1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29 pub branch: String,
31 pub base_branch: Option<String>,
35 pub branch_created: bool,
37 pub nodes_loaded: HashMap<String, usize>,
38 pub edges_loaded: HashMap<String, usize>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct IngestTableResult {
43 pub table_key: String,
44 pub rows_loaded: usize,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct IngestResult {
49 pub branch: String,
50 pub base_branch: String,
51 pub branch_created: bool,
52 pub mode: LoadMode,
53 pub tables: Vec<IngestTableResult>,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum LoadMode {
60 Overwrite,
62 Append,
64 Merge,
66}
67
68pub async fn load_jsonl(db: &Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
73 let current_branch = db.active_branch().await;
74 let branch = current_branch.as_deref().unwrap_or("main");
75 db.load(branch, data, mode).await
76}
77
78pub async fn load_jsonl_file(db: &Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
80 let current_branch = db.active_branch().await;
81 let branch = current_branch.as_deref().unwrap_or("main");
82 db.load_file(branch, path, mode).await
83}
84
85impl Omnigraph {
86 #[deprecated(
87 note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
88 )]
89 pub async fn ingest(
90 &self,
91 branch: &str,
92 from: Option<&str>,
93 data: &str,
94 mode: LoadMode,
95 ) -> Result<IngestResult> {
96 #[allow(deprecated)]
97 self.ingest_as(branch, from, data, mode, None).await
98 }
99
100 #[deprecated(
105 note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
106 )]
107 pub async fn ingest_as(
108 &self,
109 branch: &str,
110 from: Option<&str>,
111 data: &str,
112 mode: LoadMode,
113 actor_id: Option<&str>,
114 ) -> Result<IngestResult> {
115 let result = self
116 .load_as(branch, Some(from.unwrap_or("main")), data, mode, actor_id)
117 .await?;
118 Ok(IngestResult {
119 branch: result.branch.clone(),
120 base_branch: result
121 .base_branch
122 .clone()
123 .unwrap_or_else(|| "main".to_string()),
124 branch_created: result.branch_created,
125 mode,
126 tables: result.to_ingest_tables(),
127 })
128 }
129
130 #[deprecated(
131 note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
132 )]
133 pub async fn ingest_file(
134 &self,
135 branch: &str,
136 from: Option<&str>,
137 path: &str,
138 mode: LoadMode,
139 ) -> Result<IngestResult> {
140 #[allow(deprecated)]
141 self.ingest_file_as(branch, from, path, mode, None).await
142 }
143
144 #[deprecated(
145 note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
146 )]
147 pub async fn ingest_file_as(
148 &self,
149 branch: &str,
150 from: Option<&str>,
151 path: &str,
152 mode: LoadMode,
153 actor_id: Option<&str>,
154 ) -> Result<IngestResult> {
155 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
156 #[allow(deprecated)]
157 self.ingest_as(branch, from, &data, mode, actor_id).await
158 }
159
160 pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
161 self.load_as(branch, None, data, mode, None).await
162 }
163
164 pub async fn load_as(
172 &self,
173 branch: &str,
174 base: Option<&str>,
175 data: &str,
176 mode: LoadMode,
177 actor_id: Option<&str>,
178 ) -> Result<LoadResult> {
179 self.enforce(
186 omnigraph_policy::PolicyAction::Change,
187 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
188 actor_id,
189 )?;
190 self.heal_pending_recovery_sidecars().await?;
201 crate::db::ensure_public_branch_ref(branch, "load")?;
206 let requested = Self::normalize_branch_name(branch)?;
212 let base_branch = match base {
213 Some(base) => {
214 Some(Self::normalize_branch_name(base)?.unwrap_or_else(|| "main".to_string()))
215 }
216 None => None,
217 };
218 let mut branch_created = false;
221 if let (Some(target), Some(base_name)) = (requested.as_deref(), base_branch.as_deref()) {
222 let exists = self.branch_list().await?.iter().any(|name| name == target);
223 if !exists {
224 self.branch_create_from_as(
231 crate::db::ReadTarget::branch(base_name),
232 target,
233 actor_id,
234 )
235 .await?;
236 branch_created = true;
237 }
238 }
239 let mut result = self
243 .load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
244 .await?;
245 result.branch = requested.unwrap_or_else(|| "main".to_string());
246 result.base_branch = base_branch;
247 result.branch_created = branch_created;
248 Ok(result)
249 }
250
251 pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
252 self.load_file_as(branch, None, path, mode, None).await
253 }
254
255 pub async fn load_file_as(
259 &self,
260 branch: &str,
261 base: Option<&str>,
262 path: &str,
263 mode: LoadMode,
264 actor_id: Option<&str>,
265 ) -> Result<LoadResult> {
266 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
267 self.load_as(branch, base, &data, mode, actor_id).await
268 }
269
270 async fn load_direct_on_branch(
271 &self,
272 branch: Option<&str>,
273 data: &str,
274 mode: LoadMode,
275 actor_id: Option<&str>,
276 ) -> Result<LoadResult> {
277 let reader = BufReader::new(Cursor::new(data.as_bytes()));
278 load_jsonl_reader(self, branch, reader, mode, actor_id).await
279 }
280}
281
282impl LoadMode {
283 pub fn as_str(self) -> &'static str {
284 match self {
285 LoadMode::Overwrite => "overwrite",
286 LoadMode::Append => "append",
287 LoadMode::Merge => "merge",
288 }
289 }
290}
291
292impl LoadResult {
293 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
294 let mut tables = self
295 .nodes_loaded
296 .iter()
297 .map(|(type_name, rows_loaded)| IngestTableResult {
298 table_key: format!("node:{type_name}"),
299 rows_loaded: *rows_loaded,
300 })
301 .chain(
302 self.edges_loaded
303 .iter()
304 .map(|(edge_name, rows_loaded)| IngestTableResult {
305 table_key: format!("edge:{edge_name}"),
306 rows_loaded: *rows_loaded,
307 }),
308 )
309 .collect::<Vec<_>>();
310 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
311 tables
312 }
313}
314
315async fn load_jsonl_reader<R: BufRead>(
316 db: &Omnigraph,
317 branch: Option<&str>,
318 reader: R,
319 mode: LoadMode,
320 actor_id: Option<&str>,
321) -> Result<LoadResult> {
322 let catalog = db.catalog().clone();
323
324 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
326 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
327
328 for (idx, parsed) in serde_json::Deserializer::from_reader(reader)
333 .into_iter::<JsonValue>()
334 .enumerate()
335 {
336 let record_num = idx + 1;
337 let value: JsonValue = parsed.map_err(|e| {
338 OmniError::manifest(format!("invalid JSON at record {}: {}", record_num, e))
339 })?;
340
341 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
342 if !catalog.node_types.contains_key(type_name) {
343 return Err(OmniError::manifest(format!(
344 "record {}: unknown node type '{}'",
345 record_num, type_name
346 )));
347 }
348 let data = value
349 .get("data")
350 .cloned()
351 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
352 node_rows
353 .entry(type_name.to_string())
354 .or_default()
355 .push(data);
356 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
357 if catalog.lookup_edge_by_name(edge_name).is_none() {
358 return Err(OmniError::manifest(format!(
359 "record {}: unknown edge type '{}'",
360 record_num, edge_name
361 )));
362 }
363 let from = value
364 .get("from")
365 .and_then(|v| v.as_str())
366 .ok_or_else(|| {
367 OmniError::manifest(format!("record {}: edge missing 'from'", record_num))
368 })?
369 .to_string();
370 let to = value
371 .get("to")
372 .and_then(|v| v.as_str())
373 .ok_or_else(|| {
374 OmniError::manifest(format!("record {}: edge missing 'to'", record_num))
375 })?
376 .to_string();
377 let data = value
378 .get("data")
379 .cloned()
380 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
381 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
382 edge_rows
383 .entry(canonical)
384 .or_default()
385 .push((from, to, data));
386 } else {
387 return Err(OmniError::manifest(format!(
388 "record {}: expected 'type' or 'edge' field",
389 record_num
390 )));
391 }
392 }
393
394 let mut result = LoadResult::default();
403 let txn = db.open_write_txn(branch).await?;
412 let snapshot = txn.base.clone();
413 let mut staging = MutationStaging::default();
414 let pending_mode = match mode {
415 LoadMode::Merge => PendingMode::Merge,
416 LoadMode::Append => PendingMode::Append,
420 LoadMode::Overwrite => PendingMode::Overwrite,
421 };
422 let load_op_kind = match mode {
428 LoadMode::Append => crate::db::MutationOpKind::Insert,
429 LoadMode::Merge => crate::db::MutationOpKind::Merge,
430 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
431 };
432
433 let fork_queue_guards: Option<(
444 Vec<(String, Option<String>)>,
445 Vec<tokio::sync::OwnedMutexGuard<()>>,
446 )> = if let Some(active) = branch {
447 let touched: Vec<(String, Option<String>)> = node_rows
448 .keys()
449 .map(|t| (format!("node:{t}"), Some(active.to_string())))
450 .chain(
451 edge_rows
452 .keys()
453 .map(|e| (format!("edge:{e}"), Some(active.to_string()))),
454 )
455 .collect();
456 let needs_fork = touched.iter().any(|(table_key, _)| {
457 snapshot
458 .entry(table_key)
459 .map(|e| e.table_branch.as_deref() != Some(active))
460 .unwrap_or(false)
461 });
462 if needs_fork {
463 let guards = db.write_queue().acquire_many(&touched).await;
464 Some((touched, guards))
465 } else {
466 None
467 }
468 } else {
469 None
470 };
471
472 let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
475 Vec::with_capacity(node_rows.len());
476 for (type_name, rows) in &node_rows {
477 let node_type = &catalog.node_types[type_name];
478 let batch = build_node_batch(node_type, rows)?;
479 validate_value_constraints(&batch, node_type)?;
480 validate_enum_constraints(&batch, &node_type.properties, type_name)?;
481 let unique_groups = unique_constraint_groups_for_node(node_type);
482 if !unique_groups.is_empty() {
483 enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?;
484 }
485 let loaded_count = batch.num_rows();
486 let table_key = format!("node:{}", type_name);
487 let _entry = snapshot
488 .entry(&table_key)
489 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
490 prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
491 }
492
493 for (type_name, table_key, batch, loaded_count) in prepared_nodes {
496 let opened = db
501 .open_for_mutation_on_branch(branch, &table_key, load_op_kind, Some(&txn))
502 .await?;
503 staging.ensure_path(
504 &table_key,
505 opened.full_path,
506 opened.table_branch,
507 opened.expected_version,
508 load_op_kind,
509 );
510 let schema = batch.schema();
511 staging.append_batch(&table_key, schema, pending_mode, batch)?;
512 result.nodes_loaded.insert(type_name, loaded_count);
513 }
514
515 for (edge_name, rows) in &edge_rows {
522 let edge_type = &catalog.edge_types[edge_name];
523 let from_ids =
524 collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?;
525 let to_ids =
526 collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?;
527
528 for (i, (src, dst, _)) in rows.iter().enumerate() {
529 if !from_ids.contains(src.as_str()) {
530 return Err(OmniError::manifest(format!(
531 "edge {} row {}: src '{}' not found in {}",
532 edge_name,
533 i + 1,
534 src,
535 edge_type.from_type
536 )));
537 }
538 if !to_ids.contains(dst.as_str()) {
539 return Err(OmniError::manifest(format!(
540 "edge {} row {}: dst '{}' not found in {}",
541 edge_name,
542 i + 1,
543 dst,
544 edge_type.to_type
545 )));
546 }
547 }
548 }
549
550 let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
552 Vec::with_capacity(edge_rows.len());
553 for (edge_name, rows) in &edge_rows {
554 let edge_type = &catalog.edge_types[edge_name];
555 let batch = build_edge_batch(edge_type, rows)?;
556 validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
557 let unique_groups = unique_constraint_groups_for_edge(edge_type);
558 if !unique_groups.is_empty() {
559 enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?;
560 }
561 let loaded_count = batch.num_rows();
562 let table_key = format!("edge:{}", edge_name);
563 let _entry = snapshot
564 .entry(&table_key)
565 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
566 prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
567 }
568
569 for (edge_name, table_key, batch, loaded_count) in prepared_edges {
571 let opened = db
574 .open_for_mutation_on_branch(branch, &table_key, load_op_kind, Some(&txn))
575 .await?;
576 staging.ensure_path(
577 &table_key,
578 opened.full_path,
579 opened.table_branch,
580 opened.expected_version,
581 load_op_kind,
582 );
583 let schema = batch.schema();
584 staging.append_batch(&table_key, schema, pending_mode, batch)?;
585 result.edges_loaded.insert(edge_name, loaded_count);
586 }
587
588 for (edge_name, _) in &edge_rows {
593 let edge_type = &catalog.edge_types[edge_name];
594 let table_key = format!("edge:{}", edge_name);
595 validate_edge_cardinality_with_pending_loader(
596 db, branch, edge_type, &table_key, &staging, mode,
597 )
598 .await?;
599 }
600
601 let staged = staging
603 .stage_all_with_concurrency(db, branch, load_write_concurrency())
604 .await?;
605 let crate::exec::staging::CommittedMutation {
609 updates,
610 expected_versions,
611 sidecar_handle,
612 guards: _queue_guards,
613 committed_handles,
614 } = staged
615 .commit_all(
616 db,
617 branch,
618 crate::db::manifest::SidecarKind::Load,
619 actor_id,
620 fork_queue_guards,
621 Some(&txn),
622 )
623 .await?;
624 crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_POST_FINALIZE_PRE_PUBLISHER)?;
629 db.commit_updates_on_branch_with_expected(
630 branch,
631 &updates,
632 &expected_versions,
633 actor_id,
634 Some(&txn),
635 committed_handles,
636 )
637 .await?;
638 if let Some(handle) = sidecar_handle {
643 if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
644 tracing::warn!(
645 error = %err,
646 operation_id = handle.operation_id.as_str(),
647 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
648 );
649 }
650 }
651
652 Ok(result)
653}
654
655fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
656 let schema = node_type.arrow_schema.clone();
657
658 let ids: Vec<String> = rows
660 .iter()
661 .map(|row| {
662 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
663 if let Some(key_prop) = node_type.key_property() {
664 let key_value = row
665 .get(key_prop)
666 .and_then(|v| v.as_str())
667 .map(|s| s.to_string())
668 .ok_or_else(|| {
669 OmniError::manifest(format!(
670 "node {} missing @key property '{}'",
671 node_type.name, key_prop
672 ))
673 })?;
674 if let Some(explicit_id) = explicit_id {
675 if explicit_id != key_value {
676 return Err(OmniError::manifest(format!(
677 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
678 node_type.name, explicit_id, key_prop, key_value
679 )));
680 }
681 }
682 Ok(key_value)
683 } else if let Some(explicit_id) = explicit_id {
684 Ok(explicit_id)
685 } else {
686 Ok(generate_id())
687 }
688 })
689 .collect::<Result<Vec<_>>>()?;
690
691 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
692 columns.push(Arc::new(StringArray::from(ids)));
693
694 for field in schema.fields().iter().skip(1) {
696 if node_type.blob_properties.contains(field.name()) {
697 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
698 columns.push(col);
699 } else {
700 let col =
701 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
702 columns.push(col);
703 }
704 }
705
706 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
707}
708
709fn build_edge_batch(
710 edge_type: &omnigraph_compiler::catalog::EdgeType,
711 rows: &[(String, String, JsonValue)],
712) -> Result<RecordBatch> {
713 let schema = edge_type.arrow_schema.clone();
714
715 let ids: Vec<String> = rows
716 .iter()
717 .map(|(_, _, data)| {
718 data.get("id")
719 .and_then(|v| v.as_str())
720 .map(str::to_string)
721 .unwrap_or_else(generate_id)
722 })
723 .collect();
724 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
725 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
726
727 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
728 columns.push(Arc::new(StringArray::from(ids)));
729 columns.push(Arc::new(StringArray::from(srcs)));
730 columns.push(Arc::new(StringArray::from(dsts)));
731
732 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
734 for field in schema.fields().iter().skip(3) {
735 if edge_type.blob_properties.contains(field.name()) {
736 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
737 columns.push(col);
738 } else {
739 let col = build_column_from_json(
740 field.name(),
741 field.data_type(),
742 field.is_nullable(),
743 &data_values,
744 )?;
745 columns.push(col);
746 }
747 }
748
749 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
750}
751
752pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
754 if let Some(encoded) = value.strip_prefix("base64:") {
755 let bytes = base64::engine::general_purpose::STANDARD
756 .decode(encoded)
757 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
758 builder
759 .push_bytes(bytes)
760 .map_err(|e| OmniError::Lance(e.to_string()))
761 } else {
762 builder
764 .push_uri(value)
765 .map_err(|e| OmniError::Lance(e.to_string()))
766 }
767}
768
769fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
771 let mut builder = BlobArrayBuilder::new(rows.len());
772 for row in rows {
773 match row.get(name) {
774 Some(JsonValue::String(s)) => {
775 append_blob_value(&mut builder, s)?;
776 }
777 Some(JsonValue::Null) | None if nullable => {
778 builder
779 .push_null()
780 .map_err(|e| OmniError::Lance(e.to_string()))?;
781 }
782 Some(JsonValue::Null) | None => {
783 return Err(OmniError::manifest(format!(
784 "non-nullable blob property '{}' has null values",
785 name
786 )));
787 }
788 _ => {
789 return Err(OmniError::manifest(format!(
790 "blob property '{}' must be a URI string or base64: prefixed data",
791 name
792 )));
793 }
794 }
795 }
796 builder
797 .finish()
798 .map_err(|e| OmniError::Lance(e.to_string()))
799}
800
801fn build_column_from_json(
802 name: &str,
803 data_type: &DataType,
804 nullable: bool,
805 rows: &[JsonValue],
806) -> Result<ArrayRef> {
807 let array: ArrayRef = match data_type {
808 DataType::Utf8 => {
809 let values: Vec<Option<String>> = rows
810 .iter()
811 .map(|row| {
812 row.get(name)
813 .and_then(|v| v.as_str())
814 .map(|s| s.to_string())
815 })
816 .collect();
817 Arc::new(StringArray::from(values))
818 }
819 DataType::Int32 => {
820 let values: Vec<Option<i32>> = rows
821 .iter()
822 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
823 .collect();
824 Arc::new(Int32Array::from(values))
825 }
826 DataType::Int64 => {
827 let values: Vec<Option<i64>> = rows
828 .iter()
829 .map(|row| row.get(name).and_then(|v| v.as_i64()))
830 .collect();
831 Arc::new(Int64Array::from(values))
832 }
833 DataType::UInt32 => {
834 let values: Vec<Option<u32>> = rows
835 .iter()
836 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
837 .collect();
838 Arc::new(UInt32Array::from(values))
839 }
840 DataType::UInt64 => {
841 let values: Vec<Option<u64>> = rows
842 .iter()
843 .map(|row| row.get(name).and_then(|v| v.as_u64()))
844 .collect();
845 Arc::new(UInt64Array::from(values))
846 }
847 DataType::Float32 => {
848 let values: Vec<Option<f32>> = rows
849 .iter()
850 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
851 .collect();
852 Arc::new(Float32Array::from(values))
853 }
854 DataType::Float64 => {
855 let values: Vec<Option<f64>> = rows
856 .iter()
857 .map(|row| row.get(name).and_then(|v| v.as_f64()))
858 .collect();
859 Arc::new(Float64Array::from(values))
860 }
861 DataType::Boolean => {
862 let values: Vec<Option<bool>> = rows
863 .iter()
864 .map(|row| row.get(name).and_then(|v| v.as_bool()))
865 .collect();
866 Arc::new(BooleanArray::from(values))
867 }
868 DataType::Date32 => {
869 let mut values = Vec::with_capacity(rows.len());
870 for row in rows {
871 values.push(parse_date32_json_value(
872 row.get(name).unwrap_or(&JsonValue::Null),
873 )?);
874 }
875 Arc::new(Date32Array::from(values))
876 }
877 DataType::Date64 => {
878 let mut values = Vec::with_capacity(rows.len());
879 for row in rows {
880 values.push(parse_date64_json_value(
881 row.get(name).unwrap_or(&JsonValue::Null),
882 )?);
883 }
884 Arc::new(Date64Array::from(values))
885 }
886 DataType::List(field) => {
887 let mut builder = ListBuilder::with_capacity(
888 make_list_value_builder(field.data_type(), rows.len())?,
889 rows.len(),
890 )
891 .with_field(field.clone());
892 for row in rows {
893 let value = row.get(name).unwrap_or(&JsonValue::Null);
894 if value.is_null() {
895 builder.append(false);
896 continue;
897 }
898 let items = value.as_array().ok_or_else(|| {
899 OmniError::manifest(format!(
900 "list property '{}' expects a JSON array, got {}",
901 name, value
902 ))
903 })?;
904 for item in items {
905 append_json_list_item(builder.values(), field.data_type(), item)?;
906 }
907 builder.append(true);
908 }
909 Arc::new(builder.finish())
910 }
911 DataType::FixedSizeList(child_field, dim) => {
912 let dim = *dim;
914 let mut builder = FixedSizeListBuilder::with_capacity(
915 Float32Builder::with_capacity(rows.len() * dim as usize),
916 dim,
917 rows.len(),
918 )
919 .with_field(child_field.clone());
920 for row in rows {
921 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
922 if arr.len() != dim as usize {
923 return Err(OmniError::manifest(format!(
924 "vector property '{}' expects {} dimensions, got {}",
925 name,
926 dim,
927 arr.len()
928 )));
929 }
930 for val in arr {
931 builder
932 .values()
933 .append_value(val.as_f64().unwrap_or(0.0) as f32);
934 }
935 builder.append(true);
936 } else if nullable {
937 for _ in 0..dim as usize {
938 builder.values().append_null();
939 }
940 builder.append(false);
941 } else {
942 return Err(OmniError::manifest(format!(
943 "non-nullable vector property '{}' has null values",
944 name
945 )));
946 }
947 }
948 Arc::new(builder.finish())
949 }
950 _ => {
951 let values: Vec<Option<&str>> = vec![None; rows.len()];
953 Arc::new(StringArray::from(values))
954 }
955 };
956
957 if !nullable && array.null_count() > 0 {
958 return Err(OmniError::manifest(format!(
959 "non-nullable property '{}' has null or invalid values",
960 name
961 )));
962 }
963
964 Ok(array)
965}
966
967fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
968 Ok(match data_type {
969 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
970 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
971 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
972 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
973 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
974 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
975 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
976 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
977 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
978 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
979 other => {
980 return Err(OmniError::manifest(format!(
981 "unsupported list element data type {:?}",
982 other
983 )));
984 }
985 })
986}
987
988fn append_json_list_item(
989 builder: &mut Box<dyn ArrayBuilder>,
990 data_type: &DataType,
991 value: &JsonValue,
992) -> Result<()> {
993 match data_type {
994 DataType::Utf8 => {
995 let builder = builder
996 .as_any_mut()
997 .downcast_mut::<StringBuilder>()
998 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
999 if let Some(value) = value.as_str() {
1000 builder.append_value(value);
1001 } else {
1002 builder.append_null();
1003 }
1004 }
1005 DataType::Boolean => {
1006 let builder = builder
1007 .as_any_mut()
1008 .downcast_mut::<BooleanBuilder>()
1009 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
1010 if let Some(value) = value.as_bool() {
1011 builder.append_value(value);
1012 } else {
1013 builder.append_null();
1014 }
1015 }
1016 DataType::Int32 => {
1017 let builder = builder
1018 .as_any_mut()
1019 .downcast_mut::<Int32Builder>()
1020 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
1021 if let Some(value) = value.as_i64() {
1022 let value = i32::try_from(value).map_err(|_| {
1023 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
1024 })?;
1025 builder.append_value(value);
1026 } else {
1027 builder.append_null();
1028 }
1029 }
1030 DataType::Int64 => {
1031 let builder = builder
1032 .as_any_mut()
1033 .downcast_mut::<Int64Builder>()
1034 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1035 if let Some(value) = value.as_i64() {
1036 builder.append_value(value);
1037 } else {
1038 builder.append_null();
1039 }
1040 }
1041 DataType::UInt32 => {
1042 let builder = builder
1043 .as_any_mut()
1044 .downcast_mut::<UInt32Builder>()
1045 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1046 if let Some(value) = value.as_u64() {
1047 let value = u32::try_from(value).map_err(|_| {
1048 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1049 })?;
1050 builder.append_value(value);
1051 } else {
1052 builder.append_null();
1053 }
1054 }
1055 DataType::UInt64 => {
1056 let builder = builder
1057 .as_any_mut()
1058 .downcast_mut::<UInt64Builder>()
1059 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1060 if let Some(value) = value.as_u64() {
1061 builder.append_value(value);
1062 } else {
1063 builder.append_null();
1064 }
1065 }
1066 DataType::Float32 => {
1067 let builder = builder
1068 .as_any_mut()
1069 .downcast_mut::<Float32Builder>()
1070 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1071 if let Some(value) = value.as_f64() {
1072 builder.append_value(value as f32);
1073 } else {
1074 builder.append_null();
1075 }
1076 }
1077 DataType::Float64 => {
1078 let builder = builder
1079 .as_any_mut()
1080 .downcast_mut::<Float64Builder>()
1081 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1082 if let Some(value) = value.as_f64() {
1083 builder.append_value(value);
1084 } else {
1085 builder.append_null();
1086 }
1087 }
1088 DataType::Date32 => {
1089 let builder = builder
1090 .as_any_mut()
1091 .downcast_mut::<Date32Builder>()
1092 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1093 if let Some(value) = parse_date32_json_value(value)? {
1094 builder.append_value(value);
1095 } else {
1096 builder.append_null();
1097 }
1098 }
1099 DataType::Date64 => {
1100 let builder = builder
1101 .as_any_mut()
1102 .downcast_mut::<Date64Builder>()
1103 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1104 if let Some(value) = parse_date64_json_value(value)? {
1105 builder.append_value(value);
1106 } else {
1107 builder.append_null();
1108 }
1109 }
1110 other => {
1111 return Err(OmniError::manifest(format!(
1112 "unsupported list element data type {:?}",
1113 other
1114 )));
1115 }
1116 }
1117
1118 Ok(())
1119}
1120
1121fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1122 if value.is_null() {
1123 return Ok(None);
1124 }
1125 if let Some(days) = value.as_i64() {
1126 let days = i32::try_from(days)
1127 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1128 return Ok(Some(days));
1129 }
1130 if let Some(days) = value.as_u64() {
1131 let days = i32::try_from(days)
1132 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1133 return Ok(Some(days));
1134 }
1135 if let Some(value) = value.as_str() {
1136 return Ok(Some(parse_date32_literal(value)?));
1137 }
1138 Ok(None)
1139}
1140
1141fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1142 if value.is_null() {
1143 return Ok(None);
1144 }
1145 if let Some(ms) = value.as_i64() {
1146 return Ok(Some(ms));
1147 }
1148 if let Some(ms) = value.as_u64() {
1149 let ms = i64::try_from(ms)
1150 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1151 return Ok(Some(ms));
1152 }
1153 if let Some(value) = value.as_str() {
1154 return Ok(Some(parse_date64_literal(value)?));
1155 }
1156 Ok(None)
1157}
1158
1159const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1170
1171fn load_write_concurrency() -> usize {
1172 std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1173 .ok()
1174 .and_then(|v| v.parse::<usize>().ok())
1175 .filter(|v| *v > 0)
1176 .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1177}
1178
1179fn generate_id() -> String {
1180 ulid::Ulid::new().to_string()
1181}
1182
1183pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1184 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1185 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1186 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1187 let out = casted
1188 .as_any()
1189 .downcast_ref::<Date32Array>()
1190 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1191 if out.is_null(0) {
1192 return Err(OmniError::manifest(format!(
1193 "invalid Date literal '{}'",
1194 value
1195 )));
1196 }
1197 Ok(out.value(0))
1198}
1199
1200pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1201 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1202 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1203 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1204 let out = casted
1205 .as_any()
1206 .downcast_ref::<Date64Array>()
1207 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1208 if out.is_null(0) {
1209 return Err(OmniError::manifest(format!(
1210 "invalid DateTime literal '{}'",
1211 value
1212 )));
1213 }
1214 Ok(out.value(0))
1215}
1216
1217pub(crate) fn validate_value_constraints(
1220 batch: &RecordBatch,
1221 node_type: &omnigraph_compiler::catalog::NodeType,
1222) -> Result<()> {
1223 use arrow_array::Array;
1224
1225 for rc in &node_type.range_constraints {
1227 let Some(col) = batch.column_by_name(&rc.property) else {
1228 continue;
1229 };
1230 for row in 0..batch.num_rows() {
1231 if col.is_null(row) {
1232 continue;
1233 }
1234 let value = extract_numeric_value(col, row);
1235 if let Some(val) = value {
1236 if val.is_nan() {
1237 return Err(OmniError::manifest(format!(
1238 "@range violation on {}.{}: value is NaN",
1239 node_type.name, rc.property
1240 )));
1241 }
1242 if let Some(ref min) = rc.min {
1243 let min_f = literal_value_to_f64(min);
1244 if val < min_f {
1245 return Err(OmniError::manifest(format!(
1246 "@range violation on {}.{}: value {} < min {}",
1247 node_type.name, rc.property, val, min_f
1248 )));
1249 }
1250 }
1251 if let Some(ref max) = rc.max {
1252 let max_f = literal_value_to_f64(max);
1253 if val > max_f {
1254 return Err(OmniError::manifest(format!(
1255 "@range violation on {}.{}: value {} > max {}",
1256 node_type.name, rc.property, val, max_f
1257 )));
1258 }
1259 }
1260 }
1261 }
1262 }
1263
1264 for cc in &node_type.check_constraints {
1266 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1267 OmniError::manifest(format!(
1268 "@check on {}.{} has invalid regex '{}': {}",
1269 node_type.name, cc.property, cc.pattern, e
1270 ))
1271 })?;
1272 let Some(col) = batch.column_by_name(&cc.property) else {
1273 continue;
1274 };
1275 let str_col = col.as_any().downcast_ref::<StringArray>();
1276 if let Some(str_col) = str_col {
1277 for row in 0..str_col.len() {
1278 if str_col.is_null(row) {
1279 continue;
1280 }
1281 let val = str_col.value(row);
1282 if !re.is_match(val) {
1283 return Err(OmniError::manifest(format!(
1284 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1285 node_type.name, cc.property, val, cc.pattern
1286 )));
1287 }
1288 }
1289 }
1290 }
1291
1292 Ok(())
1293}
1294
1295pub(crate) fn validate_enum_constraints(
1302 batch: &RecordBatch,
1303 properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1304 type_name: &str,
1305) -> Result<()> {
1306 use arrow_array::{Array, ListArray};
1307
1308 for (prop_name, prop_type) in properties {
1309 let Some(allowed) = prop_type.enum_values.as_ref() else {
1310 continue;
1311 };
1312 let Some(col) = batch.column_by_name(prop_name) else {
1313 continue;
1314 };
1315 if prop_type.list {
1316 let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1317 continue;
1318 };
1319 for row in 0..list_col.len() {
1320 if list_col.is_null(row) {
1321 continue;
1322 }
1323 let item_arr = list_col.value(row);
1324 let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1325 continue;
1326 };
1327 for i in 0..str_arr.len() {
1328 if str_arr.is_null(i) {
1329 continue;
1330 }
1331 let val = str_arr.value(i);
1332 if !allowed.iter().any(|a| a.as_str() == val) {
1333 return Err(OmniError::manifest(format!(
1334 "invalid enum value '{}' for {}.{} (expected: {})",
1335 val,
1336 type_name,
1337 prop_name,
1338 allowed.join(", ")
1339 )));
1340 }
1341 }
1342 }
1343 } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1344 for row in 0..str_col.len() {
1345 if str_col.is_null(row) {
1346 continue;
1347 }
1348 let val = str_col.value(row);
1349 if !allowed.iter().any(|a| a.as_str() == val) {
1350 return Err(OmniError::manifest(format!(
1351 "invalid enum value '{}' for {}.{} (expected: {})",
1352 val,
1353 type_name,
1354 prop_name,
1355 allowed.join(", ")
1356 )));
1357 }
1358 }
1359 }
1360 }
1361 Ok(())
1362}
1363
1364pub(crate) fn enforce_unique_constraints_intra_batch(
1379 batch: &RecordBatch,
1380 type_name: &str,
1381 unique_constraints: &[Vec<String>],
1382) -> Result<()> {
1383 for columns in unique_constraints {
1384 let Some(group_columns) = columns
1387 .iter()
1388 .map(|name| {
1389 batch
1390 .schema()
1391 .index_of(name)
1392 .ok()
1393 .map(|i| batch.column(i).clone())
1394 })
1395 .collect::<Option<Vec<ArrayRef>>>()
1396 else {
1397 continue;
1398 };
1399 let mut seen: HashMap<Vec<String>, usize> = HashMap::new();
1400 for row in 0..batch.num_rows() {
1401 let Some(key) = composite_unique_key(&group_columns, row)? else {
1402 continue;
1403 };
1404 if let Some(prev_row) = seen.insert(key.clone(), row) {
1405 return Err(OmniError::manifest(format!(
1406 "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1407 type_name,
1408 format_tuple(columns),
1409 format_tuple(&key),
1410 prev_row,
1411 row
1412 )));
1413 }
1414 }
1415 }
1416 Ok(())
1417}
1418
1419pub(crate) fn composite_unique_key(
1436 group_columns: &[ArrayRef],
1437 row: usize,
1438) -> Result<Option<Vec<String>>> {
1439 let mut parts = Vec::with_capacity(group_columns.len());
1440 for column in group_columns {
1441 match unique_key_scalar(column, row)? {
1442 Some(value) => parts.push(value),
1443 None => return Ok(None),
1444 }
1445 }
1446 Ok(Some(parts))
1447}
1448
1449fn format_tuple(items: &[String]) -> String {
1453 match items {
1454 [single] => single.clone(),
1455 _ => format!("({})", items.join(", ")),
1456 }
1457}
1458
1459fn unique_key_scalar(array: &ArrayRef, row: usize) -> Result<Option<String>> {
1474 use arrow_array::{Array, LargeStringArray, StringViewArray};
1475 if array.is_null(row) {
1476 return Ok(None);
1477 }
1478 if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1479 return Ok(Some(a.value(row).to_string()));
1480 }
1481 if let Some(a) = array.as_any().downcast_ref::<LargeStringArray>() {
1482 return Ok(Some(a.value(row).to_string()));
1483 }
1484 if let Some(a) = array.as_any().downcast_ref::<StringViewArray>() {
1485 return Ok(Some(a.value(row).to_string()));
1486 }
1487 if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1488 return Ok(Some(a.value(row).to_string()));
1489 }
1490 if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1491 return Ok(Some(a.value(row).to_string()));
1492 }
1493 if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1494 return Ok(Some(a.value(row).to_string()));
1495 }
1496 if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1497 return Ok(Some(a.value(row).to_string()));
1498 }
1499 if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1500 return Ok(Some(a.value(row).to_string()));
1501 }
1502 if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1503 return Ok(Some(a.value(row).to_string()));
1504 }
1505 if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1506 return Ok(Some(a.value(row).to_string()));
1507 }
1508 if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1509 return Ok(Some(a.value(row).to_string()));
1510 }
1511 if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1512 return Ok(Some(a.value(row).to_string()));
1513 }
1514 Err(OmniError::manifest(format!(
1515 "uniqueness key: unsupported column type {:?} for @unique/@key enforcement",
1516 array.data_type()
1517 )))
1518}
1519
1520pub(crate) fn unique_constraint_groups_for_node(
1527 node_type: &omnigraph_compiler::catalog::NodeType,
1528) -> Vec<Vec<String>> {
1529 let mut groups: Vec<Vec<String>> = node_type.unique_constraints.clone();
1530 if let Some(key) = &node_type.key
1531 && !groups.contains(key)
1532 {
1533 groups.push(key.clone());
1534 }
1535 groups
1536}
1537
1538pub(crate) fn unique_constraint_groups_for_edge(
1541 edge_type: &omnigraph_compiler::catalog::EdgeType,
1542) -> Vec<Vec<String>> {
1543 edge_type.unique_constraints.clone()
1544}
1545
1546fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1547 use arrow_array::{
1548 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1549 };
1550 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1551 return Some(a.value(row) as f64);
1552 }
1553 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1554 return Some(a.value(row) as f64);
1555 }
1556 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1557 return Some(a.value(row) as f64);
1558 }
1559 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1560 return Some(a.value(row) as f64);
1561 }
1562 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1563 return Some(a.value(row) as f64);
1564 }
1565 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1566 return Some(a.value(row));
1567 }
1568 None
1569}
1570
1571fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1572 use omnigraph_compiler::catalog::LiteralValue;
1573 match v {
1574 LiteralValue::Integer(n) => *n as f64,
1575 LiteralValue::Float(f) => *f,
1576 }
1577}
1578
1579pub(crate) async fn validate_edge_cardinality(
1582 db: &crate::db::Omnigraph,
1583 branch: Option<&str>,
1584 edge_name: &str,
1585 written_version: u64,
1586 written_branch: Option<&str>,
1587) -> Result<()> {
1588 use arrow_array::Array;
1589 let catalog = db.catalog();
1590 let edge_type = &catalog.edge_types[edge_name];
1591 if edge_type.cardinality.is_default() {
1592 return Ok(());
1593 }
1594
1595 let snapshot = db.snapshot_for_branch(branch).await?;
1598 let table_key = format!("edge:{}", edge_name);
1599 let entry = snapshot
1600 .entry(&table_key)
1601 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1602 let ds = db
1603 .open_dataset_at_state(
1604 &entry.table_path,
1605 written_branch.or(entry.table_branch.as_deref()),
1606 written_version,
1607 )
1608 .await?;
1609
1610 let batches = db.storage().scan(&ds, Some(&["src"]), None, None).await?;
1612
1613 let mut counts: HashMap<String, u32> = HashMap::new();
1614 for batch in &batches {
1615 let srcs = batch
1616 .column_by_name("src")
1617 .unwrap()
1618 .as_any()
1619 .downcast_ref::<StringArray>()
1620 .unwrap();
1621 for i in 0..srcs.len() {
1622 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1623 }
1624 }
1625
1626 let card = &edge_type.cardinality;
1627 for (src, count) in &counts {
1628 if let Some(max) = card.max {
1629 if *count > max {
1630 return Err(OmniError::manifest(format!(
1631 "@card violation on edge {}: source '{}' has {} edges (max {})",
1632 edge_name, src, count, max
1633 )));
1634 }
1635 }
1636 if *count < card.min {
1637 return Err(OmniError::manifest(format!(
1638 "@card violation on edge {}: source '{}' has {} edges (min {})",
1639 edge_name, src, count, card.min
1640 )));
1641 }
1642 }
1643
1644 Ok(())
1645}
1646
1647async fn validate_edge_cardinality_with_pending_loader(
1662 db: &Omnigraph,
1663 branch: Option<&str>,
1664 edge_type: &omnigraph_compiler::catalog::EdgeType,
1665 table_key: &str,
1666 staging: &MutationStaging,
1667 mode: LoadMode,
1668) -> Result<()> {
1669 if edge_type.cardinality.is_default() {
1670 return Ok(());
1671 }
1672 let snapshot = db.snapshot_for_branch(branch).await?;
1673 let Some(entry) = snapshot.entry(table_key) else {
1674 return Ok(());
1677 };
1678 let ds = db
1679 .open_dataset_at_state(
1680 &entry.table_path,
1681 entry.table_branch.as_deref(),
1682 entry.table_version,
1683 )
1684 .await?;
1685 let dedupe_key = match mode {
1686 LoadMode::Merge => Some("id"),
1687 LoadMode::Append | LoadMode::Overwrite => None,
1688 };
1689 let counts =
1690 crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
1691 crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1692}
1693
1694async fn collect_node_ids_with_pending(
1708 db: &Omnigraph,
1709 branch: Option<&str>,
1710 type_name: &str,
1711 staging: &MutationStaging,
1712) -> Result<HashSet<String>> {
1713 let mut ids = HashSet::new();
1714 let table_key = format!("node:{}", type_name);
1715
1716 for batch in staging.pending_batches(&table_key) {
1718 if let Some(col) = batch.column_by_name("id") {
1719 if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1720 for i in 0..arr.len() {
1721 if arr.is_valid(i) {
1722 ids.insert(arr.value(i).to_string());
1723 }
1724 }
1725 }
1726 }
1727 }
1728
1729 if staging.pending_mode(&table_key) == Some(PendingMode::Overwrite) {
1730 return Ok(ids);
1731 }
1732
1733 let snapshot = db.snapshot_for_branch(branch).await?;
1735 let Some(entry) = snapshot.entry(&table_key) else {
1736 return Ok(ids);
1737 };
1738 let ds = db
1739 .open_dataset_at_state(
1740 &entry.table_path,
1741 entry.table_branch.as_deref(),
1742 entry.table_version,
1743 )
1744 .await?;
1745
1746 let batches = db.storage().scan(&ds, Some(&["id"]), None, None).await?;
1747
1748 for batch in &batches {
1749 let id_col = batch
1750 .column_by_name("id")
1751 .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1752 .as_any()
1753 .downcast_ref::<StringArray>()
1754 .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1755 for i in 0..batch.num_rows() {
1756 if id_col.is_valid(i) {
1761 ids.insert(id_col.value(i).to_string());
1762 }
1763 }
1764 }
1765
1766 Ok(ids)
1767}
1768
1769#[cfg(test)]
1770mod tests {
1771 use super::*;
1772 use crate::db::Omnigraph;
1773 use arrow_array::Array;
1774 use futures::TryStreamExt;
1775 use std::collections::HashMap;
1776
1777 const TEST_SCHEMA: &str = r#"
1778node Person {
1779 name: String @key
1780 age: I32?
1781}
1782node Company {
1783 name: String @key
1784}
1785edge Knows: Person -> Person {
1786 since: Date?
1787}
1788edge WorksAt: Person -> Company
1789"#;
1790
1791 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1792{"type": "Person", "data": {"name": "Bob", "age": 25}}
1793{"type": "Company", "data": {"name": "Acme"}}
1794{"edge": "Knows", "from": "Alice", "to": "Bob"}
1795{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1796"#;
1797
1798 #[tokio::test]
1799 async fn test_load_creates_data() {
1800 let dir = tempfile::tempdir().unwrap();
1801 let uri = dir.path().to_str().unwrap();
1802 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1803
1804 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1805 .await
1806 .unwrap();
1807
1808 assert_eq!(result.nodes_loaded["Person"], 2);
1809 assert_eq!(result.nodes_loaded["Company"], 1);
1810 assert_eq!(result.edges_loaded["Knows"], 1);
1811 assert_eq!(result.edges_loaded["WorksAt"], 1);
1812 }
1813
1814 #[tokio::test]
1815 async fn test_load_data_readable_via_lance() {
1816 let dir = tempfile::tempdir().unwrap();
1817 let uri = dir.path().to_str().unwrap();
1818 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1819 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1820 .await
1821 .unwrap();
1822
1823 let snap = db.snapshot().await;
1825 let person_ds = snap.open("node:Person").await.unwrap();
1826
1827 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1828
1829 let batches: Vec<RecordBatch> = person_ds
1831 .scan()
1832 .try_into_stream()
1833 .await
1834 .unwrap()
1835 .try_collect()
1836 .await
1837 .unwrap();
1838
1839 let batch = &batches[0];
1840 let ids = batch
1841 .column_by_name("id")
1842 .unwrap()
1843 .as_any()
1844 .downcast_ref::<StringArray>()
1845 .unwrap();
1846 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1848 assert!(id_values.contains(&"Alice"));
1849 assert!(id_values.contains(&"Bob"));
1850 }
1851
1852 #[tokio::test]
1853 async fn test_load_edges_reference_node_keys() {
1854 let dir = tempfile::tempdir().unwrap();
1855 let uri = dir.path().to_str().unwrap();
1856 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1857 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1858 .await
1859 .unwrap();
1860
1861 let snap = db.snapshot().await;
1862 let knows_ds = snap.open("edge:Knows").await.unwrap();
1863
1864 let batches: Vec<RecordBatch> = knows_ds
1865 .scan()
1866 .try_into_stream()
1867 .await
1868 .unwrap()
1869 .try_collect()
1870 .await
1871 .unwrap();
1872
1873 let batch = &batches[0];
1874 let srcs = batch
1875 .column_by_name("src")
1876 .unwrap()
1877 .as_any()
1878 .downcast_ref::<StringArray>()
1879 .unwrap();
1880 let dsts = batch
1881 .column_by_name("dst")
1882 .unwrap()
1883 .as_any()
1884 .downcast_ref::<StringArray>()
1885 .unwrap();
1886
1887 assert_eq!(srcs.value(0), "Alice");
1888 assert_eq!(dsts.value(0), "Bob");
1889 }
1890
1891 #[tokio::test]
1892 async fn test_load_manifest_version_advances() {
1893 let dir = tempfile::tempdir().unwrap();
1894 let uri = dir.path().to_str().unwrap();
1895 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1896 let v1 = db.version().await;
1897
1898 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1899 .await
1900 .unwrap();
1901
1902 assert!(db.version().await > v1);
1903 }
1904
1905 #[tokio::test]
1906 async fn test_load_append_adds_rows() {
1907 let dir = tempfile::tempdir().unwrap();
1908 let uri = dir.path().to_str().unwrap();
1909 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1910
1911 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1912 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1913
1914 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1915 .await
1916 .unwrap();
1917 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1918
1919 let snap = db.snapshot().await;
1920 let person_ds = snap.open("node:Person").await.unwrap();
1921 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1922 }
1923
1924 #[tokio::test]
1925 async fn test_load_unknown_type_rejected() {
1926 let dir = tempfile::tempdir().unwrap();
1927 let uri = dir.path().to_str().unwrap();
1928 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1929
1930 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1931 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1932 assert!(result.is_err());
1933 }
1934
1935 #[tokio::test]
1936 #[allow(deprecated)]
1937 async fn test_ingest_creates_branch_and_reports_tables() {
1938 let dir = tempfile::tempdir().unwrap();
1939 let uri = dir.path().to_str().unwrap();
1940 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1941
1942 let result = db
1943 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1944 .await
1945 .unwrap();
1946
1947 assert_eq!(result.branch, "feature");
1948 assert_eq!(result.base_branch, "main");
1949 assert!(result.branch_created);
1950 assert_eq!(result.mode, LoadMode::Overwrite);
1951 assert_eq!(
1952 result.tables,
1953 vec![
1954 IngestTableResult {
1955 table_key: "edge:Knows".to_string(),
1956 rows_loaded: 1
1957 },
1958 IngestTableResult {
1959 table_key: "edge:WorksAt".to_string(),
1960 rows_loaded: 1
1961 },
1962 IngestTableResult {
1963 table_key: "node:Company".to_string(),
1964 rows_loaded: 1
1965 },
1966 IngestTableResult {
1967 table_key: "node:Person".to_string(),
1968 rows_loaded: 2
1969 },
1970 ]
1971 );
1972 assert!(
1973 db.branch_list()
1974 .await
1975 .unwrap()
1976 .contains(&"feature".to_string())
1977 );
1978 }
1979
1980 #[tokio::test]
1981 #[allow(deprecated)]
1982 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1983 let dir = tempfile::tempdir().unwrap();
1984 let uri = dir.path().to_str().unwrap();
1985 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1986 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1987 .await
1988 .unwrap();
1989 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1990 .await
1991 .unwrap();
1992
1993 let result = db
1994 .ingest(
1995 "feature",
1996 Some("missing-base"),
1997 r#"{"type":"Person","data":{"name":"Bob","age":26}}
1998{"type":"Person","data":{"name":"Eve","age":31}}"#,
1999 LoadMode::Merge,
2000 )
2001 .await
2002 .unwrap();
2003
2004 assert_eq!(result.branch, "feature");
2005 assert_eq!(result.base_branch, "missing-base");
2006 assert!(!result.branch_created);
2007 assert_eq!(result.mode, LoadMode::Merge);
2008 assert_eq!(
2009 result.tables,
2010 vec![IngestTableResult {
2011 table_key: "node:Person".to_string(),
2012 rows_loaded: 2
2013 }]
2014 );
2015
2016 let snap = db
2017 .snapshot_of(crate::db::ReadTarget::branch("feature"))
2018 .await
2019 .unwrap();
2020 let person_ds = snap.open("node:Person").await.unwrap();
2021 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2022
2023 let batches: Vec<RecordBatch> = person_ds
2024 .scan()
2025 .try_into_stream()
2026 .await
2027 .unwrap()
2028 .try_collect()
2029 .await
2030 .unwrap();
2031 let mut ages_by_id = HashMap::new();
2032 for batch in &batches {
2033 let ids = batch
2034 .column_by_name("id")
2035 .unwrap()
2036 .as_any()
2037 .downcast_ref::<StringArray>()
2038 .unwrap();
2039 let ages = batch
2040 .column_by_name("age")
2041 .unwrap()
2042 .as_any()
2043 .downcast_ref::<Int32Array>()
2044 .unwrap();
2045 for idx in 0..ids.len() {
2046 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2047 }
2048 }
2049
2050 assert_eq!(ages_by_id.get("Bob"), Some(&26));
2051 assert_eq!(ages_by_id.get("Eve"), Some(&31));
2052 assert_eq!(ages_by_id.get("Alice"), Some(&30));
2053 }
2054
2055 #[tokio::test]
2056 #[allow(deprecated)]
2057 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2058 let dir = tempfile::tempdir().unwrap();
2059 let uri = dir.path().to_str().unwrap();
2060 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2061
2062 db.ingest_as(
2063 "feature",
2064 Some("main"),
2065 TEST_DATA,
2066 LoadMode::Overwrite,
2067 Some("act-andrew"),
2068 )
2069 .await
2070 .unwrap();
2071
2072 let head = db
2073 .list_commits(Some("feature"))
2074 .await
2075 .unwrap()
2076 .into_iter()
2077 .last()
2078 .unwrap();
2079 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2080 }
2081
2082 #[tokio::test]
2083 async fn test_load_as_with_base_forks_missing_branch_and_stamps_metadata() {
2084 let dir = tempfile::tempdir().unwrap();
2085 let uri = dir.path().to_str().unwrap();
2086 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2087
2088 let result = db
2089 .load_as("feature", Some("main"), TEST_DATA, LoadMode::Merge, None)
2090 .await
2091 .unwrap();
2092
2093 assert_eq!(result.branch, "feature");
2094 assert_eq!(result.base_branch.as_deref(), Some("main"));
2095 assert!(result.branch_created);
2096 assert!(
2097 db.branch_list()
2098 .await
2099 .unwrap()
2100 .contains(&"feature".to_string())
2101 );
2102
2103 let again = db
2106 .load_as(
2107 "feature",
2108 Some("main"),
2109 r#"{"type":"Person","data":{"name":"Bob","age":26}}"#,
2110 LoadMode::Merge,
2111 None,
2112 )
2113 .await
2114 .unwrap();
2115 assert!(!again.branch_created);
2116 assert_eq!(again.base_branch.as_deref(), Some("main"));
2117 }
2118
2119 #[tokio::test]
2120 async fn test_load_as_without_base_errors_on_missing_branch() {
2121 let dir = tempfile::tempdir().unwrap();
2122 let uri = dir.path().to_str().unwrap();
2123 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2124
2125 let result = db
2126 .load_as("nonexistent", None, TEST_DATA, LoadMode::Merge, None)
2127 .await;
2128 assert!(result.is_err(), "load without base must not create branches");
2129 assert!(
2130 !db.branch_list()
2131 .await
2132 .unwrap()
2133 .contains(&"nonexistent".to_string()),
2134 "failed load must not leave a branch behind"
2135 );
2136
2137 let main_load = db.load("main", TEST_DATA, LoadMode::Overwrite).await.unwrap();
2139 assert_eq!(main_load.branch, "main");
2140 assert_eq!(main_load.base_branch, None);
2141 assert!(!main_load.branch_created);
2142 }
2143
2144 #[test]
2145 fn test_range_constraint_rejects_nan() {
2146 use arrow_array::{Float64Array, RecordBatch, StringArray};
2147 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2148 use std::sync::Arc;
2149
2150 let schema = Arc::new(arrow_schema::Schema::new(vec![
2151 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2152 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2153 ]));
2154
2155 let batch = RecordBatch::try_new(
2156 schema.clone(),
2157 vec![
2158 Arc::new(StringArray::from(vec!["bad"])),
2159 Arc::new(Float64Array::from(vec![f64::NAN])),
2160 ],
2161 )
2162 .unwrap();
2163
2164 let node_type = NodeType {
2165 name: "Test".to_string(),
2166 implements: vec![],
2167 properties: Default::default(),
2168 key: None,
2169 unique_constraints: vec![],
2170 indices: vec![],
2171 range_constraints: vec![RangeConstraint {
2172 property: "score".to_string(),
2173 min: Some(LiteralValue::Float(0.0)),
2174 max: Some(LiteralValue::Float(1.0)),
2175 }],
2176 check_constraints: vec![],
2177 embed_sources: Default::default(),
2178 blob_properties: Default::default(),
2179 arrow_schema: schema,
2180 };
2181
2182 let result = validate_value_constraints(&batch, &node_type);
2183 assert!(result.is_err(), "expected NaN to be rejected");
2184 let err = result.unwrap_err().to_string();
2185 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2186 }
2187
2188 #[test]
2189 fn composite_unique_key_builds_tuple_and_exempts_null() {
2190 let a: ArrayRef = Arc::new(StringArray::from(vec![Some("x|y"), Some("x"), None]));
2191 let b: ArrayRef = Arc::new(StringArray::from(vec![Some("z"), Some("y|z"), Some("q")]));
2192 let cols = [a, b];
2193
2194 assert_eq!(
2198 composite_unique_key(&cols, 0).unwrap(),
2199 Some(vec!["x|y".to_string(), "z".to_string()])
2200 );
2201 assert_eq!(
2202 composite_unique_key(&cols, 1).unwrap(),
2203 Some(vec!["x".to_string(), "y|z".to_string()])
2204 );
2205 assert_ne!(
2206 composite_unique_key(&cols, 0).unwrap(),
2207 composite_unique_key(&cols, 1).unwrap()
2208 );
2209
2210 assert_eq!(composite_unique_key(&cols, 2).unwrap(), None);
2212 }
2213
2214 #[test]
2215 fn unique_key_scalar_errors_loudly_on_unkeyable_type() {
2216 use arrow_array::LargeBinaryArray;
2217 let blob: ArrayRef = Arc::new(LargeBinaryArray::from(vec![Some(&b"abc"[..])]));
2222 let err = unique_key_scalar(&blob, 0).unwrap_err();
2223 assert!(
2224 err.to_string().contains("unsupported column type"),
2225 "un-keyable type must fail loudly (got: {err})"
2226 );
2227 }
2228
2229 #[test]
2230 fn unique_key_scalar_handles_all_string_encodings() {
2231 use arrow_array::{LargeStringArray, StringViewArray};
2232 let utf8: ArrayRef = Arc::new(StringArray::from(vec![Some("v")]));
2238 let large: ArrayRef = Arc::new(LargeStringArray::from(vec![Some("v")]));
2239 let view: ArrayRef = Arc::new(StringViewArray::from(vec![Some("v")]));
2240 for array in [&utf8, &large, &view] {
2241 assert_eq!(
2242 unique_key_scalar(array, 0).unwrap(),
2243 Some("v".to_string()),
2244 "string array {:?} must render, not error",
2245 array.data_type()
2246 );
2247 }
2248 }
2249}