1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29 pub branch: String,
31 pub base_branch: Option<String>,
35 pub branch_created: bool,
37 pub nodes_loaded: HashMap<String, usize>,
38 pub edges_loaded: HashMap<String, usize>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct IngestTableResult {
43 pub table_key: String,
44 pub rows_loaded: usize,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct IngestResult {
49 pub branch: String,
50 pub base_branch: String,
51 pub branch_created: bool,
52 pub mode: LoadMode,
53 pub tables: Vec<IngestTableResult>,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum LoadMode {
60 Overwrite,
62 Append,
64 Merge,
66}
67
68pub async fn load_jsonl(db: &Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
73 let current_branch = db.active_branch().await;
74 let branch = current_branch.as_deref().unwrap_or("main");
75 db.load(branch, data, mode).await
76}
77
78pub async fn load_jsonl_file(db: &Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
80 let current_branch = db.active_branch().await;
81 let branch = current_branch.as_deref().unwrap_or("main");
82 db.load_file(branch, path, mode).await
83}
84
85impl Omnigraph {
86 #[deprecated(
87 note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
88 )]
89 pub async fn ingest(
90 &self,
91 branch: &str,
92 from: Option<&str>,
93 data: &str,
94 mode: LoadMode,
95 ) -> Result<IngestResult> {
96 #[allow(deprecated)]
97 self.ingest_as(branch, from, data, mode, None).await
98 }
99
100 #[deprecated(
105 note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
106 )]
107 pub async fn ingest_as(
108 &self,
109 branch: &str,
110 from: Option<&str>,
111 data: &str,
112 mode: LoadMode,
113 actor_id: Option<&str>,
114 ) -> Result<IngestResult> {
115 let result = self
116 .load_as(branch, Some(from.unwrap_or("main")), data, mode, actor_id)
117 .await?;
118 Ok(IngestResult {
119 branch: result.branch.clone(),
120 base_branch: result
121 .base_branch
122 .clone()
123 .unwrap_or_else(|| "main".to_string()),
124 branch_created: result.branch_created,
125 mode,
126 tables: result.to_ingest_tables(),
127 })
128 }
129
130 #[deprecated(
131 note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
132 )]
133 pub async fn ingest_file(
134 &self,
135 branch: &str,
136 from: Option<&str>,
137 path: &str,
138 mode: LoadMode,
139 ) -> Result<IngestResult> {
140 #[allow(deprecated)]
141 self.ingest_file_as(branch, from, path, mode, None).await
142 }
143
144 #[deprecated(
145 note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
146 )]
147 pub async fn ingest_file_as(
148 &self,
149 branch: &str,
150 from: Option<&str>,
151 path: &str,
152 mode: LoadMode,
153 actor_id: Option<&str>,
154 ) -> Result<IngestResult> {
155 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
156 #[allow(deprecated)]
157 self.ingest_as(branch, from, &data, mode, actor_id).await
158 }
159
160 pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
161 self.load_as(branch, None, data, mode, None).await
162 }
163
164 pub async fn load_as(
172 &self,
173 branch: &str,
174 base: Option<&str>,
175 data: &str,
176 mode: LoadMode,
177 actor_id: Option<&str>,
178 ) -> Result<LoadResult> {
179 self.enforce(
186 omnigraph_policy::PolicyAction::Change,
187 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
188 actor_id,
189 )?;
190 self.ensure_schema_state_valid().await?;
191 self.heal_pending_recovery_sidecars().await?;
198 crate::db::ensure_public_branch_ref(branch, "load")?;
203 let requested = Self::normalize_branch_name(branch)?;
209 let base_branch = match base {
210 Some(base) => {
211 Some(Self::normalize_branch_name(base)?.unwrap_or_else(|| "main".to_string()))
212 }
213 None => None,
214 };
215 let mut branch_created = false;
218 if let (Some(target), Some(base_name)) = (requested.as_deref(), base_branch.as_deref()) {
219 let exists = self.branch_list().await?.iter().any(|name| name == target);
220 if !exists {
221 self.branch_create_from_as(
228 crate::db::ReadTarget::branch(base_name),
229 target,
230 actor_id,
231 )
232 .await?;
233 branch_created = true;
234 }
235 }
236 let mut result = self
240 .load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
241 .await?;
242 result.branch = requested.unwrap_or_else(|| "main".to_string());
243 result.base_branch = base_branch;
244 result.branch_created = branch_created;
245 Ok(result)
246 }
247
248 pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
249 self.load_file_as(branch, None, path, mode, None).await
250 }
251
252 pub async fn load_file_as(
256 &self,
257 branch: &str,
258 base: Option<&str>,
259 path: &str,
260 mode: LoadMode,
261 actor_id: Option<&str>,
262 ) -> Result<LoadResult> {
263 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
264 self.load_as(branch, base, &data, mode, actor_id).await
265 }
266
267 async fn load_direct_on_branch(
268 &self,
269 branch: Option<&str>,
270 data: &str,
271 mode: LoadMode,
272 actor_id: Option<&str>,
273 ) -> Result<LoadResult> {
274 let reader = BufReader::new(Cursor::new(data.as_bytes()));
275 load_jsonl_reader(self, branch, reader, mode, actor_id).await
276 }
277}
278
279impl LoadMode {
280 pub fn as_str(self) -> &'static str {
281 match self {
282 LoadMode::Overwrite => "overwrite",
283 LoadMode::Append => "append",
284 LoadMode::Merge => "merge",
285 }
286 }
287}
288
289impl LoadResult {
290 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
291 let mut tables = self
292 .nodes_loaded
293 .iter()
294 .map(|(type_name, rows_loaded)| IngestTableResult {
295 table_key: format!("node:{type_name}"),
296 rows_loaded: *rows_loaded,
297 })
298 .chain(
299 self.edges_loaded
300 .iter()
301 .map(|(edge_name, rows_loaded)| IngestTableResult {
302 table_key: format!("edge:{edge_name}"),
303 rows_loaded: *rows_loaded,
304 }),
305 )
306 .collect::<Vec<_>>();
307 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
308 tables
309 }
310}
311
312async fn load_jsonl_reader<R: BufRead>(
313 db: &Omnigraph,
314 branch: Option<&str>,
315 reader: R,
316 mode: LoadMode,
317 actor_id: Option<&str>,
318) -> Result<LoadResult> {
319 let catalog = db.catalog().clone();
320
321 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
323 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
324
325 for (idx, parsed) in serde_json::Deserializer::from_reader(reader)
330 .into_iter::<JsonValue>()
331 .enumerate()
332 {
333 let record_num = idx + 1;
334 let value: JsonValue = parsed.map_err(|e| {
335 OmniError::manifest(format!("invalid JSON at record {}: {}", record_num, e))
336 })?;
337
338 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
339 if !catalog.node_types.contains_key(type_name) {
340 return Err(OmniError::manifest(format!(
341 "record {}: unknown node type '{}'",
342 record_num, type_name
343 )));
344 }
345 let data = value
346 .get("data")
347 .cloned()
348 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
349 node_rows
350 .entry(type_name.to_string())
351 .or_default()
352 .push(data);
353 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
354 if catalog.lookup_edge_by_name(edge_name).is_none() {
355 return Err(OmniError::manifest(format!(
356 "record {}: unknown edge type '{}'",
357 record_num, edge_name
358 )));
359 }
360 let from = value
361 .get("from")
362 .and_then(|v| v.as_str())
363 .ok_or_else(|| {
364 OmniError::manifest(format!("record {}: edge missing 'from'", record_num))
365 })?
366 .to_string();
367 let to = value
368 .get("to")
369 .and_then(|v| v.as_str())
370 .ok_or_else(|| {
371 OmniError::manifest(format!("record {}: edge missing 'to'", record_num))
372 })?
373 .to_string();
374 let data = value
375 .get("data")
376 .cloned()
377 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
378 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
379 edge_rows
380 .entry(canonical)
381 .or_default()
382 .push((from, to, data));
383 } else {
384 return Err(OmniError::manifest(format!(
385 "record {}: expected 'type' or 'edge' field",
386 record_num
387 )));
388 }
389 }
390
391 let mut result = LoadResult::default();
400 let snapshot = db.snapshot_for_branch(branch).await?;
401 let mut staging = MutationStaging::default();
402 let pending_mode = match mode {
403 LoadMode::Merge => PendingMode::Merge,
404 LoadMode::Append => PendingMode::Append,
408 LoadMode::Overwrite => PendingMode::Overwrite,
409 };
410 let load_op_kind = match mode {
416 LoadMode::Append => crate::db::MutationOpKind::Insert,
417 LoadMode::Merge => crate::db::MutationOpKind::Merge,
418 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
419 };
420
421 let fork_queue_guards: Option<(
432 Vec<(String, Option<String>)>,
433 Vec<tokio::sync::OwnedMutexGuard<()>>,
434 )> = if let Some(active) = branch {
435 let touched: Vec<(String, Option<String>)> = node_rows
436 .keys()
437 .map(|t| (format!("node:{t}"), Some(active.to_string())))
438 .chain(
439 edge_rows
440 .keys()
441 .map(|e| (format!("edge:{e}"), Some(active.to_string()))),
442 )
443 .collect();
444 let needs_fork = touched.iter().any(|(table_key, _)| {
445 snapshot
446 .entry(table_key)
447 .map(|e| e.table_branch.as_deref() != Some(active))
448 .unwrap_or(false)
449 });
450 if needs_fork {
451 let guards = db.write_queue().acquire_many(&touched).await;
452 Some((touched, guards))
453 } else {
454 None
455 }
456 } else {
457 None
458 };
459
460 let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
463 Vec::with_capacity(node_rows.len());
464 for (type_name, rows) in &node_rows {
465 let node_type = &catalog.node_types[type_name];
466 let batch = build_node_batch(node_type, rows)?;
467 validate_value_constraints(&batch, node_type)?;
468 validate_enum_constraints(&batch, &node_type.properties, type_name)?;
469 let unique_groups = unique_constraint_groups_for_node(node_type);
470 if !unique_groups.is_empty() {
471 enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?;
472 }
473 let loaded_count = batch.num_rows();
474 let table_key = format!("node:{}", type_name);
475 let _entry = snapshot
476 .entry(&table_key)
477 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
478 prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
479 }
480
481 for (type_name, table_key, batch, loaded_count) in prepared_nodes {
484 let (ds, full_path, table_branch) = db
485 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
486 .await?;
487 let expected_version = ds.version();
488 staging.ensure_path(
489 &table_key,
490 full_path,
491 table_branch,
492 expected_version,
493 load_op_kind,
494 );
495 let schema = batch.schema();
496 staging.append_batch(&table_key, schema, pending_mode, batch)?;
497 result.nodes_loaded.insert(type_name, loaded_count);
498 }
499
500 for (edge_name, rows) in &edge_rows {
507 let edge_type = &catalog.edge_types[edge_name];
508 let from_ids =
509 collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?;
510 let to_ids =
511 collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?;
512
513 for (i, (src, dst, _)) in rows.iter().enumerate() {
514 if !from_ids.contains(src.as_str()) {
515 return Err(OmniError::manifest(format!(
516 "edge {} row {}: src '{}' not found in {}",
517 edge_name,
518 i + 1,
519 src,
520 edge_type.from_type
521 )));
522 }
523 if !to_ids.contains(dst.as_str()) {
524 return Err(OmniError::manifest(format!(
525 "edge {} row {}: dst '{}' not found in {}",
526 edge_name,
527 i + 1,
528 dst,
529 edge_type.to_type
530 )));
531 }
532 }
533 }
534
535 let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
537 Vec::with_capacity(edge_rows.len());
538 for (edge_name, rows) in &edge_rows {
539 let edge_type = &catalog.edge_types[edge_name];
540 let batch = build_edge_batch(edge_type, rows)?;
541 validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
542 let unique_groups = unique_constraint_groups_for_edge(edge_type);
543 if !unique_groups.is_empty() {
544 enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?;
545 }
546 let loaded_count = batch.num_rows();
547 let table_key = format!("edge:{}", edge_name);
548 let _entry = snapshot
549 .entry(&table_key)
550 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
551 prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
552 }
553
554 for (edge_name, table_key, batch, loaded_count) in prepared_edges {
556 let (ds, full_path, table_branch) = db
557 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
558 .await?;
559 let expected_version = ds.version();
560 staging.ensure_path(
561 &table_key,
562 full_path,
563 table_branch,
564 expected_version,
565 load_op_kind,
566 );
567 let schema = batch.schema();
568 staging.append_batch(&table_key, schema, pending_mode, batch)?;
569 result.edges_loaded.insert(edge_name, loaded_count);
570 }
571
572 for (edge_name, _) in &edge_rows {
577 let edge_type = &catalog.edge_types[edge_name];
578 let table_key = format!("edge:{}", edge_name);
579 validate_edge_cardinality_with_pending_loader(
580 db, branch, edge_type, &table_key, &staging, mode,
581 )
582 .await?;
583 }
584
585 let staged = staging
587 .stage_all_with_concurrency(db, branch, load_write_concurrency())
588 .await?;
589 let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
593 .commit_all(
594 db,
595 branch,
596 crate::db::manifest::SidecarKind::Load,
597 actor_id,
598 fork_queue_guards,
599 )
600 .await?;
601 crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
606 db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
607 .await?;
608 if let Some(handle) = sidecar_handle {
613 if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
614 tracing::warn!(
615 error = %err,
616 operation_id = handle.operation_id.as_str(),
617 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
618 );
619 }
620 }
621
622 Ok(result)
623}
624
625fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
626 let schema = node_type.arrow_schema.clone();
627
628 let ids: Vec<String> = rows
630 .iter()
631 .map(|row| {
632 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
633 if let Some(key_prop) = node_type.key_property() {
634 let key_value = row
635 .get(key_prop)
636 .and_then(|v| v.as_str())
637 .map(|s| s.to_string())
638 .ok_or_else(|| {
639 OmniError::manifest(format!(
640 "node {} missing @key property '{}'",
641 node_type.name, key_prop
642 ))
643 })?;
644 if let Some(explicit_id) = explicit_id {
645 if explicit_id != key_value {
646 return Err(OmniError::manifest(format!(
647 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
648 node_type.name, explicit_id, key_prop, key_value
649 )));
650 }
651 }
652 Ok(key_value)
653 } else if let Some(explicit_id) = explicit_id {
654 Ok(explicit_id)
655 } else {
656 Ok(generate_id())
657 }
658 })
659 .collect::<Result<Vec<_>>>()?;
660
661 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
662 columns.push(Arc::new(StringArray::from(ids)));
663
664 for field in schema.fields().iter().skip(1) {
666 if node_type.blob_properties.contains(field.name()) {
667 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
668 columns.push(col);
669 } else {
670 let col =
671 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
672 columns.push(col);
673 }
674 }
675
676 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
677}
678
679fn build_edge_batch(
680 edge_type: &omnigraph_compiler::catalog::EdgeType,
681 rows: &[(String, String, JsonValue)],
682) -> Result<RecordBatch> {
683 let schema = edge_type.arrow_schema.clone();
684
685 let ids: Vec<String> = rows
686 .iter()
687 .map(|(_, _, data)| {
688 data.get("id")
689 .and_then(|v| v.as_str())
690 .map(str::to_string)
691 .unwrap_or_else(generate_id)
692 })
693 .collect();
694 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
695 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
696
697 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
698 columns.push(Arc::new(StringArray::from(ids)));
699 columns.push(Arc::new(StringArray::from(srcs)));
700 columns.push(Arc::new(StringArray::from(dsts)));
701
702 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
704 for field in schema.fields().iter().skip(3) {
705 if edge_type.blob_properties.contains(field.name()) {
706 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
707 columns.push(col);
708 } else {
709 let col = build_column_from_json(
710 field.name(),
711 field.data_type(),
712 field.is_nullable(),
713 &data_values,
714 )?;
715 columns.push(col);
716 }
717 }
718
719 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
720}
721
722pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
724 if let Some(encoded) = value.strip_prefix("base64:") {
725 let bytes = base64::engine::general_purpose::STANDARD
726 .decode(encoded)
727 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
728 builder
729 .push_bytes(bytes)
730 .map_err(|e| OmniError::Lance(e.to_string()))
731 } else {
732 builder
734 .push_uri(value)
735 .map_err(|e| OmniError::Lance(e.to_string()))
736 }
737}
738
739fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
741 let mut builder = BlobArrayBuilder::new(rows.len());
742 for row in rows {
743 match row.get(name) {
744 Some(JsonValue::String(s)) => {
745 append_blob_value(&mut builder, s)?;
746 }
747 Some(JsonValue::Null) | None if nullable => {
748 builder
749 .push_null()
750 .map_err(|e| OmniError::Lance(e.to_string()))?;
751 }
752 Some(JsonValue::Null) | None => {
753 return Err(OmniError::manifest(format!(
754 "non-nullable blob property '{}' has null values",
755 name
756 )));
757 }
758 _ => {
759 return Err(OmniError::manifest(format!(
760 "blob property '{}' must be a URI string or base64: prefixed data",
761 name
762 )));
763 }
764 }
765 }
766 builder
767 .finish()
768 .map_err(|e| OmniError::Lance(e.to_string()))
769}
770
771fn build_column_from_json(
772 name: &str,
773 data_type: &DataType,
774 nullable: bool,
775 rows: &[JsonValue],
776) -> Result<ArrayRef> {
777 let array: ArrayRef = match data_type {
778 DataType::Utf8 => {
779 let values: Vec<Option<String>> = rows
780 .iter()
781 .map(|row| {
782 row.get(name)
783 .and_then(|v| v.as_str())
784 .map(|s| s.to_string())
785 })
786 .collect();
787 Arc::new(StringArray::from(values))
788 }
789 DataType::Int32 => {
790 let values: Vec<Option<i32>> = rows
791 .iter()
792 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
793 .collect();
794 Arc::new(Int32Array::from(values))
795 }
796 DataType::Int64 => {
797 let values: Vec<Option<i64>> = rows
798 .iter()
799 .map(|row| row.get(name).and_then(|v| v.as_i64()))
800 .collect();
801 Arc::new(Int64Array::from(values))
802 }
803 DataType::UInt32 => {
804 let values: Vec<Option<u32>> = rows
805 .iter()
806 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
807 .collect();
808 Arc::new(UInt32Array::from(values))
809 }
810 DataType::UInt64 => {
811 let values: Vec<Option<u64>> = rows
812 .iter()
813 .map(|row| row.get(name).and_then(|v| v.as_u64()))
814 .collect();
815 Arc::new(UInt64Array::from(values))
816 }
817 DataType::Float32 => {
818 let values: Vec<Option<f32>> = rows
819 .iter()
820 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
821 .collect();
822 Arc::new(Float32Array::from(values))
823 }
824 DataType::Float64 => {
825 let values: Vec<Option<f64>> = rows
826 .iter()
827 .map(|row| row.get(name).and_then(|v| v.as_f64()))
828 .collect();
829 Arc::new(Float64Array::from(values))
830 }
831 DataType::Boolean => {
832 let values: Vec<Option<bool>> = rows
833 .iter()
834 .map(|row| row.get(name).and_then(|v| v.as_bool()))
835 .collect();
836 Arc::new(BooleanArray::from(values))
837 }
838 DataType::Date32 => {
839 let mut values = Vec::with_capacity(rows.len());
840 for row in rows {
841 values.push(parse_date32_json_value(
842 row.get(name).unwrap_or(&JsonValue::Null),
843 )?);
844 }
845 Arc::new(Date32Array::from(values))
846 }
847 DataType::Date64 => {
848 let mut values = Vec::with_capacity(rows.len());
849 for row in rows {
850 values.push(parse_date64_json_value(
851 row.get(name).unwrap_or(&JsonValue::Null),
852 )?);
853 }
854 Arc::new(Date64Array::from(values))
855 }
856 DataType::List(field) => {
857 let mut builder = ListBuilder::with_capacity(
858 make_list_value_builder(field.data_type(), rows.len())?,
859 rows.len(),
860 )
861 .with_field(field.clone());
862 for row in rows {
863 let value = row.get(name).unwrap_or(&JsonValue::Null);
864 if value.is_null() {
865 builder.append(false);
866 continue;
867 }
868 let items = value.as_array().ok_or_else(|| {
869 OmniError::manifest(format!(
870 "list property '{}' expects a JSON array, got {}",
871 name, value
872 ))
873 })?;
874 for item in items {
875 append_json_list_item(builder.values(), field.data_type(), item)?;
876 }
877 builder.append(true);
878 }
879 Arc::new(builder.finish())
880 }
881 DataType::FixedSizeList(child_field, dim) => {
882 let dim = *dim;
884 let mut builder = FixedSizeListBuilder::with_capacity(
885 Float32Builder::with_capacity(rows.len() * dim as usize),
886 dim,
887 rows.len(),
888 )
889 .with_field(child_field.clone());
890 for row in rows {
891 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
892 if arr.len() != dim as usize {
893 return Err(OmniError::manifest(format!(
894 "vector property '{}' expects {} dimensions, got {}",
895 name,
896 dim,
897 arr.len()
898 )));
899 }
900 for val in arr {
901 builder
902 .values()
903 .append_value(val.as_f64().unwrap_or(0.0) as f32);
904 }
905 builder.append(true);
906 } else if nullable {
907 for _ in 0..dim as usize {
908 builder.values().append_null();
909 }
910 builder.append(false);
911 } else {
912 return Err(OmniError::manifest(format!(
913 "non-nullable vector property '{}' has null values",
914 name
915 )));
916 }
917 }
918 Arc::new(builder.finish())
919 }
920 _ => {
921 let values: Vec<Option<&str>> = vec![None; rows.len()];
923 Arc::new(StringArray::from(values))
924 }
925 };
926
927 if !nullable && array.null_count() > 0 {
928 return Err(OmniError::manifest(format!(
929 "non-nullable property '{}' has null or invalid values",
930 name
931 )));
932 }
933
934 Ok(array)
935}
936
937fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
938 Ok(match data_type {
939 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
940 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
941 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
942 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
943 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
944 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
945 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
946 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
947 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
948 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
949 other => {
950 return Err(OmniError::manifest(format!(
951 "unsupported list element data type {:?}",
952 other
953 )));
954 }
955 })
956}
957
958fn append_json_list_item(
959 builder: &mut Box<dyn ArrayBuilder>,
960 data_type: &DataType,
961 value: &JsonValue,
962) -> Result<()> {
963 match data_type {
964 DataType::Utf8 => {
965 let builder = builder
966 .as_any_mut()
967 .downcast_mut::<StringBuilder>()
968 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
969 if let Some(value) = value.as_str() {
970 builder.append_value(value);
971 } else {
972 builder.append_null();
973 }
974 }
975 DataType::Boolean => {
976 let builder = builder
977 .as_any_mut()
978 .downcast_mut::<BooleanBuilder>()
979 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
980 if let Some(value) = value.as_bool() {
981 builder.append_value(value);
982 } else {
983 builder.append_null();
984 }
985 }
986 DataType::Int32 => {
987 let builder = builder
988 .as_any_mut()
989 .downcast_mut::<Int32Builder>()
990 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
991 if let Some(value) = value.as_i64() {
992 let value = i32::try_from(value).map_err(|_| {
993 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
994 })?;
995 builder.append_value(value);
996 } else {
997 builder.append_null();
998 }
999 }
1000 DataType::Int64 => {
1001 let builder = builder
1002 .as_any_mut()
1003 .downcast_mut::<Int64Builder>()
1004 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1005 if let Some(value) = value.as_i64() {
1006 builder.append_value(value);
1007 } else {
1008 builder.append_null();
1009 }
1010 }
1011 DataType::UInt32 => {
1012 let builder = builder
1013 .as_any_mut()
1014 .downcast_mut::<UInt32Builder>()
1015 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1016 if let Some(value) = value.as_u64() {
1017 let value = u32::try_from(value).map_err(|_| {
1018 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1019 })?;
1020 builder.append_value(value);
1021 } else {
1022 builder.append_null();
1023 }
1024 }
1025 DataType::UInt64 => {
1026 let builder = builder
1027 .as_any_mut()
1028 .downcast_mut::<UInt64Builder>()
1029 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1030 if let Some(value) = value.as_u64() {
1031 builder.append_value(value);
1032 } else {
1033 builder.append_null();
1034 }
1035 }
1036 DataType::Float32 => {
1037 let builder = builder
1038 .as_any_mut()
1039 .downcast_mut::<Float32Builder>()
1040 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1041 if let Some(value) = value.as_f64() {
1042 builder.append_value(value as f32);
1043 } else {
1044 builder.append_null();
1045 }
1046 }
1047 DataType::Float64 => {
1048 let builder = builder
1049 .as_any_mut()
1050 .downcast_mut::<Float64Builder>()
1051 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1052 if let Some(value) = value.as_f64() {
1053 builder.append_value(value);
1054 } else {
1055 builder.append_null();
1056 }
1057 }
1058 DataType::Date32 => {
1059 let builder = builder
1060 .as_any_mut()
1061 .downcast_mut::<Date32Builder>()
1062 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1063 if let Some(value) = parse_date32_json_value(value)? {
1064 builder.append_value(value);
1065 } else {
1066 builder.append_null();
1067 }
1068 }
1069 DataType::Date64 => {
1070 let builder = builder
1071 .as_any_mut()
1072 .downcast_mut::<Date64Builder>()
1073 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1074 if let Some(value) = parse_date64_json_value(value)? {
1075 builder.append_value(value);
1076 } else {
1077 builder.append_null();
1078 }
1079 }
1080 other => {
1081 return Err(OmniError::manifest(format!(
1082 "unsupported list element data type {:?}",
1083 other
1084 )));
1085 }
1086 }
1087
1088 Ok(())
1089}
1090
1091fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1092 if value.is_null() {
1093 return Ok(None);
1094 }
1095 if let Some(days) = value.as_i64() {
1096 let days = i32::try_from(days)
1097 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1098 return Ok(Some(days));
1099 }
1100 if let Some(days) = value.as_u64() {
1101 let days = i32::try_from(days)
1102 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1103 return Ok(Some(days));
1104 }
1105 if let Some(value) = value.as_str() {
1106 return Ok(Some(parse_date32_literal(value)?));
1107 }
1108 Ok(None)
1109}
1110
1111fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1112 if value.is_null() {
1113 return Ok(None);
1114 }
1115 if let Some(ms) = value.as_i64() {
1116 return Ok(Some(ms));
1117 }
1118 if let Some(ms) = value.as_u64() {
1119 let ms = i64::try_from(ms)
1120 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1121 return Ok(Some(ms));
1122 }
1123 if let Some(value) = value.as_str() {
1124 return Ok(Some(parse_date64_literal(value)?));
1125 }
1126 Ok(None)
1127}
1128
1129const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1140
1141fn load_write_concurrency() -> usize {
1142 std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1143 .ok()
1144 .and_then(|v| v.parse::<usize>().ok())
1145 .filter(|v| *v > 0)
1146 .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1147}
1148
1149fn generate_id() -> String {
1150 ulid::Ulid::new().to_string()
1151}
1152
1153pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1154 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1155 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1156 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1157 let out = casted
1158 .as_any()
1159 .downcast_ref::<Date32Array>()
1160 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1161 if out.is_null(0) {
1162 return Err(OmniError::manifest(format!(
1163 "invalid Date literal '{}'",
1164 value
1165 )));
1166 }
1167 Ok(out.value(0))
1168}
1169
1170pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1171 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1172 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1173 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1174 let out = casted
1175 .as_any()
1176 .downcast_ref::<Date64Array>()
1177 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1178 if out.is_null(0) {
1179 return Err(OmniError::manifest(format!(
1180 "invalid DateTime literal '{}'",
1181 value
1182 )));
1183 }
1184 Ok(out.value(0))
1185}
1186
1187pub(crate) fn validate_value_constraints(
1190 batch: &RecordBatch,
1191 node_type: &omnigraph_compiler::catalog::NodeType,
1192) -> Result<()> {
1193 use arrow_array::Array;
1194
1195 for rc in &node_type.range_constraints {
1197 let Some(col) = batch.column_by_name(&rc.property) else {
1198 continue;
1199 };
1200 for row in 0..batch.num_rows() {
1201 if col.is_null(row) {
1202 continue;
1203 }
1204 let value = extract_numeric_value(col, row);
1205 if let Some(val) = value {
1206 if val.is_nan() {
1207 return Err(OmniError::manifest(format!(
1208 "@range violation on {}.{}: value is NaN",
1209 node_type.name, rc.property
1210 )));
1211 }
1212 if let Some(ref min) = rc.min {
1213 let min_f = literal_value_to_f64(min);
1214 if val < min_f {
1215 return Err(OmniError::manifest(format!(
1216 "@range violation on {}.{}: value {} < min {}",
1217 node_type.name, rc.property, val, min_f
1218 )));
1219 }
1220 }
1221 if let Some(ref max) = rc.max {
1222 let max_f = literal_value_to_f64(max);
1223 if val > max_f {
1224 return Err(OmniError::manifest(format!(
1225 "@range violation on {}.{}: value {} > max {}",
1226 node_type.name, rc.property, val, max_f
1227 )));
1228 }
1229 }
1230 }
1231 }
1232 }
1233
1234 for cc in &node_type.check_constraints {
1236 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1237 OmniError::manifest(format!(
1238 "@check on {}.{} has invalid regex '{}': {}",
1239 node_type.name, cc.property, cc.pattern, e
1240 ))
1241 })?;
1242 let Some(col) = batch.column_by_name(&cc.property) else {
1243 continue;
1244 };
1245 let str_col = col.as_any().downcast_ref::<StringArray>();
1246 if let Some(str_col) = str_col {
1247 for row in 0..str_col.len() {
1248 if str_col.is_null(row) {
1249 continue;
1250 }
1251 let val = str_col.value(row);
1252 if !re.is_match(val) {
1253 return Err(OmniError::manifest(format!(
1254 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1255 node_type.name, cc.property, val, cc.pattern
1256 )));
1257 }
1258 }
1259 }
1260 }
1261
1262 Ok(())
1263}
1264
1265pub(crate) fn validate_enum_constraints(
1272 batch: &RecordBatch,
1273 properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1274 type_name: &str,
1275) -> Result<()> {
1276 use arrow_array::{Array, ListArray};
1277
1278 for (prop_name, prop_type) in properties {
1279 let Some(allowed) = prop_type.enum_values.as_ref() else {
1280 continue;
1281 };
1282 let Some(col) = batch.column_by_name(prop_name) else {
1283 continue;
1284 };
1285 if prop_type.list {
1286 let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1287 continue;
1288 };
1289 for row in 0..list_col.len() {
1290 if list_col.is_null(row) {
1291 continue;
1292 }
1293 let item_arr = list_col.value(row);
1294 let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1295 continue;
1296 };
1297 for i in 0..str_arr.len() {
1298 if str_arr.is_null(i) {
1299 continue;
1300 }
1301 let val = str_arr.value(i);
1302 if !allowed.iter().any(|a| a.as_str() == val) {
1303 return Err(OmniError::manifest(format!(
1304 "invalid enum value '{}' for {}.{} (expected: {})",
1305 val,
1306 type_name,
1307 prop_name,
1308 allowed.join(", ")
1309 )));
1310 }
1311 }
1312 }
1313 } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1314 for row in 0..str_col.len() {
1315 if str_col.is_null(row) {
1316 continue;
1317 }
1318 let val = str_col.value(row);
1319 if !allowed.iter().any(|a| a.as_str() == val) {
1320 return Err(OmniError::manifest(format!(
1321 "invalid enum value '{}' for {}.{} (expected: {})",
1322 val,
1323 type_name,
1324 prop_name,
1325 allowed.join(", ")
1326 )));
1327 }
1328 }
1329 }
1330 }
1331 Ok(())
1332}
1333
1334pub(crate) fn enforce_unique_constraints_intra_batch(
1349 batch: &RecordBatch,
1350 type_name: &str,
1351 unique_constraints: &[Vec<String>],
1352) -> Result<()> {
1353 for columns in unique_constraints {
1354 let Some(group_columns) = columns
1357 .iter()
1358 .map(|name| {
1359 batch
1360 .schema()
1361 .index_of(name)
1362 .ok()
1363 .map(|i| batch.column(i).clone())
1364 })
1365 .collect::<Option<Vec<ArrayRef>>>()
1366 else {
1367 continue;
1368 };
1369 let mut seen: HashMap<Vec<String>, usize> = HashMap::new();
1370 for row in 0..batch.num_rows() {
1371 let Some(key) = composite_unique_key(&group_columns, row)? else {
1372 continue;
1373 };
1374 if let Some(prev_row) = seen.insert(key.clone(), row) {
1375 return Err(OmniError::manifest(format!(
1376 "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1377 type_name,
1378 format_tuple(columns),
1379 format_tuple(&key),
1380 prev_row,
1381 row
1382 )));
1383 }
1384 }
1385 }
1386 Ok(())
1387}
1388
1389pub(crate) fn composite_unique_key(
1406 group_columns: &[ArrayRef],
1407 row: usize,
1408) -> Result<Option<Vec<String>>> {
1409 let mut parts = Vec::with_capacity(group_columns.len());
1410 for column in group_columns {
1411 match unique_key_scalar(column, row)? {
1412 Some(value) => parts.push(value),
1413 None => return Ok(None),
1414 }
1415 }
1416 Ok(Some(parts))
1417}
1418
1419fn format_tuple(items: &[String]) -> String {
1423 match items {
1424 [single] => single.clone(),
1425 _ => format!("({})", items.join(", ")),
1426 }
1427}
1428
1429fn unique_key_scalar(array: &ArrayRef, row: usize) -> Result<Option<String>> {
1444 use arrow_array::{Array, LargeStringArray, StringViewArray};
1445 if array.is_null(row) {
1446 return Ok(None);
1447 }
1448 if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1449 return Ok(Some(a.value(row).to_string()));
1450 }
1451 if let Some(a) = array.as_any().downcast_ref::<LargeStringArray>() {
1452 return Ok(Some(a.value(row).to_string()));
1453 }
1454 if let Some(a) = array.as_any().downcast_ref::<StringViewArray>() {
1455 return Ok(Some(a.value(row).to_string()));
1456 }
1457 if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1458 return Ok(Some(a.value(row).to_string()));
1459 }
1460 if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1461 return Ok(Some(a.value(row).to_string()));
1462 }
1463 if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1464 return Ok(Some(a.value(row).to_string()));
1465 }
1466 if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1467 return Ok(Some(a.value(row).to_string()));
1468 }
1469 if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1470 return Ok(Some(a.value(row).to_string()));
1471 }
1472 if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1473 return Ok(Some(a.value(row).to_string()));
1474 }
1475 if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1476 return Ok(Some(a.value(row).to_string()));
1477 }
1478 if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1479 return Ok(Some(a.value(row).to_string()));
1480 }
1481 if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1482 return Ok(Some(a.value(row).to_string()));
1483 }
1484 Err(OmniError::manifest(format!(
1485 "uniqueness key: unsupported column type {:?} for @unique/@key enforcement",
1486 array.data_type()
1487 )))
1488}
1489
1490pub(crate) fn unique_constraint_groups_for_node(
1497 node_type: &omnigraph_compiler::catalog::NodeType,
1498) -> Vec<Vec<String>> {
1499 let mut groups: Vec<Vec<String>> = node_type.unique_constraints.clone();
1500 if let Some(key) = &node_type.key
1501 && !groups.contains(key)
1502 {
1503 groups.push(key.clone());
1504 }
1505 groups
1506}
1507
1508pub(crate) fn unique_constraint_groups_for_edge(
1511 edge_type: &omnigraph_compiler::catalog::EdgeType,
1512) -> Vec<Vec<String>> {
1513 edge_type.unique_constraints.clone()
1514}
1515
1516fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1517 use arrow_array::{
1518 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1519 };
1520 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1521 return Some(a.value(row) as f64);
1522 }
1523 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1524 return Some(a.value(row) as f64);
1525 }
1526 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1527 return Some(a.value(row) as f64);
1528 }
1529 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1530 return Some(a.value(row) as f64);
1531 }
1532 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1533 return Some(a.value(row) as f64);
1534 }
1535 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1536 return Some(a.value(row));
1537 }
1538 None
1539}
1540
1541fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1542 use omnigraph_compiler::catalog::LiteralValue;
1543 match v {
1544 LiteralValue::Integer(n) => *n as f64,
1545 LiteralValue::Float(f) => *f,
1546 }
1547}
1548
1549pub(crate) async fn validate_edge_cardinality(
1552 db: &crate::db::Omnigraph,
1553 branch: Option<&str>,
1554 edge_name: &str,
1555 written_version: u64,
1556 written_branch: Option<&str>,
1557) -> Result<()> {
1558 use arrow_array::Array;
1559 let catalog = db.catalog();
1560 let edge_type = &catalog.edge_types[edge_name];
1561 if edge_type.cardinality.is_default() {
1562 return Ok(());
1563 }
1564
1565 let snapshot = db.snapshot_for_branch(branch).await?;
1568 let table_key = format!("edge:{}", edge_name);
1569 let entry = snapshot
1570 .entry(&table_key)
1571 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1572 let ds = db
1573 .open_dataset_at_state(
1574 &entry.table_path,
1575 written_branch.or(entry.table_branch.as_deref()),
1576 written_version,
1577 )
1578 .await?;
1579
1580 let batches = db.storage().scan(&ds, Some(&["src"]), None, None).await?;
1582
1583 let mut counts: HashMap<String, u32> = HashMap::new();
1584 for batch in &batches {
1585 let srcs = batch
1586 .column_by_name("src")
1587 .unwrap()
1588 .as_any()
1589 .downcast_ref::<StringArray>()
1590 .unwrap();
1591 for i in 0..srcs.len() {
1592 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1593 }
1594 }
1595
1596 let card = &edge_type.cardinality;
1597 for (src, count) in &counts {
1598 if let Some(max) = card.max {
1599 if *count > max {
1600 return Err(OmniError::manifest(format!(
1601 "@card violation on edge {}: source '{}' has {} edges (max {})",
1602 edge_name, src, count, max
1603 )));
1604 }
1605 }
1606 if *count < card.min {
1607 return Err(OmniError::manifest(format!(
1608 "@card violation on edge {}: source '{}' has {} edges (min {})",
1609 edge_name, src, count, card.min
1610 )));
1611 }
1612 }
1613
1614 Ok(())
1615}
1616
1617async fn validate_edge_cardinality_with_pending_loader(
1632 db: &Omnigraph,
1633 branch: Option<&str>,
1634 edge_type: &omnigraph_compiler::catalog::EdgeType,
1635 table_key: &str,
1636 staging: &MutationStaging,
1637 mode: LoadMode,
1638) -> Result<()> {
1639 if edge_type.cardinality.is_default() {
1640 return Ok(());
1641 }
1642 let snapshot = db.snapshot_for_branch(branch).await?;
1643 let Some(entry) = snapshot.entry(table_key) else {
1644 return Ok(());
1647 };
1648 let ds = db
1649 .open_dataset_at_state(
1650 &entry.table_path,
1651 entry.table_branch.as_deref(),
1652 entry.table_version,
1653 )
1654 .await?;
1655 let dedupe_key = match mode {
1656 LoadMode::Merge => Some("id"),
1657 LoadMode::Append | LoadMode::Overwrite => None,
1658 };
1659 let counts =
1660 crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
1661 crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1662}
1663
1664async fn collect_node_ids_with_pending(
1678 db: &Omnigraph,
1679 branch: Option<&str>,
1680 type_name: &str,
1681 staging: &MutationStaging,
1682) -> Result<HashSet<String>> {
1683 let mut ids = HashSet::new();
1684 let table_key = format!("node:{}", type_name);
1685
1686 for batch in staging.pending_batches(&table_key) {
1688 if let Some(col) = batch.column_by_name("id") {
1689 if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1690 for i in 0..arr.len() {
1691 if arr.is_valid(i) {
1692 ids.insert(arr.value(i).to_string());
1693 }
1694 }
1695 }
1696 }
1697 }
1698
1699 if staging.pending_mode(&table_key) == Some(PendingMode::Overwrite) {
1700 return Ok(ids);
1701 }
1702
1703 let snapshot = db.snapshot_for_branch(branch).await?;
1705 let Some(entry) = snapshot.entry(&table_key) else {
1706 return Ok(ids);
1707 };
1708 let ds = db
1709 .open_dataset_at_state(
1710 &entry.table_path,
1711 entry.table_branch.as_deref(),
1712 entry.table_version,
1713 )
1714 .await?;
1715
1716 let batches = db.storage().scan(&ds, Some(&["id"]), None, None).await?;
1717
1718 for batch in &batches {
1719 let id_col = batch
1720 .column_by_name("id")
1721 .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1722 .as_any()
1723 .downcast_ref::<StringArray>()
1724 .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1725 for i in 0..batch.num_rows() {
1726 if id_col.is_valid(i) {
1731 ids.insert(id_col.value(i).to_string());
1732 }
1733 }
1734 }
1735
1736 Ok(ids)
1737}
1738
1739#[cfg(test)]
1740mod tests {
1741 use super::*;
1742 use crate::db::Omnigraph;
1743 use arrow_array::Array;
1744 use futures::TryStreamExt;
1745 use std::collections::HashMap;
1746
1747 const TEST_SCHEMA: &str = r#"
1748node Person {
1749 name: String @key
1750 age: I32?
1751}
1752node Company {
1753 name: String @key
1754}
1755edge Knows: Person -> Person {
1756 since: Date?
1757}
1758edge WorksAt: Person -> Company
1759"#;
1760
1761 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1762{"type": "Person", "data": {"name": "Bob", "age": 25}}
1763{"type": "Company", "data": {"name": "Acme"}}
1764{"edge": "Knows", "from": "Alice", "to": "Bob"}
1765{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1766"#;
1767
1768 #[tokio::test]
1769 async fn test_load_creates_data() {
1770 let dir = tempfile::tempdir().unwrap();
1771 let uri = dir.path().to_str().unwrap();
1772 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1773
1774 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1775 .await
1776 .unwrap();
1777
1778 assert_eq!(result.nodes_loaded["Person"], 2);
1779 assert_eq!(result.nodes_loaded["Company"], 1);
1780 assert_eq!(result.edges_loaded["Knows"], 1);
1781 assert_eq!(result.edges_loaded["WorksAt"], 1);
1782 }
1783
1784 #[tokio::test]
1785 async fn test_load_data_readable_via_lance() {
1786 let dir = tempfile::tempdir().unwrap();
1787 let uri = dir.path().to_str().unwrap();
1788 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1789 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1790 .await
1791 .unwrap();
1792
1793 let snap = db.snapshot().await;
1795 let person_ds = snap.open("node:Person").await.unwrap();
1796
1797 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1798
1799 let batches: Vec<RecordBatch> = person_ds
1801 .scan()
1802 .try_into_stream()
1803 .await
1804 .unwrap()
1805 .try_collect()
1806 .await
1807 .unwrap();
1808
1809 let batch = &batches[0];
1810 let ids = batch
1811 .column_by_name("id")
1812 .unwrap()
1813 .as_any()
1814 .downcast_ref::<StringArray>()
1815 .unwrap();
1816 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1818 assert!(id_values.contains(&"Alice"));
1819 assert!(id_values.contains(&"Bob"));
1820 }
1821
1822 #[tokio::test]
1823 async fn test_load_edges_reference_node_keys() {
1824 let dir = tempfile::tempdir().unwrap();
1825 let uri = dir.path().to_str().unwrap();
1826 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1827 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1828 .await
1829 .unwrap();
1830
1831 let snap = db.snapshot().await;
1832 let knows_ds = snap.open("edge:Knows").await.unwrap();
1833
1834 let batches: Vec<RecordBatch> = knows_ds
1835 .scan()
1836 .try_into_stream()
1837 .await
1838 .unwrap()
1839 .try_collect()
1840 .await
1841 .unwrap();
1842
1843 let batch = &batches[0];
1844 let srcs = batch
1845 .column_by_name("src")
1846 .unwrap()
1847 .as_any()
1848 .downcast_ref::<StringArray>()
1849 .unwrap();
1850 let dsts = batch
1851 .column_by_name("dst")
1852 .unwrap()
1853 .as_any()
1854 .downcast_ref::<StringArray>()
1855 .unwrap();
1856
1857 assert_eq!(srcs.value(0), "Alice");
1858 assert_eq!(dsts.value(0), "Bob");
1859 }
1860
1861 #[tokio::test]
1862 async fn test_load_manifest_version_advances() {
1863 let dir = tempfile::tempdir().unwrap();
1864 let uri = dir.path().to_str().unwrap();
1865 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1866 let v1 = db.version().await;
1867
1868 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1869 .await
1870 .unwrap();
1871
1872 assert!(db.version().await > v1);
1873 }
1874
1875 #[tokio::test]
1876 async fn test_load_append_adds_rows() {
1877 let dir = tempfile::tempdir().unwrap();
1878 let uri = dir.path().to_str().unwrap();
1879 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1880
1881 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1882 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1883
1884 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1885 .await
1886 .unwrap();
1887 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1888
1889 let snap = db.snapshot().await;
1890 let person_ds = snap.open("node:Person").await.unwrap();
1891 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1892 }
1893
1894 #[tokio::test]
1895 async fn test_load_unknown_type_rejected() {
1896 let dir = tempfile::tempdir().unwrap();
1897 let uri = dir.path().to_str().unwrap();
1898 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1899
1900 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1901 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1902 assert!(result.is_err());
1903 }
1904
1905 #[tokio::test]
1906 #[allow(deprecated)]
1907 async fn test_ingest_creates_branch_and_reports_tables() {
1908 let dir = tempfile::tempdir().unwrap();
1909 let uri = dir.path().to_str().unwrap();
1910 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1911
1912 let result = db
1913 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1914 .await
1915 .unwrap();
1916
1917 assert_eq!(result.branch, "feature");
1918 assert_eq!(result.base_branch, "main");
1919 assert!(result.branch_created);
1920 assert_eq!(result.mode, LoadMode::Overwrite);
1921 assert_eq!(
1922 result.tables,
1923 vec![
1924 IngestTableResult {
1925 table_key: "edge:Knows".to_string(),
1926 rows_loaded: 1
1927 },
1928 IngestTableResult {
1929 table_key: "edge:WorksAt".to_string(),
1930 rows_loaded: 1
1931 },
1932 IngestTableResult {
1933 table_key: "node:Company".to_string(),
1934 rows_loaded: 1
1935 },
1936 IngestTableResult {
1937 table_key: "node:Person".to_string(),
1938 rows_loaded: 2
1939 },
1940 ]
1941 );
1942 assert!(
1943 db.branch_list()
1944 .await
1945 .unwrap()
1946 .contains(&"feature".to_string())
1947 );
1948 }
1949
1950 #[tokio::test]
1951 #[allow(deprecated)]
1952 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1953 let dir = tempfile::tempdir().unwrap();
1954 let uri = dir.path().to_str().unwrap();
1955 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1956 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1957 .await
1958 .unwrap();
1959 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1960 .await
1961 .unwrap();
1962
1963 let result = db
1964 .ingest(
1965 "feature",
1966 Some("missing-base"),
1967 r#"{"type":"Person","data":{"name":"Bob","age":26}}
1968{"type":"Person","data":{"name":"Eve","age":31}}"#,
1969 LoadMode::Merge,
1970 )
1971 .await
1972 .unwrap();
1973
1974 assert_eq!(result.branch, "feature");
1975 assert_eq!(result.base_branch, "missing-base");
1976 assert!(!result.branch_created);
1977 assert_eq!(result.mode, LoadMode::Merge);
1978 assert_eq!(
1979 result.tables,
1980 vec![IngestTableResult {
1981 table_key: "node:Person".to_string(),
1982 rows_loaded: 2
1983 }]
1984 );
1985
1986 let snap = db
1987 .snapshot_of(crate::db::ReadTarget::branch("feature"))
1988 .await
1989 .unwrap();
1990 let person_ds = snap.open("node:Person").await.unwrap();
1991 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
1992
1993 let batches: Vec<RecordBatch> = person_ds
1994 .scan()
1995 .try_into_stream()
1996 .await
1997 .unwrap()
1998 .try_collect()
1999 .await
2000 .unwrap();
2001 let mut ages_by_id = HashMap::new();
2002 for batch in &batches {
2003 let ids = batch
2004 .column_by_name("id")
2005 .unwrap()
2006 .as_any()
2007 .downcast_ref::<StringArray>()
2008 .unwrap();
2009 let ages = batch
2010 .column_by_name("age")
2011 .unwrap()
2012 .as_any()
2013 .downcast_ref::<Int32Array>()
2014 .unwrap();
2015 for idx in 0..ids.len() {
2016 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2017 }
2018 }
2019
2020 assert_eq!(ages_by_id.get("Bob"), Some(&26));
2021 assert_eq!(ages_by_id.get("Eve"), Some(&31));
2022 assert_eq!(ages_by_id.get("Alice"), Some(&30));
2023 }
2024
2025 #[tokio::test]
2026 #[allow(deprecated)]
2027 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2028 let dir = tempfile::tempdir().unwrap();
2029 let uri = dir.path().to_str().unwrap();
2030 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2031
2032 db.ingest_as(
2033 "feature",
2034 Some("main"),
2035 TEST_DATA,
2036 LoadMode::Overwrite,
2037 Some("act-andrew"),
2038 )
2039 .await
2040 .unwrap();
2041
2042 let head = db
2043 .list_commits(Some("feature"))
2044 .await
2045 .unwrap()
2046 .into_iter()
2047 .last()
2048 .unwrap();
2049 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2050 }
2051
2052 #[tokio::test]
2053 async fn test_load_as_with_base_forks_missing_branch_and_stamps_metadata() {
2054 let dir = tempfile::tempdir().unwrap();
2055 let uri = dir.path().to_str().unwrap();
2056 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2057
2058 let result = db
2059 .load_as("feature", Some("main"), TEST_DATA, LoadMode::Merge, None)
2060 .await
2061 .unwrap();
2062
2063 assert_eq!(result.branch, "feature");
2064 assert_eq!(result.base_branch.as_deref(), Some("main"));
2065 assert!(result.branch_created);
2066 assert!(
2067 db.branch_list()
2068 .await
2069 .unwrap()
2070 .contains(&"feature".to_string())
2071 );
2072
2073 let again = db
2076 .load_as(
2077 "feature",
2078 Some("main"),
2079 r#"{"type":"Person","data":{"name":"Bob","age":26}}"#,
2080 LoadMode::Merge,
2081 None,
2082 )
2083 .await
2084 .unwrap();
2085 assert!(!again.branch_created);
2086 assert_eq!(again.base_branch.as_deref(), Some("main"));
2087 }
2088
2089 #[tokio::test]
2090 async fn test_load_as_without_base_errors_on_missing_branch() {
2091 let dir = tempfile::tempdir().unwrap();
2092 let uri = dir.path().to_str().unwrap();
2093 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2094
2095 let result = db
2096 .load_as("nonexistent", None, TEST_DATA, LoadMode::Merge, None)
2097 .await;
2098 assert!(result.is_err(), "load without base must not create branches");
2099 assert!(
2100 !db.branch_list()
2101 .await
2102 .unwrap()
2103 .contains(&"nonexistent".to_string()),
2104 "failed load must not leave a branch behind"
2105 );
2106
2107 let main_load = db.load("main", TEST_DATA, LoadMode::Overwrite).await.unwrap();
2109 assert_eq!(main_load.branch, "main");
2110 assert_eq!(main_load.base_branch, None);
2111 assert!(!main_load.branch_created);
2112 }
2113
2114 #[test]
2115 fn test_range_constraint_rejects_nan() {
2116 use arrow_array::{Float64Array, RecordBatch, StringArray};
2117 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2118 use std::sync::Arc;
2119
2120 let schema = Arc::new(arrow_schema::Schema::new(vec![
2121 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2122 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2123 ]));
2124
2125 let batch = RecordBatch::try_new(
2126 schema.clone(),
2127 vec![
2128 Arc::new(StringArray::from(vec!["bad"])),
2129 Arc::new(Float64Array::from(vec![f64::NAN])),
2130 ],
2131 )
2132 .unwrap();
2133
2134 let node_type = NodeType {
2135 name: "Test".to_string(),
2136 implements: vec![],
2137 properties: Default::default(),
2138 key: None,
2139 unique_constraints: vec![],
2140 indices: vec![],
2141 range_constraints: vec![RangeConstraint {
2142 property: "score".to_string(),
2143 min: Some(LiteralValue::Float(0.0)),
2144 max: Some(LiteralValue::Float(1.0)),
2145 }],
2146 check_constraints: vec![],
2147 embed_sources: Default::default(),
2148 blob_properties: Default::default(),
2149 arrow_schema: schema,
2150 };
2151
2152 let result = validate_value_constraints(&batch, &node_type);
2153 assert!(result.is_err(), "expected NaN to be rejected");
2154 let err = result.unwrap_err().to_string();
2155 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2156 }
2157
2158 #[test]
2159 fn composite_unique_key_builds_tuple_and_exempts_null() {
2160 let a: ArrayRef = Arc::new(StringArray::from(vec![Some("x|y"), Some("x"), None]));
2161 let b: ArrayRef = Arc::new(StringArray::from(vec![Some("z"), Some("y|z"), Some("q")]));
2162 let cols = [a, b];
2163
2164 assert_eq!(
2168 composite_unique_key(&cols, 0).unwrap(),
2169 Some(vec!["x|y".to_string(), "z".to_string()])
2170 );
2171 assert_eq!(
2172 composite_unique_key(&cols, 1).unwrap(),
2173 Some(vec!["x".to_string(), "y|z".to_string()])
2174 );
2175 assert_ne!(
2176 composite_unique_key(&cols, 0).unwrap(),
2177 composite_unique_key(&cols, 1).unwrap()
2178 );
2179
2180 assert_eq!(composite_unique_key(&cols, 2).unwrap(), None);
2182 }
2183
2184 #[test]
2185 fn unique_key_scalar_errors_loudly_on_unkeyable_type() {
2186 use arrow_array::LargeBinaryArray;
2187 let blob: ArrayRef = Arc::new(LargeBinaryArray::from(vec![Some(&b"abc"[..])]));
2192 let err = unique_key_scalar(&blob, 0).unwrap_err();
2193 assert!(
2194 err.to_string().contains("unsupported column type"),
2195 "un-keyable type must fail loudly (got: {err})"
2196 );
2197 }
2198
2199 #[test]
2200 fn unique_key_scalar_handles_all_string_encodings() {
2201 use arrow_array::{LargeStringArray, StringViewArray};
2202 let utf8: ArrayRef = Arc::new(StringArray::from(vec![Some("v")]));
2208 let large: ArrayRef = Arc::new(LargeStringArray::from(vec![Some("v")]));
2209 let view: ArrayRef = Arc::new(StringViewArray::from(vec![Some("v")]));
2210 for array in [&utf8, &large, &view] {
2211 assert_eq!(
2212 unique_key_scalar(array, 0).unwrap(),
2213 Some("v".to_string()),
2214 "string array {:?} must render, not error",
2215 array.data_type()
2216 );
2217 }
2218 }
2219}