use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use std::sync::Arc;
pub const DEFAULT_EMBEDDING_DIM: i32 = 768;
pub mod col {
pub const TRIPLE_ID: usize = 0;
pub const SUBJECT: usize = 1;
pub const PREDICATE: usize = 2;
pub const OBJECT: usize = 3;
pub const GRAPH: usize = 4;
pub const NAMESPACE: usize = 5;
pub const Y_LAYER: usize = 6;
pub const CONFIDENCE: usize = 7;
pub const SOURCE_DOCUMENT: usize = 8;
pub const SOURCE_CHUNK_ID: usize = 9;
pub const EXTRACTED_BY: usize = 10;
pub const CREATED_AT: usize = 11;
pub const CAUSED_BY: usize = 12;
pub const DERIVED_FROM: usize = 13;
pub const CONSOLIDATED_AT: usize = 14;
pub const DELETED: usize = 15;
pub const CERTIFIABILITY_CLASS: usize = 16;
}
pub mod chunk_col {
pub const CHUNK_ID: usize = 0;
pub const DOCUMENT_PATH: usize = 1;
pub const CONTENT: usize = 2;
pub const TOKEN_COUNT: usize = 3;
pub const CHUNK_INDEX: usize = 4;
pub const TOTAL_CHUNKS: usize = 5;
pub const CHAR_OFFSET_START: usize = 6;
pub const CHAR_OFFSET_END: usize = 7;
pub const PAGE_NUMBER: usize = 8;
pub const SECTION_HEADING: usize = 9;
pub const SECTION_LEVEL: usize = 10;
pub const PARAGRAPH_INDEX: usize = 11;
pub const ELEMENT_TYPE: usize = 12;
pub const NAMESPACE: usize = 13;
pub const Y_LAYER: usize = 14;
pub const EXTRACTED_BY: usize = 15;
pub const CREATED_AT: usize = 16;
}
pub const TRIPLES_SCHEMA_VERSION: &str = "1.2.0";
pub const CHUNKS_SCHEMA_VERSION: &str = "1.0.0";
pub fn triples_schema() -> Schema {
Schema::new(vec![
Field::new("triple_id", DataType::Utf8, false),
Field::new("subject", DataType::Utf8, false),
Field::new("predicate", DataType::Utf8, false),
Field::new("object", DataType::Utf8, false),
Field::new("graph", DataType::Utf8, true),
Field::new("namespace", DataType::Utf8, false),
Field::new("y_layer", DataType::UInt8, false),
Field::new("confidence", DataType::Float64, true),
Field::new("source_document", DataType::Utf8, true),
Field::new("source_chunk_id", DataType::Utf8, true),
Field::new("extracted_by", DataType::Utf8, true),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
),
Field::new("caused_by", DataType::Utf8, true),
Field::new("derived_from", DataType::Utf8, true),
Field::new(
"consolidated_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
true,
),
Field::new("deleted", DataType::Boolean, false),
Field::new("certifiability_class", DataType::Utf8, true),
])
}
pub fn chunks_schema() -> Schema {
Schema::new(vec![
Field::new("chunk_id", DataType::Utf8, false),
Field::new("document_path", DataType::Utf8, false),
Field::new("content", DataType::LargeUtf8, true),
Field::new("token_count", DataType::UInt32, false),
Field::new("chunk_index", DataType::UInt32, false),
Field::new("total_chunks", DataType::UInt32, false),
Field::new("char_offset_start", DataType::UInt64, true),
Field::new("char_offset_end", DataType::UInt64, true),
Field::new("page_number", DataType::UInt32, true),
Field::new("section_heading", DataType::Utf8, true),
Field::new("section_level", DataType::UInt8, true),
Field::new("paragraph_index", DataType::UInt32, true),
Field::new("element_type", DataType::Utf8, false),
Field::new("namespace", DataType::Utf8, false),
Field::new("y_layer", DataType::UInt8, false),
Field::new("extracted_by", DataType::Utf8, true),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
),
])
}
pub fn normalize_to_current(
batch: &RecordBatch,
from_version: &str,
) -> std::result::Result<RecordBatch, arrow::error::ArrowError> {
use arrow::array::StringArray;
match from_version {
"1.2.0" => Ok(batch.clone()),
"1.1.0" => {
let num_rows = batch.num_rows();
let mut columns: Vec<Arc<dyn arrow::array::Array>> = Vec::with_capacity(17);
for i in 0..batch.num_columns() {
columns.push(batch.column(i).clone());
}
let nulls: Vec<Option<&str>> = vec![None; num_rows];
columns.push(Arc::new(StringArray::from(nulls)));
let schema = Arc::new(triples_schema());
RecordBatch::try_new(schema, columns)
}
"1.0.0" => {
let num_rows = batch.num_rows();
let mut columns: Vec<Arc<dyn arrow::array::Array>> = Vec::with_capacity(17);
for i in 0..9 {
columns.push(batch.column(i).clone());
}
let nulls: Vec<Option<&str>> = vec![None; num_rows];
columns.push(Arc::new(StringArray::from(nulls.clone())));
for i in 9..batch.num_columns() {
columns.push(batch.column(i).clone());
}
columns.push(Arc::new(StringArray::from(nulls)));
let schema = Arc::new(triples_schema());
RecordBatch::try_new(schema, columns)
}
other => Err(arrow::error::ArrowError::InvalidArgumentError(format!(
"Unknown schema version '{}'. Supported: 1.0.0, 1.1.0, 1.2.0. \
Upgrade nusy-arrow-core to read data from newer versions.",
other
))),
}
}
pub fn embeddings_schema() -> Schema {
embeddings_schema_with_dim(DEFAULT_EMBEDDING_DIM)
}
pub fn embeddings_schema_with_dim(dim: i32) -> Schema {
Schema::new(vec![
Field::new("entity_id", DataType::Utf8, false),
Field::new(
"vector",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), dim),
false,
),
])
}
pub fn metadata_schema() -> Schema {
Schema::new(vec![
Field::new("entity_id", DataType::Utf8, false),
Field::new("y_layer", DataType::UInt8, false),
Field::new("namespace", DataType::Utf8, false),
Field::new("access_count", DataType::UInt64, false),
Field::new(
"last_accessed",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
true,
),
])
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
Array, BooleanArray, Float64Array, RecordBatch, StringArray, TimestampMillisecondArray,
UInt8Array,
};
#[test]
fn test_triples_schema_creates_record_batch() {
let schema = Arc::new(triples_schema());
let now_ms = chrono::Utc::now().timestamp_millis();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["t-001"])),
Arc::new(StringArray::from(vec!["nusy:Santiago"])),
Arc::new(StringArray::from(vec!["rdf:type"])),
Arc::new(StringArray::from(vec!["nusy:Being"])),
Arc::new(StringArray::from(vec![Some("default")])),
Arc::new(StringArray::from(vec!["world"])),
Arc::new(UInt8Array::from(vec![1u8])),
Arc::new(Float64Array::from(vec![Some(0.95)])),
Arc::new(StringArray::from(vec![Some("ontology.md")])),
Arc::new(StringArray::from(vec![Some("chunk_onto_001")])), Arc::new(StringArray::from(vec![Some("DGX")])),
Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
Arc::new(StringArray::from(vec![Some("t-000")])), Arc::new(StringArray::from(vec![Some("t-base")])), Arc::new(TimestampMillisecondArray::from(vec![Some(now_ms)]).with_timezone("UTC")), Arc::new(BooleanArray::from(vec![false])),
Arc::new(StringArray::from(vec![None::<&str>])), ],
)
.expect("Failed to create triples RecordBatch");
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 17);
}
#[test]
fn test_chunks_schema_creates_record_batch() {
use arrow::array::{LargeStringArray, UInt32Array, UInt64Array};
let schema = Arc::new(chunks_schema());
let now_ms = chrono::Utc::now().timestamp_millis();
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["chunk_woz_001"])),
Arc::new(StringArray::from(vec!["wizard-of-oz.md"])),
Arc::new(LargeStringArray::from(vec![Some(
"Dorothy carried the shoes...",
)])),
Arc::new(UInt32Array::from(vec![42u32])),
Arc::new(UInt32Array::from(vec![0u32])),
Arc::new(UInt32Array::from(vec![10u32])),
Arc::new(UInt64Array::from(vec![Some(0u64)])),
Arc::new(UInt64Array::from(vec![Some(156u64)])),
Arc::new(UInt32Array::from(vec![Some(36u32)])),
Arc::new(StringArray::from(vec![Some("Chapter 2: The Council")])),
Arc::new(UInt8Array::from(vec![Some(2u8)])),
Arc::new(UInt32Array::from(vec![Some(7u32)])),
Arc::new(StringArray::from(vec!["prose"])),
Arc::new(StringArray::from(vec!["world"])),
Arc::new(UInt8Array::from(vec![0u8])),
Arc::new(StringArray::from(vec![Some("DGX")])),
Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
],
)
.expect("Failed to create chunks RecordBatch");
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 17);
}
#[test]
fn test_normalize_v1_0_0_to_v1_1_0() {
let v1_0_schema = Arc::new(Schema::new(vec![
Field::new("triple_id", DataType::Utf8, false),
Field::new("subject", DataType::Utf8, false),
Field::new("predicate", DataType::Utf8, false),
Field::new("object", DataType::Utf8, false),
Field::new("graph", DataType::Utf8, true),
Field::new("namespace", DataType::Utf8, false),
Field::new("y_layer", DataType::UInt8, false),
Field::new("confidence", DataType::Float64, true),
Field::new("source_document", DataType::Utf8, true),
Field::new("extracted_by", DataType::Utf8, true),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
),
Field::new("caused_by", DataType::Utf8, true),
Field::new("derived_from", DataType::Utf8, true),
Field::new(
"consolidated_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
true,
),
Field::new("deleted", DataType::Boolean, false),
]));
let now_ms = chrono::Utc::now().timestamp_millis();
let old_batch = RecordBatch::try_new(
v1_0_schema,
vec![
Arc::new(StringArray::from(vec!["t-001"])),
Arc::new(StringArray::from(vec!["sub"])),
Arc::new(StringArray::from(vec!["pred"])),
Arc::new(StringArray::from(vec!["obj"])),
Arc::new(StringArray::from(vec![Some("default")])),
Arc::new(StringArray::from(vec!["world"])),
Arc::new(UInt8Array::from(vec![1u8])),
Arc::new(Float64Array::from(vec![Some(0.9)])),
Arc::new(StringArray::from(vec![Some("doc.md")])),
Arc::new(StringArray::from(vec![Some("DGX")])),
Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
Arc::new(StringArray::from(vec![None::<&str>])),
Arc::new(StringArray::from(vec![None::<&str>])),
Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
Arc::new(BooleanArray::from(vec![false])),
],
)
.unwrap();
assert_eq!(old_batch.num_columns(), 15);
let normalized = normalize_to_current(&old_batch, "1.0.0").unwrap();
assert_eq!(normalized.num_columns(), 17);
assert_eq!(normalized.schema(), Arc::new(triples_schema()));
let chunk_id_col = normalized
.column(col::SOURCE_CHUNK_ID)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(chunk_id_col.is_null(0));
let extracted = normalized
.column(col::EXTRACTED_BY)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(extracted.value(0), "DGX");
}
#[test]
fn test_normalize_v1_1_0_adds_certifiability_class() {
let v1_1_schema = Arc::new(Schema::new(vec![
Field::new("triple_id", DataType::Utf8, false),
Field::new("subject", DataType::Utf8, false),
Field::new("predicate", DataType::Utf8, false),
Field::new("object", DataType::Utf8, false),
Field::new("graph", DataType::Utf8, true),
Field::new("namespace", DataType::Utf8, false),
Field::new("y_layer", DataType::UInt8, false),
Field::new("confidence", DataType::Float64, true),
Field::new("source_document", DataType::Utf8, true),
Field::new("source_chunk_id", DataType::Utf8, true),
Field::new("extracted_by", DataType::Utf8, true),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
),
Field::new("caused_by", DataType::Utf8, true),
Field::new("derived_from", DataType::Utf8, true),
Field::new(
"consolidated_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
true,
),
Field::new("deleted", DataType::Boolean, false),
]));
let now_ms = chrono::Utc::now().timestamp_millis();
let batch = RecordBatch::try_new(
v1_1_schema,
vec![
Arc::new(StringArray::from(vec!["t-001"])),
Arc::new(StringArray::from(vec!["sub"])),
Arc::new(StringArray::from(vec!["pred"])),
Arc::new(StringArray::from(vec!["obj"])),
Arc::new(StringArray::from(vec![Some("default")])),
Arc::new(StringArray::from(vec!["world"])),
Arc::new(UInt8Array::from(vec![1u8])),
Arc::new(Float64Array::from(vec![Some(0.9)])),
Arc::new(StringArray::from(vec![Some("doc.md")])),
Arc::new(StringArray::from(vec![None::<&str>])), Arc::new(StringArray::from(vec![Some("DGX")])),
Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
Arc::new(StringArray::from(vec![None::<&str>])),
Arc::new(StringArray::from(vec![None::<&str>])),
Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
Arc::new(BooleanArray::from(vec![false])),
],
)
.unwrap();
assert_eq!(batch.num_columns(), 16);
let normalized = normalize_to_current(&batch, "1.1.0").unwrap();
assert_eq!(normalized.num_columns(), 17);
assert_eq!(normalized.schema(), Arc::new(triples_schema()));
let cert_col = normalized
.column(col::CERTIFIABILITY_CLASS)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(cert_col.is_null(0));
}
#[test]
fn test_normalize_unknown_version_errors() {
let schema = Arc::new(triples_schema());
let now_ms = chrono::Utc::now().timestamp_millis();
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["t-001"])),
Arc::new(StringArray::from(vec!["sub"])),
Arc::new(StringArray::from(vec!["pred"])),
Arc::new(StringArray::from(vec!["obj"])),
Arc::new(StringArray::from(vec![Some("default")])),
Arc::new(StringArray::from(vec!["world"])),
Arc::new(UInt8Array::from(vec![1u8])),
Arc::new(Float64Array::from(vec![Some(0.9)])),
Arc::new(StringArray::from(vec![Some("doc.md")])),
Arc::new(StringArray::from(vec![None::<&str>])),
Arc::new(StringArray::from(vec![Some("DGX")])),
Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
Arc::new(StringArray::from(vec![None::<&str>])),
Arc::new(StringArray::from(vec![None::<&str>])),
Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
Arc::new(BooleanArray::from(vec![false])),
Arc::new(StringArray::from(vec![None::<&str>])), ],
)
.unwrap();
let result = normalize_to_current(&batch, "2.0.0");
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Unknown schema version"));
}
#[test]
fn test_embeddings_schema_creates_record_batch() {
use arrow::array::{FixedSizeListArray, Float32Array};
let schema = Arc::new(embeddings_schema_with_dim(4));
let values = Float32Array::from(vec![0.1, 0.2, 0.3, 0.4]);
let list = FixedSizeListArray::try_new(
Arc::new(Field::new("item", DataType::Float32, false)),
4,
Arc::new(values),
None,
)
.unwrap();
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(StringArray::from(vec!["e-001"])), Arc::new(list)],
)
.expect("Failed to create embeddings RecordBatch");
assert_eq!(batch.num_rows(), 1);
}
#[test]
fn test_metadata_schema_creates_record_batch() {
use arrow::array::UInt64Array;
let schema = Arc::new(metadata_schema());
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["e-001"])),
Arc::new(UInt8Array::from(vec![2u8])),
Arc::new(StringArray::from(vec!["work"])),
Arc::new(UInt64Array::from(vec![42u64])),
Arc::new(
TimestampMillisecondArray::from(vec![Some(
chrono::Utc::now().timestamp_millis(),
)])
.with_timezone("UTC"),
),
],
)
.expect("Failed to create metadata RecordBatch");
assert_eq!(batch.num_rows(), 1);
}
}