1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29 pub nodes_loaded: HashMap<String, usize>,
30 pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35 pub table_key: String,
36 pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41 pub branch: String,
42 pub base_branch: String,
43 pub branch_created: bool,
44 pub mode: LoadMode,
45 pub tables: Vec<IngestTableResult>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52 Overwrite,
54 Append,
56 Merge,
58}
59
60pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62 let current_branch = db.active_branch().await;
63 let branch = current_branch.as_deref().unwrap_or("main");
64 db.load(branch, data, mode).await
65}
66
67pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69 let current_branch = db.active_branch().await;
70 let branch = current_branch.as_deref().unwrap_or("main");
71 db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75 pub async fn ingest(
76 &self,
77 branch: &str,
78 from: Option<&str>,
79 data: &str,
80 mode: LoadMode,
81 ) -> Result<IngestResult> {
82 self.ingest_as(branch, from, data, mode, None).await
83 }
84
85 pub async fn ingest_as(
86 &self,
87 branch: &str,
88 from: Option<&str>,
89 data: &str,
90 mode: LoadMode,
91 actor_id: Option<&str>,
92 ) -> Result<IngestResult> {
93 self.enforce(
101 omnigraph_policy::PolicyAction::Change,
102 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
103 actor_id,
104 )?;
105 self.ingest_with_current_actor(branch, from, data, mode, actor_id)
106 .await
107 }
108
109 pub async fn ingest_file(
110 &self,
111 branch: &str,
112 from: Option<&str>,
113 path: &str,
114 mode: LoadMode,
115 ) -> Result<IngestResult> {
116 self.ingest_file_as(branch, from, path, mode, None).await
117 }
118
119 pub async fn ingest_file_as(
120 &self,
121 branch: &str,
122 from: Option<&str>,
123 path: &str,
124 mode: LoadMode,
125 actor_id: Option<&str>,
126 ) -> Result<IngestResult> {
127 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
128 self.ingest_as(branch, from, &data, mode, actor_id).await
129 }
130
131 async fn ingest_with_current_actor(
132 &self,
133 branch: &str,
134 from: Option<&str>,
135 data: &str,
136 mode: LoadMode,
137 actor_id: Option<&str>,
138 ) -> Result<IngestResult> {
139 self.ensure_schema_state_valid().await?;
140 let target_branch =
141 Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
142 let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
143 .unwrap_or_else(|| "main".to_string());
144 let branch_created = !self
145 .branch_list()
146 .await?
147 .iter()
148 .any(|name| name == &target_branch);
149 if branch_created {
150 self.branch_create_from_as(
157 crate::db::ReadTarget::branch(&base_branch),
158 &target_branch,
159 actor_id,
160 )
161 .await?;
162 }
163
164 let result = self.load_as(&target_branch, data, mode, actor_id).await?;
165 Ok(IngestResult {
166 branch: target_branch,
167 base_branch,
168 branch_created,
169 mode,
170 tables: result.to_ingest_tables(),
171 })
172 }
173
174 pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
175 self.load_as(branch, data, mode, None).await
176 }
177
178 pub async fn load_as(
179 &self,
180 branch: &str,
181 data: &str,
182 mode: LoadMode,
183 actor_id: Option<&str>,
184 ) -> Result<LoadResult> {
185 self.enforce(
192 omnigraph_policy::PolicyAction::Change,
193 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
194 actor_id,
195 )?;
196 self.ensure_schema_state_valid().await?;
197 crate::db::ensure_public_branch_ref(branch, "load")?;
202 let requested = Self::normalize_branch_name(branch)?;
208 self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
212 .await
213 }
214
215 pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
216 self.load_file_as(branch, path, mode, None).await
217 }
218
219 pub async fn load_file_as(
223 &self,
224 branch: &str,
225 path: &str,
226 mode: LoadMode,
227 actor_id: Option<&str>,
228 ) -> Result<LoadResult> {
229 let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
230 self.load_as(branch, &data, mode, actor_id).await
231 }
232
233 async fn load_direct_on_branch(
234 &self,
235 branch: Option<&str>,
236 data: &str,
237 mode: LoadMode,
238 actor_id: Option<&str>,
239 ) -> Result<LoadResult> {
240 let reader = BufReader::new(Cursor::new(data.as_bytes()));
241 load_jsonl_reader(self, branch, reader, mode, actor_id).await
242 }
243}
244
245impl LoadMode {
246 pub fn as_str(self) -> &'static str {
247 match self {
248 LoadMode::Overwrite => "overwrite",
249 LoadMode::Append => "append",
250 LoadMode::Merge => "merge",
251 }
252 }
253}
254
255impl LoadResult {
256 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
257 let mut tables = self
258 .nodes_loaded
259 .iter()
260 .map(|(type_name, rows_loaded)| IngestTableResult {
261 table_key: format!("node:{type_name}"),
262 rows_loaded: *rows_loaded,
263 })
264 .chain(
265 self.edges_loaded
266 .iter()
267 .map(|(edge_name, rows_loaded)| IngestTableResult {
268 table_key: format!("edge:{edge_name}"),
269 rows_loaded: *rows_loaded,
270 }),
271 )
272 .collect::<Vec<_>>();
273 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
274 tables
275 }
276}
277
278async fn load_jsonl_reader<R: BufRead>(
279 db: &Omnigraph,
280 branch: Option<&str>,
281 reader: R,
282 mode: LoadMode,
283 actor_id: Option<&str>,
284) -> Result<LoadResult> {
285 let catalog = db.catalog().clone();
286
287 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
289 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
290
291 for (idx, parsed) in serde_json::Deserializer::from_reader(reader)
296 .into_iter::<JsonValue>()
297 .enumerate()
298 {
299 let record_num = idx + 1;
300 let value: JsonValue = parsed.map_err(|e| {
301 OmniError::manifest(format!("invalid JSON at record {}: {}", record_num, e))
302 })?;
303
304 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
305 if !catalog.node_types.contains_key(type_name) {
306 return Err(OmniError::manifest(format!(
307 "record {}: unknown node type '{}'",
308 record_num,
309 type_name
310 )));
311 }
312 let data = value
313 .get("data")
314 .cloned()
315 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
316 node_rows
317 .entry(type_name.to_string())
318 .or_default()
319 .push(data);
320 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
321 if catalog.lookup_edge_by_name(edge_name).is_none() {
322 return Err(OmniError::manifest(format!(
323 "record {}: unknown edge type '{}'",
324 record_num,
325 edge_name
326 )));
327 }
328 let from = value
329 .get("from")
330 .and_then(|v| v.as_str())
331 .ok_or_else(|| {
332 OmniError::manifest(format!("record {}: edge missing 'from'", record_num))
333 })?
334 .to_string();
335 let to = value
336 .get("to")
337 .and_then(|v| v.as_str())
338 .ok_or_else(|| {
339 OmniError::manifest(format!("record {}: edge missing 'to'", record_num))
340 })?
341 .to_string();
342 let data = value
343 .get("data")
344 .cloned()
345 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
346 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
347 edge_rows
348 .entry(canonical)
349 .or_default()
350 .push((from, to, data));
351 } else {
352 return Err(OmniError::manifest(format!(
353 "record {}: expected 'type' or 'edge' field",
354 record_num
355 )));
356 }
357 }
358
359 let mut result = LoadResult::default();
369 let snapshot = db.snapshot_for_branch(branch).await?;
370 let use_staging = !matches!(mode, LoadMode::Overwrite);
371 let mut staging = MutationStaging::default();
372 let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
373 let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
374 let pending_mode = match mode {
375 LoadMode::Merge => PendingMode::Merge,
376 LoadMode::Append => PendingMode::Append,
380 LoadMode::Overwrite => PendingMode::Append, };
382 let load_op_kind = match mode {
388 LoadMode::Append => crate::db::MutationOpKind::Insert,
389 LoadMode::Merge => crate::db::MutationOpKind::Merge,
390 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
391 };
392
393 let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
396 Vec::with_capacity(node_rows.len());
397 for (type_name, rows) in &node_rows {
398 let node_type = &catalog.node_types[type_name];
399 let batch = build_node_batch(node_type, rows)?;
400 validate_value_constraints(&batch, node_type)?;
401 validate_enum_constraints(&batch, &node_type.properties, type_name)?;
402 let unique_props = unique_property_names_for_node(node_type);
403 if !unique_props.is_empty() {
404 enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
405 }
406 let loaded_count = batch.num_rows();
407 let table_key = format!("node:{}", type_name);
408 let entry = snapshot
409 .entry(&table_key)
410 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
411 if !use_staging {
412 overwrite_expected.insert(table_key.clone(), entry.table_version);
413 }
414 prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
415 }
416
417 if use_staging {
420 for (type_name, table_key, batch, loaded_count) in prepared_nodes {
421 let (ds, full_path, table_branch) = db
422 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
423 .await?;
424 let expected_version = ds.version().version;
425 staging.ensure_path(
426 &table_key,
427 full_path,
428 table_branch,
429 expected_version,
430 load_op_kind,
431 );
432 let schema = batch.schema();
433 staging.append_batch(&table_key, schema, pending_mode, batch)?;
434 result.nodes_loaded.insert(type_name, loaded_count);
435 }
436 } else {
437 let node_write_results =
438 write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
439 for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
440 overwrite_updates.push(crate::db::SubTableUpdate {
441 table_key,
442 table_version: state.version,
443 table_branch,
444 row_count: state.row_count,
445 version_metadata: state.version_metadata,
446 });
447 result.nodes_loaded.insert(type_name, loaded_count);
448 }
449 }
450
451 for (edge_name, rows) in &edge_rows {
456 let edge_type = &catalog.edge_types[edge_name];
457 let from_ids = if use_staging {
458 collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?
459 } else {
460 collect_node_ids(
461 db,
462 branch,
463 &edge_type.from_type,
464 &node_rows,
465 &catalog,
466 &overwrite_updates,
467 )
468 .await?
469 };
470 let to_ids = if use_staging {
471 collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?
472 } else {
473 collect_node_ids(
474 db,
475 branch,
476 &edge_type.to_type,
477 &node_rows,
478 &catalog,
479 &overwrite_updates,
480 )
481 .await?
482 };
483
484 for (i, (src, dst, _)) in rows.iter().enumerate() {
485 if !from_ids.contains(src.as_str()) {
486 return Err(OmniError::manifest(format!(
487 "edge {} row {}: src '{}' not found in {}",
488 edge_name,
489 i + 1,
490 src,
491 edge_type.from_type
492 )));
493 }
494 if !to_ids.contains(dst.as_str()) {
495 return Err(OmniError::manifest(format!(
496 "edge {} row {}: dst '{}' not found in {}",
497 edge_name,
498 i + 1,
499 dst,
500 edge_type.to_type
501 )));
502 }
503 }
504 }
505
506 let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
508 Vec::with_capacity(edge_rows.len());
509 for (edge_name, rows) in &edge_rows {
510 let edge_type = &catalog.edge_types[edge_name];
511 let batch = build_edge_batch(edge_type, rows)?;
512 validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
513 let unique_props = unique_property_names_for_edge(edge_type);
514 if !unique_props.is_empty() {
515 enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
516 }
517 let loaded_count = batch.num_rows();
518 let table_key = format!("edge:{}", edge_name);
519 let entry = snapshot
520 .entry(&table_key)
521 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
522 if !use_staging {
523 overwrite_expected.insert(table_key.clone(), entry.table_version);
524 }
525 prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
526 }
527
528 if use_staging {
530 for (edge_name, table_key, batch, loaded_count) in prepared_edges {
531 let (ds, full_path, table_branch) = db
532 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
533 .await?;
534 let expected_version = ds.version().version;
535 staging.ensure_path(
536 &table_key,
537 full_path,
538 table_branch,
539 expected_version,
540 load_op_kind,
541 );
542 let schema = batch.schema();
543 staging.append_batch(&table_key, schema, pending_mode, batch)?;
544 result.edges_loaded.insert(edge_name, loaded_count);
545 }
546 } else {
547 let edge_write_results =
548 write_batches_concurrently(db, branch, mode, prepared_edges).await?;
549 for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
550 overwrite_updates.push(crate::db::SubTableUpdate {
551 table_key,
552 table_version: state.version,
553 table_branch,
554 row_count: state.row_count,
555 version_metadata: state.version_metadata,
556 });
557 result.edges_loaded.insert(edge_name, loaded_count);
558 }
559 }
560
561 for (edge_name, _) in &edge_rows {
566 let edge_type = &catalog.edge_types[edge_name];
567 let table_key = format!("edge:{}", edge_name);
568 if use_staging {
569 validate_edge_cardinality_with_pending_loader(
570 db, branch, edge_type, &table_key, &staging, mode,
571 )
572 .await?;
573 } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
574 validate_edge_cardinality(
575 db,
576 branch,
577 edge_name,
578 update.table_version,
579 update.table_branch.as_deref(),
580 )
581 .await?;
582 }
583 }
584
585 if use_staging {
587 let staged = staging.stage_all(db, branch).await?;
588 let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
592 .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
593 .await?;
594 crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
599 db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
600 .await?;
601 if let Some(handle) = sidecar_handle {
606 if let Err(err) =
607 crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
608 {
609 tracing::warn!(
610 error = %err,
611 operation_id = handle.operation_id.as_str(),
612 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
613 );
614 }
615 }
616 } else {
617 db.commit_updates_on_branch_with_expected(
625 branch,
626 &overwrite_updates,
627 &overwrite_expected,
628 actor_id,
629 )
630 .await?;
631 }
632
633 Ok(result)
634}
635
636fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
637 let schema = node_type.arrow_schema.clone();
638
639 let ids: Vec<String> = rows
641 .iter()
642 .map(|row| {
643 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
644 if let Some(key_prop) = node_type.key_property() {
645 let key_value = row
646 .get(key_prop)
647 .and_then(|v| v.as_str())
648 .map(|s| s.to_string())
649 .ok_or_else(|| {
650 OmniError::manifest(format!(
651 "node {} missing @key property '{}'",
652 node_type.name, key_prop
653 ))
654 })?;
655 if let Some(explicit_id) = explicit_id {
656 if explicit_id != key_value {
657 return Err(OmniError::manifest(format!(
658 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
659 node_type.name, explicit_id, key_prop, key_value
660 )));
661 }
662 }
663 Ok(key_value)
664 } else if let Some(explicit_id) = explicit_id {
665 Ok(explicit_id)
666 } else {
667 Ok(generate_id())
668 }
669 })
670 .collect::<Result<Vec<_>>>()?;
671
672 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
673 columns.push(Arc::new(StringArray::from(ids)));
674
675 for field in schema.fields().iter().skip(1) {
677 if node_type.blob_properties.contains(field.name()) {
678 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
679 columns.push(col);
680 } else {
681 let col =
682 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
683 columns.push(col);
684 }
685 }
686
687 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
688}
689
690fn build_edge_batch(
691 edge_type: &omnigraph_compiler::catalog::EdgeType,
692 rows: &[(String, String, JsonValue)],
693) -> Result<RecordBatch> {
694 let schema = edge_type.arrow_schema.clone();
695
696 let ids: Vec<String> = rows
697 .iter()
698 .map(|(_, _, data)| {
699 data.get("id")
700 .and_then(|v| v.as_str())
701 .map(str::to_string)
702 .unwrap_or_else(generate_id)
703 })
704 .collect();
705 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
706 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
707
708 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
709 columns.push(Arc::new(StringArray::from(ids)));
710 columns.push(Arc::new(StringArray::from(srcs)));
711 columns.push(Arc::new(StringArray::from(dsts)));
712
713 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
715 for field in schema.fields().iter().skip(3) {
716 if edge_type.blob_properties.contains(field.name()) {
717 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
718 columns.push(col);
719 } else {
720 let col = build_column_from_json(
721 field.name(),
722 field.data_type(),
723 field.is_nullable(),
724 &data_values,
725 )?;
726 columns.push(col);
727 }
728 }
729
730 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
731}
732
733pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
735 if let Some(encoded) = value.strip_prefix("base64:") {
736 let bytes = base64::engine::general_purpose::STANDARD
737 .decode(encoded)
738 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
739 builder
740 .push_bytes(bytes)
741 .map_err(|e| OmniError::Lance(e.to_string()))
742 } else {
743 builder
745 .push_uri(value)
746 .map_err(|e| OmniError::Lance(e.to_string()))
747 }
748}
749
750fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
752 let mut builder = BlobArrayBuilder::new(rows.len());
753 for row in rows {
754 match row.get(name) {
755 Some(JsonValue::String(s)) => {
756 append_blob_value(&mut builder, s)?;
757 }
758 Some(JsonValue::Null) | None if nullable => {
759 builder
760 .push_null()
761 .map_err(|e| OmniError::Lance(e.to_string()))?;
762 }
763 Some(JsonValue::Null) | None => {
764 return Err(OmniError::manifest(format!(
765 "non-nullable blob property '{}' has null values",
766 name
767 )));
768 }
769 _ => {
770 return Err(OmniError::manifest(format!(
771 "blob property '{}' must be a URI string or base64: prefixed data",
772 name
773 )));
774 }
775 }
776 }
777 builder
778 .finish()
779 .map_err(|e| OmniError::Lance(e.to_string()))
780}
781
782fn build_column_from_json(
783 name: &str,
784 data_type: &DataType,
785 nullable: bool,
786 rows: &[JsonValue],
787) -> Result<ArrayRef> {
788 let array: ArrayRef = match data_type {
789 DataType::Utf8 => {
790 let values: Vec<Option<String>> = rows
791 .iter()
792 .map(|row| {
793 row.get(name)
794 .and_then(|v| v.as_str())
795 .map(|s| s.to_string())
796 })
797 .collect();
798 Arc::new(StringArray::from(values))
799 }
800 DataType::Int32 => {
801 let values: Vec<Option<i32>> = rows
802 .iter()
803 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
804 .collect();
805 Arc::new(Int32Array::from(values))
806 }
807 DataType::Int64 => {
808 let values: Vec<Option<i64>> = rows
809 .iter()
810 .map(|row| row.get(name).and_then(|v| v.as_i64()))
811 .collect();
812 Arc::new(Int64Array::from(values))
813 }
814 DataType::UInt32 => {
815 let values: Vec<Option<u32>> = rows
816 .iter()
817 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
818 .collect();
819 Arc::new(UInt32Array::from(values))
820 }
821 DataType::UInt64 => {
822 let values: Vec<Option<u64>> = rows
823 .iter()
824 .map(|row| row.get(name).and_then(|v| v.as_u64()))
825 .collect();
826 Arc::new(UInt64Array::from(values))
827 }
828 DataType::Float32 => {
829 let values: Vec<Option<f32>> = rows
830 .iter()
831 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
832 .collect();
833 Arc::new(Float32Array::from(values))
834 }
835 DataType::Float64 => {
836 let values: Vec<Option<f64>> = rows
837 .iter()
838 .map(|row| row.get(name).and_then(|v| v.as_f64()))
839 .collect();
840 Arc::new(Float64Array::from(values))
841 }
842 DataType::Boolean => {
843 let values: Vec<Option<bool>> = rows
844 .iter()
845 .map(|row| row.get(name).and_then(|v| v.as_bool()))
846 .collect();
847 Arc::new(BooleanArray::from(values))
848 }
849 DataType::Date32 => {
850 let mut values = Vec::with_capacity(rows.len());
851 for row in rows {
852 values.push(parse_date32_json_value(
853 row.get(name).unwrap_or(&JsonValue::Null),
854 )?);
855 }
856 Arc::new(Date32Array::from(values))
857 }
858 DataType::Date64 => {
859 let mut values = Vec::with_capacity(rows.len());
860 for row in rows {
861 values.push(parse_date64_json_value(
862 row.get(name).unwrap_or(&JsonValue::Null),
863 )?);
864 }
865 Arc::new(Date64Array::from(values))
866 }
867 DataType::List(field) => {
868 let mut builder = ListBuilder::with_capacity(
869 make_list_value_builder(field.data_type(), rows.len())?,
870 rows.len(),
871 )
872 .with_field(field.clone());
873 for row in rows {
874 let value = row.get(name).unwrap_or(&JsonValue::Null);
875 if value.is_null() {
876 builder.append(false);
877 continue;
878 }
879 let items = value.as_array().ok_or_else(|| {
880 OmniError::manifest(format!(
881 "list property '{}' expects a JSON array, got {}",
882 name, value
883 ))
884 })?;
885 for item in items {
886 append_json_list_item(builder.values(), field.data_type(), item)?;
887 }
888 builder.append(true);
889 }
890 Arc::new(builder.finish())
891 }
892 DataType::FixedSizeList(child_field, dim) => {
893 let dim = *dim;
895 let mut builder = FixedSizeListBuilder::with_capacity(
896 Float32Builder::with_capacity(rows.len() * dim as usize),
897 dim,
898 rows.len(),
899 )
900 .with_field(child_field.clone());
901 for row in rows {
902 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
903 if arr.len() != dim as usize {
904 return Err(OmniError::manifest(format!(
905 "vector property '{}' expects {} dimensions, got {}",
906 name,
907 dim,
908 arr.len()
909 )));
910 }
911 for val in arr {
912 builder
913 .values()
914 .append_value(val.as_f64().unwrap_or(0.0) as f32);
915 }
916 builder.append(true);
917 } else if nullable {
918 for _ in 0..dim as usize {
919 builder.values().append_null();
920 }
921 builder.append(false);
922 } else {
923 return Err(OmniError::manifest(format!(
924 "non-nullable vector property '{}' has null values",
925 name
926 )));
927 }
928 }
929 Arc::new(builder.finish())
930 }
931 _ => {
932 let values: Vec<Option<&str>> = vec![None; rows.len()];
934 Arc::new(StringArray::from(values))
935 }
936 };
937
938 if !nullable && array.null_count() > 0 {
939 return Err(OmniError::manifest(format!(
940 "non-nullable property '{}' has null or invalid values",
941 name
942 )));
943 }
944
945 Ok(array)
946}
947
948fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
949 Ok(match data_type {
950 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
951 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
952 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
953 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
954 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
955 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
956 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
957 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
958 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
959 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
960 other => {
961 return Err(OmniError::manifest(format!(
962 "unsupported list element data type {:?}",
963 other
964 )));
965 }
966 })
967}
968
969fn append_json_list_item(
970 builder: &mut Box<dyn ArrayBuilder>,
971 data_type: &DataType,
972 value: &JsonValue,
973) -> Result<()> {
974 match data_type {
975 DataType::Utf8 => {
976 let builder = builder
977 .as_any_mut()
978 .downcast_mut::<StringBuilder>()
979 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
980 if let Some(value) = value.as_str() {
981 builder.append_value(value);
982 } else {
983 builder.append_null();
984 }
985 }
986 DataType::Boolean => {
987 let builder = builder
988 .as_any_mut()
989 .downcast_mut::<BooleanBuilder>()
990 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
991 if let Some(value) = value.as_bool() {
992 builder.append_value(value);
993 } else {
994 builder.append_null();
995 }
996 }
997 DataType::Int32 => {
998 let builder = builder
999 .as_any_mut()
1000 .downcast_mut::<Int32Builder>()
1001 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
1002 if let Some(value) = value.as_i64() {
1003 let value = i32::try_from(value).map_err(|_| {
1004 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
1005 })?;
1006 builder.append_value(value);
1007 } else {
1008 builder.append_null();
1009 }
1010 }
1011 DataType::Int64 => {
1012 let builder = builder
1013 .as_any_mut()
1014 .downcast_mut::<Int64Builder>()
1015 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1016 if let Some(value) = value.as_i64() {
1017 builder.append_value(value);
1018 } else {
1019 builder.append_null();
1020 }
1021 }
1022 DataType::UInt32 => {
1023 let builder = builder
1024 .as_any_mut()
1025 .downcast_mut::<UInt32Builder>()
1026 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1027 if let Some(value) = value.as_u64() {
1028 let value = u32::try_from(value).map_err(|_| {
1029 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1030 })?;
1031 builder.append_value(value);
1032 } else {
1033 builder.append_null();
1034 }
1035 }
1036 DataType::UInt64 => {
1037 let builder = builder
1038 .as_any_mut()
1039 .downcast_mut::<UInt64Builder>()
1040 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1041 if let Some(value) = value.as_u64() {
1042 builder.append_value(value);
1043 } else {
1044 builder.append_null();
1045 }
1046 }
1047 DataType::Float32 => {
1048 let builder = builder
1049 .as_any_mut()
1050 .downcast_mut::<Float32Builder>()
1051 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1052 if let Some(value) = value.as_f64() {
1053 builder.append_value(value as f32);
1054 } else {
1055 builder.append_null();
1056 }
1057 }
1058 DataType::Float64 => {
1059 let builder = builder
1060 .as_any_mut()
1061 .downcast_mut::<Float64Builder>()
1062 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1063 if let Some(value) = value.as_f64() {
1064 builder.append_value(value);
1065 } else {
1066 builder.append_null();
1067 }
1068 }
1069 DataType::Date32 => {
1070 let builder = builder
1071 .as_any_mut()
1072 .downcast_mut::<Date32Builder>()
1073 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1074 if let Some(value) = parse_date32_json_value(value)? {
1075 builder.append_value(value);
1076 } else {
1077 builder.append_null();
1078 }
1079 }
1080 DataType::Date64 => {
1081 let builder = builder
1082 .as_any_mut()
1083 .downcast_mut::<Date64Builder>()
1084 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1085 if let Some(value) = parse_date64_json_value(value)? {
1086 builder.append_value(value);
1087 } else {
1088 builder.append_null();
1089 }
1090 }
1091 other => {
1092 return Err(OmniError::manifest(format!(
1093 "unsupported list element data type {:?}",
1094 other
1095 )));
1096 }
1097 }
1098
1099 Ok(())
1100}
1101
1102fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1103 if value.is_null() {
1104 return Ok(None);
1105 }
1106 if let Some(days) = value.as_i64() {
1107 let days = i32::try_from(days)
1108 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1109 return Ok(Some(days));
1110 }
1111 if let Some(days) = value.as_u64() {
1112 let days = i32::try_from(days)
1113 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1114 return Ok(Some(days));
1115 }
1116 if let Some(value) = value.as_str() {
1117 return Ok(Some(parse_date32_literal(value)?));
1118 }
1119 Ok(None)
1120}
1121
1122fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1123 if value.is_null() {
1124 return Ok(None);
1125 }
1126 if let Some(ms) = value.as_i64() {
1127 return Ok(Some(ms));
1128 }
1129 if let Some(ms) = value.as_u64() {
1130 let ms = i64::try_from(ms)
1131 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1132 return Ok(Some(ms));
1133 }
1134 if let Some(value) = value.as_str() {
1135 return Ok(Some(parse_date64_literal(value)?));
1136 }
1137 Ok(None)
1138}
1139
1140const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1151
1152fn load_write_concurrency() -> usize {
1153 std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1154 .ok()
1155 .and_then(|v| v.parse::<usize>().ok())
1156 .filter(|v| *v > 0)
1157 .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1158}
1159
1160async fn write_batches_concurrently(
1164 db: &Omnigraph,
1165 branch: Option<&str>,
1166 mode: LoadMode,
1167 prepared: Vec<(String, String, RecordBatch, usize)>,
1168) -> Result<
1169 Vec<(
1170 String,
1171 String,
1172 usize,
1173 crate::table_store::TableState,
1174 Option<String>,
1175 )>,
1176> {
1177 use futures::stream::StreamExt;
1178
1179 if prepared.is_empty() {
1180 return Ok(Vec::new());
1181 }
1182
1183 let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1184
1185 futures::stream::iter(prepared.into_iter().map(
1186 |(type_name, table_key, batch, loaded_count)| async move {
1187 let (state, table_branch) =
1188 write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1189 Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1190 },
1191 ))
1192 .buffered(concurrency)
1193 .collect::<Vec<Result<_>>>()
1194 .await
1195 .into_iter()
1196 .collect()
1197}
1198
1199async fn write_batch_to_dataset(
1200 db: &Omnigraph,
1201 branch: Option<&str>,
1202 table_key: &str,
1203 batch: RecordBatch,
1204 mode: LoadMode,
1205) -> Result<(crate::table_store::TableState, Option<String>)> {
1206 let op_kind = match mode {
1207 LoadMode::Append => crate::db::MutationOpKind::Insert,
1208 LoadMode::Merge => crate::db::MutationOpKind::Merge,
1209 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
1210 };
1211 let (mut ds, full_path, table_branch) = db
1212 .open_for_mutation_on_branch(branch, table_key, op_kind)
1213 .await?;
1214 let table_store = db.table_store();
1215
1216 match mode {
1217 LoadMode::Overwrite => {
1218 let state = table_store
1219 .overwrite_batch(&full_path, &mut ds, batch)
1220 .await?;
1221 Ok((state, table_branch))
1222 }
1223 LoadMode::Append => {
1224 let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1225 Ok((state, table_branch))
1226 }
1227 LoadMode::Merge => {
1228 let state = table_store
1229 .merge_insert_batch(
1230 &full_path,
1231 ds,
1232 batch,
1233 vec!["id".to_string()],
1234 lance::dataset::WhenMatched::UpdateAll,
1235 lance::dataset::WhenNotMatched::InsertAll,
1236 )
1237 .await?;
1238 Ok((state, table_branch))
1239 }
1240 }
1241}
1242
1243fn generate_id() -> String {
1244 ulid::Ulid::new().to_string()
1245}
1246
1247pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1248 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1249 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1250 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1251 let out = casted
1252 .as_any()
1253 .downcast_ref::<Date32Array>()
1254 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1255 if out.is_null(0) {
1256 return Err(OmniError::manifest(format!(
1257 "invalid Date literal '{}'",
1258 value
1259 )));
1260 }
1261 Ok(out.value(0))
1262}
1263
1264pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1265 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1266 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1267 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1268 let out = casted
1269 .as_any()
1270 .downcast_ref::<Date64Array>()
1271 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1272 if out.is_null(0) {
1273 return Err(OmniError::manifest(format!(
1274 "invalid DateTime literal '{}'",
1275 value
1276 )));
1277 }
1278 Ok(out.value(0))
1279}
1280
1281pub(crate) fn validate_value_constraints(
1284 batch: &RecordBatch,
1285 node_type: &omnigraph_compiler::catalog::NodeType,
1286) -> Result<()> {
1287 use arrow_array::Array;
1288
1289 for rc in &node_type.range_constraints {
1291 let Some(col) = batch.column_by_name(&rc.property) else {
1292 continue;
1293 };
1294 for row in 0..batch.num_rows() {
1295 if col.is_null(row) {
1296 continue;
1297 }
1298 let value = extract_numeric_value(col, row);
1299 if let Some(val) = value {
1300 if val.is_nan() {
1301 return Err(OmniError::manifest(format!(
1302 "@range violation on {}.{}: value is NaN",
1303 node_type.name, rc.property
1304 )));
1305 }
1306 if let Some(ref min) = rc.min {
1307 let min_f = literal_value_to_f64(min);
1308 if val < min_f {
1309 return Err(OmniError::manifest(format!(
1310 "@range violation on {}.{}: value {} < min {}",
1311 node_type.name, rc.property, val, min_f
1312 )));
1313 }
1314 }
1315 if let Some(ref max) = rc.max {
1316 let max_f = literal_value_to_f64(max);
1317 if val > max_f {
1318 return Err(OmniError::manifest(format!(
1319 "@range violation on {}.{}: value {} > max {}",
1320 node_type.name, rc.property, val, max_f
1321 )));
1322 }
1323 }
1324 }
1325 }
1326 }
1327
1328 for cc in &node_type.check_constraints {
1330 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1331 OmniError::manifest(format!(
1332 "@check on {}.{} has invalid regex '{}': {}",
1333 node_type.name, cc.property, cc.pattern, e
1334 ))
1335 })?;
1336 let Some(col) = batch.column_by_name(&cc.property) else {
1337 continue;
1338 };
1339 let str_col = col.as_any().downcast_ref::<StringArray>();
1340 if let Some(str_col) = str_col {
1341 for row in 0..str_col.len() {
1342 if str_col.is_null(row) {
1343 continue;
1344 }
1345 let val = str_col.value(row);
1346 if !re.is_match(val) {
1347 return Err(OmniError::manifest(format!(
1348 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1349 node_type.name, cc.property, val, cc.pattern
1350 )));
1351 }
1352 }
1353 }
1354 }
1355
1356 Ok(())
1357}
1358
1359pub(crate) fn validate_enum_constraints(
1366 batch: &RecordBatch,
1367 properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1368 type_name: &str,
1369) -> Result<()> {
1370 use arrow_array::{Array, ListArray};
1371
1372 for (prop_name, prop_type) in properties {
1373 let Some(allowed) = prop_type.enum_values.as_ref() else {
1374 continue;
1375 };
1376 let Some(col) = batch.column_by_name(prop_name) else {
1377 continue;
1378 };
1379 if prop_type.list {
1380 let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1381 continue;
1382 };
1383 for row in 0..list_col.len() {
1384 if list_col.is_null(row) {
1385 continue;
1386 }
1387 let item_arr = list_col.value(row);
1388 let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1389 continue;
1390 };
1391 for i in 0..str_arr.len() {
1392 if str_arr.is_null(i) {
1393 continue;
1394 }
1395 let val = str_arr.value(i);
1396 if !allowed.iter().any(|a| a.as_str() == val) {
1397 return Err(OmniError::manifest(format!(
1398 "invalid enum value '{}' for {}.{} (expected: {})",
1399 val,
1400 type_name,
1401 prop_name,
1402 allowed.join(", ")
1403 )));
1404 }
1405 }
1406 }
1407 } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1408 for row in 0..str_col.len() {
1409 if str_col.is_null(row) {
1410 continue;
1411 }
1412 let val = str_col.value(row);
1413 if !allowed.iter().any(|a| a.as_str() == val) {
1414 return Err(OmniError::manifest(format!(
1415 "invalid enum value '{}' for {}.{} (expected: {})",
1416 val,
1417 type_name,
1418 prop_name,
1419 allowed.join(", ")
1420 )));
1421 }
1422 }
1423 }
1424 }
1425 Ok(())
1426}
1427
1428pub(crate) fn enforce_unique_constraints_intra_batch(
1435 batch: &RecordBatch,
1436 type_name: &str,
1437 unique_properties: &[String],
1438) -> Result<()> {
1439 for property in unique_properties {
1440 let Some(col_idx) = batch.schema().index_of(property).ok() else {
1441 continue;
1442 };
1443 let arr = batch.column(col_idx);
1444 let mut seen: HashMap<String, usize> = HashMap::new();
1445 for row in 0..batch.num_rows() {
1446 let Some(value) = scalar_to_string(arr, row) else {
1447 continue;
1448 };
1449 if let Some(prev_row) = seen.insert(value.clone(), row) {
1450 return Err(OmniError::manifest(format!(
1451 "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1452 type_name, property, value, prev_row, row
1453 )));
1454 }
1455 }
1456 }
1457 Ok(())
1458}
1459
1460fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1464 use arrow_array::Array;
1465 if array.is_null(row) {
1466 return None;
1467 }
1468 if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1469 return Some(a.value(row).to_string());
1470 }
1471 if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1472 return Some(a.value(row).to_string());
1473 }
1474 if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1475 return Some(a.value(row).to_string());
1476 }
1477 if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1478 return Some(a.value(row).to_string());
1479 }
1480 if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1481 return Some(a.value(row).to_string());
1482 }
1483 if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1484 return Some(a.value(row).to_string());
1485 }
1486 if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1487 return Some(a.value(row).to_string());
1488 }
1489 if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1490 return Some(a.value(row).to_string());
1491 }
1492 if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1493 return Some(a.value(row).to_string());
1494 }
1495 if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1496 return Some(a.value(row).to_string());
1497 }
1498 None
1499}
1500
1501pub(crate) fn unique_property_names_for_node(
1505 node_type: &omnigraph_compiler::catalog::NodeType,
1506) -> Vec<String> {
1507 let mut props: Vec<String> = node_type
1508 .unique_constraints
1509 .iter()
1510 .flatten()
1511 .cloned()
1512 .collect();
1513 if let Some(key) = &node_type.key {
1514 props.extend(key.iter().cloned());
1515 }
1516 props.sort();
1517 props.dedup();
1518 props
1519}
1520
1521pub(crate) fn unique_property_names_for_edge(
1523 edge_type: &omnigraph_compiler::catalog::EdgeType,
1524) -> Vec<String> {
1525 let mut props: Vec<String> = edge_type
1526 .unique_constraints
1527 .iter()
1528 .flatten()
1529 .cloned()
1530 .collect();
1531 props.sort();
1532 props.dedup();
1533 props
1534}
1535
1536fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1537 use arrow_array::{
1538 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1539 };
1540 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1541 return Some(a.value(row) as f64);
1542 }
1543 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1544 return Some(a.value(row) as f64);
1545 }
1546 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1547 return Some(a.value(row) as f64);
1548 }
1549 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1550 return Some(a.value(row) as f64);
1551 }
1552 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1553 return Some(a.value(row) as f64);
1554 }
1555 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1556 return Some(a.value(row));
1557 }
1558 None
1559}
1560
1561fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1562 use omnigraph_compiler::catalog::LiteralValue;
1563 match v {
1564 LiteralValue::Integer(n) => *n as f64,
1565 LiteralValue::Float(f) => *f,
1566 }
1567}
1568
1569pub(crate) async fn validate_edge_cardinality(
1572 db: &crate::db::Omnigraph,
1573 branch: Option<&str>,
1574 edge_name: &str,
1575 written_version: u64,
1576 written_branch: Option<&str>,
1577) -> Result<()> {
1578 use arrow_array::Array;
1579 let catalog = db.catalog();
1580 let edge_type = &catalog.edge_types[edge_name];
1581 if edge_type.cardinality.is_default() {
1582 return Ok(());
1583 }
1584
1585 let snapshot = db.snapshot_for_branch(branch).await?;
1588 let table_key = format!("edge:{}", edge_name);
1589 let entry = snapshot
1590 .entry(&table_key)
1591 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1592 let ds = db
1593 .open_dataset_at_state(
1594 &entry.table_path,
1595 written_branch.or(entry.table_branch.as_deref()),
1596 written_version,
1597 )
1598 .await?;
1599
1600 let batches = db
1602 .table_store()
1603 .scan(&ds, Some(&["src"]), None, None)
1604 .await?;
1605
1606 let mut counts: HashMap<String, u32> = HashMap::new();
1607 for batch in &batches {
1608 let srcs = batch
1609 .column_by_name("src")
1610 .unwrap()
1611 .as_any()
1612 .downcast_ref::<StringArray>()
1613 .unwrap();
1614 for i in 0..srcs.len() {
1615 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1616 }
1617 }
1618
1619 let card = &edge_type.cardinality;
1620 for (src, count) in &counts {
1621 if let Some(max) = card.max {
1622 if *count > max {
1623 return Err(OmniError::manifest(format!(
1624 "@card violation on edge {}: source '{}' has {} edges (max {})",
1625 edge_name, src, count, max
1626 )));
1627 }
1628 }
1629 if *count < card.min {
1630 return Err(OmniError::manifest(format!(
1631 "@card violation on edge {}: source '{}' has {} edges (min {})",
1632 edge_name, src, count, card.min
1633 )));
1634 }
1635 }
1636
1637 Ok(())
1638}
1639
1640async fn validate_edge_cardinality_with_pending_loader(
1655 db: &Omnigraph,
1656 branch: Option<&str>,
1657 edge_type: &omnigraph_compiler::catalog::EdgeType,
1658 table_key: &str,
1659 staging: &MutationStaging,
1660 mode: LoadMode,
1661) -> Result<()> {
1662 if edge_type.cardinality.is_default() {
1663 return Ok(());
1664 }
1665 let snapshot = db.snapshot_for_branch(branch).await?;
1666 let Some(entry) = snapshot.entry(table_key) else {
1667 return Ok(());
1670 };
1671 let ds = db
1672 .open_dataset_at_state(
1673 &entry.table_path,
1674 entry.table_branch.as_deref(),
1675 entry.table_version,
1676 )
1677 .await?;
1678 let dedupe_key = match mode {
1679 LoadMode::Merge => Some("id"),
1680 LoadMode::Append | LoadMode::Overwrite => None,
1681 };
1682 let counts =
1683 crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
1684 crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1685}
1686
1687async fn collect_node_ids_with_pending(
1696 db: &Omnigraph,
1697 branch: Option<&str>,
1698 type_name: &str,
1699 staging: &MutationStaging,
1700) -> Result<HashSet<String>> {
1701 let mut ids = HashSet::new();
1702 let table_key = format!("node:{}", type_name);
1703
1704 for batch in staging.pending_batches(&table_key) {
1706 if let Some(col) = batch.column_by_name("id") {
1707 if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1708 for i in 0..arr.len() {
1709 if arr.is_valid(i) {
1710 ids.insert(arr.value(i).to_string());
1711 }
1712 }
1713 }
1714 }
1715 }
1716
1717 let snapshot = db.snapshot_for_branch(branch).await?;
1719 let Some(entry) = snapshot.entry(&table_key) else {
1720 return Ok(ids);
1721 };
1722 let ds = db
1723 .open_dataset_at_state(
1724 &entry.table_path,
1725 entry.table_branch.as_deref(),
1726 entry.table_version,
1727 )
1728 .await?;
1729
1730 let batches = db
1731 .table_store()
1732 .scan(&ds, Some(&["id"]), None, None)
1733 .await?;
1734
1735 for batch in &batches {
1736 let id_col = batch
1737 .column_by_name("id")
1738 .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1739 .as_any()
1740 .downcast_ref::<StringArray>()
1741 .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1742 for i in 0..batch.num_rows() {
1743 if id_col.is_valid(i) {
1748 ids.insert(id_col.value(i).to_string());
1749 }
1750 }
1751 }
1752
1753 Ok(ids)
1754}
1755
1756async fn collect_node_ids(
1761 db: &Omnigraph,
1762 branch: Option<&str>,
1763 type_name: &str,
1764 node_rows: &HashMap<String, Vec<JsonValue>>,
1765 catalog: &omnigraph_compiler::catalog::Catalog,
1766 updates: &[crate::db::SubTableUpdate],
1767) -> Result<HashSet<String>> {
1768 let mut ids = HashSet::new();
1769
1770 if let Some(rows) = node_rows.get(type_name) {
1772 if let Some(node_type) = catalog.node_types.get(type_name) {
1773 if let Some(key_prop) = node_type.key_property() {
1774 for row in rows {
1775 if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1776 ids.insert(id.to_string());
1777 }
1778 }
1779 }
1780 }
1781 }
1782
1783 let table_key = format!("node:{}", type_name);
1785 let snapshot = db.snapshot_for_branch(branch).await?;
1786 let Some(entry) = snapshot.entry(&table_key) else {
1787 return Ok(ids);
1788 };
1789 let updated = updates
1791 .iter()
1792 .find(|u| u.table_key == table_key)
1793 .map(|u| (u.table_version, u.table_branch.as_deref()));
1794 let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1795 let ds = db
1796 .open_dataset_at_state(&entry.table_path, branch, version)
1797 .await?;
1798
1799 let batches = db
1800 .table_store()
1801 .scan(&ds, Some(&["id"]), None, None)
1802 .await?;
1803
1804 for batch in &batches {
1805 let id_col = batch
1806 .column_by_name("id")
1807 .unwrap()
1808 .as_any()
1809 .downcast_ref::<StringArray>()
1810 .unwrap();
1811 for i in 0..batch.num_rows() {
1812 if !id_col.is_valid(i) {
1813 continue;
1814 }
1815 ids.insert(id_col.value(i).to_string());
1816 }
1817 }
1818
1819 Ok(ids)
1820}
1821
1822#[cfg(test)]
1823mod tests {
1824 use super::*;
1825 use crate::db::Omnigraph;
1826 use arrow_array::Array;
1827 use futures::TryStreamExt;
1828 use std::collections::HashMap;
1829
1830 const TEST_SCHEMA: &str = r#"
1831node Person {
1832 name: String @key
1833 age: I32?
1834}
1835node Company {
1836 name: String @key
1837}
1838edge Knows: Person -> Person {
1839 since: Date?
1840}
1841edge WorksAt: Person -> Company
1842"#;
1843
1844 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1845{"type": "Person", "data": {"name": "Bob", "age": 25}}
1846{"type": "Company", "data": {"name": "Acme"}}
1847{"edge": "Knows", "from": "Alice", "to": "Bob"}
1848{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1849"#;
1850
1851 #[tokio::test]
1852 async fn test_load_creates_data() {
1853 let dir = tempfile::tempdir().unwrap();
1854 let uri = dir.path().to_str().unwrap();
1855 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1856
1857 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1858 .await
1859 .unwrap();
1860
1861 assert_eq!(result.nodes_loaded["Person"], 2);
1862 assert_eq!(result.nodes_loaded["Company"], 1);
1863 assert_eq!(result.edges_loaded["Knows"], 1);
1864 assert_eq!(result.edges_loaded["WorksAt"], 1);
1865 }
1866
1867 #[tokio::test]
1868 async fn test_load_data_readable_via_lance() {
1869 let dir = tempfile::tempdir().unwrap();
1870 let uri = dir.path().to_str().unwrap();
1871 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1872 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1873 .await
1874 .unwrap();
1875
1876 let snap = db.snapshot().await;
1878 let person_ds = snap.open("node:Person").await.unwrap();
1879
1880 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1881
1882 let batches: Vec<RecordBatch> = person_ds
1884 .scan()
1885 .try_into_stream()
1886 .await
1887 .unwrap()
1888 .try_collect()
1889 .await
1890 .unwrap();
1891
1892 let batch = &batches[0];
1893 let ids = batch
1894 .column_by_name("id")
1895 .unwrap()
1896 .as_any()
1897 .downcast_ref::<StringArray>()
1898 .unwrap();
1899 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1901 assert!(id_values.contains(&"Alice"));
1902 assert!(id_values.contains(&"Bob"));
1903 }
1904
1905 #[tokio::test]
1906 async fn test_load_edges_reference_node_keys() {
1907 let dir = tempfile::tempdir().unwrap();
1908 let uri = dir.path().to_str().unwrap();
1909 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1910 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1911 .await
1912 .unwrap();
1913
1914 let snap = db.snapshot().await;
1915 let knows_ds = snap.open("edge:Knows").await.unwrap();
1916
1917 let batches: Vec<RecordBatch> = knows_ds
1918 .scan()
1919 .try_into_stream()
1920 .await
1921 .unwrap()
1922 .try_collect()
1923 .await
1924 .unwrap();
1925
1926 let batch = &batches[0];
1927 let srcs = batch
1928 .column_by_name("src")
1929 .unwrap()
1930 .as_any()
1931 .downcast_ref::<StringArray>()
1932 .unwrap();
1933 let dsts = batch
1934 .column_by_name("dst")
1935 .unwrap()
1936 .as_any()
1937 .downcast_ref::<StringArray>()
1938 .unwrap();
1939
1940 assert_eq!(srcs.value(0), "Alice");
1941 assert_eq!(dsts.value(0), "Bob");
1942 }
1943
1944 #[tokio::test]
1945 async fn test_load_manifest_version_advances() {
1946 let dir = tempfile::tempdir().unwrap();
1947 let uri = dir.path().to_str().unwrap();
1948 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1949 let v1 = db.version().await;
1950
1951 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1952 .await
1953 .unwrap();
1954
1955 assert!(db.version().await > v1);
1956 }
1957
1958 #[tokio::test]
1959 async fn test_load_append_adds_rows() {
1960 let dir = tempfile::tempdir().unwrap();
1961 let uri = dir.path().to_str().unwrap();
1962 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1963
1964 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1965 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1966
1967 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1968 .await
1969 .unwrap();
1970 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1971
1972 let snap = db.snapshot().await;
1973 let person_ds = snap.open("node:Person").await.unwrap();
1974 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1975 }
1976
1977 #[tokio::test]
1978 async fn test_load_unknown_type_rejected() {
1979 let dir = tempfile::tempdir().unwrap();
1980 let uri = dir.path().to_str().unwrap();
1981 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1982
1983 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1984 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1985 assert!(result.is_err());
1986 }
1987
1988 #[tokio::test]
1989 async fn test_ingest_creates_branch_and_reports_tables() {
1990 let dir = tempfile::tempdir().unwrap();
1991 let uri = dir.path().to_str().unwrap();
1992 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1993
1994 let result = db
1995 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1996 .await
1997 .unwrap();
1998
1999 assert_eq!(result.branch, "feature");
2000 assert_eq!(result.base_branch, "main");
2001 assert!(result.branch_created);
2002 assert_eq!(result.mode, LoadMode::Overwrite);
2003 assert_eq!(
2004 result.tables,
2005 vec![
2006 IngestTableResult {
2007 table_key: "edge:Knows".to_string(),
2008 rows_loaded: 1
2009 },
2010 IngestTableResult {
2011 table_key: "edge:WorksAt".to_string(),
2012 rows_loaded: 1
2013 },
2014 IngestTableResult {
2015 table_key: "node:Company".to_string(),
2016 rows_loaded: 1
2017 },
2018 IngestTableResult {
2019 table_key: "node:Person".to_string(),
2020 rows_loaded: 2
2021 },
2022 ]
2023 );
2024 assert!(
2025 db.branch_list()
2026 .await
2027 .unwrap()
2028 .contains(&"feature".to_string())
2029 );
2030 }
2031
2032 #[tokio::test]
2033 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
2034 let dir = tempfile::tempdir().unwrap();
2035 let uri = dir.path().to_str().unwrap();
2036 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2037 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
2038 .await
2039 .unwrap();
2040 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
2041 .await
2042 .unwrap();
2043
2044 let result = db
2045 .ingest(
2046 "feature",
2047 Some("missing-base"),
2048 r#"{"type":"Person","data":{"name":"Bob","age":26}}
2049{"type":"Person","data":{"name":"Eve","age":31}}"#,
2050 LoadMode::Merge,
2051 )
2052 .await
2053 .unwrap();
2054
2055 assert_eq!(result.branch, "feature");
2056 assert_eq!(result.base_branch, "missing-base");
2057 assert!(!result.branch_created);
2058 assert_eq!(result.mode, LoadMode::Merge);
2059 assert_eq!(
2060 result.tables,
2061 vec![IngestTableResult {
2062 table_key: "node:Person".to_string(),
2063 rows_loaded: 2
2064 }]
2065 );
2066
2067 let snap = db
2068 .snapshot_of(crate::db::ReadTarget::branch("feature"))
2069 .await
2070 .unwrap();
2071 let person_ds = snap.open("node:Person").await.unwrap();
2072 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2073
2074 let batches: Vec<RecordBatch> = person_ds
2075 .scan()
2076 .try_into_stream()
2077 .await
2078 .unwrap()
2079 .try_collect()
2080 .await
2081 .unwrap();
2082 let mut ages_by_id = HashMap::new();
2083 for batch in &batches {
2084 let ids = batch
2085 .column_by_name("id")
2086 .unwrap()
2087 .as_any()
2088 .downcast_ref::<StringArray>()
2089 .unwrap();
2090 let ages = batch
2091 .column_by_name("age")
2092 .unwrap()
2093 .as_any()
2094 .downcast_ref::<Int32Array>()
2095 .unwrap();
2096 for idx in 0..ids.len() {
2097 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2098 }
2099 }
2100
2101 assert_eq!(ages_by_id.get("Bob"), Some(&26));
2102 assert_eq!(ages_by_id.get("Eve"), Some(&31));
2103 assert_eq!(ages_by_id.get("Alice"), Some(&30));
2104 }
2105
2106 #[tokio::test]
2107 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2108 let dir = tempfile::tempdir().unwrap();
2109 let uri = dir.path().to_str().unwrap();
2110 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2111
2112 db.ingest_as(
2113 "feature",
2114 Some("main"),
2115 TEST_DATA,
2116 LoadMode::Overwrite,
2117 Some("act-andrew"),
2118 )
2119 .await
2120 .unwrap();
2121
2122 let head = db
2123 .list_commits(Some("feature"))
2124 .await
2125 .unwrap()
2126 .into_iter()
2127 .last()
2128 .unwrap();
2129 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2130 }
2131
2132 #[test]
2133 fn test_range_constraint_rejects_nan() {
2134 use arrow_array::{Float64Array, RecordBatch, StringArray};
2135 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2136 use std::sync::Arc;
2137
2138 let schema = Arc::new(arrow_schema::Schema::new(vec![
2139 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2140 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2141 ]));
2142
2143 let batch = RecordBatch::try_new(
2144 schema.clone(),
2145 vec![
2146 Arc::new(StringArray::from(vec!["bad"])),
2147 Arc::new(Float64Array::from(vec![f64::NAN])),
2148 ],
2149 )
2150 .unwrap();
2151
2152 let node_type = NodeType {
2153 name: "Test".to_string(),
2154 implements: vec![],
2155 properties: Default::default(),
2156 key: None,
2157 unique_constraints: vec![],
2158 indices: vec![],
2159 range_constraints: vec![RangeConstraint {
2160 property: "score".to_string(),
2161 min: Some(LiteralValue::Float(0.0)),
2162 max: Some(LiteralValue::Float(1.0)),
2163 }],
2164 check_constraints: vec![],
2165 embed_sources: Default::default(),
2166 blob_properties: Default::default(),
2167 arrow_schema: schema,
2168 };
2169
2170 let result = validate_value_constraints(&batch, &node_type);
2171 assert!(result.is_err(), "expected NaN to be rejected");
2172 let err = result.unwrap_err().to_string();
2173 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2174 }
2175}