Skip to main content

arrow_graph_core/
schema.rs

1//! Arrow schemas for the graph store.
2//!
3//! Three foundational tables:
4//! - **Triples**: subject/predicate/object quads with provenance
5//! - **Embeddings**: entity vectors (`FixedSizeList<f32>`)
6//! - **Metadata**: per-entity access tracking
7//!
8//! The triples schema includes an optional `layer` column (UInt8) for
9//! sub-partitioning within namespaces. When layers are not needed,
10//! the column is still present but set to 0.
11
12use arrow::array::RecordBatch;
13use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
14use std::sync::Arc;
15
16/// Default embedding dimension (all-MiniLM-L6-v2 = 384, but 768 is future-proof).
17pub const DEFAULT_EMBEDDING_DIM: i32 = 768;
18
19/// Named column indices for the Triples schema.
20/// Use these instead of hardcoded integers when accessing RecordBatch columns.
21pub mod col {
22    pub const TRIPLE_ID: usize = 0;
23    pub const SUBJECT: usize = 1;
24    pub const PREDICATE: usize = 2;
25    pub const OBJECT: usize = 3;
26    pub const GRAPH: usize = 4;
27    pub const NAMESPACE: usize = 5;
28    pub const LAYER: usize = 6;
29    pub const CONFIDENCE: usize = 7;
30    pub const SOURCE_DOCUMENT: usize = 8;
31    pub const SOURCE_CHUNK_ID: usize = 9;
32    pub const EXTRACTED_BY: usize = 10;
33    pub const CREATED_AT: usize = 11;
34    pub const CAUSED_BY: usize = 12;
35    pub const DERIVED_FROM: usize = 13;
36    pub const CONSOLIDATED_AT: usize = 14;
37    pub const DELETED: usize = 15;
38}
39
40/// Named column indices for the Chunks schema (fine-grained provenance).
41pub mod chunk_col {
42    pub const CHUNK_ID: usize = 0;
43    pub const DOCUMENT_PATH: usize = 1;
44    pub const CONTENT: usize = 2;
45    pub const TOKEN_COUNT: usize = 3;
46    pub const CHUNK_INDEX: usize = 4;
47    pub const TOTAL_CHUNKS: usize = 5;
48    pub const CHAR_OFFSET_START: usize = 6;
49    pub const CHAR_OFFSET_END: usize = 7;
50    pub const PAGE_NUMBER: usize = 8;
51    pub const SECTION_HEADING: usize = 9;
52    pub const SECTION_LEVEL: usize = 10;
53    pub const PARAGRAPH_INDEX: usize = 11;
54    pub const ELEMENT_TYPE: usize = 12;
55    pub const NAMESPACE: usize = 13;
56    pub const LAYER: usize = 14;
57    pub const EXTRACTED_BY: usize = 15;
58    pub const CREATED_AT: usize = 16;
59}
60
61/// Current schema version for the Triples table.
62pub const TRIPLES_SCHEMA_VERSION: &str = "1.1.0";
63
64/// Current schema version for the Chunks table.
65pub const CHUNKS_SCHEMA_VERSION: &str = "1.0.0";
66
67/// Schema for the Triples table — the core knowledge representation.
68///
69/// Columns (16 total, v1.1.0):
70/// - `triple_id`: unique identifier (UUID string)
71/// - `subject`, `predicate`, `object`: the RDF-like triple
72/// - `graph`: named graph / context URI
73/// - `namespace`: partition key (user-defined string)
74/// - `layer`: optional sub-partition (UInt8, 0 if unused)
75/// - `confidence`: float64 confidence score
76/// - `source_document`: provenance document path
77/// - `source_chunk_id`: FK to ChunkTable for fine-grained provenance
78/// - `extracted_by`: agent/process that created this triple
79/// - `created_at`: timestamp
80/// - `caused_by`: triple_id of the causal predecessor
81/// - `derived_from`: triple_id of the derivation source
82/// - `consolidated_at`: timestamp when consolidated
83/// - `deleted`: logical delete flag
84pub fn triples_schema() -> Schema {
85    Schema::new(vec![
86        Field::new("triple_id", DataType::Utf8, false),
87        Field::new("subject", DataType::Utf8, false),
88        Field::new("predicate", DataType::Utf8, false),
89        Field::new("object", DataType::Utf8, false),
90        Field::new("graph", DataType::Utf8, true),
91        Field::new("namespace", DataType::Utf8, false),
92        Field::new("layer", DataType::UInt8, false),
93        Field::new("confidence", DataType::Float64, true),
94        Field::new("source_document", DataType::Utf8, true),
95        Field::new("source_chunk_id", DataType::Utf8, true),
96        Field::new("extracted_by", DataType::Utf8, true),
97        Field::new(
98            "created_at",
99            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
100            false,
101        ),
102        Field::new("caused_by", DataType::Utf8, true),
103        Field::new("derived_from", DataType::Utf8, true),
104        Field::new(
105            "consolidated_at",
106            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
107            true,
108        ),
109        Field::new("deleted", DataType::Boolean, false),
110    ])
111}
112
113/// Schema for the Chunks table — fine-grained document provenance.
114///
115/// Each row represents a chunk of a source document. Triples reference chunks
116/// via `source_chunk_id` (FK). This enables WHY chains that resolve to
117/// paragraph-level or finer granularity.
118///
119/// 17 columns total.
120pub fn chunks_schema() -> Schema {
121    Schema::new(vec![
122        // Identity
123        Field::new("chunk_id", DataType::Utf8, false),
124        Field::new("document_path", DataType::Utf8, false),
125        // Content
126        Field::new("content", DataType::LargeUtf8, true),
127        Field::new("token_count", DataType::UInt32, false),
128        // Position within document
129        Field::new("chunk_index", DataType::UInt32, false),
130        Field::new("total_chunks", DataType::UInt32, false),
131        Field::new("char_offset_start", DataType::UInt64, true),
132        Field::new("char_offset_end", DataType::UInt64, true),
133        // Structural metadata
134        Field::new("page_number", DataType::UInt32, true),
135        Field::new("section_heading", DataType::Utf8, true),
136        Field::new("section_level", DataType::UInt8, true),
137        Field::new("paragraph_index", DataType::UInt32, true),
138        Field::new("element_type", DataType::Utf8, false),
139        // Provenance
140        Field::new("namespace", DataType::Utf8, false),
141        Field::new("layer", DataType::UInt8, false),
142        Field::new("extracted_by", DataType::Utf8, true),
143        Field::new(
144            "created_at",
145            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
146            false,
147        ),
148    ])
149}
150
151/// Normalize a RecordBatch from an older schema version to the current version.
152///
153/// This is the read-path migration: when loading Parquet files written with older
154/// schemas, the normalizer adds missing columns with default values.
155///
156/// Supported migrations:
157/// - v1.0.0 → v1.1.0: adds null `source_chunk_id` column at index 9
158pub fn normalize_to_current(
159    batch: &RecordBatch,
160    from_version: &str,
161) -> std::result::Result<RecordBatch, arrow::error::ArrowError> {
162    match from_version {
163        "1.1.0" => Ok(batch.clone()),
164        "1.0.0" => {
165            // v1.0.0 has 15 columns; v1.1.0 has 16 (source_chunk_id at index 9)
166            let num_rows = batch.num_rows();
167            let mut columns: Vec<Arc<dyn arrow::array::Array>> = Vec::with_capacity(16);
168
169            // Copy columns 0..9 (triple_id through source_document)
170            for i in 0..9 {
171                columns.push(batch.column(i).clone());
172            }
173
174            // Insert null source_chunk_id at index 9
175            use arrow::array::StringArray;
176            let nulls: Vec<Option<&str>> = vec![None; num_rows];
177            columns.push(Arc::new(StringArray::from(nulls)));
178
179            // Copy remaining columns 9..15 (extracted_by through deleted)
180            for i in 9..batch.num_columns() {
181                columns.push(batch.column(i).clone());
182            }
183
184            let schema = Arc::new(triples_schema());
185            RecordBatch::try_new(schema, columns)
186        }
187        other => Err(arrow::error::ArrowError::InvalidArgumentError(format!(
188            "Unknown schema version '{}'. Supported: 1.0.0, 1.1.0. \
189             Upgrade arrow-graph-core to read data from newer versions.",
190            other
191        ))),
192    }
193}
194
195/// Schema for the Embeddings table — vector representations of entities.
196pub fn embeddings_schema() -> Schema {
197    embeddings_schema_with_dim(DEFAULT_EMBEDDING_DIM)
198}
199
200/// Embeddings schema with a custom vector dimension.
201pub fn embeddings_schema_with_dim(dim: i32) -> Schema {
202    Schema::new(vec![
203        Field::new("entity_id", DataType::Utf8, false),
204        Field::new(
205            "vector",
206            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), dim),
207            false,
208        ),
209    ])
210}
211
212/// Schema for the Metadata table — per-entity access tracking.
213pub fn metadata_schema() -> Schema {
214    Schema::new(vec![
215        Field::new("entity_id", DataType::Utf8, false),
216        Field::new("layer", DataType::UInt8, false),
217        Field::new("namespace", DataType::Utf8, false),
218        Field::new("access_count", DataType::UInt64, false),
219        Field::new(
220            "last_accessed",
221            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
222            true,
223        ),
224    ])
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use arrow::array::{
231        Array, BooleanArray, Float64Array, RecordBatch, StringArray, TimestampMillisecondArray,
232        UInt8Array,
233    };
234
235    #[test]
236    fn test_triples_schema_creates_record_batch() {
237        let schema = Arc::new(triples_schema());
238        let now_ms = chrono::Utc::now().timestamp_millis();
239
240        let batch = RecordBatch::try_new(
241            schema.clone(),
242            vec![
243                Arc::new(StringArray::from(vec!["t-001"])),
244                Arc::new(StringArray::from(vec!["example:Alice"])),
245                Arc::new(StringArray::from(vec!["rdf:type"])),
246                Arc::new(StringArray::from(vec!["example:Person"])),
247                Arc::new(StringArray::from(vec![Some("default")])),
248                Arc::new(StringArray::from(vec!["world"])),
249                Arc::new(UInt8Array::from(vec![1u8])),
250                Arc::new(Float64Array::from(vec![Some(0.95)])),
251                Arc::new(StringArray::from(vec![Some("ontology.md")])),
252                Arc::new(StringArray::from(vec![Some("chunk_001")])),
253                Arc::new(StringArray::from(vec![Some("agent-1")])),
254                Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
255                Arc::new(StringArray::from(vec![Some("t-000")])),
256                Arc::new(StringArray::from(vec![Some("t-base")])),
257                Arc::new(TimestampMillisecondArray::from(vec![Some(now_ms)]).with_timezone("UTC")),
258                Arc::new(BooleanArray::from(vec![false])),
259            ],
260        )
261        .expect("Failed to create triples RecordBatch");
262
263        assert_eq!(batch.num_rows(), 1);
264        assert_eq!(batch.num_columns(), 16);
265    }
266
267    #[test]
268    fn test_chunks_schema_creates_record_batch() {
269        use arrow::array::{LargeStringArray, UInt32Array, UInt64Array};
270
271        let schema = Arc::new(chunks_schema());
272        let now_ms = chrono::Utc::now().timestamp_millis();
273
274        let batch = RecordBatch::try_new(
275            schema,
276            vec![
277                Arc::new(StringArray::from(vec!["chunk_001"])),
278                Arc::new(StringArray::from(vec!["document.md"])),
279                Arc::new(LargeStringArray::from(vec![Some("The quick brown fox...")])),
280                Arc::new(UInt32Array::from(vec![42u32])),
281                Arc::new(UInt32Array::from(vec![0u32])),
282                Arc::new(UInt32Array::from(vec![10u32])),
283                Arc::new(UInt64Array::from(vec![Some(0u64)])),
284                Arc::new(UInt64Array::from(vec![Some(156u64)])),
285                Arc::new(UInt32Array::from(vec![Some(36u32)])),
286                Arc::new(StringArray::from(vec![Some("Chapter 2")])),
287                Arc::new(UInt8Array::from(vec![Some(2u8)])),
288                Arc::new(UInt32Array::from(vec![Some(7u32)])),
289                Arc::new(StringArray::from(vec!["prose"])),
290                Arc::new(StringArray::from(vec!["world"])),
291                Arc::new(UInt8Array::from(vec![0u8])),
292                Arc::new(StringArray::from(vec![Some("agent-1")])),
293                Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
294            ],
295        )
296        .expect("Failed to create chunks RecordBatch");
297
298        assert_eq!(batch.num_rows(), 1);
299        assert_eq!(batch.num_columns(), 17);
300    }
301
302    #[test]
303    fn test_normalize_v1_0_0_to_v1_1_0() {
304        let v1_0_schema = Arc::new(Schema::new(vec![
305            Field::new("triple_id", DataType::Utf8, false),
306            Field::new("subject", DataType::Utf8, false),
307            Field::new("predicate", DataType::Utf8, false),
308            Field::new("object", DataType::Utf8, false),
309            Field::new("graph", DataType::Utf8, true),
310            Field::new("namespace", DataType::Utf8, false),
311            Field::new("layer", DataType::UInt8, false),
312            Field::new("confidence", DataType::Float64, true),
313            Field::new("source_document", DataType::Utf8, true),
314            Field::new("extracted_by", DataType::Utf8, true),
315            Field::new(
316                "created_at",
317                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
318                false,
319            ),
320            Field::new("caused_by", DataType::Utf8, true),
321            Field::new("derived_from", DataType::Utf8, true),
322            Field::new(
323                "consolidated_at",
324                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
325                true,
326            ),
327            Field::new("deleted", DataType::Boolean, false),
328        ]));
329        let now_ms = chrono::Utc::now().timestamp_millis();
330
331        let old_batch = RecordBatch::try_new(
332            v1_0_schema,
333            vec![
334                Arc::new(StringArray::from(vec!["t-001"])),
335                Arc::new(StringArray::from(vec!["sub"])),
336                Arc::new(StringArray::from(vec!["pred"])),
337                Arc::new(StringArray::from(vec!["obj"])),
338                Arc::new(StringArray::from(vec![Some("default")])),
339                Arc::new(StringArray::from(vec!["world"])),
340                Arc::new(UInt8Array::from(vec![1u8])),
341                Arc::new(Float64Array::from(vec![Some(0.9)])),
342                Arc::new(StringArray::from(vec![Some("doc.md")])),
343                Arc::new(StringArray::from(vec![Some("agent-1")])),
344                Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
345                Arc::new(StringArray::from(vec![None::<&str>])),
346                Arc::new(StringArray::from(vec![None::<&str>])),
347                Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
348                Arc::new(BooleanArray::from(vec![false])),
349            ],
350        )
351        .unwrap();
352
353        assert_eq!(old_batch.num_columns(), 15);
354
355        let normalized = normalize_to_current(&old_batch, "1.0.0").unwrap();
356        assert_eq!(normalized.num_columns(), 16);
357        assert_eq!(normalized.schema(), Arc::new(triples_schema()));
358
359        let chunk_id_col = normalized
360            .column(col::SOURCE_CHUNK_ID)
361            .as_any()
362            .downcast_ref::<StringArray>()
363            .unwrap();
364        assert!(chunk_id_col.is_null(0));
365
366        let extracted = normalized
367            .column(col::EXTRACTED_BY)
368            .as_any()
369            .downcast_ref::<StringArray>()
370            .unwrap();
371        assert_eq!(extracted.value(0), "agent-1");
372    }
373
374    #[test]
375    fn test_normalize_v1_1_0_passthrough() {
376        let schema = Arc::new(triples_schema());
377        let now_ms = chrono::Utc::now().timestamp_millis();
378
379        let batch = RecordBatch::try_new(
380            schema,
381            vec![
382                Arc::new(StringArray::from(vec!["t-001"])),
383                Arc::new(StringArray::from(vec!["sub"])),
384                Arc::new(StringArray::from(vec!["pred"])),
385                Arc::new(StringArray::from(vec!["obj"])),
386                Arc::new(StringArray::from(vec![Some("default")])),
387                Arc::new(StringArray::from(vec!["world"])),
388                Arc::new(UInt8Array::from(vec![1u8])),
389                Arc::new(Float64Array::from(vec![Some(0.9)])),
390                Arc::new(StringArray::from(vec![Some("doc.md")])),
391                Arc::new(StringArray::from(vec![None::<&str>])),
392                Arc::new(StringArray::from(vec![Some("agent-1")])),
393                Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
394                Arc::new(StringArray::from(vec![None::<&str>])),
395                Arc::new(StringArray::from(vec![None::<&str>])),
396                Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
397                Arc::new(BooleanArray::from(vec![false])),
398            ],
399        )
400        .unwrap();
401
402        let normalized = normalize_to_current(&batch, "1.1.0").unwrap();
403        assert_eq!(normalized.num_columns(), 16);
404    }
405
406    #[test]
407    fn test_normalize_unknown_version_errors() {
408        let schema = Arc::new(triples_schema());
409        let now_ms = chrono::Utc::now().timestamp_millis();
410
411        let batch = RecordBatch::try_new(
412            schema,
413            vec![
414                Arc::new(StringArray::from(vec!["t-001"])),
415                Arc::new(StringArray::from(vec!["sub"])),
416                Arc::new(StringArray::from(vec!["pred"])),
417                Arc::new(StringArray::from(vec!["obj"])),
418                Arc::new(StringArray::from(vec![Some("default")])),
419                Arc::new(StringArray::from(vec!["world"])),
420                Arc::new(UInt8Array::from(vec![1u8])),
421                Arc::new(Float64Array::from(vec![Some(0.9)])),
422                Arc::new(StringArray::from(vec![Some("doc.md")])),
423                Arc::new(StringArray::from(vec![None::<&str>])),
424                Arc::new(StringArray::from(vec![Some("agent-1")])),
425                Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
426                Arc::new(StringArray::from(vec![None::<&str>])),
427                Arc::new(StringArray::from(vec![None::<&str>])),
428                Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
429                Arc::new(BooleanArray::from(vec![false])),
430            ],
431        )
432        .unwrap();
433
434        let result = normalize_to_current(&batch, "2.0.0");
435        assert!(result.is_err());
436        let err_msg = result.unwrap_err().to_string();
437        assert!(err_msg.contains("Unknown schema version"));
438    }
439
440    #[test]
441    fn test_embeddings_schema_creates_record_batch() {
442        use arrow::array::{FixedSizeListArray, Float32Array};
443
444        let schema = Arc::new(embeddings_schema_with_dim(4));
445        let values = Float32Array::from(vec![0.1, 0.2, 0.3, 0.4]);
446        let list = FixedSizeListArray::try_new(
447            Arc::new(Field::new("item", DataType::Float32, false)),
448            4,
449            Arc::new(values),
450            None,
451        )
452        .unwrap();
453
454        let batch = RecordBatch::try_new(
455            schema,
456            vec![Arc::new(StringArray::from(vec!["e-001"])), Arc::new(list)],
457        )
458        .expect("Failed to create embeddings RecordBatch");
459
460        assert_eq!(batch.num_rows(), 1);
461    }
462
463    #[test]
464    fn test_metadata_schema_creates_record_batch() {
465        use arrow::array::UInt64Array;
466
467        let schema = Arc::new(metadata_schema());
468        let batch = RecordBatch::try_new(
469            schema,
470            vec![
471                Arc::new(StringArray::from(vec!["e-001"])),
472                Arc::new(UInt8Array::from(vec![2u8])),
473                Arc::new(StringArray::from(vec!["work"])),
474                Arc::new(UInt64Array::from(vec![42u64])),
475                Arc::new(
476                    TimestampMillisecondArray::from(vec![Some(
477                        chrono::Utc::now().timestamp_millis(),
478                    )])
479                    .with_timezone("UTC"),
480                ),
481            ],
482        )
483        .expect("Failed to create metadata RecordBatch");
484
485        assert_eq!(batch.num_rows(), 1);
486    }
487}