1use arrow::array::RecordBatch;
13use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
14use std::sync::Arc;
15
16pub const DEFAULT_EMBEDDING_DIM: i32 = 768;
18
19pub mod col {
22 pub const TRIPLE_ID: usize = 0;
23 pub const SUBJECT: usize = 1;
24 pub const PREDICATE: usize = 2;
25 pub const OBJECT: usize = 3;
26 pub const GRAPH: usize = 4;
27 pub const NAMESPACE: usize = 5;
28 pub const LAYER: usize = 6;
29 pub const CONFIDENCE: usize = 7;
30 pub const SOURCE_DOCUMENT: usize = 8;
31 pub const SOURCE_CHUNK_ID: usize = 9;
32 pub const EXTRACTED_BY: usize = 10;
33 pub const CREATED_AT: usize = 11;
34 pub const CAUSED_BY: usize = 12;
35 pub const DERIVED_FROM: usize = 13;
36 pub const CONSOLIDATED_AT: usize = 14;
37 pub const DELETED: usize = 15;
38}
39
40pub mod chunk_col {
42 pub const CHUNK_ID: usize = 0;
43 pub const DOCUMENT_PATH: usize = 1;
44 pub const CONTENT: usize = 2;
45 pub const TOKEN_COUNT: usize = 3;
46 pub const CHUNK_INDEX: usize = 4;
47 pub const TOTAL_CHUNKS: usize = 5;
48 pub const CHAR_OFFSET_START: usize = 6;
49 pub const CHAR_OFFSET_END: usize = 7;
50 pub const PAGE_NUMBER: usize = 8;
51 pub const SECTION_HEADING: usize = 9;
52 pub const SECTION_LEVEL: usize = 10;
53 pub const PARAGRAPH_INDEX: usize = 11;
54 pub const ELEMENT_TYPE: usize = 12;
55 pub const NAMESPACE: usize = 13;
56 pub const LAYER: usize = 14;
57 pub const EXTRACTED_BY: usize = 15;
58 pub const CREATED_AT: usize = 16;
59}
60
61pub const TRIPLES_SCHEMA_VERSION: &str = "1.1.0";
63
64pub const CHUNKS_SCHEMA_VERSION: &str = "1.0.0";
66
67pub fn triples_schema() -> Schema {
85 Schema::new(vec![
86 Field::new("triple_id", DataType::Utf8, false),
87 Field::new("subject", DataType::Utf8, false),
88 Field::new("predicate", DataType::Utf8, false),
89 Field::new("object", DataType::Utf8, false),
90 Field::new("graph", DataType::Utf8, true),
91 Field::new("namespace", DataType::Utf8, false),
92 Field::new("layer", DataType::UInt8, false),
93 Field::new("confidence", DataType::Float64, true),
94 Field::new("source_document", DataType::Utf8, true),
95 Field::new("source_chunk_id", DataType::Utf8, true),
96 Field::new("extracted_by", DataType::Utf8, true),
97 Field::new(
98 "created_at",
99 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
100 false,
101 ),
102 Field::new("caused_by", DataType::Utf8, true),
103 Field::new("derived_from", DataType::Utf8, true),
104 Field::new(
105 "consolidated_at",
106 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
107 true,
108 ),
109 Field::new("deleted", DataType::Boolean, false),
110 ])
111}
112
113pub fn chunks_schema() -> Schema {
121 Schema::new(vec![
122 Field::new("chunk_id", DataType::Utf8, false),
124 Field::new("document_path", DataType::Utf8, false),
125 Field::new("content", DataType::LargeUtf8, true),
127 Field::new("token_count", DataType::UInt32, false),
128 Field::new("chunk_index", DataType::UInt32, false),
130 Field::new("total_chunks", DataType::UInt32, false),
131 Field::new("char_offset_start", DataType::UInt64, true),
132 Field::new("char_offset_end", DataType::UInt64, true),
133 Field::new("page_number", DataType::UInt32, true),
135 Field::new("section_heading", DataType::Utf8, true),
136 Field::new("section_level", DataType::UInt8, true),
137 Field::new("paragraph_index", DataType::UInt32, true),
138 Field::new("element_type", DataType::Utf8, false),
139 Field::new("namespace", DataType::Utf8, false),
141 Field::new("layer", DataType::UInt8, false),
142 Field::new("extracted_by", DataType::Utf8, true),
143 Field::new(
144 "created_at",
145 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
146 false,
147 ),
148 ])
149}
150
151pub fn normalize_to_current(
159 batch: &RecordBatch,
160 from_version: &str,
161) -> std::result::Result<RecordBatch, arrow::error::ArrowError> {
162 match from_version {
163 "1.1.0" => Ok(batch.clone()),
164 "1.0.0" => {
165 let num_rows = batch.num_rows();
167 let mut columns: Vec<Arc<dyn arrow::array::Array>> = Vec::with_capacity(16);
168
169 for i in 0..9 {
171 columns.push(batch.column(i).clone());
172 }
173
174 use arrow::array::StringArray;
176 let nulls: Vec<Option<&str>> = vec![None; num_rows];
177 columns.push(Arc::new(StringArray::from(nulls)));
178
179 for i in 9..batch.num_columns() {
181 columns.push(batch.column(i).clone());
182 }
183
184 let schema = Arc::new(triples_schema());
185 RecordBatch::try_new(schema, columns)
186 }
187 other => Err(arrow::error::ArrowError::InvalidArgumentError(format!(
188 "Unknown schema version '{}'. Supported: 1.0.0, 1.1.0. \
189 Upgrade arrow-graph-core to read data from newer versions.",
190 other
191 ))),
192 }
193}
194
195pub fn embeddings_schema() -> Schema {
197 embeddings_schema_with_dim(DEFAULT_EMBEDDING_DIM)
198}
199
200pub fn embeddings_schema_with_dim(dim: i32) -> Schema {
202 Schema::new(vec![
203 Field::new("entity_id", DataType::Utf8, false),
204 Field::new(
205 "vector",
206 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), dim),
207 false,
208 ),
209 ])
210}
211
212pub fn metadata_schema() -> Schema {
214 Schema::new(vec![
215 Field::new("entity_id", DataType::Utf8, false),
216 Field::new("layer", DataType::UInt8, false),
217 Field::new("namespace", DataType::Utf8, false),
218 Field::new("access_count", DataType::UInt64, false),
219 Field::new(
220 "last_accessed",
221 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
222 true,
223 ),
224 ])
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use arrow::array::{
231 Array, BooleanArray, Float64Array, RecordBatch, StringArray, TimestampMillisecondArray,
232 UInt8Array,
233 };
234
235 #[test]
236 fn test_triples_schema_creates_record_batch() {
237 let schema = Arc::new(triples_schema());
238 let now_ms = chrono::Utc::now().timestamp_millis();
239
240 let batch = RecordBatch::try_new(
241 schema.clone(),
242 vec![
243 Arc::new(StringArray::from(vec!["t-001"])),
244 Arc::new(StringArray::from(vec!["example:Alice"])),
245 Arc::new(StringArray::from(vec!["rdf:type"])),
246 Arc::new(StringArray::from(vec!["example:Person"])),
247 Arc::new(StringArray::from(vec![Some("default")])),
248 Arc::new(StringArray::from(vec!["world"])),
249 Arc::new(UInt8Array::from(vec![1u8])),
250 Arc::new(Float64Array::from(vec![Some(0.95)])),
251 Arc::new(StringArray::from(vec![Some("ontology.md")])),
252 Arc::new(StringArray::from(vec![Some("chunk_001")])),
253 Arc::new(StringArray::from(vec![Some("agent-1")])),
254 Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
255 Arc::new(StringArray::from(vec![Some("t-000")])),
256 Arc::new(StringArray::from(vec![Some("t-base")])),
257 Arc::new(TimestampMillisecondArray::from(vec![Some(now_ms)]).with_timezone("UTC")),
258 Arc::new(BooleanArray::from(vec![false])),
259 ],
260 )
261 .expect("Failed to create triples RecordBatch");
262
263 assert_eq!(batch.num_rows(), 1);
264 assert_eq!(batch.num_columns(), 16);
265 }
266
267 #[test]
268 fn test_chunks_schema_creates_record_batch() {
269 use arrow::array::{LargeStringArray, UInt32Array, UInt64Array};
270
271 let schema = Arc::new(chunks_schema());
272 let now_ms = chrono::Utc::now().timestamp_millis();
273
274 let batch = RecordBatch::try_new(
275 schema,
276 vec![
277 Arc::new(StringArray::from(vec!["chunk_001"])),
278 Arc::new(StringArray::from(vec!["document.md"])),
279 Arc::new(LargeStringArray::from(vec![Some("The quick brown fox...")])),
280 Arc::new(UInt32Array::from(vec![42u32])),
281 Arc::new(UInt32Array::from(vec![0u32])),
282 Arc::new(UInt32Array::from(vec![10u32])),
283 Arc::new(UInt64Array::from(vec![Some(0u64)])),
284 Arc::new(UInt64Array::from(vec![Some(156u64)])),
285 Arc::new(UInt32Array::from(vec![Some(36u32)])),
286 Arc::new(StringArray::from(vec![Some("Chapter 2")])),
287 Arc::new(UInt8Array::from(vec![Some(2u8)])),
288 Arc::new(UInt32Array::from(vec![Some(7u32)])),
289 Arc::new(StringArray::from(vec!["prose"])),
290 Arc::new(StringArray::from(vec!["world"])),
291 Arc::new(UInt8Array::from(vec![0u8])),
292 Arc::new(StringArray::from(vec![Some("agent-1")])),
293 Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
294 ],
295 )
296 .expect("Failed to create chunks RecordBatch");
297
298 assert_eq!(batch.num_rows(), 1);
299 assert_eq!(batch.num_columns(), 17);
300 }
301
302 #[test]
303 fn test_normalize_v1_0_0_to_v1_1_0() {
304 let v1_0_schema = Arc::new(Schema::new(vec![
305 Field::new("triple_id", DataType::Utf8, false),
306 Field::new("subject", DataType::Utf8, false),
307 Field::new("predicate", DataType::Utf8, false),
308 Field::new("object", DataType::Utf8, false),
309 Field::new("graph", DataType::Utf8, true),
310 Field::new("namespace", DataType::Utf8, false),
311 Field::new("layer", DataType::UInt8, false),
312 Field::new("confidence", DataType::Float64, true),
313 Field::new("source_document", DataType::Utf8, true),
314 Field::new("extracted_by", DataType::Utf8, true),
315 Field::new(
316 "created_at",
317 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
318 false,
319 ),
320 Field::new("caused_by", DataType::Utf8, true),
321 Field::new("derived_from", DataType::Utf8, true),
322 Field::new(
323 "consolidated_at",
324 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
325 true,
326 ),
327 Field::new("deleted", DataType::Boolean, false),
328 ]));
329 let now_ms = chrono::Utc::now().timestamp_millis();
330
331 let old_batch = RecordBatch::try_new(
332 v1_0_schema,
333 vec![
334 Arc::new(StringArray::from(vec!["t-001"])),
335 Arc::new(StringArray::from(vec!["sub"])),
336 Arc::new(StringArray::from(vec!["pred"])),
337 Arc::new(StringArray::from(vec!["obj"])),
338 Arc::new(StringArray::from(vec![Some("default")])),
339 Arc::new(StringArray::from(vec!["world"])),
340 Arc::new(UInt8Array::from(vec![1u8])),
341 Arc::new(Float64Array::from(vec![Some(0.9)])),
342 Arc::new(StringArray::from(vec![Some("doc.md")])),
343 Arc::new(StringArray::from(vec![Some("agent-1")])),
344 Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
345 Arc::new(StringArray::from(vec![None::<&str>])),
346 Arc::new(StringArray::from(vec![None::<&str>])),
347 Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
348 Arc::new(BooleanArray::from(vec![false])),
349 ],
350 )
351 .unwrap();
352
353 assert_eq!(old_batch.num_columns(), 15);
354
355 let normalized = normalize_to_current(&old_batch, "1.0.0").unwrap();
356 assert_eq!(normalized.num_columns(), 16);
357 assert_eq!(normalized.schema(), Arc::new(triples_schema()));
358
359 let chunk_id_col = normalized
360 .column(col::SOURCE_CHUNK_ID)
361 .as_any()
362 .downcast_ref::<StringArray>()
363 .unwrap();
364 assert!(chunk_id_col.is_null(0));
365
366 let extracted = normalized
367 .column(col::EXTRACTED_BY)
368 .as_any()
369 .downcast_ref::<StringArray>()
370 .unwrap();
371 assert_eq!(extracted.value(0), "agent-1");
372 }
373
374 #[test]
375 fn test_normalize_v1_1_0_passthrough() {
376 let schema = Arc::new(triples_schema());
377 let now_ms = chrono::Utc::now().timestamp_millis();
378
379 let batch = RecordBatch::try_new(
380 schema,
381 vec![
382 Arc::new(StringArray::from(vec!["t-001"])),
383 Arc::new(StringArray::from(vec!["sub"])),
384 Arc::new(StringArray::from(vec!["pred"])),
385 Arc::new(StringArray::from(vec!["obj"])),
386 Arc::new(StringArray::from(vec![Some("default")])),
387 Arc::new(StringArray::from(vec!["world"])),
388 Arc::new(UInt8Array::from(vec![1u8])),
389 Arc::new(Float64Array::from(vec![Some(0.9)])),
390 Arc::new(StringArray::from(vec![Some("doc.md")])),
391 Arc::new(StringArray::from(vec![None::<&str>])),
392 Arc::new(StringArray::from(vec![Some("agent-1")])),
393 Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
394 Arc::new(StringArray::from(vec![None::<&str>])),
395 Arc::new(StringArray::from(vec![None::<&str>])),
396 Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
397 Arc::new(BooleanArray::from(vec![false])),
398 ],
399 )
400 .unwrap();
401
402 let normalized = normalize_to_current(&batch, "1.1.0").unwrap();
403 assert_eq!(normalized.num_columns(), 16);
404 }
405
406 #[test]
407 fn test_normalize_unknown_version_errors() {
408 let schema = Arc::new(triples_schema());
409 let now_ms = chrono::Utc::now().timestamp_millis();
410
411 let batch = RecordBatch::try_new(
412 schema,
413 vec![
414 Arc::new(StringArray::from(vec!["t-001"])),
415 Arc::new(StringArray::from(vec!["sub"])),
416 Arc::new(StringArray::from(vec!["pred"])),
417 Arc::new(StringArray::from(vec!["obj"])),
418 Arc::new(StringArray::from(vec![Some("default")])),
419 Arc::new(StringArray::from(vec!["world"])),
420 Arc::new(UInt8Array::from(vec![1u8])),
421 Arc::new(Float64Array::from(vec![Some(0.9)])),
422 Arc::new(StringArray::from(vec![Some("doc.md")])),
423 Arc::new(StringArray::from(vec![None::<&str>])),
424 Arc::new(StringArray::from(vec![Some("agent-1")])),
425 Arc::new(TimestampMillisecondArray::from(vec![now_ms]).with_timezone("UTC")),
426 Arc::new(StringArray::from(vec![None::<&str>])),
427 Arc::new(StringArray::from(vec![None::<&str>])),
428 Arc::new(TimestampMillisecondArray::from(vec![None]).with_timezone("UTC")),
429 Arc::new(BooleanArray::from(vec![false])),
430 ],
431 )
432 .unwrap();
433
434 let result = normalize_to_current(&batch, "2.0.0");
435 assert!(result.is_err());
436 let err_msg = result.unwrap_err().to_string();
437 assert!(err_msg.contains("Unknown schema version"));
438 }
439
440 #[test]
441 fn test_embeddings_schema_creates_record_batch() {
442 use arrow::array::{FixedSizeListArray, Float32Array};
443
444 let schema = Arc::new(embeddings_schema_with_dim(4));
445 let values = Float32Array::from(vec![0.1, 0.2, 0.3, 0.4]);
446 let list = FixedSizeListArray::try_new(
447 Arc::new(Field::new("item", DataType::Float32, false)),
448 4,
449 Arc::new(values),
450 None,
451 )
452 .unwrap();
453
454 let batch = RecordBatch::try_new(
455 schema,
456 vec![Arc::new(StringArray::from(vec!["e-001"])), Arc::new(list)],
457 )
458 .expect("Failed to create embeddings RecordBatch");
459
460 assert_eq!(batch.num_rows(), 1);
461 }
462
463 #[test]
464 fn test_metadata_schema_creates_record_batch() {
465 use arrow::array::UInt64Array;
466
467 let schema = Arc::new(metadata_schema());
468 let batch = RecordBatch::try_new(
469 schema,
470 vec![
471 Arc::new(StringArray::from(vec!["e-001"])),
472 Arc::new(UInt8Array::from(vec![2u8])),
473 Arc::new(StringArray::from(vec!["work"])),
474 Arc::new(UInt64Array::from(vec![42u64])),
475 Arc::new(
476 TimestampMillisecondArray::from(vec![Some(
477 chrono::Utc::now().timestamp_millis(),
478 )])
479 .with_timezone("UTC"),
480 ),
481 ],
482 )
483 .expect("Failed to create metadata RecordBatch");
484
485 assert_eq!(batch.num_rows(), 1);
486 }
487}