1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24
25#[derive(Debug, Clone, Default)]
27pub struct LoadResult {
28 pub nodes_loaded: HashMap<String, usize>,
29 pub edges_loaded: HashMap<String, usize>,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct IngestTableResult {
34 pub table_key: String,
35 pub rows_loaded: usize,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct IngestResult {
40 pub branch: String,
41 pub base_branch: String,
42 pub branch_created: bool,
43 pub mode: LoadMode,
44 pub tables: Vec<IngestTableResult>,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum LoadMode {
51 Overwrite,
53 Append,
55 Merge,
57}
58
59pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
61 let current_branch = db.active_branch().map(str::to_string);
62 let branch = current_branch.as_deref().unwrap_or("main");
63 db.load(branch, data, mode).await
64}
65
66pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
68 let current_branch = db.active_branch().map(str::to_string);
69 let branch = current_branch.as_deref().unwrap_or("main");
70 db.load_file(branch, path, mode).await
71}
72
73impl Omnigraph {
74 pub async fn ingest(
75 &mut self,
76 branch: &str,
77 from: Option<&str>,
78 data: &str,
79 mode: LoadMode,
80 ) -> Result<IngestResult> {
81 self.ingest_as(branch, from, data, mode, None).await
82 }
83
84 pub async fn ingest_as(
85 &mut self,
86 branch: &str,
87 from: Option<&str>,
88 data: &str,
89 mode: LoadMode,
90 actor_id: Option<&str>,
91 ) -> Result<IngestResult> {
92 let previous_actor = self.audit_actor_id.clone();
93 self.audit_actor_id = actor_id.map(str::to_string);
94 let result = self
95 .ingest_with_current_actor(branch, from, data, mode)
96 .await;
97 self.audit_actor_id = previous_actor;
98 result
99 }
100
101 pub async fn ingest_file(
102 &mut self,
103 branch: &str,
104 from: Option<&str>,
105 path: &str,
106 mode: LoadMode,
107 ) -> Result<IngestResult> {
108 self.ingest_file_as(branch, from, path, mode, None).await
109 }
110
111 pub async fn ingest_file_as(
112 &mut self,
113 branch: &str,
114 from: Option<&str>,
115 path: &str,
116 mode: LoadMode,
117 actor_id: Option<&str>,
118 ) -> Result<IngestResult> {
119 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
120 self.ingest_as(branch, from, &data, mode, actor_id).await
121 }
122
123 async fn ingest_with_current_actor(
124 &mut self,
125 branch: &str,
126 from: Option<&str>,
127 data: &str,
128 mode: LoadMode,
129 ) -> Result<IngestResult> {
130 self.ensure_schema_state_valid().await?;
131 let target_branch =
132 Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
133 let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
134 .unwrap_or_else(|| "main".to_string());
135 let branch_created = !self
136 .branch_list()
137 .await?
138 .iter()
139 .any(|name| name == &target_branch);
140 if branch_created {
141 self.branch_create_from(crate::db::ReadTarget::branch(&base_branch), &target_branch)
142 .await?;
143 }
144
145 let result = self.load(&target_branch, data, mode).await?;
146 Ok(IngestResult {
147 branch: target_branch,
148 base_branch,
149 branch_created,
150 mode,
151 tables: result.to_ingest_tables(),
152 })
153 }
154
155 pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
156 self.ensure_schema_state_valid().await?;
157 let requested = Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
158 if crate::db::is_internal_run_branch(&requested) {
159 return self
160 .load_direct_on_branch(Some(requested.as_str()), data, mode)
161 .await;
162 }
163
164 let target_head_before = self.latest_branch_snapshot_id(&requested).await?;
165 let op = format!("load_jsonl:branch={}:mode={}", requested, mode.as_str());
166 let run = self.begin_run(&requested, Some(op.as_str())).await?;
167 let staged_result = match self
168 .load_direct_on_branch(Some(run.run_branch.as_str()), data, mode)
169 .await
170 {
171 Ok(result) => result,
172 Err(err) => {
173 let _ = self.fail_run(&run.run_id).await;
174 return Err(err);
175 }
176 };
177
178 let target_head_now = self.latest_branch_snapshot_id(&requested).await?;
179 if target_head_now.as_str() != target_head_before.as_str() {
180 let _ = self.fail_run(&run.run_id).await;
181 return Err(OmniError::manifest_conflict(format!(
182 "target branch '{}' advanced during transactional load; retry",
183 requested
184 )));
185 }
186
187 if let Err(err) = self.publish_run(&run.run_id).await {
188 let _ = self.fail_run(&run.run_id).await;
189 return Err(err);
190 }
191
192 Ok(staged_result)
193 }
194
195 pub async fn load_file(
196 &mut self,
197 branch: &str,
198 path: &str,
199 mode: LoadMode,
200 ) -> Result<LoadResult> {
201 let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
202 self.load(branch, &data, mode).await
203 }
204
205 async fn load_direct_on_branch(
206 &mut self,
207 branch: Option<&str>,
208 data: &str,
209 mode: LoadMode,
210 ) -> Result<LoadResult> {
211 let reader = BufReader::new(Cursor::new(data.as_bytes()));
212 load_jsonl_reader(self, branch, reader, mode).await
213 }
214}
215
216impl LoadMode {
217 pub fn as_str(self) -> &'static str {
218 match self {
219 LoadMode::Overwrite => "overwrite",
220 LoadMode::Append => "append",
221 LoadMode::Merge => "merge",
222 }
223 }
224}
225
226impl LoadResult {
227 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
228 let mut tables = self
229 .nodes_loaded
230 .iter()
231 .map(|(type_name, rows_loaded)| IngestTableResult {
232 table_key: format!("node:{type_name}"),
233 rows_loaded: *rows_loaded,
234 })
235 .chain(
236 self.edges_loaded
237 .iter()
238 .map(|(edge_name, rows_loaded)| IngestTableResult {
239 table_key: format!("edge:{edge_name}"),
240 rows_loaded: *rows_loaded,
241 }),
242 )
243 .collect::<Vec<_>>();
244 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
245 tables
246 }
247}
248
249async fn load_jsonl_reader<R: BufRead>(
250 db: &mut Omnigraph,
251 branch: Option<&str>,
252 reader: R,
253 mode: LoadMode,
254) -> Result<LoadResult> {
255 let catalog = db.catalog().clone();
256
257 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
259 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
260
261 for (line_num, line) in reader.lines().enumerate() {
262 let line = line?;
263 let line = line.trim();
264 if line.is_empty() {
265 continue;
266 }
267 let value: JsonValue = serde_json::from_str(line).map_err(|e| {
268 OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
269 })?;
270
271 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
272 if !catalog.node_types.contains_key(type_name) {
273 return Err(OmniError::manifest(format!(
274 "line {}: unknown node type '{}'",
275 line_num + 1,
276 type_name
277 )));
278 }
279 let data = value
280 .get("data")
281 .cloned()
282 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
283 node_rows
284 .entry(type_name.to_string())
285 .or_default()
286 .push(data);
287 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
288 if catalog.lookup_edge_by_name(edge_name).is_none() {
289 return Err(OmniError::manifest(format!(
290 "line {}: unknown edge type '{}'",
291 line_num + 1,
292 edge_name
293 )));
294 }
295 let from = value
296 .get("from")
297 .and_then(|v| v.as_str())
298 .ok_or_else(|| {
299 OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
300 })?
301 .to_string();
302 let to = value
303 .get("to")
304 .and_then(|v| v.as_str())
305 .ok_or_else(|| {
306 OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
307 })?
308 .to_string();
309 let data = value
310 .get("data")
311 .cloned()
312 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
313 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
314 edge_rows
315 .entry(canonical)
316 .or_default()
317 .push((from, to, data));
318 } else {
319 return Err(OmniError::manifest(format!(
320 "line {}: expected 'type' or 'edge' field",
321 line_num + 1
322 )));
323 }
324 }
325
326 let mut updates = Vec::new();
329 let mut result = LoadResult::default();
330 let snapshot = db.snapshot_for_branch(branch).await?;
331
332 for (type_name, rows) in &node_rows {
334 let node_type = &catalog.node_types[type_name];
335 let batch = build_node_batch(node_type, rows)?;
336
337 validate_value_constraints(&batch, node_type)?;
339
340 let loaded_count = batch.num_rows();
341
342 let table_key = format!("node:{}", type_name);
343 snapshot
344 .entry(&table_key)
345 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
346
347 let (state, table_branch) =
348 write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
349
350 updates.push(crate::db::SubTableUpdate {
351 table_key,
352 table_version: state.version,
353 table_branch,
354 row_count: state.row_count,
355 version_metadata: state.version_metadata,
356 });
357 result.nodes_loaded.insert(type_name.clone(), loaded_count);
358 }
359
360 for (edge_name, rows) in &edge_rows {
363 let edge_type = &catalog.edge_types[edge_name];
364 let from_ids = collect_node_ids(
365 db,
366 branch,
367 &edge_type.from_type,
368 &node_rows,
369 &catalog,
370 &updates,
371 )
372 .await?;
373 let to_ids = collect_node_ids(
374 db,
375 branch,
376 &edge_type.to_type,
377 &node_rows,
378 &catalog,
379 &updates,
380 )
381 .await?;
382
383 for (i, (src, dst, _)) in rows.iter().enumerate() {
384 if !from_ids.contains(src.as_str()) {
385 return Err(OmniError::manifest(format!(
386 "edge {} row {}: src '{}' not found in {}",
387 edge_name,
388 i + 1,
389 src,
390 edge_type.from_type
391 )));
392 }
393 if !to_ids.contains(dst.as_str()) {
394 return Err(OmniError::manifest(format!(
395 "edge {} row {}: dst '{}' not found in {}",
396 edge_name,
397 i + 1,
398 dst,
399 edge_type.to_type
400 )));
401 }
402 }
403 }
404
405 for (edge_name, rows) in &edge_rows {
407 let edge_type = &catalog.edge_types[edge_name];
408 let batch = build_edge_batch(edge_type, rows)?;
409 let loaded_count = batch.num_rows();
410
411 let table_key = format!("edge:{}", edge_name);
412 snapshot
413 .entry(&table_key)
414 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
415
416 let (state, table_branch) =
417 write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
418
419 updates.push(crate::db::SubTableUpdate {
420 table_key,
421 table_version: state.version,
422 table_branch,
423 row_count: state.row_count,
424 version_metadata: state.version_metadata,
425 });
426 result.edges_loaded.insert(edge_name.clone(), loaded_count);
427 }
428
429 for (edge_name, _) in &edge_rows {
433 let table_key = format!("edge:{}", edge_name);
434 if let Some(update) = updates.iter().find(|u| u.table_key == table_key) {
435 validate_edge_cardinality(
436 db,
437 branch,
438 edge_name,
439 update.table_version,
440 update.table_branch.as_deref(),
441 )
442 .await?;
443 }
444 }
445
446 db.commit_updates_on_branch(branch, &updates).await?;
448
449 Ok(result)
450}
451
452fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
453 let schema = node_type.arrow_schema.clone();
454
455 let ids: Vec<String> = rows
457 .iter()
458 .map(|row| {
459 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
460 if let Some(key_prop) = node_type.key_property() {
461 let key_value = row
462 .get(key_prop)
463 .and_then(|v| v.as_str())
464 .map(|s| s.to_string())
465 .ok_or_else(|| {
466 OmniError::manifest(format!(
467 "node {} missing @key property '{}'",
468 node_type.name, key_prop
469 ))
470 })?;
471 if let Some(explicit_id) = explicit_id {
472 if explicit_id != key_value {
473 return Err(OmniError::manifest(format!(
474 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
475 node_type.name, explicit_id, key_prop, key_value
476 )));
477 }
478 }
479 Ok(key_value)
480 } else if let Some(explicit_id) = explicit_id {
481 Ok(explicit_id)
482 } else {
483 Ok(generate_id())
484 }
485 })
486 .collect::<Result<Vec<_>>>()?;
487
488 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
489 columns.push(Arc::new(StringArray::from(ids)));
490
491 for field in schema.fields().iter().skip(1) {
493 if node_type.blob_properties.contains(field.name()) {
494 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
495 columns.push(col);
496 } else {
497 let col =
498 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
499 columns.push(col);
500 }
501 }
502
503 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
504}
505
506fn build_edge_batch(
507 edge_type: &omnigraph_compiler::catalog::EdgeType,
508 rows: &[(String, String, JsonValue)],
509) -> Result<RecordBatch> {
510 let schema = edge_type.arrow_schema.clone();
511
512 let ids: Vec<String> = rows
513 .iter()
514 .map(|(_, _, data)| {
515 data.get("id")
516 .and_then(|v| v.as_str())
517 .map(str::to_string)
518 .unwrap_or_else(generate_id)
519 })
520 .collect();
521 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
522 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
523
524 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
525 columns.push(Arc::new(StringArray::from(ids)));
526 columns.push(Arc::new(StringArray::from(srcs)));
527 columns.push(Arc::new(StringArray::from(dsts)));
528
529 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
531 for field in schema.fields().iter().skip(3) {
532 if edge_type.blob_properties.contains(field.name()) {
533 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
534 columns.push(col);
535 } else {
536 let col = build_column_from_json(
537 field.name(),
538 field.data_type(),
539 field.is_nullable(),
540 &data_values,
541 )?;
542 columns.push(col);
543 }
544 }
545
546 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
547}
548
549pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
551 if let Some(encoded) = value.strip_prefix("base64:") {
552 let bytes = base64::engine::general_purpose::STANDARD
553 .decode(encoded)
554 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
555 builder
556 .push_bytes(bytes)
557 .map_err(|e| OmniError::Lance(e.to_string()))
558 } else {
559 builder
561 .push_uri(value)
562 .map_err(|e| OmniError::Lance(e.to_string()))
563 }
564}
565
566fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
568 let mut builder = BlobArrayBuilder::new(rows.len());
569 for row in rows {
570 match row.get(name) {
571 Some(JsonValue::String(s)) => {
572 append_blob_value(&mut builder, s)?;
573 }
574 Some(JsonValue::Null) | None if nullable => {
575 builder
576 .push_null()
577 .map_err(|e| OmniError::Lance(e.to_string()))?;
578 }
579 Some(JsonValue::Null) | None => {
580 return Err(OmniError::manifest(format!(
581 "non-nullable blob property '{}' has null values",
582 name
583 )));
584 }
585 _ => {
586 return Err(OmniError::manifest(format!(
587 "blob property '{}' must be a URI string or base64: prefixed data",
588 name
589 )));
590 }
591 }
592 }
593 builder
594 .finish()
595 .map_err(|e| OmniError::Lance(e.to_string()))
596}
597
598fn build_column_from_json(
599 name: &str,
600 data_type: &DataType,
601 nullable: bool,
602 rows: &[JsonValue],
603) -> Result<ArrayRef> {
604 let array: ArrayRef = match data_type {
605 DataType::Utf8 => {
606 let values: Vec<Option<String>> = rows
607 .iter()
608 .map(|row| {
609 row.get(name)
610 .and_then(|v| v.as_str())
611 .map(|s| s.to_string())
612 })
613 .collect();
614 Arc::new(StringArray::from(values))
615 }
616 DataType::Int32 => {
617 let values: Vec<Option<i32>> = rows
618 .iter()
619 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
620 .collect();
621 Arc::new(Int32Array::from(values))
622 }
623 DataType::Int64 => {
624 let values: Vec<Option<i64>> = rows
625 .iter()
626 .map(|row| row.get(name).and_then(|v| v.as_i64()))
627 .collect();
628 Arc::new(Int64Array::from(values))
629 }
630 DataType::UInt32 => {
631 let values: Vec<Option<u32>> = rows
632 .iter()
633 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
634 .collect();
635 Arc::new(UInt32Array::from(values))
636 }
637 DataType::UInt64 => {
638 let values: Vec<Option<u64>> = rows
639 .iter()
640 .map(|row| row.get(name).and_then(|v| v.as_u64()))
641 .collect();
642 Arc::new(UInt64Array::from(values))
643 }
644 DataType::Float32 => {
645 let values: Vec<Option<f32>> = rows
646 .iter()
647 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
648 .collect();
649 Arc::new(Float32Array::from(values))
650 }
651 DataType::Float64 => {
652 let values: Vec<Option<f64>> = rows
653 .iter()
654 .map(|row| row.get(name).and_then(|v| v.as_f64()))
655 .collect();
656 Arc::new(Float64Array::from(values))
657 }
658 DataType::Boolean => {
659 let values: Vec<Option<bool>> = rows
660 .iter()
661 .map(|row| row.get(name).and_then(|v| v.as_bool()))
662 .collect();
663 Arc::new(BooleanArray::from(values))
664 }
665 DataType::Date32 => {
666 let mut values = Vec::with_capacity(rows.len());
667 for row in rows {
668 values.push(parse_date32_json_value(
669 row.get(name).unwrap_or(&JsonValue::Null),
670 )?);
671 }
672 Arc::new(Date32Array::from(values))
673 }
674 DataType::Date64 => {
675 let mut values = Vec::with_capacity(rows.len());
676 for row in rows {
677 values.push(parse_date64_json_value(
678 row.get(name).unwrap_or(&JsonValue::Null),
679 )?);
680 }
681 Arc::new(Date64Array::from(values))
682 }
683 DataType::List(field) => {
684 let mut builder = ListBuilder::with_capacity(
685 make_list_value_builder(field.data_type(), rows.len())?,
686 rows.len(),
687 )
688 .with_field(field.clone());
689 for row in rows {
690 let value = row.get(name).unwrap_or(&JsonValue::Null);
691 if value.is_null() {
692 builder.append(false);
693 continue;
694 }
695 let items = value.as_array().ok_or_else(|| {
696 OmniError::manifest(format!(
697 "list property '{}' expects a JSON array, got {}",
698 name, value
699 ))
700 })?;
701 for item in items {
702 append_json_list_item(builder.values(), field.data_type(), item)?;
703 }
704 builder.append(true);
705 }
706 Arc::new(builder.finish())
707 }
708 DataType::FixedSizeList(child_field, dim) => {
709 let dim = *dim;
711 let mut builder = FixedSizeListBuilder::with_capacity(
712 Float32Builder::with_capacity(rows.len() * dim as usize),
713 dim,
714 rows.len(),
715 )
716 .with_field(child_field.clone());
717 for row in rows {
718 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
719 if arr.len() != dim as usize {
720 return Err(OmniError::manifest(format!(
721 "vector property '{}' expects {} dimensions, got {}",
722 name,
723 dim,
724 arr.len()
725 )));
726 }
727 for val in arr {
728 builder
729 .values()
730 .append_value(val.as_f64().unwrap_or(0.0) as f32);
731 }
732 builder.append(true);
733 } else if nullable {
734 for _ in 0..dim as usize {
735 builder.values().append_null();
736 }
737 builder.append(false);
738 } else {
739 return Err(OmniError::manifest(format!(
740 "non-nullable vector property '{}' has null values",
741 name
742 )));
743 }
744 }
745 Arc::new(builder.finish())
746 }
747 _ => {
748 let values: Vec<Option<&str>> = vec![None; rows.len()];
750 Arc::new(StringArray::from(values))
751 }
752 };
753
754 if !nullable && array.null_count() > 0 {
755 return Err(OmniError::manifest(format!(
756 "non-nullable property '{}' has null or invalid values",
757 name
758 )));
759 }
760
761 Ok(array)
762}
763
764fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
765 Ok(match data_type {
766 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
767 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
768 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
769 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
770 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
771 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
772 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
773 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
774 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
775 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
776 other => {
777 return Err(OmniError::manifest(format!(
778 "unsupported list element data type {:?}",
779 other
780 )));
781 }
782 })
783}
784
785fn append_json_list_item(
786 builder: &mut Box<dyn ArrayBuilder>,
787 data_type: &DataType,
788 value: &JsonValue,
789) -> Result<()> {
790 match data_type {
791 DataType::Utf8 => {
792 let builder = builder
793 .as_any_mut()
794 .downcast_mut::<StringBuilder>()
795 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
796 if let Some(value) = value.as_str() {
797 builder.append_value(value);
798 } else {
799 builder.append_null();
800 }
801 }
802 DataType::Boolean => {
803 let builder = builder
804 .as_any_mut()
805 .downcast_mut::<BooleanBuilder>()
806 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
807 if let Some(value) = value.as_bool() {
808 builder.append_value(value);
809 } else {
810 builder.append_null();
811 }
812 }
813 DataType::Int32 => {
814 let builder = builder
815 .as_any_mut()
816 .downcast_mut::<Int32Builder>()
817 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
818 if let Some(value) = value.as_i64() {
819 let value = i32::try_from(value).map_err(|_| {
820 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
821 })?;
822 builder.append_value(value);
823 } else {
824 builder.append_null();
825 }
826 }
827 DataType::Int64 => {
828 let builder = builder
829 .as_any_mut()
830 .downcast_mut::<Int64Builder>()
831 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
832 if let Some(value) = value.as_i64() {
833 builder.append_value(value);
834 } else {
835 builder.append_null();
836 }
837 }
838 DataType::UInt32 => {
839 let builder = builder
840 .as_any_mut()
841 .downcast_mut::<UInt32Builder>()
842 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
843 if let Some(value) = value.as_u64() {
844 let value = u32::try_from(value).map_err(|_| {
845 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
846 })?;
847 builder.append_value(value);
848 } else {
849 builder.append_null();
850 }
851 }
852 DataType::UInt64 => {
853 let builder = builder
854 .as_any_mut()
855 .downcast_mut::<UInt64Builder>()
856 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
857 if let Some(value) = value.as_u64() {
858 builder.append_value(value);
859 } else {
860 builder.append_null();
861 }
862 }
863 DataType::Float32 => {
864 let builder = builder
865 .as_any_mut()
866 .downcast_mut::<Float32Builder>()
867 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
868 if let Some(value) = value.as_f64() {
869 builder.append_value(value as f32);
870 } else {
871 builder.append_null();
872 }
873 }
874 DataType::Float64 => {
875 let builder = builder
876 .as_any_mut()
877 .downcast_mut::<Float64Builder>()
878 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
879 if let Some(value) = value.as_f64() {
880 builder.append_value(value);
881 } else {
882 builder.append_null();
883 }
884 }
885 DataType::Date32 => {
886 let builder = builder
887 .as_any_mut()
888 .downcast_mut::<Date32Builder>()
889 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
890 if let Some(value) = parse_date32_json_value(value)? {
891 builder.append_value(value);
892 } else {
893 builder.append_null();
894 }
895 }
896 DataType::Date64 => {
897 let builder = builder
898 .as_any_mut()
899 .downcast_mut::<Date64Builder>()
900 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
901 if let Some(value) = parse_date64_json_value(value)? {
902 builder.append_value(value);
903 } else {
904 builder.append_null();
905 }
906 }
907 other => {
908 return Err(OmniError::manifest(format!(
909 "unsupported list element data type {:?}",
910 other
911 )));
912 }
913 }
914
915 Ok(())
916}
917
918fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
919 if value.is_null() {
920 return Ok(None);
921 }
922 if let Some(days) = value.as_i64() {
923 let days = i32::try_from(days)
924 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
925 return Ok(Some(days));
926 }
927 if let Some(days) = value.as_u64() {
928 let days = i32::try_from(days)
929 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
930 return Ok(Some(days));
931 }
932 if let Some(value) = value.as_str() {
933 return Ok(Some(parse_date32_literal(value)?));
934 }
935 Ok(None)
936}
937
938fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
939 if value.is_null() {
940 return Ok(None);
941 }
942 if let Some(ms) = value.as_i64() {
943 return Ok(Some(ms));
944 }
945 if let Some(ms) = value.as_u64() {
946 let ms = i64::try_from(ms)
947 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
948 return Ok(Some(ms));
949 }
950 if let Some(value) = value.as_str() {
951 return Ok(Some(parse_date64_literal(value)?));
952 }
953 Ok(None)
954}
955
956async fn write_batch_to_dataset(
958 db: &Omnigraph,
959 branch: Option<&str>,
960 table_key: &str,
961 batch: RecordBatch,
962 mode: LoadMode,
963) -> Result<(crate::table_store::TableState, Option<String>)> {
964 let (mut ds, full_path, table_branch) =
965 db.open_for_mutation_on_branch(branch, table_key).await?;
966 let table_store = db.table_store();
967
968 match mode {
969 LoadMode::Overwrite => {
970 let state = table_store
971 .overwrite_batch(&full_path, &mut ds, batch)
972 .await?;
973 Ok((state, table_branch))
974 }
975 LoadMode::Append => {
976 let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
977 Ok((state, table_branch))
978 }
979 LoadMode::Merge => {
980 let state = table_store
981 .merge_insert_batch(
982 &full_path,
983 ds,
984 batch,
985 vec!["id".to_string()],
986 lance::dataset::WhenMatched::UpdateAll,
987 lance::dataset::WhenNotMatched::InsertAll,
988 )
989 .await?;
990 Ok((state, table_branch))
991 }
992 }
993}
994
995fn generate_id() -> String {
996 ulid::Ulid::new().to_string()
997}
998
999pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1000 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1001 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1002 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1003 let out = casted
1004 .as_any()
1005 .downcast_ref::<Date32Array>()
1006 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1007 if out.is_null(0) {
1008 return Err(OmniError::manifest(format!(
1009 "invalid Date literal '{}'",
1010 value
1011 )));
1012 }
1013 Ok(out.value(0))
1014}
1015
1016pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1017 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1018 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1019 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1020 let out = casted
1021 .as_any()
1022 .downcast_ref::<Date64Array>()
1023 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1024 if out.is_null(0) {
1025 return Err(OmniError::manifest(format!(
1026 "invalid DateTime literal '{}'",
1027 value
1028 )));
1029 }
1030 Ok(out.value(0))
1031}
1032
1033pub(crate) fn validate_value_constraints(
1036 batch: &RecordBatch,
1037 node_type: &omnigraph_compiler::catalog::NodeType,
1038) -> Result<()> {
1039 use arrow_array::Array;
1040
1041 for rc in &node_type.range_constraints {
1043 let Some(col) = batch.column_by_name(&rc.property) else {
1044 continue;
1045 };
1046 for row in 0..batch.num_rows() {
1047 if col.is_null(row) {
1048 continue;
1049 }
1050 let value = extract_numeric_value(col, row);
1051 if let Some(val) = value {
1052 if val.is_nan() {
1053 return Err(OmniError::manifest(format!(
1054 "@range violation on {}.{}: value is NaN",
1055 node_type.name, rc.property
1056 )));
1057 }
1058 if let Some(ref min) = rc.min {
1059 let min_f = literal_value_to_f64(min);
1060 if val < min_f {
1061 return Err(OmniError::manifest(format!(
1062 "@range violation on {}.{}: value {} < min {}",
1063 node_type.name, rc.property, val, min_f
1064 )));
1065 }
1066 }
1067 if let Some(ref max) = rc.max {
1068 let max_f = literal_value_to_f64(max);
1069 if val > max_f {
1070 return Err(OmniError::manifest(format!(
1071 "@range violation on {}.{}: value {} > max {}",
1072 node_type.name, rc.property, val, max_f
1073 )));
1074 }
1075 }
1076 }
1077 }
1078 }
1079
1080 for cc in &node_type.check_constraints {
1082 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1083 OmniError::manifest(format!(
1084 "@check on {}.{} has invalid regex '{}': {}",
1085 node_type.name, cc.property, cc.pattern, e
1086 ))
1087 })?;
1088 let Some(col) = batch.column_by_name(&cc.property) else {
1089 continue;
1090 };
1091 let str_col = col.as_any().downcast_ref::<StringArray>();
1092 if let Some(str_col) = str_col {
1093 for row in 0..str_col.len() {
1094 if str_col.is_null(row) {
1095 continue;
1096 }
1097 let val = str_col.value(row);
1098 if !re.is_match(val) {
1099 return Err(OmniError::manifest(format!(
1100 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1101 node_type.name, cc.property, val, cc.pattern
1102 )));
1103 }
1104 }
1105 }
1106 }
1107
1108 Ok(())
1109}
1110
1111fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1112 use arrow_array::{
1113 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1114 };
1115 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1116 return Some(a.value(row) as f64);
1117 }
1118 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1119 return Some(a.value(row) as f64);
1120 }
1121 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1122 return Some(a.value(row) as f64);
1123 }
1124 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1125 return Some(a.value(row) as f64);
1126 }
1127 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1128 return Some(a.value(row) as f64);
1129 }
1130 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1131 return Some(a.value(row));
1132 }
1133 None
1134}
1135
1136fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1137 use omnigraph_compiler::catalog::LiteralValue;
1138 match v {
1139 LiteralValue::Integer(n) => *n as f64,
1140 LiteralValue::Float(f) => *f,
1141 }
1142}
1143
1144async fn validate_edge_cardinality(
1147 db: &crate::db::Omnigraph,
1148 branch: Option<&str>,
1149 edge_name: &str,
1150 written_version: u64,
1151 written_branch: Option<&str>,
1152) -> Result<()> {
1153 use arrow_array::Array;
1154 let catalog = db.catalog();
1155 let edge_type = &catalog.edge_types[edge_name];
1156 if edge_type.cardinality.is_default() {
1157 return Ok(());
1158 }
1159
1160 let snapshot = db.snapshot_for_branch(branch).await?;
1163 let table_key = format!("edge:{}", edge_name);
1164 let entry = snapshot
1165 .entry(&table_key)
1166 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1167 let ds = db
1168 .open_dataset_at_state(
1169 &entry.table_path,
1170 written_branch.or(entry.table_branch.as_deref()),
1171 written_version,
1172 )
1173 .await?;
1174
1175 let batches = db
1177 .table_store()
1178 .scan(&ds, Some(&["src"]), None, None)
1179 .await?;
1180
1181 let mut counts: HashMap<String, u32> = HashMap::new();
1182 for batch in &batches {
1183 let srcs = batch
1184 .column_by_name("src")
1185 .unwrap()
1186 .as_any()
1187 .downcast_ref::<StringArray>()
1188 .unwrap();
1189 for i in 0..srcs.len() {
1190 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1191 }
1192 }
1193
1194 let card = &edge_type.cardinality;
1195 for (src, count) in &counts {
1196 if let Some(max) = card.max {
1197 if *count > max {
1198 return Err(OmniError::manifest(format!(
1199 "@card violation on edge {}: source '{}' has {} edges (max {})",
1200 edge_name, src, count, max
1201 )));
1202 }
1203 }
1204 if *count < card.min {
1205 return Err(OmniError::manifest(format!(
1206 "@card violation on edge {}: source '{}' has {} edges (min {})",
1207 edge_name, src, count, card.min
1208 )));
1209 }
1210 }
1211
1212 Ok(())
1213}
1214
1215async fn collect_node_ids(
1220 db: &Omnigraph,
1221 branch: Option<&str>,
1222 type_name: &str,
1223 node_rows: &HashMap<String, Vec<JsonValue>>,
1224 catalog: &omnigraph_compiler::catalog::Catalog,
1225 updates: &[crate::db::SubTableUpdate],
1226) -> Result<HashSet<String>> {
1227 let mut ids = HashSet::new();
1228
1229 if let Some(rows) = node_rows.get(type_name) {
1231 if let Some(node_type) = catalog.node_types.get(type_name) {
1232 if let Some(key_prop) = node_type.key_property() {
1233 for row in rows {
1234 if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1235 ids.insert(id.to_string());
1236 }
1237 }
1238 }
1239 }
1240 }
1241
1242 let table_key = format!("node:{}", type_name);
1244 let snapshot = db.snapshot_for_branch(branch).await?;
1245 let Some(entry) = snapshot.entry(&table_key) else {
1246 return Ok(ids);
1247 };
1248 let updated = updates
1250 .iter()
1251 .find(|u| u.table_key == table_key)
1252 .map(|u| (u.table_version, u.table_branch.as_deref()));
1253 let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1254 let ds = db
1255 .open_dataset_at_state(&entry.table_path, branch, version)
1256 .await?;
1257
1258 let batches = db
1259 .table_store()
1260 .scan(&ds, Some(&["id"]), None, None)
1261 .await?;
1262
1263 for batch in &batches {
1264 let id_col = batch
1265 .column_by_name("id")
1266 .unwrap()
1267 .as_any()
1268 .downcast_ref::<StringArray>()
1269 .unwrap();
1270 for i in 0..batch.num_rows() {
1271 ids.insert(id_col.value(i).to_string());
1272 }
1273 }
1274
1275 Ok(ids)
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281 use crate::db::Omnigraph;
1282 use arrow_array::Array;
1283 use futures::TryStreamExt;
1284 use std::collections::HashMap;
1285
1286 const TEST_SCHEMA: &str = r#"
1287node Person {
1288 name: String @key
1289 age: I32?
1290}
1291node Company {
1292 name: String @key
1293}
1294edge Knows: Person -> Person {
1295 since: Date?
1296}
1297edge WorksAt: Person -> Company
1298"#;
1299
1300 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1301{"type": "Person", "data": {"name": "Bob", "age": 25}}
1302{"type": "Company", "data": {"name": "Acme"}}
1303{"edge": "Knows", "from": "Alice", "to": "Bob"}
1304{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1305"#;
1306
1307 #[tokio::test]
1308 async fn test_load_creates_data() {
1309 let dir = tempfile::tempdir().unwrap();
1310 let uri = dir.path().to_str().unwrap();
1311 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1312
1313 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1314 .await
1315 .unwrap();
1316
1317 assert_eq!(result.nodes_loaded["Person"], 2);
1318 assert_eq!(result.nodes_loaded["Company"], 1);
1319 assert_eq!(result.edges_loaded["Knows"], 1);
1320 assert_eq!(result.edges_loaded["WorksAt"], 1);
1321 }
1322
1323 #[tokio::test]
1324 async fn test_load_data_readable_via_lance() {
1325 let dir = tempfile::tempdir().unwrap();
1326 let uri = dir.path().to_str().unwrap();
1327 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1328 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1329 .await
1330 .unwrap();
1331
1332 let snap = db.snapshot();
1334 let person_ds = snap.open("node:Person").await.unwrap();
1335
1336 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1337
1338 let batches: Vec<RecordBatch> = person_ds
1340 .scan()
1341 .try_into_stream()
1342 .await
1343 .unwrap()
1344 .try_collect()
1345 .await
1346 .unwrap();
1347
1348 let batch = &batches[0];
1349 let ids = batch
1350 .column_by_name("id")
1351 .unwrap()
1352 .as_any()
1353 .downcast_ref::<StringArray>()
1354 .unwrap();
1355 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1357 assert!(id_values.contains(&"Alice"));
1358 assert!(id_values.contains(&"Bob"));
1359 }
1360
1361 #[tokio::test]
1362 async fn test_load_edges_reference_node_keys() {
1363 let dir = tempfile::tempdir().unwrap();
1364 let uri = dir.path().to_str().unwrap();
1365 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1366 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1367 .await
1368 .unwrap();
1369
1370 let snap = db.snapshot();
1371 let knows_ds = snap.open("edge:Knows").await.unwrap();
1372
1373 let batches: Vec<RecordBatch> = knows_ds
1374 .scan()
1375 .try_into_stream()
1376 .await
1377 .unwrap()
1378 .try_collect()
1379 .await
1380 .unwrap();
1381
1382 let batch = &batches[0];
1383 let srcs = batch
1384 .column_by_name("src")
1385 .unwrap()
1386 .as_any()
1387 .downcast_ref::<StringArray>()
1388 .unwrap();
1389 let dsts = batch
1390 .column_by_name("dst")
1391 .unwrap()
1392 .as_any()
1393 .downcast_ref::<StringArray>()
1394 .unwrap();
1395
1396 assert_eq!(srcs.value(0), "Alice");
1397 assert_eq!(dsts.value(0), "Bob");
1398 }
1399
1400 #[tokio::test]
1401 async fn test_load_manifest_version_advances() {
1402 let dir = tempfile::tempdir().unwrap();
1403 let uri = dir.path().to_str().unwrap();
1404 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1405 let v1 = db.version();
1406
1407 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1408 .await
1409 .unwrap();
1410
1411 assert!(db.version() > v1);
1412 }
1413
1414 #[tokio::test]
1415 async fn test_load_append_adds_rows() {
1416 let dir = tempfile::tempdir().unwrap();
1417 let uri = dir.path().to_str().unwrap();
1418 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1419
1420 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1421 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1422
1423 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1424 .await
1425 .unwrap();
1426 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1427
1428 let snap = db.snapshot();
1429 let person_ds = snap.open("node:Person").await.unwrap();
1430 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1431 }
1432
1433 #[tokio::test]
1434 async fn test_load_unknown_type_rejected() {
1435 let dir = tempfile::tempdir().unwrap();
1436 let uri = dir.path().to_str().unwrap();
1437 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1438
1439 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1440 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1441 assert!(result.is_err());
1442 }
1443
1444 #[tokio::test]
1445 async fn test_ingest_creates_branch_and_reports_tables() {
1446 let dir = tempfile::tempdir().unwrap();
1447 let uri = dir.path().to_str().unwrap();
1448 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1449
1450 let result = db
1451 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1452 .await
1453 .unwrap();
1454
1455 assert_eq!(result.branch, "feature");
1456 assert_eq!(result.base_branch, "main");
1457 assert!(result.branch_created);
1458 assert_eq!(result.mode, LoadMode::Overwrite);
1459 assert_eq!(
1460 result.tables,
1461 vec![
1462 IngestTableResult {
1463 table_key: "edge:Knows".to_string(),
1464 rows_loaded: 1
1465 },
1466 IngestTableResult {
1467 table_key: "edge:WorksAt".to_string(),
1468 rows_loaded: 1
1469 },
1470 IngestTableResult {
1471 table_key: "node:Company".to_string(),
1472 rows_loaded: 1
1473 },
1474 IngestTableResult {
1475 table_key: "node:Person".to_string(),
1476 rows_loaded: 2
1477 },
1478 ]
1479 );
1480 assert!(
1481 db.branch_list()
1482 .await
1483 .unwrap()
1484 .contains(&"feature".to_string())
1485 );
1486 }
1487
1488 #[tokio::test]
1489 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1490 let dir = tempfile::tempdir().unwrap();
1491 let uri = dir.path().to_str().unwrap();
1492 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1493 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1494 .await
1495 .unwrap();
1496 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1497 .await
1498 .unwrap();
1499
1500 let result = db
1501 .ingest(
1502 "feature",
1503 Some("missing-base"),
1504 r#"{"type":"Person","data":{"name":"Bob","age":26}}
1505{"type":"Person","data":{"name":"Eve","age":31}}"#,
1506 LoadMode::Merge,
1507 )
1508 .await
1509 .unwrap();
1510
1511 assert_eq!(result.branch, "feature");
1512 assert_eq!(result.base_branch, "missing-base");
1513 assert!(!result.branch_created);
1514 assert_eq!(result.mode, LoadMode::Merge);
1515 assert_eq!(
1516 result.tables,
1517 vec![IngestTableResult {
1518 table_key: "node:Person".to_string(),
1519 rows_loaded: 2
1520 }]
1521 );
1522
1523 let snap = db
1524 .snapshot_of(crate::db::ReadTarget::branch("feature"))
1525 .await
1526 .unwrap();
1527 let person_ds = snap.open("node:Person").await.unwrap();
1528 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
1529
1530 let batches: Vec<RecordBatch> = person_ds
1531 .scan()
1532 .try_into_stream()
1533 .await
1534 .unwrap()
1535 .try_collect()
1536 .await
1537 .unwrap();
1538 let mut ages_by_id = HashMap::new();
1539 for batch in &batches {
1540 let ids = batch
1541 .column_by_name("id")
1542 .unwrap()
1543 .as_any()
1544 .downcast_ref::<StringArray>()
1545 .unwrap();
1546 let ages = batch
1547 .column_by_name("age")
1548 .unwrap()
1549 .as_any()
1550 .downcast_ref::<Int32Array>()
1551 .unwrap();
1552 for idx in 0..ids.len() {
1553 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
1554 }
1555 }
1556
1557 assert_eq!(ages_by_id.get("Bob"), Some(&26));
1558 assert_eq!(ages_by_id.get("Eve"), Some(&31));
1559 assert_eq!(ages_by_id.get("Alice"), Some(&30));
1560 }
1561
1562 #[tokio::test]
1563 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
1564 let dir = tempfile::tempdir().unwrap();
1565 let uri = dir.path().to_str().unwrap();
1566 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1567
1568 db.ingest_as(
1569 "feature",
1570 Some("main"),
1571 TEST_DATA,
1572 LoadMode::Overwrite,
1573 Some("act-andrew"),
1574 )
1575 .await
1576 .unwrap();
1577
1578 let head = db
1579 .list_commits(Some("feature"))
1580 .await
1581 .unwrap()
1582 .into_iter()
1583 .last()
1584 .unwrap();
1585 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
1586 }
1587
1588 #[test]
1589 fn test_range_constraint_rejects_nan() {
1590 use arrow_array::{Float64Array, RecordBatch, StringArray};
1591 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
1592 use std::sync::Arc;
1593
1594 let schema = Arc::new(arrow_schema::Schema::new(vec![
1595 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
1596 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
1597 ]));
1598
1599 let batch = RecordBatch::try_new(
1600 schema.clone(),
1601 vec![
1602 Arc::new(StringArray::from(vec!["bad"])),
1603 Arc::new(Float64Array::from(vec![f64::NAN])),
1604 ],
1605 )
1606 .unwrap();
1607
1608 let node_type = NodeType {
1609 name: "Test".to_string(),
1610 implements: vec![],
1611 properties: Default::default(),
1612 key: None,
1613 unique_constraints: vec![],
1614 indices: vec![],
1615 range_constraints: vec![RangeConstraint {
1616 property: "score".to_string(),
1617 min: Some(LiteralValue::Float(0.0)),
1618 max: Some(LiteralValue::Float(1.0)),
1619 }],
1620 check_constraints: vec![],
1621 embed_sources: Default::default(),
1622 blob_properties: Default::default(),
1623 arrow_schema: schema,
1624 };
1625
1626 let result = validate_value_constraints(&batch, &node_type);
1627 assert!(result.is_err(), "expected NaN to be rejected");
1628 let err = result.unwrap_err().to_string();
1629 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
1630 }
1631}