1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29 pub nodes_loaded: HashMap<String, usize>,
30 pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35 pub table_key: String,
36 pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41 pub branch: String,
42 pub base_branch: String,
43 pub branch_created: bool,
44 pub mode: LoadMode,
45 pub tables: Vec<IngestTableResult>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52 Overwrite,
54 Append,
56 Merge,
58}
59
60pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62 let current_branch = db.active_branch().await;
63 let branch = current_branch.as_deref().unwrap_or("main");
64 db.load(branch, data, mode).await
65}
66
67pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69 let current_branch = db.active_branch().await;
70 let branch = current_branch.as_deref().unwrap_or("main");
71 db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75 pub async fn ingest(
76 &self,
77 branch: &str,
78 from: Option<&str>,
79 data: &str,
80 mode: LoadMode,
81 ) -> Result<IngestResult> {
82 self.ingest_as(branch, from, data, mode, None).await
83 }
84
85 pub async fn ingest_as(
86 &self,
87 branch: &str,
88 from: Option<&str>,
89 data: &str,
90 mode: LoadMode,
91 actor_id: Option<&str>,
92 ) -> Result<IngestResult> {
93 self.enforce(
101 omnigraph_policy::PolicyAction::Change,
102 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
103 actor_id,
104 )?;
105 self.ingest_with_current_actor(branch, from, data, mode, actor_id)
106 .await
107 }
108
109 pub async fn ingest_file(
110 &self,
111 branch: &str,
112 from: Option<&str>,
113 path: &str,
114 mode: LoadMode,
115 ) -> Result<IngestResult> {
116 self.ingest_file_as(branch, from, path, mode, None).await
117 }
118
119 pub async fn ingest_file_as(
120 &self,
121 branch: &str,
122 from: Option<&str>,
123 path: &str,
124 mode: LoadMode,
125 actor_id: Option<&str>,
126 ) -> Result<IngestResult> {
127 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
128 self.ingest_as(branch, from, &data, mode, actor_id).await
129 }
130
131 async fn ingest_with_current_actor(
132 &self,
133 branch: &str,
134 from: Option<&str>,
135 data: &str,
136 mode: LoadMode,
137 actor_id: Option<&str>,
138 ) -> Result<IngestResult> {
139 self.ensure_schema_state_valid().await?;
140 let target_branch =
141 Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
142 let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
143 .unwrap_or_else(|| "main".to_string());
144 let branch_created = !self
145 .branch_list()
146 .await?
147 .iter()
148 .any(|name| name == &target_branch);
149 if branch_created {
150 self.branch_create_from_as(
157 crate::db::ReadTarget::branch(&base_branch),
158 &target_branch,
159 actor_id,
160 )
161 .await?;
162 }
163
164 let result = self.load_as(&target_branch, data, mode, actor_id).await?;
165 Ok(IngestResult {
166 branch: target_branch,
167 base_branch,
168 branch_created,
169 mode,
170 tables: result.to_ingest_tables(),
171 })
172 }
173
174 pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
175 self.load_as(branch, data, mode, None).await
176 }
177
178 pub async fn load_as(
179 &self,
180 branch: &str,
181 data: &str,
182 mode: LoadMode,
183 actor_id: Option<&str>,
184 ) -> Result<LoadResult> {
185 self.enforce(
192 omnigraph_policy::PolicyAction::Change,
193 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
194 actor_id,
195 )?;
196 self.ensure_schema_state_valid().await?;
197 crate::db::ensure_public_branch_ref(branch, "load")?;
202 let requested = Self::normalize_branch_name(branch)?;
208 self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
212 .await
213 }
214
215 pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
216 self.load_file_as(branch, path, mode, None).await
217 }
218
219 pub async fn load_file_as(
223 &self,
224 branch: &str,
225 path: &str,
226 mode: LoadMode,
227 actor_id: Option<&str>,
228 ) -> Result<LoadResult> {
229 let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
230 self.load_as(branch, &data, mode, actor_id).await
231 }
232
233 async fn load_direct_on_branch(
234 &self,
235 branch: Option<&str>,
236 data: &str,
237 mode: LoadMode,
238 actor_id: Option<&str>,
239 ) -> Result<LoadResult> {
240 let reader = BufReader::new(Cursor::new(data.as_bytes()));
241 load_jsonl_reader(self, branch, reader, mode, actor_id).await
242 }
243}
244
245impl LoadMode {
246 pub fn as_str(self) -> &'static str {
247 match self {
248 LoadMode::Overwrite => "overwrite",
249 LoadMode::Append => "append",
250 LoadMode::Merge => "merge",
251 }
252 }
253}
254
255impl LoadResult {
256 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
257 let mut tables = self
258 .nodes_loaded
259 .iter()
260 .map(|(type_name, rows_loaded)| IngestTableResult {
261 table_key: format!("node:{type_name}"),
262 rows_loaded: *rows_loaded,
263 })
264 .chain(
265 self.edges_loaded
266 .iter()
267 .map(|(edge_name, rows_loaded)| IngestTableResult {
268 table_key: format!("edge:{edge_name}"),
269 rows_loaded: *rows_loaded,
270 }),
271 )
272 .collect::<Vec<_>>();
273 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
274 tables
275 }
276}
277
278async fn load_jsonl_reader<R: BufRead>(
279 db: &Omnigraph,
280 branch: Option<&str>,
281 reader: R,
282 mode: LoadMode,
283 actor_id: Option<&str>,
284) -> Result<LoadResult> {
285 let catalog = db.catalog().clone();
286
287 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
289 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
290
291 for (line_num, line) in reader.lines().enumerate() {
292 let line = line?;
293 let line = line.trim();
294 if line.is_empty() {
295 continue;
296 }
297 let value: JsonValue = serde_json::from_str(line).map_err(|e| {
298 OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
299 })?;
300
301 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
302 if !catalog.node_types.contains_key(type_name) {
303 return Err(OmniError::manifest(format!(
304 "line {}: unknown node type '{}'",
305 line_num + 1,
306 type_name
307 )));
308 }
309 let data = value
310 .get("data")
311 .cloned()
312 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
313 node_rows
314 .entry(type_name.to_string())
315 .or_default()
316 .push(data);
317 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
318 if catalog.lookup_edge_by_name(edge_name).is_none() {
319 return Err(OmniError::manifest(format!(
320 "line {}: unknown edge type '{}'",
321 line_num + 1,
322 edge_name
323 )));
324 }
325 let from = value
326 .get("from")
327 .and_then(|v| v.as_str())
328 .ok_or_else(|| {
329 OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
330 })?
331 .to_string();
332 let to = value
333 .get("to")
334 .and_then(|v| v.as_str())
335 .ok_or_else(|| {
336 OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
337 })?
338 .to_string();
339 let data = value
340 .get("data")
341 .cloned()
342 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
343 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
344 edge_rows
345 .entry(canonical)
346 .or_default()
347 .push((from, to, data));
348 } else {
349 return Err(OmniError::manifest(format!(
350 "line {}: expected 'type' or 'edge' field",
351 line_num + 1
352 )));
353 }
354 }
355
356 let mut result = LoadResult::default();
366 let snapshot = db.snapshot_for_branch(branch).await?;
367 let use_staging = !matches!(mode, LoadMode::Overwrite);
368 let mut staging = MutationStaging::default();
369 let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
370 let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
371 let pending_mode = match mode {
372 LoadMode::Merge => PendingMode::Merge,
373 LoadMode::Append => PendingMode::Append,
377 LoadMode::Overwrite => PendingMode::Append, };
379 let load_op_kind = match mode {
385 LoadMode::Append => crate::db::MutationOpKind::Insert,
386 LoadMode::Merge => crate::db::MutationOpKind::Merge,
387 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
388 };
389
390 let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
393 Vec::with_capacity(node_rows.len());
394 for (type_name, rows) in &node_rows {
395 let node_type = &catalog.node_types[type_name];
396 let batch = build_node_batch(node_type, rows)?;
397 validate_value_constraints(&batch, node_type)?;
398 validate_enum_constraints(&batch, &node_type.properties, type_name)?;
399 let unique_props = unique_property_names_for_node(node_type);
400 if !unique_props.is_empty() {
401 enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
402 }
403 let loaded_count = batch.num_rows();
404 let table_key = format!("node:{}", type_name);
405 let entry = snapshot
406 .entry(&table_key)
407 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
408 if !use_staging {
409 overwrite_expected.insert(table_key.clone(), entry.table_version);
410 }
411 prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
412 }
413
414 if use_staging {
417 for (type_name, table_key, batch, loaded_count) in prepared_nodes {
418 let (ds, full_path, table_branch) = db
419 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
420 .await?;
421 let expected_version = ds.version().version;
422 staging.ensure_path(
423 &table_key,
424 full_path,
425 table_branch,
426 expected_version,
427 load_op_kind,
428 );
429 let schema = batch.schema();
430 staging.append_batch(&table_key, schema, pending_mode, batch)?;
431 result.nodes_loaded.insert(type_name, loaded_count);
432 }
433 } else {
434 let node_write_results =
435 write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
436 for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
437 overwrite_updates.push(crate::db::SubTableUpdate {
438 table_key,
439 table_version: state.version,
440 table_branch,
441 row_count: state.row_count,
442 version_metadata: state.version_metadata,
443 });
444 result.nodes_loaded.insert(type_name, loaded_count);
445 }
446 }
447
448 for (edge_name, rows) in &edge_rows {
453 let edge_type = &catalog.edge_types[edge_name];
454 let from_ids = if use_staging {
455 collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?
456 } else {
457 collect_node_ids(
458 db,
459 branch,
460 &edge_type.from_type,
461 &node_rows,
462 &catalog,
463 &overwrite_updates,
464 )
465 .await?
466 };
467 let to_ids = if use_staging {
468 collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?
469 } else {
470 collect_node_ids(
471 db,
472 branch,
473 &edge_type.to_type,
474 &node_rows,
475 &catalog,
476 &overwrite_updates,
477 )
478 .await?
479 };
480
481 for (i, (src, dst, _)) in rows.iter().enumerate() {
482 if !from_ids.contains(src.as_str()) {
483 return Err(OmniError::manifest(format!(
484 "edge {} row {}: src '{}' not found in {}",
485 edge_name,
486 i + 1,
487 src,
488 edge_type.from_type
489 )));
490 }
491 if !to_ids.contains(dst.as_str()) {
492 return Err(OmniError::manifest(format!(
493 "edge {} row {}: dst '{}' not found in {}",
494 edge_name,
495 i + 1,
496 dst,
497 edge_type.to_type
498 )));
499 }
500 }
501 }
502
503 let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
505 Vec::with_capacity(edge_rows.len());
506 for (edge_name, rows) in &edge_rows {
507 let edge_type = &catalog.edge_types[edge_name];
508 let batch = build_edge_batch(edge_type, rows)?;
509 validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
510 let unique_props = unique_property_names_for_edge(edge_type);
511 if !unique_props.is_empty() {
512 enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
513 }
514 let loaded_count = batch.num_rows();
515 let table_key = format!("edge:{}", edge_name);
516 let entry = snapshot
517 .entry(&table_key)
518 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
519 if !use_staging {
520 overwrite_expected.insert(table_key.clone(), entry.table_version);
521 }
522 prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
523 }
524
525 if use_staging {
527 for (edge_name, table_key, batch, loaded_count) in prepared_edges {
528 let (ds, full_path, table_branch) = db
529 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
530 .await?;
531 let expected_version = ds.version().version;
532 staging.ensure_path(
533 &table_key,
534 full_path,
535 table_branch,
536 expected_version,
537 load_op_kind,
538 );
539 let schema = batch.schema();
540 staging.append_batch(&table_key, schema, pending_mode, batch)?;
541 result.edges_loaded.insert(edge_name, loaded_count);
542 }
543 } else {
544 let edge_write_results =
545 write_batches_concurrently(db, branch, mode, prepared_edges).await?;
546 for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
547 overwrite_updates.push(crate::db::SubTableUpdate {
548 table_key,
549 table_version: state.version,
550 table_branch,
551 row_count: state.row_count,
552 version_metadata: state.version_metadata,
553 });
554 result.edges_loaded.insert(edge_name, loaded_count);
555 }
556 }
557
558 for (edge_name, _) in &edge_rows {
563 let edge_type = &catalog.edge_types[edge_name];
564 let table_key = format!("edge:{}", edge_name);
565 if use_staging {
566 validate_edge_cardinality_with_pending_loader(
567 db, branch, edge_type, &table_key, &staging, mode,
568 )
569 .await?;
570 } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
571 validate_edge_cardinality(
572 db,
573 branch,
574 edge_name,
575 update.table_version,
576 update.table_branch.as_deref(),
577 )
578 .await?;
579 }
580 }
581
582 if use_staging {
584 let staged = staging.stage_all(db, branch).await?;
585 let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
589 .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
590 .await?;
591 crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
596 db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
597 .await?;
598 if let Some(handle) = sidecar_handle {
603 if let Err(err) =
604 crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
605 {
606 tracing::warn!(
607 error = %err,
608 operation_id = handle.operation_id.as_str(),
609 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
610 );
611 }
612 }
613 } else {
614 db.commit_updates_on_branch_with_expected(
622 branch,
623 &overwrite_updates,
624 &overwrite_expected,
625 actor_id,
626 )
627 .await?;
628 }
629
630 Ok(result)
631}
632
633fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
634 let schema = node_type.arrow_schema.clone();
635
636 let ids: Vec<String> = rows
638 .iter()
639 .map(|row| {
640 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
641 if let Some(key_prop) = node_type.key_property() {
642 let key_value = row
643 .get(key_prop)
644 .and_then(|v| v.as_str())
645 .map(|s| s.to_string())
646 .ok_or_else(|| {
647 OmniError::manifest(format!(
648 "node {} missing @key property '{}'",
649 node_type.name, key_prop
650 ))
651 })?;
652 if let Some(explicit_id) = explicit_id {
653 if explicit_id != key_value {
654 return Err(OmniError::manifest(format!(
655 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
656 node_type.name, explicit_id, key_prop, key_value
657 )));
658 }
659 }
660 Ok(key_value)
661 } else if let Some(explicit_id) = explicit_id {
662 Ok(explicit_id)
663 } else {
664 Ok(generate_id())
665 }
666 })
667 .collect::<Result<Vec<_>>>()?;
668
669 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
670 columns.push(Arc::new(StringArray::from(ids)));
671
672 for field in schema.fields().iter().skip(1) {
674 if node_type.blob_properties.contains(field.name()) {
675 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
676 columns.push(col);
677 } else {
678 let col =
679 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
680 columns.push(col);
681 }
682 }
683
684 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
685}
686
687fn build_edge_batch(
688 edge_type: &omnigraph_compiler::catalog::EdgeType,
689 rows: &[(String, String, JsonValue)],
690) -> Result<RecordBatch> {
691 let schema = edge_type.arrow_schema.clone();
692
693 let ids: Vec<String> = rows
694 .iter()
695 .map(|(_, _, data)| {
696 data.get("id")
697 .and_then(|v| v.as_str())
698 .map(str::to_string)
699 .unwrap_or_else(generate_id)
700 })
701 .collect();
702 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
703 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
704
705 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
706 columns.push(Arc::new(StringArray::from(ids)));
707 columns.push(Arc::new(StringArray::from(srcs)));
708 columns.push(Arc::new(StringArray::from(dsts)));
709
710 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
712 for field in schema.fields().iter().skip(3) {
713 if edge_type.blob_properties.contains(field.name()) {
714 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
715 columns.push(col);
716 } else {
717 let col = build_column_from_json(
718 field.name(),
719 field.data_type(),
720 field.is_nullable(),
721 &data_values,
722 )?;
723 columns.push(col);
724 }
725 }
726
727 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
728}
729
730pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
732 if let Some(encoded) = value.strip_prefix("base64:") {
733 let bytes = base64::engine::general_purpose::STANDARD
734 .decode(encoded)
735 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
736 builder
737 .push_bytes(bytes)
738 .map_err(|e| OmniError::Lance(e.to_string()))
739 } else {
740 builder
742 .push_uri(value)
743 .map_err(|e| OmniError::Lance(e.to_string()))
744 }
745}
746
747fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
749 let mut builder = BlobArrayBuilder::new(rows.len());
750 for row in rows {
751 match row.get(name) {
752 Some(JsonValue::String(s)) => {
753 append_blob_value(&mut builder, s)?;
754 }
755 Some(JsonValue::Null) | None if nullable => {
756 builder
757 .push_null()
758 .map_err(|e| OmniError::Lance(e.to_string()))?;
759 }
760 Some(JsonValue::Null) | None => {
761 return Err(OmniError::manifest(format!(
762 "non-nullable blob property '{}' has null values",
763 name
764 )));
765 }
766 _ => {
767 return Err(OmniError::manifest(format!(
768 "blob property '{}' must be a URI string or base64: prefixed data",
769 name
770 )));
771 }
772 }
773 }
774 builder
775 .finish()
776 .map_err(|e| OmniError::Lance(e.to_string()))
777}
778
779fn build_column_from_json(
780 name: &str,
781 data_type: &DataType,
782 nullable: bool,
783 rows: &[JsonValue],
784) -> Result<ArrayRef> {
785 let array: ArrayRef = match data_type {
786 DataType::Utf8 => {
787 let values: Vec<Option<String>> = rows
788 .iter()
789 .map(|row| {
790 row.get(name)
791 .and_then(|v| v.as_str())
792 .map(|s| s.to_string())
793 })
794 .collect();
795 Arc::new(StringArray::from(values))
796 }
797 DataType::Int32 => {
798 let values: Vec<Option<i32>> = rows
799 .iter()
800 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
801 .collect();
802 Arc::new(Int32Array::from(values))
803 }
804 DataType::Int64 => {
805 let values: Vec<Option<i64>> = rows
806 .iter()
807 .map(|row| row.get(name).and_then(|v| v.as_i64()))
808 .collect();
809 Arc::new(Int64Array::from(values))
810 }
811 DataType::UInt32 => {
812 let values: Vec<Option<u32>> = rows
813 .iter()
814 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
815 .collect();
816 Arc::new(UInt32Array::from(values))
817 }
818 DataType::UInt64 => {
819 let values: Vec<Option<u64>> = rows
820 .iter()
821 .map(|row| row.get(name).and_then(|v| v.as_u64()))
822 .collect();
823 Arc::new(UInt64Array::from(values))
824 }
825 DataType::Float32 => {
826 let values: Vec<Option<f32>> = rows
827 .iter()
828 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
829 .collect();
830 Arc::new(Float32Array::from(values))
831 }
832 DataType::Float64 => {
833 let values: Vec<Option<f64>> = rows
834 .iter()
835 .map(|row| row.get(name).and_then(|v| v.as_f64()))
836 .collect();
837 Arc::new(Float64Array::from(values))
838 }
839 DataType::Boolean => {
840 let values: Vec<Option<bool>> = rows
841 .iter()
842 .map(|row| row.get(name).and_then(|v| v.as_bool()))
843 .collect();
844 Arc::new(BooleanArray::from(values))
845 }
846 DataType::Date32 => {
847 let mut values = Vec::with_capacity(rows.len());
848 for row in rows {
849 values.push(parse_date32_json_value(
850 row.get(name).unwrap_or(&JsonValue::Null),
851 )?);
852 }
853 Arc::new(Date32Array::from(values))
854 }
855 DataType::Date64 => {
856 let mut values = Vec::with_capacity(rows.len());
857 for row in rows {
858 values.push(parse_date64_json_value(
859 row.get(name).unwrap_or(&JsonValue::Null),
860 )?);
861 }
862 Arc::new(Date64Array::from(values))
863 }
864 DataType::List(field) => {
865 let mut builder = ListBuilder::with_capacity(
866 make_list_value_builder(field.data_type(), rows.len())?,
867 rows.len(),
868 )
869 .with_field(field.clone());
870 for row in rows {
871 let value = row.get(name).unwrap_or(&JsonValue::Null);
872 if value.is_null() {
873 builder.append(false);
874 continue;
875 }
876 let items = value.as_array().ok_or_else(|| {
877 OmniError::manifest(format!(
878 "list property '{}' expects a JSON array, got {}",
879 name, value
880 ))
881 })?;
882 for item in items {
883 append_json_list_item(builder.values(), field.data_type(), item)?;
884 }
885 builder.append(true);
886 }
887 Arc::new(builder.finish())
888 }
889 DataType::FixedSizeList(child_field, dim) => {
890 let dim = *dim;
892 let mut builder = FixedSizeListBuilder::with_capacity(
893 Float32Builder::with_capacity(rows.len() * dim as usize),
894 dim,
895 rows.len(),
896 )
897 .with_field(child_field.clone());
898 for row in rows {
899 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
900 if arr.len() != dim as usize {
901 return Err(OmniError::manifest(format!(
902 "vector property '{}' expects {} dimensions, got {}",
903 name,
904 dim,
905 arr.len()
906 )));
907 }
908 for val in arr {
909 builder
910 .values()
911 .append_value(val.as_f64().unwrap_or(0.0) as f32);
912 }
913 builder.append(true);
914 } else if nullable {
915 for _ in 0..dim as usize {
916 builder.values().append_null();
917 }
918 builder.append(false);
919 } else {
920 return Err(OmniError::manifest(format!(
921 "non-nullable vector property '{}' has null values",
922 name
923 )));
924 }
925 }
926 Arc::new(builder.finish())
927 }
928 _ => {
929 let values: Vec<Option<&str>> = vec![None; rows.len()];
931 Arc::new(StringArray::from(values))
932 }
933 };
934
935 if !nullable && array.null_count() > 0 {
936 return Err(OmniError::manifest(format!(
937 "non-nullable property '{}' has null or invalid values",
938 name
939 )));
940 }
941
942 Ok(array)
943}
944
945fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
946 Ok(match data_type {
947 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
948 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
949 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
950 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
951 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
952 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
953 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
954 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
955 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
956 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
957 other => {
958 return Err(OmniError::manifest(format!(
959 "unsupported list element data type {:?}",
960 other
961 )));
962 }
963 })
964}
965
966fn append_json_list_item(
967 builder: &mut Box<dyn ArrayBuilder>,
968 data_type: &DataType,
969 value: &JsonValue,
970) -> Result<()> {
971 match data_type {
972 DataType::Utf8 => {
973 let builder = builder
974 .as_any_mut()
975 .downcast_mut::<StringBuilder>()
976 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
977 if let Some(value) = value.as_str() {
978 builder.append_value(value);
979 } else {
980 builder.append_null();
981 }
982 }
983 DataType::Boolean => {
984 let builder = builder
985 .as_any_mut()
986 .downcast_mut::<BooleanBuilder>()
987 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
988 if let Some(value) = value.as_bool() {
989 builder.append_value(value);
990 } else {
991 builder.append_null();
992 }
993 }
994 DataType::Int32 => {
995 let builder = builder
996 .as_any_mut()
997 .downcast_mut::<Int32Builder>()
998 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
999 if let Some(value) = value.as_i64() {
1000 let value = i32::try_from(value).map_err(|_| {
1001 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
1002 })?;
1003 builder.append_value(value);
1004 } else {
1005 builder.append_null();
1006 }
1007 }
1008 DataType::Int64 => {
1009 let builder = builder
1010 .as_any_mut()
1011 .downcast_mut::<Int64Builder>()
1012 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1013 if let Some(value) = value.as_i64() {
1014 builder.append_value(value);
1015 } else {
1016 builder.append_null();
1017 }
1018 }
1019 DataType::UInt32 => {
1020 let builder = builder
1021 .as_any_mut()
1022 .downcast_mut::<UInt32Builder>()
1023 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1024 if let Some(value) = value.as_u64() {
1025 let value = u32::try_from(value).map_err(|_| {
1026 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1027 })?;
1028 builder.append_value(value);
1029 } else {
1030 builder.append_null();
1031 }
1032 }
1033 DataType::UInt64 => {
1034 let builder = builder
1035 .as_any_mut()
1036 .downcast_mut::<UInt64Builder>()
1037 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1038 if let Some(value) = value.as_u64() {
1039 builder.append_value(value);
1040 } else {
1041 builder.append_null();
1042 }
1043 }
1044 DataType::Float32 => {
1045 let builder = builder
1046 .as_any_mut()
1047 .downcast_mut::<Float32Builder>()
1048 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1049 if let Some(value) = value.as_f64() {
1050 builder.append_value(value as f32);
1051 } else {
1052 builder.append_null();
1053 }
1054 }
1055 DataType::Float64 => {
1056 let builder = builder
1057 .as_any_mut()
1058 .downcast_mut::<Float64Builder>()
1059 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1060 if let Some(value) = value.as_f64() {
1061 builder.append_value(value);
1062 } else {
1063 builder.append_null();
1064 }
1065 }
1066 DataType::Date32 => {
1067 let builder = builder
1068 .as_any_mut()
1069 .downcast_mut::<Date32Builder>()
1070 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1071 if let Some(value) = parse_date32_json_value(value)? {
1072 builder.append_value(value);
1073 } else {
1074 builder.append_null();
1075 }
1076 }
1077 DataType::Date64 => {
1078 let builder = builder
1079 .as_any_mut()
1080 .downcast_mut::<Date64Builder>()
1081 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1082 if let Some(value) = parse_date64_json_value(value)? {
1083 builder.append_value(value);
1084 } else {
1085 builder.append_null();
1086 }
1087 }
1088 other => {
1089 return Err(OmniError::manifest(format!(
1090 "unsupported list element data type {:?}",
1091 other
1092 )));
1093 }
1094 }
1095
1096 Ok(())
1097}
1098
1099fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1100 if value.is_null() {
1101 return Ok(None);
1102 }
1103 if let Some(days) = value.as_i64() {
1104 let days = i32::try_from(days)
1105 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1106 return Ok(Some(days));
1107 }
1108 if let Some(days) = value.as_u64() {
1109 let days = i32::try_from(days)
1110 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1111 return Ok(Some(days));
1112 }
1113 if let Some(value) = value.as_str() {
1114 return Ok(Some(parse_date32_literal(value)?));
1115 }
1116 Ok(None)
1117}
1118
1119fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1120 if value.is_null() {
1121 return Ok(None);
1122 }
1123 if let Some(ms) = value.as_i64() {
1124 return Ok(Some(ms));
1125 }
1126 if let Some(ms) = value.as_u64() {
1127 let ms = i64::try_from(ms)
1128 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1129 return Ok(Some(ms));
1130 }
1131 if let Some(value) = value.as_str() {
1132 return Ok(Some(parse_date64_literal(value)?));
1133 }
1134 Ok(None)
1135}
1136
1137const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1148
1149fn load_write_concurrency() -> usize {
1150 std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1151 .ok()
1152 .and_then(|v| v.parse::<usize>().ok())
1153 .filter(|v| *v > 0)
1154 .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1155}
1156
1157async fn write_batches_concurrently(
1161 db: &Omnigraph,
1162 branch: Option<&str>,
1163 mode: LoadMode,
1164 prepared: Vec<(String, String, RecordBatch, usize)>,
1165) -> Result<
1166 Vec<(
1167 String,
1168 String,
1169 usize,
1170 crate::table_store::TableState,
1171 Option<String>,
1172 )>,
1173> {
1174 use futures::stream::StreamExt;
1175
1176 if prepared.is_empty() {
1177 return Ok(Vec::new());
1178 }
1179
1180 let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1181
1182 futures::stream::iter(prepared.into_iter().map(
1183 |(type_name, table_key, batch, loaded_count)| async move {
1184 let (state, table_branch) =
1185 write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1186 Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1187 },
1188 ))
1189 .buffered(concurrency)
1190 .collect::<Vec<Result<_>>>()
1191 .await
1192 .into_iter()
1193 .collect()
1194}
1195
1196async fn write_batch_to_dataset(
1197 db: &Omnigraph,
1198 branch: Option<&str>,
1199 table_key: &str,
1200 batch: RecordBatch,
1201 mode: LoadMode,
1202) -> Result<(crate::table_store::TableState, Option<String>)> {
1203 let op_kind = match mode {
1204 LoadMode::Append => crate::db::MutationOpKind::Insert,
1205 LoadMode::Merge => crate::db::MutationOpKind::Merge,
1206 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
1207 };
1208 let (mut ds, full_path, table_branch) = db
1209 .open_for_mutation_on_branch(branch, table_key, op_kind)
1210 .await?;
1211 let table_store = db.table_store();
1212
1213 match mode {
1214 LoadMode::Overwrite => {
1215 let state = table_store
1216 .overwrite_batch(&full_path, &mut ds, batch)
1217 .await?;
1218 Ok((state, table_branch))
1219 }
1220 LoadMode::Append => {
1221 let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1222 Ok((state, table_branch))
1223 }
1224 LoadMode::Merge => {
1225 let state = table_store
1226 .merge_insert_batch(
1227 &full_path,
1228 ds,
1229 batch,
1230 vec!["id".to_string()],
1231 lance::dataset::WhenMatched::UpdateAll,
1232 lance::dataset::WhenNotMatched::InsertAll,
1233 )
1234 .await?;
1235 Ok((state, table_branch))
1236 }
1237 }
1238}
1239
1240fn generate_id() -> String {
1241 ulid::Ulid::new().to_string()
1242}
1243
1244pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1245 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1246 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1247 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1248 let out = casted
1249 .as_any()
1250 .downcast_ref::<Date32Array>()
1251 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1252 if out.is_null(0) {
1253 return Err(OmniError::manifest(format!(
1254 "invalid Date literal '{}'",
1255 value
1256 )));
1257 }
1258 Ok(out.value(0))
1259}
1260
1261pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1262 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1263 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1264 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1265 let out = casted
1266 .as_any()
1267 .downcast_ref::<Date64Array>()
1268 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1269 if out.is_null(0) {
1270 return Err(OmniError::manifest(format!(
1271 "invalid DateTime literal '{}'",
1272 value
1273 )));
1274 }
1275 Ok(out.value(0))
1276}
1277
1278pub(crate) fn validate_value_constraints(
1281 batch: &RecordBatch,
1282 node_type: &omnigraph_compiler::catalog::NodeType,
1283) -> Result<()> {
1284 use arrow_array::Array;
1285
1286 for rc in &node_type.range_constraints {
1288 let Some(col) = batch.column_by_name(&rc.property) else {
1289 continue;
1290 };
1291 for row in 0..batch.num_rows() {
1292 if col.is_null(row) {
1293 continue;
1294 }
1295 let value = extract_numeric_value(col, row);
1296 if let Some(val) = value {
1297 if val.is_nan() {
1298 return Err(OmniError::manifest(format!(
1299 "@range violation on {}.{}: value is NaN",
1300 node_type.name, rc.property
1301 )));
1302 }
1303 if let Some(ref min) = rc.min {
1304 let min_f = literal_value_to_f64(min);
1305 if val < min_f {
1306 return Err(OmniError::manifest(format!(
1307 "@range violation on {}.{}: value {} < min {}",
1308 node_type.name, rc.property, val, min_f
1309 )));
1310 }
1311 }
1312 if let Some(ref max) = rc.max {
1313 let max_f = literal_value_to_f64(max);
1314 if val > max_f {
1315 return Err(OmniError::manifest(format!(
1316 "@range violation on {}.{}: value {} > max {}",
1317 node_type.name, rc.property, val, max_f
1318 )));
1319 }
1320 }
1321 }
1322 }
1323 }
1324
1325 for cc in &node_type.check_constraints {
1327 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1328 OmniError::manifest(format!(
1329 "@check on {}.{} has invalid regex '{}': {}",
1330 node_type.name, cc.property, cc.pattern, e
1331 ))
1332 })?;
1333 let Some(col) = batch.column_by_name(&cc.property) else {
1334 continue;
1335 };
1336 let str_col = col.as_any().downcast_ref::<StringArray>();
1337 if let Some(str_col) = str_col {
1338 for row in 0..str_col.len() {
1339 if str_col.is_null(row) {
1340 continue;
1341 }
1342 let val = str_col.value(row);
1343 if !re.is_match(val) {
1344 return Err(OmniError::manifest(format!(
1345 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1346 node_type.name, cc.property, val, cc.pattern
1347 )));
1348 }
1349 }
1350 }
1351 }
1352
1353 Ok(())
1354}
1355
1356pub(crate) fn validate_enum_constraints(
1363 batch: &RecordBatch,
1364 properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1365 type_name: &str,
1366) -> Result<()> {
1367 use arrow_array::{Array, ListArray};
1368
1369 for (prop_name, prop_type) in properties {
1370 let Some(allowed) = prop_type.enum_values.as_ref() else {
1371 continue;
1372 };
1373 let Some(col) = batch.column_by_name(prop_name) else {
1374 continue;
1375 };
1376 if prop_type.list {
1377 let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1378 continue;
1379 };
1380 for row in 0..list_col.len() {
1381 if list_col.is_null(row) {
1382 continue;
1383 }
1384 let item_arr = list_col.value(row);
1385 let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1386 continue;
1387 };
1388 for i in 0..str_arr.len() {
1389 if str_arr.is_null(i) {
1390 continue;
1391 }
1392 let val = str_arr.value(i);
1393 if !allowed.iter().any(|a| a.as_str() == val) {
1394 return Err(OmniError::manifest(format!(
1395 "invalid enum value '{}' for {}.{} (expected: {})",
1396 val,
1397 type_name,
1398 prop_name,
1399 allowed.join(", ")
1400 )));
1401 }
1402 }
1403 }
1404 } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1405 for row in 0..str_col.len() {
1406 if str_col.is_null(row) {
1407 continue;
1408 }
1409 let val = str_col.value(row);
1410 if !allowed.iter().any(|a| a.as_str() == val) {
1411 return Err(OmniError::manifest(format!(
1412 "invalid enum value '{}' for {}.{} (expected: {})",
1413 val,
1414 type_name,
1415 prop_name,
1416 allowed.join(", ")
1417 )));
1418 }
1419 }
1420 }
1421 }
1422 Ok(())
1423}
1424
1425pub(crate) fn enforce_unique_constraints_intra_batch(
1432 batch: &RecordBatch,
1433 type_name: &str,
1434 unique_properties: &[String],
1435) -> Result<()> {
1436 for property in unique_properties {
1437 let Some(col_idx) = batch.schema().index_of(property).ok() else {
1438 continue;
1439 };
1440 let arr = batch.column(col_idx);
1441 let mut seen: HashMap<String, usize> = HashMap::new();
1442 for row in 0..batch.num_rows() {
1443 let Some(value) = scalar_to_string(arr, row) else {
1444 continue;
1445 };
1446 if let Some(prev_row) = seen.insert(value.clone(), row) {
1447 return Err(OmniError::manifest(format!(
1448 "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1449 type_name, property, value, prev_row, row
1450 )));
1451 }
1452 }
1453 }
1454 Ok(())
1455}
1456
1457fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1461 use arrow_array::Array;
1462 if array.is_null(row) {
1463 return None;
1464 }
1465 if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1466 return Some(a.value(row).to_string());
1467 }
1468 if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1469 return Some(a.value(row).to_string());
1470 }
1471 if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1472 return Some(a.value(row).to_string());
1473 }
1474 if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1475 return Some(a.value(row).to_string());
1476 }
1477 if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1478 return Some(a.value(row).to_string());
1479 }
1480 if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1481 return Some(a.value(row).to_string());
1482 }
1483 if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1484 return Some(a.value(row).to_string());
1485 }
1486 if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1487 return Some(a.value(row).to_string());
1488 }
1489 if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1490 return Some(a.value(row).to_string());
1491 }
1492 if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1493 return Some(a.value(row).to_string());
1494 }
1495 None
1496}
1497
1498pub(crate) fn unique_property_names_for_node(
1502 node_type: &omnigraph_compiler::catalog::NodeType,
1503) -> Vec<String> {
1504 let mut props: Vec<String> = node_type
1505 .unique_constraints
1506 .iter()
1507 .flatten()
1508 .cloned()
1509 .collect();
1510 if let Some(key) = &node_type.key {
1511 props.extend(key.iter().cloned());
1512 }
1513 props.sort();
1514 props.dedup();
1515 props
1516}
1517
1518pub(crate) fn unique_property_names_for_edge(
1520 edge_type: &omnigraph_compiler::catalog::EdgeType,
1521) -> Vec<String> {
1522 let mut props: Vec<String> = edge_type
1523 .unique_constraints
1524 .iter()
1525 .flatten()
1526 .cloned()
1527 .collect();
1528 props.sort();
1529 props.dedup();
1530 props
1531}
1532
1533fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1534 use arrow_array::{
1535 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1536 };
1537 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1538 return Some(a.value(row) as f64);
1539 }
1540 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1541 return Some(a.value(row) as f64);
1542 }
1543 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1544 return Some(a.value(row) as f64);
1545 }
1546 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1547 return Some(a.value(row) as f64);
1548 }
1549 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1550 return Some(a.value(row) as f64);
1551 }
1552 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1553 return Some(a.value(row));
1554 }
1555 None
1556}
1557
1558fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1559 use omnigraph_compiler::catalog::LiteralValue;
1560 match v {
1561 LiteralValue::Integer(n) => *n as f64,
1562 LiteralValue::Float(f) => *f,
1563 }
1564}
1565
1566pub(crate) async fn validate_edge_cardinality(
1569 db: &crate::db::Omnigraph,
1570 branch: Option<&str>,
1571 edge_name: &str,
1572 written_version: u64,
1573 written_branch: Option<&str>,
1574) -> Result<()> {
1575 use arrow_array::Array;
1576 let catalog = db.catalog();
1577 let edge_type = &catalog.edge_types[edge_name];
1578 if edge_type.cardinality.is_default() {
1579 return Ok(());
1580 }
1581
1582 let snapshot = db.snapshot_for_branch(branch).await?;
1585 let table_key = format!("edge:{}", edge_name);
1586 let entry = snapshot
1587 .entry(&table_key)
1588 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1589 let ds = db
1590 .open_dataset_at_state(
1591 &entry.table_path,
1592 written_branch.or(entry.table_branch.as_deref()),
1593 written_version,
1594 )
1595 .await?;
1596
1597 let batches = db
1599 .table_store()
1600 .scan(&ds, Some(&["src"]), None, None)
1601 .await?;
1602
1603 let mut counts: HashMap<String, u32> = HashMap::new();
1604 for batch in &batches {
1605 let srcs = batch
1606 .column_by_name("src")
1607 .unwrap()
1608 .as_any()
1609 .downcast_ref::<StringArray>()
1610 .unwrap();
1611 for i in 0..srcs.len() {
1612 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1613 }
1614 }
1615
1616 let card = &edge_type.cardinality;
1617 for (src, count) in &counts {
1618 if let Some(max) = card.max {
1619 if *count > max {
1620 return Err(OmniError::manifest(format!(
1621 "@card violation on edge {}: source '{}' has {} edges (max {})",
1622 edge_name, src, count, max
1623 )));
1624 }
1625 }
1626 if *count < card.min {
1627 return Err(OmniError::manifest(format!(
1628 "@card violation on edge {}: source '{}' has {} edges (min {})",
1629 edge_name, src, count, card.min
1630 )));
1631 }
1632 }
1633
1634 Ok(())
1635}
1636
1637async fn validate_edge_cardinality_with_pending_loader(
1652 db: &Omnigraph,
1653 branch: Option<&str>,
1654 edge_type: &omnigraph_compiler::catalog::EdgeType,
1655 table_key: &str,
1656 staging: &MutationStaging,
1657 mode: LoadMode,
1658) -> Result<()> {
1659 if edge_type.cardinality.is_default() {
1660 return Ok(());
1661 }
1662 let snapshot = db.snapshot_for_branch(branch).await?;
1663 let Some(entry) = snapshot.entry(table_key) else {
1664 return Ok(());
1667 };
1668 let ds = db
1669 .open_dataset_at_state(
1670 &entry.table_path,
1671 entry.table_branch.as_deref(),
1672 entry.table_version,
1673 )
1674 .await?;
1675 let dedupe_key = match mode {
1676 LoadMode::Merge => Some("id"),
1677 LoadMode::Append | LoadMode::Overwrite => None,
1678 };
1679 let counts =
1680 crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
1681 crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1682}
1683
1684async fn collect_node_ids_with_pending(
1693 db: &Omnigraph,
1694 branch: Option<&str>,
1695 type_name: &str,
1696 staging: &MutationStaging,
1697) -> Result<HashSet<String>> {
1698 let mut ids = HashSet::new();
1699 let table_key = format!("node:{}", type_name);
1700
1701 for batch in staging.pending_batches(&table_key) {
1703 if let Some(col) = batch.column_by_name("id") {
1704 if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1705 for i in 0..arr.len() {
1706 if arr.is_valid(i) {
1707 ids.insert(arr.value(i).to_string());
1708 }
1709 }
1710 }
1711 }
1712 }
1713
1714 let snapshot = db.snapshot_for_branch(branch).await?;
1716 let Some(entry) = snapshot.entry(&table_key) else {
1717 return Ok(ids);
1718 };
1719 let ds = db
1720 .open_dataset_at_state(
1721 &entry.table_path,
1722 entry.table_branch.as_deref(),
1723 entry.table_version,
1724 )
1725 .await?;
1726
1727 let batches = db
1728 .table_store()
1729 .scan(&ds, Some(&["id"]), None, None)
1730 .await?;
1731
1732 for batch in &batches {
1733 let id_col = batch
1734 .column_by_name("id")
1735 .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1736 .as_any()
1737 .downcast_ref::<StringArray>()
1738 .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1739 for i in 0..batch.num_rows() {
1740 if id_col.is_valid(i) {
1745 ids.insert(id_col.value(i).to_string());
1746 }
1747 }
1748 }
1749
1750 Ok(ids)
1751}
1752
1753async fn collect_node_ids(
1758 db: &Omnigraph,
1759 branch: Option<&str>,
1760 type_name: &str,
1761 node_rows: &HashMap<String, Vec<JsonValue>>,
1762 catalog: &omnigraph_compiler::catalog::Catalog,
1763 updates: &[crate::db::SubTableUpdate],
1764) -> Result<HashSet<String>> {
1765 let mut ids = HashSet::new();
1766
1767 if let Some(rows) = node_rows.get(type_name) {
1769 if let Some(node_type) = catalog.node_types.get(type_name) {
1770 if let Some(key_prop) = node_type.key_property() {
1771 for row in rows {
1772 if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1773 ids.insert(id.to_string());
1774 }
1775 }
1776 }
1777 }
1778 }
1779
1780 let table_key = format!("node:{}", type_name);
1782 let snapshot = db.snapshot_for_branch(branch).await?;
1783 let Some(entry) = snapshot.entry(&table_key) else {
1784 return Ok(ids);
1785 };
1786 let updated = updates
1788 .iter()
1789 .find(|u| u.table_key == table_key)
1790 .map(|u| (u.table_version, u.table_branch.as_deref()));
1791 let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1792 let ds = db
1793 .open_dataset_at_state(&entry.table_path, branch, version)
1794 .await?;
1795
1796 let batches = db
1797 .table_store()
1798 .scan(&ds, Some(&["id"]), None, None)
1799 .await?;
1800
1801 for batch in &batches {
1802 let id_col = batch
1803 .column_by_name("id")
1804 .unwrap()
1805 .as_any()
1806 .downcast_ref::<StringArray>()
1807 .unwrap();
1808 for i in 0..batch.num_rows() {
1809 if !id_col.is_valid(i) {
1810 continue;
1811 }
1812 ids.insert(id_col.value(i).to_string());
1813 }
1814 }
1815
1816 Ok(ids)
1817}
1818
1819#[cfg(test)]
1820mod tests {
1821 use super::*;
1822 use crate::db::Omnigraph;
1823 use arrow_array::Array;
1824 use futures::TryStreamExt;
1825 use std::collections::HashMap;
1826
1827 const TEST_SCHEMA: &str = r#"
1828node Person {
1829 name: String @key
1830 age: I32?
1831}
1832node Company {
1833 name: String @key
1834}
1835edge Knows: Person -> Person {
1836 since: Date?
1837}
1838edge WorksAt: Person -> Company
1839"#;
1840
1841 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1842{"type": "Person", "data": {"name": "Bob", "age": 25}}
1843{"type": "Company", "data": {"name": "Acme"}}
1844{"edge": "Knows", "from": "Alice", "to": "Bob"}
1845{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1846"#;
1847
1848 #[tokio::test]
1849 async fn test_load_creates_data() {
1850 let dir = tempfile::tempdir().unwrap();
1851 let uri = dir.path().to_str().unwrap();
1852 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1853
1854 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1855 .await
1856 .unwrap();
1857
1858 assert_eq!(result.nodes_loaded["Person"], 2);
1859 assert_eq!(result.nodes_loaded["Company"], 1);
1860 assert_eq!(result.edges_loaded["Knows"], 1);
1861 assert_eq!(result.edges_loaded["WorksAt"], 1);
1862 }
1863
1864 #[tokio::test]
1865 async fn test_load_data_readable_via_lance() {
1866 let dir = tempfile::tempdir().unwrap();
1867 let uri = dir.path().to_str().unwrap();
1868 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1869 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1870 .await
1871 .unwrap();
1872
1873 let snap = db.snapshot().await;
1875 let person_ds = snap.open("node:Person").await.unwrap();
1876
1877 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1878
1879 let batches: Vec<RecordBatch> = person_ds
1881 .scan()
1882 .try_into_stream()
1883 .await
1884 .unwrap()
1885 .try_collect()
1886 .await
1887 .unwrap();
1888
1889 let batch = &batches[0];
1890 let ids = batch
1891 .column_by_name("id")
1892 .unwrap()
1893 .as_any()
1894 .downcast_ref::<StringArray>()
1895 .unwrap();
1896 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1898 assert!(id_values.contains(&"Alice"));
1899 assert!(id_values.contains(&"Bob"));
1900 }
1901
1902 #[tokio::test]
1903 async fn test_load_edges_reference_node_keys() {
1904 let dir = tempfile::tempdir().unwrap();
1905 let uri = dir.path().to_str().unwrap();
1906 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1907 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1908 .await
1909 .unwrap();
1910
1911 let snap = db.snapshot().await;
1912 let knows_ds = snap.open("edge:Knows").await.unwrap();
1913
1914 let batches: Vec<RecordBatch> = knows_ds
1915 .scan()
1916 .try_into_stream()
1917 .await
1918 .unwrap()
1919 .try_collect()
1920 .await
1921 .unwrap();
1922
1923 let batch = &batches[0];
1924 let srcs = batch
1925 .column_by_name("src")
1926 .unwrap()
1927 .as_any()
1928 .downcast_ref::<StringArray>()
1929 .unwrap();
1930 let dsts = batch
1931 .column_by_name("dst")
1932 .unwrap()
1933 .as_any()
1934 .downcast_ref::<StringArray>()
1935 .unwrap();
1936
1937 assert_eq!(srcs.value(0), "Alice");
1938 assert_eq!(dsts.value(0), "Bob");
1939 }
1940
1941 #[tokio::test]
1942 async fn test_load_manifest_version_advances() {
1943 let dir = tempfile::tempdir().unwrap();
1944 let uri = dir.path().to_str().unwrap();
1945 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1946 let v1 = db.version().await;
1947
1948 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1949 .await
1950 .unwrap();
1951
1952 assert!(db.version().await > v1);
1953 }
1954
1955 #[tokio::test]
1956 async fn test_load_append_adds_rows() {
1957 let dir = tempfile::tempdir().unwrap();
1958 let uri = dir.path().to_str().unwrap();
1959 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1960
1961 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1962 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1963
1964 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1965 .await
1966 .unwrap();
1967 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1968
1969 let snap = db.snapshot().await;
1970 let person_ds = snap.open("node:Person").await.unwrap();
1971 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1972 }
1973
1974 #[tokio::test]
1975 async fn test_load_unknown_type_rejected() {
1976 let dir = tempfile::tempdir().unwrap();
1977 let uri = dir.path().to_str().unwrap();
1978 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1979
1980 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1981 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1982 assert!(result.is_err());
1983 }
1984
1985 #[tokio::test]
1986 async fn test_ingest_creates_branch_and_reports_tables() {
1987 let dir = tempfile::tempdir().unwrap();
1988 let uri = dir.path().to_str().unwrap();
1989 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1990
1991 let result = db
1992 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1993 .await
1994 .unwrap();
1995
1996 assert_eq!(result.branch, "feature");
1997 assert_eq!(result.base_branch, "main");
1998 assert!(result.branch_created);
1999 assert_eq!(result.mode, LoadMode::Overwrite);
2000 assert_eq!(
2001 result.tables,
2002 vec![
2003 IngestTableResult {
2004 table_key: "edge:Knows".to_string(),
2005 rows_loaded: 1
2006 },
2007 IngestTableResult {
2008 table_key: "edge:WorksAt".to_string(),
2009 rows_loaded: 1
2010 },
2011 IngestTableResult {
2012 table_key: "node:Company".to_string(),
2013 rows_loaded: 1
2014 },
2015 IngestTableResult {
2016 table_key: "node:Person".to_string(),
2017 rows_loaded: 2
2018 },
2019 ]
2020 );
2021 assert!(
2022 db.branch_list()
2023 .await
2024 .unwrap()
2025 .contains(&"feature".to_string())
2026 );
2027 }
2028
2029 #[tokio::test]
2030 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
2031 let dir = tempfile::tempdir().unwrap();
2032 let uri = dir.path().to_str().unwrap();
2033 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2034 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
2035 .await
2036 .unwrap();
2037 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
2038 .await
2039 .unwrap();
2040
2041 let result = db
2042 .ingest(
2043 "feature",
2044 Some("missing-base"),
2045 r#"{"type":"Person","data":{"name":"Bob","age":26}}
2046{"type":"Person","data":{"name":"Eve","age":31}}"#,
2047 LoadMode::Merge,
2048 )
2049 .await
2050 .unwrap();
2051
2052 assert_eq!(result.branch, "feature");
2053 assert_eq!(result.base_branch, "missing-base");
2054 assert!(!result.branch_created);
2055 assert_eq!(result.mode, LoadMode::Merge);
2056 assert_eq!(
2057 result.tables,
2058 vec![IngestTableResult {
2059 table_key: "node:Person".to_string(),
2060 rows_loaded: 2
2061 }]
2062 );
2063
2064 let snap = db
2065 .snapshot_of(crate::db::ReadTarget::branch("feature"))
2066 .await
2067 .unwrap();
2068 let person_ds = snap.open("node:Person").await.unwrap();
2069 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2070
2071 let batches: Vec<RecordBatch> = person_ds
2072 .scan()
2073 .try_into_stream()
2074 .await
2075 .unwrap()
2076 .try_collect()
2077 .await
2078 .unwrap();
2079 let mut ages_by_id = HashMap::new();
2080 for batch in &batches {
2081 let ids = batch
2082 .column_by_name("id")
2083 .unwrap()
2084 .as_any()
2085 .downcast_ref::<StringArray>()
2086 .unwrap();
2087 let ages = batch
2088 .column_by_name("age")
2089 .unwrap()
2090 .as_any()
2091 .downcast_ref::<Int32Array>()
2092 .unwrap();
2093 for idx in 0..ids.len() {
2094 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2095 }
2096 }
2097
2098 assert_eq!(ages_by_id.get("Bob"), Some(&26));
2099 assert_eq!(ages_by_id.get("Eve"), Some(&31));
2100 assert_eq!(ages_by_id.get("Alice"), Some(&30));
2101 }
2102
2103 #[tokio::test]
2104 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2105 let dir = tempfile::tempdir().unwrap();
2106 let uri = dir.path().to_str().unwrap();
2107 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2108
2109 db.ingest_as(
2110 "feature",
2111 Some("main"),
2112 TEST_DATA,
2113 LoadMode::Overwrite,
2114 Some("act-andrew"),
2115 )
2116 .await
2117 .unwrap();
2118
2119 let head = db
2120 .list_commits(Some("feature"))
2121 .await
2122 .unwrap()
2123 .into_iter()
2124 .last()
2125 .unwrap();
2126 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2127 }
2128
2129 #[test]
2130 fn test_range_constraint_rejects_nan() {
2131 use arrow_array::{Float64Array, RecordBatch, StringArray};
2132 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2133 use std::sync::Arc;
2134
2135 let schema = Arc::new(arrow_schema::Schema::new(vec![
2136 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2137 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2138 ]));
2139
2140 let batch = RecordBatch::try_new(
2141 schema.clone(),
2142 vec![
2143 Arc::new(StringArray::from(vec!["bad"])),
2144 Arc::new(Float64Array::from(vec![f64::NAN])),
2145 ],
2146 )
2147 .unwrap();
2148
2149 let node_type = NodeType {
2150 name: "Test".to_string(),
2151 implements: vec![],
2152 properties: Default::default(),
2153 key: None,
2154 unique_constraints: vec![],
2155 indices: vec![],
2156 range_constraints: vec![RangeConstraint {
2157 property: "score".to_string(),
2158 min: Some(LiteralValue::Float(0.0)),
2159 max: Some(LiteralValue::Float(1.0)),
2160 }],
2161 check_constraints: vec![],
2162 embed_sources: Default::default(),
2163 blob_properties: Default::default(),
2164 arrow_schema: schema,
2165 };
2166
2167 let result = validate_value_constraints(&batch, &node_type);
2168 assert!(result.is_err(), "expected NaN to be rejected");
2169 let err = result.unwrap_err().to_string();
2170 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2171 }
2172}