1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29 pub nodes_loaded: HashMap<String, usize>,
30 pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35 pub table_key: String,
36 pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41 pub branch: String,
42 pub base_branch: String,
43 pub branch_created: bool,
44 pub mode: LoadMode,
45 pub tables: Vec<IngestTableResult>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52 Overwrite,
54 Append,
56 Merge,
58}
59
60pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62 let current_branch = db.active_branch().await;
63 let branch = current_branch.as_deref().unwrap_or("main");
64 db.load(branch, data, mode).await
65}
66
67pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69 let current_branch = db.active_branch().await;
70 let branch = current_branch.as_deref().unwrap_or("main");
71 db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75 pub async fn ingest(
76 &self,
77 branch: &str,
78 from: Option<&str>,
79 data: &str,
80 mode: LoadMode,
81 ) -> Result<IngestResult> {
82 self.ingest_as(branch, from, data, mode, None).await
83 }
84
85 pub async fn ingest_as(
86 &self,
87 branch: &str,
88 from: Option<&str>,
89 data: &str,
90 mode: LoadMode,
91 actor_id: Option<&str>,
92 ) -> Result<IngestResult> {
93 self.enforce(
101 omnigraph_policy::PolicyAction::Change,
102 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
103 actor_id,
104 )?;
105 self.ingest_with_current_actor(branch, from, data, mode, actor_id)
106 .await
107 }
108
109 pub async fn ingest_file(
110 &self,
111 branch: &str,
112 from: Option<&str>,
113 path: &str,
114 mode: LoadMode,
115 ) -> Result<IngestResult> {
116 self.ingest_file_as(branch, from, path, mode, None).await
117 }
118
119 pub async fn ingest_file_as(
120 &self,
121 branch: &str,
122 from: Option<&str>,
123 path: &str,
124 mode: LoadMode,
125 actor_id: Option<&str>,
126 ) -> Result<IngestResult> {
127 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
128 self.ingest_as(branch, from, &data, mode, actor_id).await
129 }
130
131 async fn ingest_with_current_actor(
132 &self,
133 branch: &str,
134 from: Option<&str>,
135 data: &str,
136 mode: LoadMode,
137 actor_id: Option<&str>,
138 ) -> Result<IngestResult> {
139 self.ensure_schema_state_valid().await?;
140 let target_branch =
141 Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
142 let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
143 .unwrap_or_else(|| "main".to_string());
144 let branch_created = !self
145 .branch_list()
146 .await?
147 .iter()
148 .any(|name| name == &target_branch);
149 if branch_created {
150 self.branch_create_from_as(
157 crate::db::ReadTarget::branch(&base_branch),
158 &target_branch,
159 actor_id,
160 )
161 .await?;
162 }
163
164 let result = self.load_as(&target_branch, data, mode, actor_id).await?;
165 Ok(IngestResult {
166 branch: target_branch,
167 base_branch,
168 branch_created,
169 mode,
170 tables: result.to_ingest_tables(),
171 })
172 }
173
174 pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
175 self.load_as(branch, data, mode, None).await
176 }
177
178 pub async fn load_as(
179 &self,
180 branch: &str,
181 data: &str,
182 mode: LoadMode,
183 actor_id: Option<&str>,
184 ) -> Result<LoadResult> {
185 self.enforce(
192 omnigraph_policy::PolicyAction::Change,
193 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
194 actor_id,
195 )?;
196 self.ensure_schema_state_valid().await?;
197 crate::db::ensure_public_branch_ref(branch, "load")?;
202 let requested = Self::normalize_branch_name(branch)?;
208 self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
212 .await
213 }
214
215 pub async fn load_file(
216 &self,
217 branch: &str,
218 path: &str,
219 mode: LoadMode,
220 ) -> Result<LoadResult> {
221 self.load_file_as(branch, path, mode, None).await
222 }
223
224 pub async fn load_file_as(
228 &self,
229 branch: &str,
230 path: &str,
231 mode: LoadMode,
232 actor_id: Option<&str>,
233 ) -> Result<LoadResult> {
234 let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
235 self.load_as(branch, &data, mode, actor_id).await
236 }
237
238 async fn load_direct_on_branch(
239 &self,
240 branch: Option<&str>,
241 data: &str,
242 mode: LoadMode,
243 actor_id: Option<&str>,
244 ) -> Result<LoadResult> {
245 let reader = BufReader::new(Cursor::new(data.as_bytes()));
246 load_jsonl_reader(self, branch, reader, mode, actor_id).await
247 }
248}
249
250impl LoadMode {
251 pub fn as_str(self) -> &'static str {
252 match self {
253 LoadMode::Overwrite => "overwrite",
254 LoadMode::Append => "append",
255 LoadMode::Merge => "merge",
256 }
257 }
258}
259
260impl LoadResult {
261 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
262 let mut tables = self
263 .nodes_loaded
264 .iter()
265 .map(|(type_name, rows_loaded)| IngestTableResult {
266 table_key: format!("node:{type_name}"),
267 rows_loaded: *rows_loaded,
268 })
269 .chain(
270 self.edges_loaded
271 .iter()
272 .map(|(edge_name, rows_loaded)| IngestTableResult {
273 table_key: format!("edge:{edge_name}"),
274 rows_loaded: *rows_loaded,
275 }),
276 )
277 .collect::<Vec<_>>();
278 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
279 tables
280 }
281}
282
283async fn load_jsonl_reader<R: BufRead>(
284 db: &Omnigraph,
285 branch: Option<&str>,
286 reader: R,
287 mode: LoadMode,
288 actor_id: Option<&str>,
289) -> Result<LoadResult> {
290 let catalog = db.catalog().clone();
291
292 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
294 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
295
296 for (line_num, line) in reader.lines().enumerate() {
297 let line = line?;
298 let line = line.trim();
299 if line.is_empty() {
300 continue;
301 }
302 let value: JsonValue = serde_json::from_str(line).map_err(|e| {
303 OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
304 })?;
305
306 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
307 if !catalog.node_types.contains_key(type_name) {
308 return Err(OmniError::manifest(format!(
309 "line {}: unknown node type '{}'",
310 line_num + 1,
311 type_name
312 )));
313 }
314 let data = value
315 .get("data")
316 .cloned()
317 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
318 node_rows
319 .entry(type_name.to_string())
320 .or_default()
321 .push(data);
322 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
323 if catalog.lookup_edge_by_name(edge_name).is_none() {
324 return Err(OmniError::manifest(format!(
325 "line {}: unknown edge type '{}'",
326 line_num + 1,
327 edge_name
328 )));
329 }
330 let from = value
331 .get("from")
332 .and_then(|v| v.as_str())
333 .ok_or_else(|| {
334 OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
335 })?
336 .to_string();
337 let to = value
338 .get("to")
339 .and_then(|v| v.as_str())
340 .ok_or_else(|| {
341 OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
342 })?
343 .to_string();
344 let data = value
345 .get("data")
346 .cloned()
347 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
348 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
349 edge_rows
350 .entry(canonical)
351 .or_default()
352 .push((from, to, data));
353 } else {
354 return Err(OmniError::manifest(format!(
355 "line {}: expected 'type' or 'edge' field",
356 line_num + 1
357 )));
358 }
359 }
360
361 let mut result = LoadResult::default();
371 let snapshot = db.snapshot_for_branch(branch).await?;
372 let use_staging = !matches!(mode, LoadMode::Overwrite);
373 let mut staging = MutationStaging::default();
374 let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
375 let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
376 let pending_mode = match mode {
377 LoadMode::Merge => PendingMode::Merge,
378 LoadMode::Append => PendingMode::Append,
382 LoadMode::Overwrite => PendingMode::Append, };
384 let load_op_kind = match mode {
390 LoadMode::Append => crate::db::MutationOpKind::Insert,
391 LoadMode::Merge => crate::db::MutationOpKind::Merge,
392 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
393 };
394
395 let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
398 Vec::with_capacity(node_rows.len());
399 for (type_name, rows) in &node_rows {
400 let node_type = &catalog.node_types[type_name];
401 let batch = build_node_batch(node_type, rows)?;
402 validate_value_constraints(&batch, node_type)?;
403 validate_enum_constraints(&batch, &node_type.properties, type_name)?;
404 let unique_props = unique_property_names_for_node(node_type);
405 if !unique_props.is_empty() {
406 enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
407 }
408 let loaded_count = batch.num_rows();
409 let table_key = format!("node:{}", type_name);
410 let entry = snapshot
411 .entry(&table_key)
412 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
413 if !use_staging {
414 overwrite_expected.insert(table_key.clone(), entry.table_version);
415 }
416 prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
417 }
418
419 if use_staging {
422 for (type_name, table_key, batch, loaded_count) in prepared_nodes {
423 let (ds, full_path, table_branch) = db
424 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
425 .await?;
426 let expected_version = ds.version().version;
427 staging.ensure_path(
428 &table_key,
429 full_path,
430 table_branch,
431 expected_version,
432 load_op_kind,
433 );
434 let schema = batch.schema();
435 staging.append_batch(&table_key, schema, pending_mode, batch)?;
436 result.nodes_loaded.insert(type_name, loaded_count);
437 }
438 } else {
439 let node_write_results =
440 write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
441 for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
442 overwrite_updates.push(crate::db::SubTableUpdate {
443 table_key,
444 table_version: state.version,
445 table_branch,
446 row_count: state.row_count,
447 version_metadata: state.version_metadata,
448 });
449 result.nodes_loaded.insert(type_name, loaded_count);
450 }
451 }
452
453 for (edge_name, rows) in &edge_rows {
458 let edge_type = &catalog.edge_types[edge_name];
459 let from_ids = if use_staging {
460 collect_node_ids_with_pending(
461 db,
462 branch,
463 &edge_type.from_type,
464 &staging,
465 )
466 .await?
467 } else {
468 collect_node_ids(
469 db,
470 branch,
471 &edge_type.from_type,
472 &node_rows,
473 &catalog,
474 &overwrite_updates,
475 )
476 .await?
477 };
478 let to_ids = if use_staging {
479 collect_node_ids_with_pending(
480 db,
481 branch,
482 &edge_type.to_type,
483 &staging,
484 )
485 .await?
486 } else {
487 collect_node_ids(
488 db,
489 branch,
490 &edge_type.to_type,
491 &node_rows,
492 &catalog,
493 &overwrite_updates,
494 )
495 .await?
496 };
497
498 for (i, (src, dst, _)) in rows.iter().enumerate() {
499 if !from_ids.contains(src.as_str()) {
500 return Err(OmniError::manifest(format!(
501 "edge {} row {}: src '{}' not found in {}",
502 edge_name,
503 i + 1,
504 src,
505 edge_type.from_type
506 )));
507 }
508 if !to_ids.contains(dst.as_str()) {
509 return Err(OmniError::manifest(format!(
510 "edge {} row {}: dst '{}' not found in {}",
511 edge_name,
512 i + 1,
513 dst,
514 edge_type.to_type
515 )));
516 }
517 }
518 }
519
520 let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
522 Vec::with_capacity(edge_rows.len());
523 for (edge_name, rows) in &edge_rows {
524 let edge_type = &catalog.edge_types[edge_name];
525 let batch = build_edge_batch(edge_type, rows)?;
526 validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
527 let unique_props = unique_property_names_for_edge(edge_type);
528 if !unique_props.is_empty() {
529 enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
530 }
531 let loaded_count = batch.num_rows();
532 let table_key = format!("edge:{}", edge_name);
533 let entry = snapshot
534 .entry(&table_key)
535 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
536 if !use_staging {
537 overwrite_expected.insert(table_key.clone(), entry.table_version);
538 }
539 prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
540 }
541
542 if use_staging {
544 for (edge_name, table_key, batch, loaded_count) in prepared_edges {
545 let (ds, full_path, table_branch) = db
546 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
547 .await?;
548 let expected_version = ds.version().version;
549 staging.ensure_path(
550 &table_key,
551 full_path,
552 table_branch,
553 expected_version,
554 load_op_kind,
555 );
556 let schema = batch.schema();
557 staging.append_batch(&table_key, schema, pending_mode, batch)?;
558 result.edges_loaded.insert(edge_name, loaded_count);
559 }
560 } else {
561 let edge_write_results =
562 write_batches_concurrently(db, branch, mode, prepared_edges).await?;
563 for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
564 overwrite_updates.push(crate::db::SubTableUpdate {
565 table_key,
566 table_version: state.version,
567 table_branch,
568 row_count: state.row_count,
569 version_metadata: state.version_metadata,
570 });
571 result.edges_loaded.insert(edge_name, loaded_count);
572 }
573 }
574
575 for (edge_name, _) in &edge_rows {
580 let edge_type = &catalog.edge_types[edge_name];
581 let table_key = format!("edge:{}", edge_name);
582 if use_staging {
583 validate_edge_cardinality_with_pending_loader(
584 db,
585 branch,
586 edge_type,
587 &table_key,
588 &staging,
589 mode,
590 )
591 .await?;
592 } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
593 validate_edge_cardinality(
594 db,
595 branch,
596 edge_name,
597 update.table_version,
598 update.table_branch.as_deref(),
599 )
600 .await?;
601 }
602 }
603
604 if use_staging {
606 let staged = staging.stage_all(db, branch).await?;
607 let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
611 .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
612 .await?;
613 crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
618 db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
619 .await?;
620 if let Some(handle) = sidecar_handle {
625 if let Err(err) =
626 crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
627 {
628 tracing::warn!(
629 error = %err,
630 operation_id = handle.operation_id.as_str(),
631 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
632 );
633 }
634 }
635 } else {
636 db.commit_updates_on_branch_with_expected(
644 branch,
645 &overwrite_updates,
646 &overwrite_expected,
647 actor_id,
648 )
649 .await?;
650 }
651
652 Ok(result)
653}
654
655fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
656 let schema = node_type.arrow_schema.clone();
657
658 let ids: Vec<String> = rows
660 .iter()
661 .map(|row| {
662 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
663 if let Some(key_prop) = node_type.key_property() {
664 let key_value = row
665 .get(key_prop)
666 .and_then(|v| v.as_str())
667 .map(|s| s.to_string())
668 .ok_or_else(|| {
669 OmniError::manifest(format!(
670 "node {} missing @key property '{}'",
671 node_type.name, key_prop
672 ))
673 })?;
674 if let Some(explicit_id) = explicit_id {
675 if explicit_id != key_value {
676 return Err(OmniError::manifest(format!(
677 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
678 node_type.name, explicit_id, key_prop, key_value
679 )));
680 }
681 }
682 Ok(key_value)
683 } else if let Some(explicit_id) = explicit_id {
684 Ok(explicit_id)
685 } else {
686 Ok(generate_id())
687 }
688 })
689 .collect::<Result<Vec<_>>>()?;
690
691 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
692 columns.push(Arc::new(StringArray::from(ids)));
693
694 for field in schema.fields().iter().skip(1) {
696 if node_type.blob_properties.contains(field.name()) {
697 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
698 columns.push(col);
699 } else {
700 let col =
701 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
702 columns.push(col);
703 }
704 }
705
706 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
707}
708
709fn build_edge_batch(
710 edge_type: &omnigraph_compiler::catalog::EdgeType,
711 rows: &[(String, String, JsonValue)],
712) -> Result<RecordBatch> {
713 let schema = edge_type.arrow_schema.clone();
714
715 let ids: Vec<String> = rows
716 .iter()
717 .map(|(_, _, data)| {
718 data.get("id")
719 .and_then(|v| v.as_str())
720 .map(str::to_string)
721 .unwrap_or_else(generate_id)
722 })
723 .collect();
724 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
725 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
726
727 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
728 columns.push(Arc::new(StringArray::from(ids)));
729 columns.push(Arc::new(StringArray::from(srcs)));
730 columns.push(Arc::new(StringArray::from(dsts)));
731
732 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
734 for field in schema.fields().iter().skip(3) {
735 if edge_type.blob_properties.contains(field.name()) {
736 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
737 columns.push(col);
738 } else {
739 let col = build_column_from_json(
740 field.name(),
741 field.data_type(),
742 field.is_nullable(),
743 &data_values,
744 )?;
745 columns.push(col);
746 }
747 }
748
749 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
750}
751
752pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
754 if let Some(encoded) = value.strip_prefix("base64:") {
755 let bytes = base64::engine::general_purpose::STANDARD
756 .decode(encoded)
757 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
758 builder
759 .push_bytes(bytes)
760 .map_err(|e| OmniError::Lance(e.to_string()))
761 } else {
762 builder
764 .push_uri(value)
765 .map_err(|e| OmniError::Lance(e.to_string()))
766 }
767}
768
769fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
771 let mut builder = BlobArrayBuilder::new(rows.len());
772 for row in rows {
773 match row.get(name) {
774 Some(JsonValue::String(s)) => {
775 append_blob_value(&mut builder, s)?;
776 }
777 Some(JsonValue::Null) | None if nullable => {
778 builder
779 .push_null()
780 .map_err(|e| OmniError::Lance(e.to_string()))?;
781 }
782 Some(JsonValue::Null) | None => {
783 return Err(OmniError::manifest(format!(
784 "non-nullable blob property '{}' has null values",
785 name
786 )));
787 }
788 _ => {
789 return Err(OmniError::manifest(format!(
790 "blob property '{}' must be a URI string or base64: prefixed data",
791 name
792 )));
793 }
794 }
795 }
796 builder
797 .finish()
798 .map_err(|e| OmniError::Lance(e.to_string()))
799}
800
801fn build_column_from_json(
802 name: &str,
803 data_type: &DataType,
804 nullable: bool,
805 rows: &[JsonValue],
806) -> Result<ArrayRef> {
807 let array: ArrayRef = match data_type {
808 DataType::Utf8 => {
809 let values: Vec<Option<String>> = rows
810 .iter()
811 .map(|row| {
812 row.get(name)
813 .and_then(|v| v.as_str())
814 .map(|s| s.to_string())
815 })
816 .collect();
817 Arc::new(StringArray::from(values))
818 }
819 DataType::Int32 => {
820 let values: Vec<Option<i32>> = rows
821 .iter()
822 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
823 .collect();
824 Arc::new(Int32Array::from(values))
825 }
826 DataType::Int64 => {
827 let values: Vec<Option<i64>> = rows
828 .iter()
829 .map(|row| row.get(name).and_then(|v| v.as_i64()))
830 .collect();
831 Arc::new(Int64Array::from(values))
832 }
833 DataType::UInt32 => {
834 let values: Vec<Option<u32>> = rows
835 .iter()
836 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
837 .collect();
838 Arc::new(UInt32Array::from(values))
839 }
840 DataType::UInt64 => {
841 let values: Vec<Option<u64>> = rows
842 .iter()
843 .map(|row| row.get(name).and_then(|v| v.as_u64()))
844 .collect();
845 Arc::new(UInt64Array::from(values))
846 }
847 DataType::Float32 => {
848 let values: Vec<Option<f32>> = rows
849 .iter()
850 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
851 .collect();
852 Arc::new(Float32Array::from(values))
853 }
854 DataType::Float64 => {
855 let values: Vec<Option<f64>> = rows
856 .iter()
857 .map(|row| row.get(name).and_then(|v| v.as_f64()))
858 .collect();
859 Arc::new(Float64Array::from(values))
860 }
861 DataType::Boolean => {
862 let values: Vec<Option<bool>> = rows
863 .iter()
864 .map(|row| row.get(name).and_then(|v| v.as_bool()))
865 .collect();
866 Arc::new(BooleanArray::from(values))
867 }
868 DataType::Date32 => {
869 let mut values = Vec::with_capacity(rows.len());
870 for row in rows {
871 values.push(parse_date32_json_value(
872 row.get(name).unwrap_or(&JsonValue::Null),
873 )?);
874 }
875 Arc::new(Date32Array::from(values))
876 }
877 DataType::Date64 => {
878 let mut values = Vec::with_capacity(rows.len());
879 for row in rows {
880 values.push(parse_date64_json_value(
881 row.get(name).unwrap_or(&JsonValue::Null),
882 )?);
883 }
884 Arc::new(Date64Array::from(values))
885 }
886 DataType::List(field) => {
887 let mut builder = ListBuilder::with_capacity(
888 make_list_value_builder(field.data_type(), rows.len())?,
889 rows.len(),
890 )
891 .with_field(field.clone());
892 for row in rows {
893 let value = row.get(name).unwrap_or(&JsonValue::Null);
894 if value.is_null() {
895 builder.append(false);
896 continue;
897 }
898 let items = value.as_array().ok_or_else(|| {
899 OmniError::manifest(format!(
900 "list property '{}' expects a JSON array, got {}",
901 name, value
902 ))
903 })?;
904 for item in items {
905 append_json_list_item(builder.values(), field.data_type(), item)?;
906 }
907 builder.append(true);
908 }
909 Arc::new(builder.finish())
910 }
911 DataType::FixedSizeList(child_field, dim) => {
912 let dim = *dim;
914 let mut builder = FixedSizeListBuilder::with_capacity(
915 Float32Builder::with_capacity(rows.len() * dim as usize),
916 dim,
917 rows.len(),
918 )
919 .with_field(child_field.clone());
920 for row in rows {
921 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
922 if arr.len() != dim as usize {
923 return Err(OmniError::manifest(format!(
924 "vector property '{}' expects {} dimensions, got {}",
925 name,
926 dim,
927 arr.len()
928 )));
929 }
930 for val in arr {
931 builder
932 .values()
933 .append_value(val.as_f64().unwrap_or(0.0) as f32);
934 }
935 builder.append(true);
936 } else if nullable {
937 for _ in 0..dim as usize {
938 builder.values().append_null();
939 }
940 builder.append(false);
941 } else {
942 return Err(OmniError::manifest(format!(
943 "non-nullable vector property '{}' has null values",
944 name
945 )));
946 }
947 }
948 Arc::new(builder.finish())
949 }
950 _ => {
951 let values: Vec<Option<&str>> = vec![None; rows.len()];
953 Arc::new(StringArray::from(values))
954 }
955 };
956
957 if !nullable && array.null_count() > 0 {
958 return Err(OmniError::manifest(format!(
959 "non-nullable property '{}' has null or invalid values",
960 name
961 )));
962 }
963
964 Ok(array)
965}
966
967fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
968 Ok(match data_type {
969 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
970 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
971 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
972 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
973 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
974 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
975 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
976 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
977 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
978 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
979 other => {
980 return Err(OmniError::manifest(format!(
981 "unsupported list element data type {:?}",
982 other
983 )));
984 }
985 })
986}
987
988fn append_json_list_item(
989 builder: &mut Box<dyn ArrayBuilder>,
990 data_type: &DataType,
991 value: &JsonValue,
992) -> Result<()> {
993 match data_type {
994 DataType::Utf8 => {
995 let builder = builder
996 .as_any_mut()
997 .downcast_mut::<StringBuilder>()
998 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
999 if let Some(value) = value.as_str() {
1000 builder.append_value(value);
1001 } else {
1002 builder.append_null();
1003 }
1004 }
1005 DataType::Boolean => {
1006 let builder = builder
1007 .as_any_mut()
1008 .downcast_mut::<BooleanBuilder>()
1009 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
1010 if let Some(value) = value.as_bool() {
1011 builder.append_value(value);
1012 } else {
1013 builder.append_null();
1014 }
1015 }
1016 DataType::Int32 => {
1017 let builder = builder
1018 .as_any_mut()
1019 .downcast_mut::<Int32Builder>()
1020 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
1021 if let Some(value) = value.as_i64() {
1022 let value = i32::try_from(value).map_err(|_| {
1023 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
1024 })?;
1025 builder.append_value(value);
1026 } else {
1027 builder.append_null();
1028 }
1029 }
1030 DataType::Int64 => {
1031 let builder = builder
1032 .as_any_mut()
1033 .downcast_mut::<Int64Builder>()
1034 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1035 if let Some(value) = value.as_i64() {
1036 builder.append_value(value);
1037 } else {
1038 builder.append_null();
1039 }
1040 }
1041 DataType::UInt32 => {
1042 let builder = builder
1043 .as_any_mut()
1044 .downcast_mut::<UInt32Builder>()
1045 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1046 if let Some(value) = value.as_u64() {
1047 let value = u32::try_from(value).map_err(|_| {
1048 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1049 })?;
1050 builder.append_value(value);
1051 } else {
1052 builder.append_null();
1053 }
1054 }
1055 DataType::UInt64 => {
1056 let builder = builder
1057 .as_any_mut()
1058 .downcast_mut::<UInt64Builder>()
1059 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1060 if let Some(value) = value.as_u64() {
1061 builder.append_value(value);
1062 } else {
1063 builder.append_null();
1064 }
1065 }
1066 DataType::Float32 => {
1067 let builder = builder
1068 .as_any_mut()
1069 .downcast_mut::<Float32Builder>()
1070 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1071 if let Some(value) = value.as_f64() {
1072 builder.append_value(value as f32);
1073 } else {
1074 builder.append_null();
1075 }
1076 }
1077 DataType::Float64 => {
1078 let builder = builder
1079 .as_any_mut()
1080 .downcast_mut::<Float64Builder>()
1081 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1082 if let Some(value) = value.as_f64() {
1083 builder.append_value(value);
1084 } else {
1085 builder.append_null();
1086 }
1087 }
1088 DataType::Date32 => {
1089 let builder = builder
1090 .as_any_mut()
1091 .downcast_mut::<Date32Builder>()
1092 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1093 if let Some(value) = parse_date32_json_value(value)? {
1094 builder.append_value(value);
1095 } else {
1096 builder.append_null();
1097 }
1098 }
1099 DataType::Date64 => {
1100 let builder = builder
1101 .as_any_mut()
1102 .downcast_mut::<Date64Builder>()
1103 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1104 if let Some(value) = parse_date64_json_value(value)? {
1105 builder.append_value(value);
1106 } else {
1107 builder.append_null();
1108 }
1109 }
1110 other => {
1111 return Err(OmniError::manifest(format!(
1112 "unsupported list element data type {:?}",
1113 other
1114 )));
1115 }
1116 }
1117
1118 Ok(())
1119}
1120
1121fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1122 if value.is_null() {
1123 return Ok(None);
1124 }
1125 if let Some(days) = value.as_i64() {
1126 let days = i32::try_from(days)
1127 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1128 return Ok(Some(days));
1129 }
1130 if let Some(days) = value.as_u64() {
1131 let days = i32::try_from(days)
1132 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1133 return Ok(Some(days));
1134 }
1135 if let Some(value) = value.as_str() {
1136 return Ok(Some(parse_date32_literal(value)?));
1137 }
1138 Ok(None)
1139}
1140
1141fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1142 if value.is_null() {
1143 return Ok(None);
1144 }
1145 if let Some(ms) = value.as_i64() {
1146 return Ok(Some(ms));
1147 }
1148 if let Some(ms) = value.as_u64() {
1149 let ms = i64::try_from(ms)
1150 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1151 return Ok(Some(ms));
1152 }
1153 if let Some(value) = value.as_str() {
1154 return Ok(Some(parse_date64_literal(value)?));
1155 }
1156 Ok(None)
1157}
1158
1159const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1170
1171fn load_write_concurrency() -> usize {
1172 std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1173 .ok()
1174 .and_then(|v| v.parse::<usize>().ok())
1175 .filter(|v| *v > 0)
1176 .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1177}
1178
1179async fn write_batches_concurrently(
1183 db: &Omnigraph,
1184 branch: Option<&str>,
1185 mode: LoadMode,
1186 prepared: Vec<(String, String, RecordBatch, usize)>,
1187) -> Result<
1188 Vec<(
1189 String,
1190 String,
1191 usize,
1192 crate::table_store::TableState,
1193 Option<String>,
1194 )>,
1195> {
1196 use futures::stream::StreamExt;
1197
1198 if prepared.is_empty() {
1199 return Ok(Vec::new());
1200 }
1201
1202 let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1203
1204 futures::stream::iter(prepared.into_iter().map(
1205 |(type_name, table_key, batch, loaded_count)| async move {
1206 let (state, table_branch) =
1207 write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1208 Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1209 },
1210 ))
1211 .buffered(concurrency)
1212 .collect::<Vec<Result<_>>>()
1213 .await
1214 .into_iter()
1215 .collect()
1216}
1217
1218async fn write_batch_to_dataset(
1219 db: &Omnigraph,
1220 branch: Option<&str>,
1221 table_key: &str,
1222 batch: RecordBatch,
1223 mode: LoadMode,
1224) -> Result<(crate::table_store::TableState, Option<String>)> {
1225 let op_kind = match mode {
1226 LoadMode::Append => crate::db::MutationOpKind::Insert,
1227 LoadMode::Merge => crate::db::MutationOpKind::Merge,
1228 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
1229 };
1230 let (mut ds, full_path, table_branch) = db
1231 .open_for_mutation_on_branch(branch, table_key, op_kind)
1232 .await?;
1233 let table_store = db.table_store();
1234
1235 match mode {
1236 LoadMode::Overwrite => {
1237 let state = table_store
1238 .overwrite_batch(&full_path, &mut ds, batch)
1239 .await?;
1240 Ok((state, table_branch))
1241 }
1242 LoadMode::Append => {
1243 let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1244 Ok((state, table_branch))
1245 }
1246 LoadMode::Merge => {
1247 let state = table_store
1248 .merge_insert_batch(
1249 &full_path,
1250 ds,
1251 batch,
1252 vec!["id".to_string()],
1253 lance::dataset::WhenMatched::UpdateAll,
1254 lance::dataset::WhenNotMatched::InsertAll,
1255 )
1256 .await?;
1257 Ok((state, table_branch))
1258 }
1259 }
1260}
1261
1262fn generate_id() -> String {
1263 ulid::Ulid::new().to_string()
1264}
1265
1266pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1267 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1268 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1269 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1270 let out = casted
1271 .as_any()
1272 .downcast_ref::<Date32Array>()
1273 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1274 if out.is_null(0) {
1275 return Err(OmniError::manifest(format!(
1276 "invalid Date literal '{}'",
1277 value
1278 )));
1279 }
1280 Ok(out.value(0))
1281}
1282
1283pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1284 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1285 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1286 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1287 let out = casted
1288 .as_any()
1289 .downcast_ref::<Date64Array>()
1290 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1291 if out.is_null(0) {
1292 return Err(OmniError::manifest(format!(
1293 "invalid DateTime literal '{}'",
1294 value
1295 )));
1296 }
1297 Ok(out.value(0))
1298}
1299
1300pub(crate) fn validate_value_constraints(
1303 batch: &RecordBatch,
1304 node_type: &omnigraph_compiler::catalog::NodeType,
1305) -> Result<()> {
1306 use arrow_array::Array;
1307
1308 for rc in &node_type.range_constraints {
1310 let Some(col) = batch.column_by_name(&rc.property) else {
1311 continue;
1312 };
1313 for row in 0..batch.num_rows() {
1314 if col.is_null(row) {
1315 continue;
1316 }
1317 let value = extract_numeric_value(col, row);
1318 if let Some(val) = value {
1319 if val.is_nan() {
1320 return Err(OmniError::manifest(format!(
1321 "@range violation on {}.{}: value is NaN",
1322 node_type.name, rc.property
1323 )));
1324 }
1325 if let Some(ref min) = rc.min {
1326 let min_f = literal_value_to_f64(min);
1327 if val < min_f {
1328 return Err(OmniError::manifest(format!(
1329 "@range violation on {}.{}: value {} < min {}",
1330 node_type.name, rc.property, val, min_f
1331 )));
1332 }
1333 }
1334 if let Some(ref max) = rc.max {
1335 let max_f = literal_value_to_f64(max);
1336 if val > max_f {
1337 return Err(OmniError::manifest(format!(
1338 "@range violation on {}.{}: value {} > max {}",
1339 node_type.name, rc.property, val, max_f
1340 )));
1341 }
1342 }
1343 }
1344 }
1345 }
1346
1347 for cc in &node_type.check_constraints {
1349 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1350 OmniError::manifest(format!(
1351 "@check on {}.{} has invalid regex '{}': {}",
1352 node_type.name, cc.property, cc.pattern, e
1353 ))
1354 })?;
1355 let Some(col) = batch.column_by_name(&cc.property) else {
1356 continue;
1357 };
1358 let str_col = col.as_any().downcast_ref::<StringArray>();
1359 if let Some(str_col) = str_col {
1360 for row in 0..str_col.len() {
1361 if str_col.is_null(row) {
1362 continue;
1363 }
1364 let val = str_col.value(row);
1365 if !re.is_match(val) {
1366 return Err(OmniError::manifest(format!(
1367 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1368 node_type.name, cc.property, val, cc.pattern
1369 )));
1370 }
1371 }
1372 }
1373 }
1374
1375 Ok(())
1376}
1377
1378pub(crate) fn validate_enum_constraints(
1385 batch: &RecordBatch,
1386 properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1387 type_name: &str,
1388) -> Result<()> {
1389 use arrow_array::{Array, ListArray};
1390
1391 for (prop_name, prop_type) in properties {
1392 let Some(allowed) = prop_type.enum_values.as_ref() else {
1393 continue;
1394 };
1395 let Some(col) = batch.column_by_name(prop_name) else {
1396 continue;
1397 };
1398 if prop_type.list {
1399 let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1400 continue;
1401 };
1402 for row in 0..list_col.len() {
1403 if list_col.is_null(row) {
1404 continue;
1405 }
1406 let item_arr = list_col.value(row);
1407 let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1408 continue;
1409 };
1410 for i in 0..str_arr.len() {
1411 if str_arr.is_null(i) {
1412 continue;
1413 }
1414 let val = str_arr.value(i);
1415 if !allowed.iter().any(|a| a.as_str() == val) {
1416 return Err(OmniError::manifest(format!(
1417 "invalid enum value '{}' for {}.{} (expected: {})",
1418 val,
1419 type_name,
1420 prop_name,
1421 allowed.join(", ")
1422 )));
1423 }
1424 }
1425 }
1426 } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1427 for row in 0..str_col.len() {
1428 if str_col.is_null(row) {
1429 continue;
1430 }
1431 let val = str_col.value(row);
1432 if !allowed.iter().any(|a| a.as_str() == val) {
1433 return Err(OmniError::manifest(format!(
1434 "invalid enum value '{}' for {}.{} (expected: {})",
1435 val,
1436 type_name,
1437 prop_name,
1438 allowed.join(", ")
1439 )));
1440 }
1441 }
1442 }
1443 }
1444 Ok(())
1445}
1446
1447pub(crate) fn enforce_unique_constraints_intra_batch(
1454 batch: &RecordBatch,
1455 type_name: &str,
1456 unique_properties: &[String],
1457) -> Result<()> {
1458 for property in unique_properties {
1459 let Some(col_idx) = batch.schema().index_of(property).ok() else {
1460 continue;
1461 };
1462 let arr = batch.column(col_idx);
1463 let mut seen: HashMap<String, usize> = HashMap::new();
1464 for row in 0..batch.num_rows() {
1465 let Some(value) = scalar_to_string(arr, row) else {
1466 continue;
1467 };
1468 if let Some(prev_row) = seen.insert(value.clone(), row) {
1469 return Err(OmniError::manifest(format!(
1470 "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1471 type_name, property, value, prev_row, row
1472 )));
1473 }
1474 }
1475 }
1476 Ok(())
1477}
1478
1479fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1483 use arrow_array::Array;
1484 if array.is_null(row) {
1485 return None;
1486 }
1487 if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1488 return Some(a.value(row).to_string());
1489 }
1490 if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1491 return Some(a.value(row).to_string());
1492 }
1493 if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1494 return Some(a.value(row).to_string());
1495 }
1496 if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1497 return Some(a.value(row).to_string());
1498 }
1499 if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1500 return Some(a.value(row).to_string());
1501 }
1502 if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1503 return Some(a.value(row).to_string());
1504 }
1505 if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1506 return Some(a.value(row).to_string());
1507 }
1508 if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1509 return Some(a.value(row).to_string());
1510 }
1511 if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1512 return Some(a.value(row).to_string());
1513 }
1514 if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1515 return Some(a.value(row).to_string());
1516 }
1517 None
1518}
1519
1520pub(crate) fn unique_property_names_for_node(
1524 node_type: &omnigraph_compiler::catalog::NodeType,
1525) -> Vec<String> {
1526 let mut props: Vec<String> = node_type
1527 .unique_constraints
1528 .iter()
1529 .flatten()
1530 .cloned()
1531 .collect();
1532 if let Some(key) = &node_type.key {
1533 props.extend(key.iter().cloned());
1534 }
1535 props.sort();
1536 props.dedup();
1537 props
1538}
1539
1540pub(crate) fn unique_property_names_for_edge(
1542 edge_type: &omnigraph_compiler::catalog::EdgeType,
1543) -> Vec<String> {
1544 let mut props: Vec<String> = edge_type
1545 .unique_constraints
1546 .iter()
1547 .flatten()
1548 .cloned()
1549 .collect();
1550 props.sort();
1551 props.dedup();
1552 props
1553}
1554
1555fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1556 use arrow_array::{
1557 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1558 };
1559 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1560 return Some(a.value(row) as f64);
1561 }
1562 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1563 return Some(a.value(row) as f64);
1564 }
1565 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1566 return Some(a.value(row) as f64);
1567 }
1568 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1569 return Some(a.value(row) as f64);
1570 }
1571 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1572 return Some(a.value(row) as f64);
1573 }
1574 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1575 return Some(a.value(row));
1576 }
1577 None
1578}
1579
1580fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1581 use omnigraph_compiler::catalog::LiteralValue;
1582 match v {
1583 LiteralValue::Integer(n) => *n as f64,
1584 LiteralValue::Float(f) => *f,
1585 }
1586}
1587
1588pub(crate) async fn validate_edge_cardinality(
1591 db: &crate::db::Omnigraph,
1592 branch: Option<&str>,
1593 edge_name: &str,
1594 written_version: u64,
1595 written_branch: Option<&str>,
1596) -> Result<()> {
1597 use arrow_array::Array;
1598 let catalog = db.catalog();
1599 let edge_type = &catalog.edge_types[edge_name];
1600 if edge_type.cardinality.is_default() {
1601 return Ok(());
1602 }
1603
1604 let snapshot = db.snapshot_for_branch(branch).await?;
1607 let table_key = format!("edge:{}", edge_name);
1608 let entry = snapshot
1609 .entry(&table_key)
1610 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1611 let ds = db
1612 .open_dataset_at_state(
1613 &entry.table_path,
1614 written_branch.or(entry.table_branch.as_deref()),
1615 written_version,
1616 )
1617 .await?;
1618
1619 let batches = db
1621 .table_store()
1622 .scan(&ds, Some(&["src"]), None, None)
1623 .await?;
1624
1625 let mut counts: HashMap<String, u32> = HashMap::new();
1626 for batch in &batches {
1627 let srcs = batch
1628 .column_by_name("src")
1629 .unwrap()
1630 .as_any()
1631 .downcast_ref::<StringArray>()
1632 .unwrap();
1633 for i in 0..srcs.len() {
1634 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1635 }
1636 }
1637
1638 let card = &edge_type.cardinality;
1639 for (src, count) in &counts {
1640 if let Some(max) = card.max {
1641 if *count > max {
1642 return Err(OmniError::manifest(format!(
1643 "@card violation on edge {}: source '{}' has {} edges (max {})",
1644 edge_name, src, count, max
1645 )));
1646 }
1647 }
1648 if *count < card.min {
1649 return Err(OmniError::manifest(format!(
1650 "@card violation on edge {}: source '{}' has {} edges (min {})",
1651 edge_name, src, count, card.min
1652 )));
1653 }
1654 }
1655
1656 Ok(())
1657}
1658
1659async fn validate_edge_cardinality_with_pending_loader(
1674 db: &Omnigraph,
1675 branch: Option<&str>,
1676 edge_type: &omnigraph_compiler::catalog::EdgeType,
1677 table_key: &str,
1678 staging: &MutationStaging,
1679 mode: LoadMode,
1680) -> Result<()> {
1681 if edge_type.cardinality.is_default() {
1682 return Ok(());
1683 }
1684 let snapshot = db.snapshot_for_branch(branch).await?;
1685 let Some(entry) = snapshot.entry(table_key) else {
1686 return Ok(());
1689 };
1690 let ds = db
1691 .open_dataset_at_state(
1692 &entry.table_path,
1693 entry.table_branch.as_deref(),
1694 entry.table_version,
1695 )
1696 .await?;
1697 let dedupe_key = match mode {
1698 LoadMode::Merge => Some("id"),
1699 LoadMode::Append | LoadMode::Overwrite => None,
1700 };
1701 let counts =
1702 crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
1703 .await?;
1704 crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1705}
1706
1707async fn collect_node_ids_with_pending(
1716 db: &Omnigraph,
1717 branch: Option<&str>,
1718 type_name: &str,
1719 staging: &MutationStaging,
1720) -> Result<HashSet<String>> {
1721 let mut ids = HashSet::new();
1722 let table_key = format!("node:{}", type_name);
1723
1724 for batch in staging.pending_batches(&table_key) {
1726 if let Some(col) = batch.column_by_name("id") {
1727 if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1728 for i in 0..arr.len() {
1729 if arr.is_valid(i) {
1730 ids.insert(arr.value(i).to_string());
1731 }
1732 }
1733 }
1734 }
1735 }
1736
1737 let snapshot = db.snapshot_for_branch(branch).await?;
1739 let Some(entry) = snapshot.entry(&table_key) else {
1740 return Ok(ids);
1741 };
1742 let ds = db
1743 .open_dataset_at_state(
1744 &entry.table_path,
1745 entry.table_branch.as_deref(),
1746 entry.table_version,
1747 )
1748 .await?;
1749
1750 let batches = db
1751 .table_store()
1752 .scan(&ds, Some(&["id"]), None, None)
1753 .await?;
1754
1755 for batch in &batches {
1756 let id_col = batch
1757 .column_by_name("id")
1758 .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1759 .as_any()
1760 .downcast_ref::<StringArray>()
1761 .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1762 for i in 0..batch.num_rows() {
1763 if id_col.is_valid(i) {
1768 ids.insert(id_col.value(i).to_string());
1769 }
1770 }
1771 }
1772
1773 Ok(ids)
1774}
1775
1776async fn collect_node_ids(
1781 db: &Omnigraph,
1782 branch: Option<&str>,
1783 type_name: &str,
1784 node_rows: &HashMap<String, Vec<JsonValue>>,
1785 catalog: &omnigraph_compiler::catalog::Catalog,
1786 updates: &[crate::db::SubTableUpdate],
1787) -> Result<HashSet<String>> {
1788 let mut ids = HashSet::new();
1789
1790 if let Some(rows) = node_rows.get(type_name) {
1792 if let Some(node_type) = catalog.node_types.get(type_name) {
1793 if let Some(key_prop) = node_type.key_property() {
1794 for row in rows {
1795 if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1796 ids.insert(id.to_string());
1797 }
1798 }
1799 }
1800 }
1801 }
1802
1803 let table_key = format!("node:{}", type_name);
1805 let snapshot = db.snapshot_for_branch(branch).await?;
1806 let Some(entry) = snapshot.entry(&table_key) else {
1807 return Ok(ids);
1808 };
1809 let updated = updates
1811 .iter()
1812 .find(|u| u.table_key == table_key)
1813 .map(|u| (u.table_version, u.table_branch.as_deref()));
1814 let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1815 let ds = db
1816 .open_dataset_at_state(&entry.table_path, branch, version)
1817 .await?;
1818
1819 let batches = db
1820 .table_store()
1821 .scan(&ds, Some(&["id"]), None, None)
1822 .await?;
1823
1824 for batch in &batches {
1825 let id_col = batch
1826 .column_by_name("id")
1827 .unwrap()
1828 .as_any()
1829 .downcast_ref::<StringArray>()
1830 .unwrap();
1831 for i in 0..batch.num_rows() {
1832 if !id_col.is_valid(i) {
1833 continue;
1834 }
1835 ids.insert(id_col.value(i).to_string());
1836 }
1837 }
1838
1839 Ok(ids)
1840}
1841
1842#[cfg(test)]
1843mod tests {
1844 use super::*;
1845 use crate::db::Omnigraph;
1846 use arrow_array::Array;
1847 use futures::TryStreamExt;
1848 use std::collections::HashMap;
1849
1850 const TEST_SCHEMA: &str = r#"
1851node Person {
1852 name: String @key
1853 age: I32?
1854}
1855node Company {
1856 name: String @key
1857}
1858edge Knows: Person -> Person {
1859 since: Date?
1860}
1861edge WorksAt: Person -> Company
1862"#;
1863
1864 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1865{"type": "Person", "data": {"name": "Bob", "age": 25}}
1866{"type": "Company", "data": {"name": "Acme"}}
1867{"edge": "Knows", "from": "Alice", "to": "Bob"}
1868{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1869"#;
1870
1871 #[tokio::test]
1872 async fn test_load_creates_data() {
1873 let dir = tempfile::tempdir().unwrap();
1874 let uri = dir.path().to_str().unwrap();
1875 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1876
1877 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1878 .await
1879 .unwrap();
1880
1881 assert_eq!(result.nodes_loaded["Person"], 2);
1882 assert_eq!(result.nodes_loaded["Company"], 1);
1883 assert_eq!(result.edges_loaded["Knows"], 1);
1884 assert_eq!(result.edges_loaded["WorksAt"], 1);
1885 }
1886
1887 #[tokio::test]
1888 async fn test_load_data_readable_via_lance() {
1889 let dir = tempfile::tempdir().unwrap();
1890 let uri = dir.path().to_str().unwrap();
1891 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1892 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1893 .await
1894 .unwrap();
1895
1896 let snap = db.snapshot().await;
1898 let person_ds = snap.open("node:Person").await.unwrap();
1899
1900 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1901
1902 let batches: Vec<RecordBatch> = person_ds
1904 .scan()
1905 .try_into_stream()
1906 .await
1907 .unwrap()
1908 .try_collect()
1909 .await
1910 .unwrap();
1911
1912 let batch = &batches[0];
1913 let ids = batch
1914 .column_by_name("id")
1915 .unwrap()
1916 .as_any()
1917 .downcast_ref::<StringArray>()
1918 .unwrap();
1919 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1921 assert!(id_values.contains(&"Alice"));
1922 assert!(id_values.contains(&"Bob"));
1923 }
1924
1925 #[tokio::test]
1926 async fn test_load_edges_reference_node_keys() {
1927 let dir = tempfile::tempdir().unwrap();
1928 let uri = dir.path().to_str().unwrap();
1929 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1930 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1931 .await
1932 .unwrap();
1933
1934 let snap = db.snapshot().await;
1935 let knows_ds = snap.open("edge:Knows").await.unwrap();
1936
1937 let batches: Vec<RecordBatch> = knows_ds
1938 .scan()
1939 .try_into_stream()
1940 .await
1941 .unwrap()
1942 .try_collect()
1943 .await
1944 .unwrap();
1945
1946 let batch = &batches[0];
1947 let srcs = batch
1948 .column_by_name("src")
1949 .unwrap()
1950 .as_any()
1951 .downcast_ref::<StringArray>()
1952 .unwrap();
1953 let dsts = batch
1954 .column_by_name("dst")
1955 .unwrap()
1956 .as_any()
1957 .downcast_ref::<StringArray>()
1958 .unwrap();
1959
1960 assert_eq!(srcs.value(0), "Alice");
1961 assert_eq!(dsts.value(0), "Bob");
1962 }
1963
1964 #[tokio::test]
1965 async fn test_load_manifest_version_advances() {
1966 let dir = tempfile::tempdir().unwrap();
1967 let uri = dir.path().to_str().unwrap();
1968 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1969 let v1 = db.version().await;
1970
1971 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1972 .await
1973 .unwrap();
1974
1975 assert!(db.version().await > v1);
1976 }
1977
1978 #[tokio::test]
1979 async fn test_load_append_adds_rows() {
1980 let dir = tempfile::tempdir().unwrap();
1981 let uri = dir.path().to_str().unwrap();
1982 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1983
1984 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1985 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1986
1987 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1988 .await
1989 .unwrap();
1990 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1991
1992 let snap = db.snapshot().await;
1993 let person_ds = snap.open("node:Person").await.unwrap();
1994 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1995 }
1996
1997 #[tokio::test]
1998 async fn test_load_unknown_type_rejected() {
1999 let dir = tempfile::tempdir().unwrap();
2000 let uri = dir.path().to_str().unwrap();
2001 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2002
2003 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
2004 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
2005 assert!(result.is_err());
2006 }
2007
2008 #[tokio::test]
2009 async fn test_ingest_creates_branch_and_reports_tables() {
2010 let dir = tempfile::tempdir().unwrap();
2011 let uri = dir.path().to_str().unwrap();
2012 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2013
2014 let result = db
2015 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
2016 .await
2017 .unwrap();
2018
2019 assert_eq!(result.branch, "feature");
2020 assert_eq!(result.base_branch, "main");
2021 assert!(result.branch_created);
2022 assert_eq!(result.mode, LoadMode::Overwrite);
2023 assert_eq!(
2024 result.tables,
2025 vec![
2026 IngestTableResult {
2027 table_key: "edge:Knows".to_string(),
2028 rows_loaded: 1
2029 },
2030 IngestTableResult {
2031 table_key: "edge:WorksAt".to_string(),
2032 rows_loaded: 1
2033 },
2034 IngestTableResult {
2035 table_key: "node:Company".to_string(),
2036 rows_loaded: 1
2037 },
2038 IngestTableResult {
2039 table_key: "node:Person".to_string(),
2040 rows_loaded: 2
2041 },
2042 ]
2043 );
2044 assert!(
2045 db.branch_list()
2046 .await
2047 .unwrap()
2048 .contains(&"feature".to_string())
2049 );
2050 }
2051
2052 #[tokio::test]
2053 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
2054 let dir = tempfile::tempdir().unwrap();
2055 let uri = dir.path().to_str().unwrap();
2056 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2057 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
2058 .await
2059 .unwrap();
2060 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
2061 .await
2062 .unwrap();
2063
2064 let result = db
2065 .ingest(
2066 "feature",
2067 Some("missing-base"),
2068 r#"{"type":"Person","data":{"name":"Bob","age":26}}
2069{"type":"Person","data":{"name":"Eve","age":31}}"#,
2070 LoadMode::Merge,
2071 )
2072 .await
2073 .unwrap();
2074
2075 assert_eq!(result.branch, "feature");
2076 assert_eq!(result.base_branch, "missing-base");
2077 assert!(!result.branch_created);
2078 assert_eq!(result.mode, LoadMode::Merge);
2079 assert_eq!(
2080 result.tables,
2081 vec![IngestTableResult {
2082 table_key: "node:Person".to_string(),
2083 rows_loaded: 2
2084 }]
2085 );
2086
2087 let snap = db
2088 .snapshot_of(crate::db::ReadTarget::branch("feature"))
2089 .await
2090 .unwrap();
2091 let person_ds = snap.open("node:Person").await.unwrap();
2092 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2093
2094 let batches: Vec<RecordBatch> = person_ds
2095 .scan()
2096 .try_into_stream()
2097 .await
2098 .unwrap()
2099 .try_collect()
2100 .await
2101 .unwrap();
2102 let mut ages_by_id = HashMap::new();
2103 for batch in &batches {
2104 let ids = batch
2105 .column_by_name("id")
2106 .unwrap()
2107 .as_any()
2108 .downcast_ref::<StringArray>()
2109 .unwrap();
2110 let ages = batch
2111 .column_by_name("age")
2112 .unwrap()
2113 .as_any()
2114 .downcast_ref::<Int32Array>()
2115 .unwrap();
2116 for idx in 0..ids.len() {
2117 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2118 }
2119 }
2120
2121 assert_eq!(ages_by_id.get("Bob"), Some(&26));
2122 assert_eq!(ages_by_id.get("Eve"), Some(&31));
2123 assert_eq!(ages_by_id.get("Alice"), Some(&30));
2124 }
2125
2126 #[tokio::test]
2127 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2128 let dir = tempfile::tempdir().unwrap();
2129 let uri = dir.path().to_str().unwrap();
2130 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2131
2132 db.ingest_as(
2133 "feature",
2134 Some("main"),
2135 TEST_DATA,
2136 LoadMode::Overwrite,
2137 Some("act-andrew"),
2138 )
2139 .await
2140 .unwrap();
2141
2142 let head = db
2143 .list_commits(Some("feature"))
2144 .await
2145 .unwrap()
2146 .into_iter()
2147 .last()
2148 .unwrap();
2149 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2150 }
2151
2152 #[test]
2153 fn test_range_constraint_rejects_nan() {
2154 use arrow_array::{Float64Array, RecordBatch, StringArray};
2155 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2156 use std::sync::Arc;
2157
2158 let schema = Arc::new(arrow_schema::Schema::new(vec![
2159 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2160 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2161 ]));
2162
2163 let batch = RecordBatch::try_new(
2164 schema.clone(),
2165 vec![
2166 Arc::new(StringArray::from(vec!["bad"])),
2167 Arc::new(Float64Array::from(vec![f64::NAN])),
2168 ],
2169 )
2170 .unwrap();
2171
2172 let node_type = NodeType {
2173 name: "Test".to_string(),
2174 implements: vec![],
2175 properties: Default::default(),
2176 key: None,
2177 unique_constraints: vec![],
2178 indices: vec![],
2179 range_constraints: vec![RangeConstraint {
2180 property: "score".to_string(),
2181 min: Some(LiteralValue::Float(0.0)),
2182 max: Some(LiteralValue::Float(1.0)),
2183 }],
2184 check_constraints: vec![],
2185 embed_sources: Default::default(),
2186 blob_properties: Default::default(),
2187 arrow_schema: schema,
2188 };
2189
2190 let result = validate_value_constraints(&batch, &node_type);
2191 assert!(result.is_err(), "expected NaN to be rejected");
2192 let err = result.unwrap_err().to_string();
2193 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2194 }
2195}