1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29 pub nodes_loaded: HashMap<String, usize>,
30 pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35 pub table_key: String,
36 pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41 pub branch: String,
42 pub base_branch: String,
43 pub branch_created: bool,
44 pub mode: LoadMode,
45 pub tables: Vec<IngestTableResult>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52 Overwrite,
54 Append,
56 Merge,
58}
59
60pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62 let current_branch = db.active_branch().await;
63 let branch = current_branch.as_deref().unwrap_or("main");
64 db.load(branch, data, mode).await
65}
66
67pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69 let current_branch = db.active_branch().await;
70 let branch = current_branch.as_deref().unwrap_or("main");
71 db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75 pub async fn ingest(
76 &self,
77 branch: &str,
78 from: Option<&str>,
79 data: &str,
80 mode: LoadMode,
81 ) -> Result<IngestResult> {
82 self.ingest_as(branch, from, data, mode, None).await
83 }
84
85 pub async fn ingest_as(
86 &self,
87 branch: &str,
88 from: Option<&str>,
89 data: &str,
90 mode: LoadMode,
91 actor_id: Option<&str>,
92 ) -> Result<IngestResult> {
93 self.ingest_with_current_actor(branch, from, data, mode, actor_id)
94 .await
95 }
96
97 pub async fn ingest_file(
98 &self,
99 branch: &str,
100 from: Option<&str>,
101 path: &str,
102 mode: LoadMode,
103 ) -> Result<IngestResult> {
104 self.ingest_file_as(branch, from, path, mode, None).await
105 }
106
107 pub async fn ingest_file_as(
108 &self,
109 branch: &str,
110 from: Option<&str>,
111 path: &str,
112 mode: LoadMode,
113 actor_id: Option<&str>,
114 ) -> Result<IngestResult> {
115 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
116 self.ingest_as(branch, from, &data, mode, actor_id).await
117 }
118
119 async fn ingest_with_current_actor(
120 &self,
121 branch: &str,
122 from: Option<&str>,
123 data: &str,
124 mode: LoadMode,
125 actor_id: Option<&str>,
126 ) -> Result<IngestResult> {
127 self.ensure_schema_state_valid().await?;
128 let target_branch =
129 Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
130 let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
131 .unwrap_or_else(|| "main".to_string());
132 let branch_created = !self
133 .branch_list()
134 .await?
135 .iter()
136 .any(|name| name == &target_branch);
137 if branch_created {
138 self.branch_create_from(crate::db::ReadTarget::branch(&base_branch), &target_branch)
139 .await?;
140 }
141
142 let result = self.load_as(&target_branch, data, mode, actor_id).await?;
143 Ok(IngestResult {
144 branch: target_branch,
145 base_branch,
146 branch_created,
147 mode,
148 tables: result.to_ingest_tables(),
149 })
150 }
151
152 pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
153 self.load_as(branch, data, mode, None).await
154 }
155
156 pub async fn load_as(
157 &self,
158 branch: &str,
159 data: &str,
160 mode: LoadMode,
161 actor_id: Option<&str>,
162 ) -> Result<LoadResult> {
163 self.ensure_schema_state_valid().await?;
164 crate::db::ensure_public_branch_ref(branch, "load")?;
169 let requested = Self::normalize_branch_name(branch)?;
175 self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
179 .await
180 }
181
182 pub async fn load_file(
183 &self,
184 branch: &str,
185 path: &str,
186 mode: LoadMode,
187 ) -> Result<LoadResult> {
188 let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
189 self.load(branch, &data, mode).await
190 }
191
192 async fn load_direct_on_branch(
193 &self,
194 branch: Option<&str>,
195 data: &str,
196 mode: LoadMode,
197 actor_id: Option<&str>,
198 ) -> Result<LoadResult> {
199 let reader = BufReader::new(Cursor::new(data.as_bytes()));
200 load_jsonl_reader(self, branch, reader, mode, actor_id).await
201 }
202}
203
204impl LoadMode {
205 pub fn as_str(self) -> &'static str {
206 match self {
207 LoadMode::Overwrite => "overwrite",
208 LoadMode::Append => "append",
209 LoadMode::Merge => "merge",
210 }
211 }
212}
213
214impl LoadResult {
215 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
216 let mut tables = self
217 .nodes_loaded
218 .iter()
219 .map(|(type_name, rows_loaded)| IngestTableResult {
220 table_key: format!("node:{type_name}"),
221 rows_loaded: *rows_loaded,
222 })
223 .chain(
224 self.edges_loaded
225 .iter()
226 .map(|(edge_name, rows_loaded)| IngestTableResult {
227 table_key: format!("edge:{edge_name}"),
228 rows_loaded: *rows_loaded,
229 }),
230 )
231 .collect::<Vec<_>>();
232 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
233 tables
234 }
235}
236
237async fn load_jsonl_reader<R: BufRead>(
238 db: &Omnigraph,
239 branch: Option<&str>,
240 reader: R,
241 mode: LoadMode,
242 actor_id: Option<&str>,
243) -> Result<LoadResult> {
244 let catalog = db.catalog().clone();
245
246 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
248 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
249
250 for (line_num, line) in reader.lines().enumerate() {
251 let line = line?;
252 let line = line.trim();
253 if line.is_empty() {
254 continue;
255 }
256 let value: JsonValue = serde_json::from_str(line).map_err(|e| {
257 OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
258 })?;
259
260 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
261 if !catalog.node_types.contains_key(type_name) {
262 return Err(OmniError::manifest(format!(
263 "line {}: unknown node type '{}'",
264 line_num + 1,
265 type_name
266 )));
267 }
268 let data = value
269 .get("data")
270 .cloned()
271 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
272 node_rows
273 .entry(type_name.to_string())
274 .or_default()
275 .push(data);
276 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
277 if catalog.lookup_edge_by_name(edge_name).is_none() {
278 return Err(OmniError::manifest(format!(
279 "line {}: unknown edge type '{}'",
280 line_num + 1,
281 edge_name
282 )));
283 }
284 let from = value
285 .get("from")
286 .and_then(|v| v.as_str())
287 .ok_or_else(|| {
288 OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
289 })?
290 .to_string();
291 let to = value
292 .get("to")
293 .and_then(|v| v.as_str())
294 .ok_or_else(|| {
295 OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
296 })?
297 .to_string();
298 let data = value
299 .get("data")
300 .cloned()
301 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
302 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
303 edge_rows
304 .entry(canonical)
305 .or_default()
306 .push((from, to, data));
307 } else {
308 return Err(OmniError::manifest(format!(
309 "line {}: expected 'type' or 'edge' field",
310 line_num + 1
311 )));
312 }
313 }
314
315 let mut result = LoadResult::default();
325 let snapshot = db.snapshot_for_branch(branch).await?;
326 let use_staging = !matches!(mode, LoadMode::Overwrite);
327 let mut staging = MutationStaging::default();
328 let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
329 let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
330 let pending_mode = match mode {
331 LoadMode::Merge => PendingMode::Merge,
332 LoadMode::Append => PendingMode::Append,
336 LoadMode::Overwrite => PendingMode::Append, };
338 let load_op_kind = match mode {
344 LoadMode::Append => crate::db::MutationOpKind::Insert,
345 LoadMode::Merge => crate::db::MutationOpKind::Merge,
346 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
347 };
348
349 let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
352 Vec::with_capacity(node_rows.len());
353 for (type_name, rows) in &node_rows {
354 let node_type = &catalog.node_types[type_name];
355 let batch = build_node_batch(node_type, rows)?;
356 validate_value_constraints(&batch, node_type)?;
357 validate_enum_constraints(&batch, &node_type.properties, type_name)?;
358 let unique_props = unique_property_names_for_node(node_type);
359 if !unique_props.is_empty() {
360 enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
361 }
362 let loaded_count = batch.num_rows();
363 let table_key = format!("node:{}", type_name);
364 let entry = snapshot
365 .entry(&table_key)
366 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
367 if !use_staging {
368 overwrite_expected.insert(table_key.clone(), entry.table_version);
369 }
370 prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
371 }
372
373 if use_staging {
376 for (type_name, table_key, batch, loaded_count) in prepared_nodes {
377 let (ds, full_path, table_branch) = db
378 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
379 .await?;
380 let expected_version = ds.version().version;
381 staging.ensure_path(
382 &table_key,
383 full_path,
384 table_branch,
385 expected_version,
386 load_op_kind,
387 );
388 let schema = batch.schema();
389 staging.append_batch(&table_key, schema, pending_mode, batch)?;
390 result.nodes_loaded.insert(type_name, loaded_count);
391 }
392 } else {
393 let node_write_results =
394 write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
395 for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
396 overwrite_updates.push(crate::db::SubTableUpdate {
397 table_key,
398 table_version: state.version,
399 table_branch,
400 row_count: state.row_count,
401 version_metadata: state.version_metadata,
402 });
403 result.nodes_loaded.insert(type_name, loaded_count);
404 }
405 }
406
407 for (edge_name, rows) in &edge_rows {
412 let edge_type = &catalog.edge_types[edge_name];
413 let from_ids = if use_staging {
414 collect_node_ids_with_pending(
415 db,
416 branch,
417 &edge_type.from_type,
418 &staging,
419 )
420 .await?
421 } else {
422 collect_node_ids(
423 db,
424 branch,
425 &edge_type.from_type,
426 &node_rows,
427 &catalog,
428 &overwrite_updates,
429 )
430 .await?
431 };
432 let to_ids = if use_staging {
433 collect_node_ids_with_pending(
434 db,
435 branch,
436 &edge_type.to_type,
437 &staging,
438 )
439 .await?
440 } else {
441 collect_node_ids(
442 db,
443 branch,
444 &edge_type.to_type,
445 &node_rows,
446 &catalog,
447 &overwrite_updates,
448 )
449 .await?
450 };
451
452 for (i, (src, dst, _)) in rows.iter().enumerate() {
453 if !from_ids.contains(src.as_str()) {
454 return Err(OmniError::manifest(format!(
455 "edge {} row {}: src '{}' not found in {}",
456 edge_name,
457 i + 1,
458 src,
459 edge_type.from_type
460 )));
461 }
462 if !to_ids.contains(dst.as_str()) {
463 return Err(OmniError::manifest(format!(
464 "edge {} row {}: dst '{}' not found in {}",
465 edge_name,
466 i + 1,
467 dst,
468 edge_type.to_type
469 )));
470 }
471 }
472 }
473
474 let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
476 Vec::with_capacity(edge_rows.len());
477 for (edge_name, rows) in &edge_rows {
478 let edge_type = &catalog.edge_types[edge_name];
479 let batch = build_edge_batch(edge_type, rows)?;
480 validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
481 let unique_props = unique_property_names_for_edge(edge_type);
482 if !unique_props.is_empty() {
483 enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
484 }
485 let loaded_count = batch.num_rows();
486 let table_key = format!("edge:{}", edge_name);
487 let entry = snapshot
488 .entry(&table_key)
489 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
490 if !use_staging {
491 overwrite_expected.insert(table_key.clone(), entry.table_version);
492 }
493 prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
494 }
495
496 if use_staging {
498 for (edge_name, table_key, batch, loaded_count) in prepared_edges {
499 let (ds, full_path, table_branch) = db
500 .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
501 .await?;
502 let expected_version = ds.version().version;
503 staging.ensure_path(
504 &table_key,
505 full_path,
506 table_branch,
507 expected_version,
508 load_op_kind,
509 );
510 let schema = batch.schema();
511 staging.append_batch(&table_key, schema, pending_mode, batch)?;
512 result.edges_loaded.insert(edge_name, loaded_count);
513 }
514 } else {
515 let edge_write_results =
516 write_batches_concurrently(db, branch, mode, prepared_edges).await?;
517 for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
518 overwrite_updates.push(crate::db::SubTableUpdate {
519 table_key,
520 table_version: state.version,
521 table_branch,
522 row_count: state.row_count,
523 version_metadata: state.version_metadata,
524 });
525 result.edges_loaded.insert(edge_name, loaded_count);
526 }
527 }
528
529 for (edge_name, _) in &edge_rows {
534 let edge_type = &catalog.edge_types[edge_name];
535 let table_key = format!("edge:{}", edge_name);
536 if use_staging {
537 validate_edge_cardinality_with_pending_loader(
538 db,
539 branch,
540 edge_type,
541 &table_key,
542 &staging,
543 mode,
544 )
545 .await?;
546 } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
547 validate_edge_cardinality(
548 db,
549 branch,
550 edge_name,
551 update.table_version,
552 update.table_branch.as_deref(),
553 )
554 .await?;
555 }
556 }
557
558 if use_staging {
560 let staged = staging.stage_all(db, branch).await?;
561 let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
565 .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
566 .await?;
567 crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
572 db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
573 .await?;
574 if let Some(handle) = sidecar_handle {
579 if let Err(err) =
580 crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
581 {
582 tracing::warn!(
583 error = %err,
584 operation_id = handle.operation_id.as_str(),
585 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
586 );
587 }
588 }
589 } else {
590 db.commit_updates_on_branch_with_expected(
598 branch,
599 &overwrite_updates,
600 &overwrite_expected,
601 actor_id,
602 )
603 .await?;
604 }
605
606 Ok(result)
607}
608
609fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
610 let schema = node_type.arrow_schema.clone();
611
612 let ids: Vec<String> = rows
614 .iter()
615 .map(|row| {
616 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
617 if let Some(key_prop) = node_type.key_property() {
618 let key_value = row
619 .get(key_prop)
620 .and_then(|v| v.as_str())
621 .map(|s| s.to_string())
622 .ok_or_else(|| {
623 OmniError::manifest(format!(
624 "node {} missing @key property '{}'",
625 node_type.name, key_prop
626 ))
627 })?;
628 if let Some(explicit_id) = explicit_id {
629 if explicit_id != key_value {
630 return Err(OmniError::manifest(format!(
631 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
632 node_type.name, explicit_id, key_prop, key_value
633 )));
634 }
635 }
636 Ok(key_value)
637 } else if let Some(explicit_id) = explicit_id {
638 Ok(explicit_id)
639 } else {
640 Ok(generate_id())
641 }
642 })
643 .collect::<Result<Vec<_>>>()?;
644
645 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
646 columns.push(Arc::new(StringArray::from(ids)));
647
648 for field in schema.fields().iter().skip(1) {
650 if node_type.blob_properties.contains(field.name()) {
651 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
652 columns.push(col);
653 } else {
654 let col =
655 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
656 columns.push(col);
657 }
658 }
659
660 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
661}
662
663fn build_edge_batch(
664 edge_type: &omnigraph_compiler::catalog::EdgeType,
665 rows: &[(String, String, JsonValue)],
666) -> Result<RecordBatch> {
667 let schema = edge_type.arrow_schema.clone();
668
669 let ids: Vec<String> = rows
670 .iter()
671 .map(|(_, _, data)| {
672 data.get("id")
673 .and_then(|v| v.as_str())
674 .map(str::to_string)
675 .unwrap_or_else(generate_id)
676 })
677 .collect();
678 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
679 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
680
681 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
682 columns.push(Arc::new(StringArray::from(ids)));
683 columns.push(Arc::new(StringArray::from(srcs)));
684 columns.push(Arc::new(StringArray::from(dsts)));
685
686 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
688 for field in schema.fields().iter().skip(3) {
689 if edge_type.blob_properties.contains(field.name()) {
690 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
691 columns.push(col);
692 } else {
693 let col = build_column_from_json(
694 field.name(),
695 field.data_type(),
696 field.is_nullable(),
697 &data_values,
698 )?;
699 columns.push(col);
700 }
701 }
702
703 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
704}
705
706pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
708 if let Some(encoded) = value.strip_prefix("base64:") {
709 let bytes = base64::engine::general_purpose::STANDARD
710 .decode(encoded)
711 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
712 builder
713 .push_bytes(bytes)
714 .map_err(|e| OmniError::Lance(e.to_string()))
715 } else {
716 builder
718 .push_uri(value)
719 .map_err(|e| OmniError::Lance(e.to_string()))
720 }
721}
722
723fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
725 let mut builder = BlobArrayBuilder::new(rows.len());
726 for row in rows {
727 match row.get(name) {
728 Some(JsonValue::String(s)) => {
729 append_blob_value(&mut builder, s)?;
730 }
731 Some(JsonValue::Null) | None if nullable => {
732 builder
733 .push_null()
734 .map_err(|e| OmniError::Lance(e.to_string()))?;
735 }
736 Some(JsonValue::Null) | None => {
737 return Err(OmniError::manifest(format!(
738 "non-nullable blob property '{}' has null values",
739 name
740 )));
741 }
742 _ => {
743 return Err(OmniError::manifest(format!(
744 "blob property '{}' must be a URI string or base64: prefixed data",
745 name
746 )));
747 }
748 }
749 }
750 builder
751 .finish()
752 .map_err(|e| OmniError::Lance(e.to_string()))
753}
754
755fn build_column_from_json(
756 name: &str,
757 data_type: &DataType,
758 nullable: bool,
759 rows: &[JsonValue],
760) -> Result<ArrayRef> {
761 let array: ArrayRef = match data_type {
762 DataType::Utf8 => {
763 let values: Vec<Option<String>> = rows
764 .iter()
765 .map(|row| {
766 row.get(name)
767 .and_then(|v| v.as_str())
768 .map(|s| s.to_string())
769 })
770 .collect();
771 Arc::new(StringArray::from(values))
772 }
773 DataType::Int32 => {
774 let values: Vec<Option<i32>> = rows
775 .iter()
776 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
777 .collect();
778 Arc::new(Int32Array::from(values))
779 }
780 DataType::Int64 => {
781 let values: Vec<Option<i64>> = rows
782 .iter()
783 .map(|row| row.get(name).and_then(|v| v.as_i64()))
784 .collect();
785 Arc::new(Int64Array::from(values))
786 }
787 DataType::UInt32 => {
788 let values: Vec<Option<u32>> = rows
789 .iter()
790 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
791 .collect();
792 Arc::new(UInt32Array::from(values))
793 }
794 DataType::UInt64 => {
795 let values: Vec<Option<u64>> = rows
796 .iter()
797 .map(|row| row.get(name).and_then(|v| v.as_u64()))
798 .collect();
799 Arc::new(UInt64Array::from(values))
800 }
801 DataType::Float32 => {
802 let values: Vec<Option<f32>> = rows
803 .iter()
804 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
805 .collect();
806 Arc::new(Float32Array::from(values))
807 }
808 DataType::Float64 => {
809 let values: Vec<Option<f64>> = rows
810 .iter()
811 .map(|row| row.get(name).and_then(|v| v.as_f64()))
812 .collect();
813 Arc::new(Float64Array::from(values))
814 }
815 DataType::Boolean => {
816 let values: Vec<Option<bool>> = rows
817 .iter()
818 .map(|row| row.get(name).and_then(|v| v.as_bool()))
819 .collect();
820 Arc::new(BooleanArray::from(values))
821 }
822 DataType::Date32 => {
823 let mut values = Vec::with_capacity(rows.len());
824 for row in rows {
825 values.push(parse_date32_json_value(
826 row.get(name).unwrap_or(&JsonValue::Null),
827 )?);
828 }
829 Arc::new(Date32Array::from(values))
830 }
831 DataType::Date64 => {
832 let mut values = Vec::with_capacity(rows.len());
833 for row in rows {
834 values.push(parse_date64_json_value(
835 row.get(name).unwrap_or(&JsonValue::Null),
836 )?);
837 }
838 Arc::new(Date64Array::from(values))
839 }
840 DataType::List(field) => {
841 let mut builder = ListBuilder::with_capacity(
842 make_list_value_builder(field.data_type(), rows.len())?,
843 rows.len(),
844 )
845 .with_field(field.clone());
846 for row in rows {
847 let value = row.get(name).unwrap_or(&JsonValue::Null);
848 if value.is_null() {
849 builder.append(false);
850 continue;
851 }
852 let items = value.as_array().ok_or_else(|| {
853 OmniError::manifest(format!(
854 "list property '{}' expects a JSON array, got {}",
855 name, value
856 ))
857 })?;
858 for item in items {
859 append_json_list_item(builder.values(), field.data_type(), item)?;
860 }
861 builder.append(true);
862 }
863 Arc::new(builder.finish())
864 }
865 DataType::FixedSizeList(child_field, dim) => {
866 let dim = *dim;
868 let mut builder = FixedSizeListBuilder::with_capacity(
869 Float32Builder::with_capacity(rows.len() * dim as usize),
870 dim,
871 rows.len(),
872 )
873 .with_field(child_field.clone());
874 for row in rows {
875 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
876 if arr.len() != dim as usize {
877 return Err(OmniError::manifest(format!(
878 "vector property '{}' expects {} dimensions, got {}",
879 name,
880 dim,
881 arr.len()
882 )));
883 }
884 for val in arr {
885 builder
886 .values()
887 .append_value(val.as_f64().unwrap_or(0.0) as f32);
888 }
889 builder.append(true);
890 } else if nullable {
891 for _ in 0..dim as usize {
892 builder.values().append_null();
893 }
894 builder.append(false);
895 } else {
896 return Err(OmniError::manifest(format!(
897 "non-nullable vector property '{}' has null values",
898 name
899 )));
900 }
901 }
902 Arc::new(builder.finish())
903 }
904 _ => {
905 let values: Vec<Option<&str>> = vec![None; rows.len()];
907 Arc::new(StringArray::from(values))
908 }
909 };
910
911 if !nullable && array.null_count() > 0 {
912 return Err(OmniError::manifest(format!(
913 "non-nullable property '{}' has null or invalid values",
914 name
915 )));
916 }
917
918 Ok(array)
919}
920
921fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
922 Ok(match data_type {
923 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
924 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
925 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
926 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
927 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
928 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
929 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
930 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
931 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
932 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
933 other => {
934 return Err(OmniError::manifest(format!(
935 "unsupported list element data type {:?}",
936 other
937 )));
938 }
939 })
940}
941
942fn append_json_list_item(
943 builder: &mut Box<dyn ArrayBuilder>,
944 data_type: &DataType,
945 value: &JsonValue,
946) -> Result<()> {
947 match data_type {
948 DataType::Utf8 => {
949 let builder = builder
950 .as_any_mut()
951 .downcast_mut::<StringBuilder>()
952 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
953 if let Some(value) = value.as_str() {
954 builder.append_value(value);
955 } else {
956 builder.append_null();
957 }
958 }
959 DataType::Boolean => {
960 let builder = builder
961 .as_any_mut()
962 .downcast_mut::<BooleanBuilder>()
963 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
964 if let Some(value) = value.as_bool() {
965 builder.append_value(value);
966 } else {
967 builder.append_null();
968 }
969 }
970 DataType::Int32 => {
971 let builder = builder
972 .as_any_mut()
973 .downcast_mut::<Int32Builder>()
974 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
975 if let Some(value) = value.as_i64() {
976 let value = i32::try_from(value).map_err(|_| {
977 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
978 })?;
979 builder.append_value(value);
980 } else {
981 builder.append_null();
982 }
983 }
984 DataType::Int64 => {
985 let builder = builder
986 .as_any_mut()
987 .downcast_mut::<Int64Builder>()
988 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
989 if let Some(value) = value.as_i64() {
990 builder.append_value(value);
991 } else {
992 builder.append_null();
993 }
994 }
995 DataType::UInt32 => {
996 let builder = builder
997 .as_any_mut()
998 .downcast_mut::<UInt32Builder>()
999 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1000 if let Some(value) = value.as_u64() {
1001 let value = u32::try_from(value).map_err(|_| {
1002 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1003 })?;
1004 builder.append_value(value);
1005 } else {
1006 builder.append_null();
1007 }
1008 }
1009 DataType::UInt64 => {
1010 let builder = builder
1011 .as_any_mut()
1012 .downcast_mut::<UInt64Builder>()
1013 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1014 if let Some(value) = value.as_u64() {
1015 builder.append_value(value);
1016 } else {
1017 builder.append_null();
1018 }
1019 }
1020 DataType::Float32 => {
1021 let builder = builder
1022 .as_any_mut()
1023 .downcast_mut::<Float32Builder>()
1024 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1025 if let Some(value) = value.as_f64() {
1026 builder.append_value(value as f32);
1027 } else {
1028 builder.append_null();
1029 }
1030 }
1031 DataType::Float64 => {
1032 let builder = builder
1033 .as_any_mut()
1034 .downcast_mut::<Float64Builder>()
1035 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1036 if let Some(value) = value.as_f64() {
1037 builder.append_value(value);
1038 } else {
1039 builder.append_null();
1040 }
1041 }
1042 DataType::Date32 => {
1043 let builder = builder
1044 .as_any_mut()
1045 .downcast_mut::<Date32Builder>()
1046 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1047 if let Some(value) = parse_date32_json_value(value)? {
1048 builder.append_value(value);
1049 } else {
1050 builder.append_null();
1051 }
1052 }
1053 DataType::Date64 => {
1054 let builder = builder
1055 .as_any_mut()
1056 .downcast_mut::<Date64Builder>()
1057 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1058 if let Some(value) = parse_date64_json_value(value)? {
1059 builder.append_value(value);
1060 } else {
1061 builder.append_null();
1062 }
1063 }
1064 other => {
1065 return Err(OmniError::manifest(format!(
1066 "unsupported list element data type {:?}",
1067 other
1068 )));
1069 }
1070 }
1071
1072 Ok(())
1073}
1074
1075fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1076 if value.is_null() {
1077 return Ok(None);
1078 }
1079 if let Some(days) = value.as_i64() {
1080 let days = i32::try_from(days)
1081 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1082 return Ok(Some(days));
1083 }
1084 if let Some(days) = value.as_u64() {
1085 let days = i32::try_from(days)
1086 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1087 return Ok(Some(days));
1088 }
1089 if let Some(value) = value.as_str() {
1090 return Ok(Some(parse_date32_literal(value)?));
1091 }
1092 Ok(None)
1093}
1094
1095fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1096 if value.is_null() {
1097 return Ok(None);
1098 }
1099 if let Some(ms) = value.as_i64() {
1100 return Ok(Some(ms));
1101 }
1102 if let Some(ms) = value.as_u64() {
1103 let ms = i64::try_from(ms)
1104 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1105 return Ok(Some(ms));
1106 }
1107 if let Some(value) = value.as_str() {
1108 return Ok(Some(parse_date64_literal(value)?));
1109 }
1110 Ok(None)
1111}
1112
1113const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1124
1125fn load_write_concurrency() -> usize {
1126 std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1127 .ok()
1128 .and_then(|v| v.parse::<usize>().ok())
1129 .filter(|v| *v > 0)
1130 .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1131}
1132
1133async fn write_batches_concurrently(
1137 db: &Omnigraph,
1138 branch: Option<&str>,
1139 mode: LoadMode,
1140 prepared: Vec<(String, String, RecordBatch, usize)>,
1141) -> Result<
1142 Vec<(
1143 String,
1144 String,
1145 usize,
1146 crate::table_store::TableState,
1147 Option<String>,
1148 )>,
1149> {
1150 use futures::stream::StreamExt;
1151
1152 if prepared.is_empty() {
1153 return Ok(Vec::new());
1154 }
1155
1156 let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1157
1158 futures::stream::iter(prepared.into_iter().map(
1159 |(type_name, table_key, batch, loaded_count)| async move {
1160 let (state, table_branch) =
1161 write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1162 Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1163 },
1164 ))
1165 .buffered(concurrency)
1166 .collect::<Vec<Result<_>>>()
1167 .await
1168 .into_iter()
1169 .collect()
1170}
1171
1172async fn write_batch_to_dataset(
1173 db: &Omnigraph,
1174 branch: Option<&str>,
1175 table_key: &str,
1176 batch: RecordBatch,
1177 mode: LoadMode,
1178) -> Result<(crate::table_store::TableState, Option<String>)> {
1179 let op_kind = match mode {
1180 LoadMode::Append => crate::db::MutationOpKind::Insert,
1181 LoadMode::Merge => crate::db::MutationOpKind::Merge,
1182 LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
1183 };
1184 let (mut ds, full_path, table_branch) = db
1185 .open_for_mutation_on_branch(branch, table_key, op_kind)
1186 .await?;
1187 let table_store = db.table_store();
1188
1189 match mode {
1190 LoadMode::Overwrite => {
1191 let state = table_store
1192 .overwrite_batch(&full_path, &mut ds, batch)
1193 .await?;
1194 Ok((state, table_branch))
1195 }
1196 LoadMode::Append => {
1197 let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1198 Ok((state, table_branch))
1199 }
1200 LoadMode::Merge => {
1201 let state = table_store
1202 .merge_insert_batch(
1203 &full_path,
1204 ds,
1205 batch,
1206 vec!["id".to_string()],
1207 lance::dataset::WhenMatched::UpdateAll,
1208 lance::dataset::WhenNotMatched::InsertAll,
1209 )
1210 .await?;
1211 Ok((state, table_branch))
1212 }
1213 }
1214}
1215
1216fn generate_id() -> String {
1217 ulid::Ulid::new().to_string()
1218}
1219
1220pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1221 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1222 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1223 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1224 let out = casted
1225 .as_any()
1226 .downcast_ref::<Date32Array>()
1227 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1228 if out.is_null(0) {
1229 return Err(OmniError::manifest(format!(
1230 "invalid Date literal '{}'",
1231 value
1232 )));
1233 }
1234 Ok(out.value(0))
1235}
1236
1237pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1238 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1239 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1240 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1241 let out = casted
1242 .as_any()
1243 .downcast_ref::<Date64Array>()
1244 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1245 if out.is_null(0) {
1246 return Err(OmniError::manifest(format!(
1247 "invalid DateTime literal '{}'",
1248 value
1249 )));
1250 }
1251 Ok(out.value(0))
1252}
1253
1254pub(crate) fn validate_value_constraints(
1257 batch: &RecordBatch,
1258 node_type: &omnigraph_compiler::catalog::NodeType,
1259) -> Result<()> {
1260 use arrow_array::Array;
1261
1262 for rc in &node_type.range_constraints {
1264 let Some(col) = batch.column_by_name(&rc.property) else {
1265 continue;
1266 };
1267 for row in 0..batch.num_rows() {
1268 if col.is_null(row) {
1269 continue;
1270 }
1271 let value = extract_numeric_value(col, row);
1272 if let Some(val) = value {
1273 if val.is_nan() {
1274 return Err(OmniError::manifest(format!(
1275 "@range violation on {}.{}: value is NaN",
1276 node_type.name, rc.property
1277 )));
1278 }
1279 if let Some(ref min) = rc.min {
1280 let min_f = literal_value_to_f64(min);
1281 if val < min_f {
1282 return Err(OmniError::manifest(format!(
1283 "@range violation on {}.{}: value {} < min {}",
1284 node_type.name, rc.property, val, min_f
1285 )));
1286 }
1287 }
1288 if let Some(ref max) = rc.max {
1289 let max_f = literal_value_to_f64(max);
1290 if val > max_f {
1291 return Err(OmniError::manifest(format!(
1292 "@range violation on {}.{}: value {} > max {}",
1293 node_type.name, rc.property, val, max_f
1294 )));
1295 }
1296 }
1297 }
1298 }
1299 }
1300
1301 for cc in &node_type.check_constraints {
1303 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1304 OmniError::manifest(format!(
1305 "@check on {}.{} has invalid regex '{}': {}",
1306 node_type.name, cc.property, cc.pattern, e
1307 ))
1308 })?;
1309 let Some(col) = batch.column_by_name(&cc.property) else {
1310 continue;
1311 };
1312 let str_col = col.as_any().downcast_ref::<StringArray>();
1313 if let Some(str_col) = str_col {
1314 for row in 0..str_col.len() {
1315 if str_col.is_null(row) {
1316 continue;
1317 }
1318 let val = str_col.value(row);
1319 if !re.is_match(val) {
1320 return Err(OmniError::manifest(format!(
1321 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1322 node_type.name, cc.property, val, cc.pattern
1323 )));
1324 }
1325 }
1326 }
1327 }
1328
1329 Ok(())
1330}
1331
1332pub(crate) fn validate_enum_constraints(
1339 batch: &RecordBatch,
1340 properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1341 type_name: &str,
1342) -> Result<()> {
1343 use arrow_array::{Array, ListArray};
1344
1345 for (prop_name, prop_type) in properties {
1346 let Some(allowed) = prop_type.enum_values.as_ref() else {
1347 continue;
1348 };
1349 let Some(col) = batch.column_by_name(prop_name) else {
1350 continue;
1351 };
1352 if prop_type.list {
1353 let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1354 continue;
1355 };
1356 for row in 0..list_col.len() {
1357 if list_col.is_null(row) {
1358 continue;
1359 }
1360 let item_arr = list_col.value(row);
1361 let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1362 continue;
1363 };
1364 for i in 0..str_arr.len() {
1365 if str_arr.is_null(i) {
1366 continue;
1367 }
1368 let val = str_arr.value(i);
1369 if !allowed.iter().any(|a| a.as_str() == val) {
1370 return Err(OmniError::manifest(format!(
1371 "invalid enum value '{}' for {}.{} (expected: {})",
1372 val,
1373 type_name,
1374 prop_name,
1375 allowed.join(", ")
1376 )));
1377 }
1378 }
1379 }
1380 } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1381 for row in 0..str_col.len() {
1382 if str_col.is_null(row) {
1383 continue;
1384 }
1385 let val = str_col.value(row);
1386 if !allowed.iter().any(|a| a.as_str() == val) {
1387 return Err(OmniError::manifest(format!(
1388 "invalid enum value '{}' for {}.{} (expected: {})",
1389 val,
1390 type_name,
1391 prop_name,
1392 allowed.join(", ")
1393 )));
1394 }
1395 }
1396 }
1397 }
1398 Ok(())
1399}
1400
1401pub(crate) fn enforce_unique_constraints_intra_batch(
1408 batch: &RecordBatch,
1409 type_name: &str,
1410 unique_properties: &[String],
1411) -> Result<()> {
1412 for property in unique_properties {
1413 let Some(col_idx) = batch.schema().index_of(property).ok() else {
1414 continue;
1415 };
1416 let arr = batch.column(col_idx);
1417 let mut seen: HashMap<String, usize> = HashMap::new();
1418 for row in 0..batch.num_rows() {
1419 let Some(value) = scalar_to_string(arr, row) else {
1420 continue;
1421 };
1422 if let Some(prev_row) = seen.insert(value.clone(), row) {
1423 return Err(OmniError::manifest(format!(
1424 "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1425 type_name, property, value, prev_row, row
1426 )));
1427 }
1428 }
1429 }
1430 Ok(())
1431}
1432
1433fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1437 use arrow_array::Array;
1438 if array.is_null(row) {
1439 return None;
1440 }
1441 if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1442 return Some(a.value(row).to_string());
1443 }
1444 if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1445 return Some(a.value(row).to_string());
1446 }
1447 if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1448 return Some(a.value(row).to_string());
1449 }
1450 if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1451 return Some(a.value(row).to_string());
1452 }
1453 if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1454 return Some(a.value(row).to_string());
1455 }
1456 if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1457 return Some(a.value(row).to_string());
1458 }
1459 if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1460 return Some(a.value(row).to_string());
1461 }
1462 if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1463 return Some(a.value(row).to_string());
1464 }
1465 if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1466 return Some(a.value(row).to_string());
1467 }
1468 if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1469 return Some(a.value(row).to_string());
1470 }
1471 None
1472}
1473
1474pub(crate) fn unique_property_names_for_node(
1478 node_type: &omnigraph_compiler::catalog::NodeType,
1479) -> Vec<String> {
1480 let mut props: Vec<String> = node_type
1481 .unique_constraints
1482 .iter()
1483 .flatten()
1484 .cloned()
1485 .collect();
1486 if let Some(key) = &node_type.key {
1487 props.extend(key.iter().cloned());
1488 }
1489 props.sort();
1490 props.dedup();
1491 props
1492}
1493
1494pub(crate) fn unique_property_names_for_edge(
1496 edge_type: &omnigraph_compiler::catalog::EdgeType,
1497) -> Vec<String> {
1498 let mut props: Vec<String> = edge_type
1499 .unique_constraints
1500 .iter()
1501 .flatten()
1502 .cloned()
1503 .collect();
1504 props.sort();
1505 props.dedup();
1506 props
1507}
1508
1509fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1510 use arrow_array::{
1511 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1512 };
1513 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1514 return Some(a.value(row) as f64);
1515 }
1516 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1517 return Some(a.value(row) as f64);
1518 }
1519 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1520 return Some(a.value(row) as f64);
1521 }
1522 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1523 return Some(a.value(row) as f64);
1524 }
1525 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1526 return Some(a.value(row) as f64);
1527 }
1528 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1529 return Some(a.value(row));
1530 }
1531 None
1532}
1533
1534fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1535 use omnigraph_compiler::catalog::LiteralValue;
1536 match v {
1537 LiteralValue::Integer(n) => *n as f64,
1538 LiteralValue::Float(f) => *f,
1539 }
1540}
1541
1542pub(crate) async fn validate_edge_cardinality(
1545 db: &crate::db::Omnigraph,
1546 branch: Option<&str>,
1547 edge_name: &str,
1548 written_version: u64,
1549 written_branch: Option<&str>,
1550) -> Result<()> {
1551 use arrow_array::Array;
1552 let catalog = db.catalog();
1553 let edge_type = &catalog.edge_types[edge_name];
1554 if edge_type.cardinality.is_default() {
1555 return Ok(());
1556 }
1557
1558 let snapshot = db.snapshot_for_branch(branch).await?;
1561 let table_key = format!("edge:{}", edge_name);
1562 let entry = snapshot
1563 .entry(&table_key)
1564 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1565 let ds = db
1566 .open_dataset_at_state(
1567 &entry.table_path,
1568 written_branch.or(entry.table_branch.as_deref()),
1569 written_version,
1570 )
1571 .await?;
1572
1573 let batches = db
1575 .table_store()
1576 .scan(&ds, Some(&["src"]), None, None)
1577 .await?;
1578
1579 let mut counts: HashMap<String, u32> = HashMap::new();
1580 for batch in &batches {
1581 let srcs = batch
1582 .column_by_name("src")
1583 .unwrap()
1584 .as_any()
1585 .downcast_ref::<StringArray>()
1586 .unwrap();
1587 for i in 0..srcs.len() {
1588 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1589 }
1590 }
1591
1592 let card = &edge_type.cardinality;
1593 for (src, count) in &counts {
1594 if let Some(max) = card.max {
1595 if *count > max {
1596 return Err(OmniError::manifest(format!(
1597 "@card violation on edge {}: source '{}' has {} edges (max {})",
1598 edge_name, src, count, max
1599 )));
1600 }
1601 }
1602 if *count < card.min {
1603 return Err(OmniError::manifest(format!(
1604 "@card violation on edge {}: source '{}' has {} edges (min {})",
1605 edge_name, src, count, card.min
1606 )));
1607 }
1608 }
1609
1610 Ok(())
1611}
1612
1613async fn validate_edge_cardinality_with_pending_loader(
1628 db: &Omnigraph,
1629 branch: Option<&str>,
1630 edge_type: &omnigraph_compiler::catalog::EdgeType,
1631 table_key: &str,
1632 staging: &MutationStaging,
1633 mode: LoadMode,
1634) -> Result<()> {
1635 if edge_type.cardinality.is_default() {
1636 return Ok(());
1637 }
1638 let snapshot = db.snapshot_for_branch(branch).await?;
1639 let Some(entry) = snapshot.entry(table_key) else {
1640 return Ok(());
1643 };
1644 let ds = db
1645 .open_dataset_at_state(
1646 &entry.table_path,
1647 entry.table_branch.as_deref(),
1648 entry.table_version,
1649 )
1650 .await?;
1651 let dedupe_key = match mode {
1652 LoadMode::Merge => Some("id"),
1653 LoadMode::Append | LoadMode::Overwrite => None,
1654 };
1655 let counts =
1656 crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
1657 .await?;
1658 crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1659}
1660
1661async fn collect_node_ids_with_pending(
1670 db: &Omnigraph,
1671 branch: Option<&str>,
1672 type_name: &str,
1673 staging: &MutationStaging,
1674) -> Result<HashSet<String>> {
1675 let mut ids = HashSet::new();
1676 let table_key = format!("node:{}", type_name);
1677
1678 for batch in staging.pending_batches(&table_key) {
1680 if let Some(col) = batch.column_by_name("id") {
1681 if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1682 for i in 0..arr.len() {
1683 if arr.is_valid(i) {
1684 ids.insert(arr.value(i).to_string());
1685 }
1686 }
1687 }
1688 }
1689 }
1690
1691 let snapshot = db.snapshot_for_branch(branch).await?;
1693 let Some(entry) = snapshot.entry(&table_key) else {
1694 return Ok(ids);
1695 };
1696 let ds = db
1697 .open_dataset_at_state(
1698 &entry.table_path,
1699 entry.table_branch.as_deref(),
1700 entry.table_version,
1701 )
1702 .await?;
1703
1704 let batches = db
1705 .table_store()
1706 .scan(&ds, Some(&["id"]), None, None)
1707 .await?;
1708
1709 for batch in &batches {
1710 let id_col = batch
1711 .column_by_name("id")
1712 .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1713 .as_any()
1714 .downcast_ref::<StringArray>()
1715 .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1716 for i in 0..batch.num_rows() {
1717 if id_col.is_valid(i) {
1722 ids.insert(id_col.value(i).to_string());
1723 }
1724 }
1725 }
1726
1727 Ok(ids)
1728}
1729
1730async fn collect_node_ids(
1735 db: &Omnigraph,
1736 branch: Option<&str>,
1737 type_name: &str,
1738 node_rows: &HashMap<String, Vec<JsonValue>>,
1739 catalog: &omnigraph_compiler::catalog::Catalog,
1740 updates: &[crate::db::SubTableUpdate],
1741) -> Result<HashSet<String>> {
1742 let mut ids = HashSet::new();
1743
1744 if let Some(rows) = node_rows.get(type_name) {
1746 if let Some(node_type) = catalog.node_types.get(type_name) {
1747 if let Some(key_prop) = node_type.key_property() {
1748 for row in rows {
1749 if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1750 ids.insert(id.to_string());
1751 }
1752 }
1753 }
1754 }
1755 }
1756
1757 let table_key = format!("node:{}", type_name);
1759 let snapshot = db.snapshot_for_branch(branch).await?;
1760 let Some(entry) = snapshot.entry(&table_key) else {
1761 return Ok(ids);
1762 };
1763 let updated = updates
1765 .iter()
1766 .find(|u| u.table_key == table_key)
1767 .map(|u| (u.table_version, u.table_branch.as_deref()));
1768 let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1769 let ds = db
1770 .open_dataset_at_state(&entry.table_path, branch, version)
1771 .await?;
1772
1773 let batches = db
1774 .table_store()
1775 .scan(&ds, Some(&["id"]), None, None)
1776 .await?;
1777
1778 for batch in &batches {
1779 let id_col = batch
1780 .column_by_name("id")
1781 .unwrap()
1782 .as_any()
1783 .downcast_ref::<StringArray>()
1784 .unwrap();
1785 for i in 0..batch.num_rows() {
1786 if !id_col.is_valid(i) {
1787 continue;
1788 }
1789 ids.insert(id_col.value(i).to_string());
1790 }
1791 }
1792
1793 Ok(ids)
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798 use super::*;
1799 use crate::db::Omnigraph;
1800 use arrow_array::Array;
1801 use futures::TryStreamExt;
1802 use std::collections::HashMap;
1803
1804 const TEST_SCHEMA: &str = r#"
1805node Person {
1806 name: String @key
1807 age: I32?
1808}
1809node Company {
1810 name: String @key
1811}
1812edge Knows: Person -> Person {
1813 since: Date?
1814}
1815edge WorksAt: Person -> Company
1816"#;
1817
1818 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1819{"type": "Person", "data": {"name": "Bob", "age": 25}}
1820{"type": "Company", "data": {"name": "Acme"}}
1821{"edge": "Knows", "from": "Alice", "to": "Bob"}
1822{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1823"#;
1824
1825 #[tokio::test]
1826 async fn test_load_creates_data() {
1827 let dir = tempfile::tempdir().unwrap();
1828 let uri = dir.path().to_str().unwrap();
1829 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1830
1831 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1832 .await
1833 .unwrap();
1834
1835 assert_eq!(result.nodes_loaded["Person"], 2);
1836 assert_eq!(result.nodes_loaded["Company"], 1);
1837 assert_eq!(result.edges_loaded["Knows"], 1);
1838 assert_eq!(result.edges_loaded["WorksAt"], 1);
1839 }
1840
1841 #[tokio::test]
1842 async fn test_load_data_readable_via_lance() {
1843 let dir = tempfile::tempdir().unwrap();
1844 let uri = dir.path().to_str().unwrap();
1845 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1846 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1847 .await
1848 .unwrap();
1849
1850 let snap = db.snapshot().await;
1852 let person_ds = snap.open("node:Person").await.unwrap();
1853
1854 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1855
1856 let batches: Vec<RecordBatch> = person_ds
1858 .scan()
1859 .try_into_stream()
1860 .await
1861 .unwrap()
1862 .try_collect()
1863 .await
1864 .unwrap();
1865
1866 let batch = &batches[0];
1867 let ids = batch
1868 .column_by_name("id")
1869 .unwrap()
1870 .as_any()
1871 .downcast_ref::<StringArray>()
1872 .unwrap();
1873 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1875 assert!(id_values.contains(&"Alice"));
1876 assert!(id_values.contains(&"Bob"));
1877 }
1878
1879 #[tokio::test]
1880 async fn test_load_edges_reference_node_keys() {
1881 let dir = tempfile::tempdir().unwrap();
1882 let uri = dir.path().to_str().unwrap();
1883 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1884 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1885 .await
1886 .unwrap();
1887
1888 let snap = db.snapshot().await;
1889 let knows_ds = snap.open("edge:Knows").await.unwrap();
1890
1891 let batches: Vec<RecordBatch> = knows_ds
1892 .scan()
1893 .try_into_stream()
1894 .await
1895 .unwrap()
1896 .try_collect()
1897 .await
1898 .unwrap();
1899
1900 let batch = &batches[0];
1901 let srcs = batch
1902 .column_by_name("src")
1903 .unwrap()
1904 .as_any()
1905 .downcast_ref::<StringArray>()
1906 .unwrap();
1907 let dsts = batch
1908 .column_by_name("dst")
1909 .unwrap()
1910 .as_any()
1911 .downcast_ref::<StringArray>()
1912 .unwrap();
1913
1914 assert_eq!(srcs.value(0), "Alice");
1915 assert_eq!(dsts.value(0), "Bob");
1916 }
1917
1918 #[tokio::test]
1919 async fn test_load_manifest_version_advances() {
1920 let dir = tempfile::tempdir().unwrap();
1921 let uri = dir.path().to_str().unwrap();
1922 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1923 let v1 = db.version().await;
1924
1925 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1926 .await
1927 .unwrap();
1928
1929 assert!(db.version().await > v1);
1930 }
1931
1932 #[tokio::test]
1933 async fn test_load_append_adds_rows() {
1934 let dir = tempfile::tempdir().unwrap();
1935 let uri = dir.path().to_str().unwrap();
1936 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1937
1938 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1939 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1940
1941 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1942 .await
1943 .unwrap();
1944 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1945
1946 let snap = db.snapshot().await;
1947 let person_ds = snap.open("node:Person").await.unwrap();
1948 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1949 }
1950
1951 #[tokio::test]
1952 async fn test_load_unknown_type_rejected() {
1953 let dir = tempfile::tempdir().unwrap();
1954 let uri = dir.path().to_str().unwrap();
1955 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1956
1957 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1958 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1959 assert!(result.is_err());
1960 }
1961
1962 #[tokio::test]
1963 async fn test_ingest_creates_branch_and_reports_tables() {
1964 let dir = tempfile::tempdir().unwrap();
1965 let uri = dir.path().to_str().unwrap();
1966 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1967
1968 let result = db
1969 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1970 .await
1971 .unwrap();
1972
1973 assert_eq!(result.branch, "feature");
1974 assert_eq!(result.base_branch, "main");
1975 assert!(result.branch_created);
1976 assert_eq!(result.mode, LoadMode::Overwrite);
1977 assert_eq!(
1978 result.tables,
1979 vec![
1980 IngestTableResult {
1981 table_key: "edge:Knows".to_string(),
1982 rows_loaded: 1
1983 },
1984 IngestTableResult {
1985 table_key: "edge:WorksAt".to_string(),
1986 rows_loaded: 1
1987 },
1988 IngestTableResult {
1989 table_key: "node:Company".to_string(),
1990 rows_loaded: 1
1991 },
1992 IngestTableResult {
1993 table_key: "node:Person".to_string(),
1994 rows_loaded: 2
1995 },
1996 ]
1997 );
1998 assert!(
1999 db.branch_list()
2000 .await
2001 .unwrap()
2002 .contains(&"feature".to_string())
2003 );
2004 }
2005
2006 #[tokio::test]
2007 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
2008 let dir = tempfile::tempdir().unwrap();
2009 let uri = dir.path().to_str().unwrap();
2010 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2011 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
2012 .await
2013 .unwrap();
2014 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
2015 .await
2016 .unwrap();
2017
2018 let result = db
2019 .ingest(
2020 "feature",
2021 Some("missing-base"),
2022 r#"{"type":"Person","data":{"name":"Bob","age":26}}
2023{"type":"Person","data":{"name":"Eve","age":31}}"#,
2024 LoadMode::Merge,
2025 )
2026 .await
2027 .unwrap();
2028
2029 assert_eq!(result.branch, "feature");
2030 assert_eq!(result.base_branch, "missing-base");
2031 assert!(!result.branch_created);
2032 assert_eq!(result.mode, LoadMode::Merge);
2033 assert_eq!(
2034 result.tables,
2035 vec![IngestTableResult {
2036 table_key: "node:Person".to_string(),
2037 rows_loaded: 2
2038 }]
2039 );
2040
2041 let snap = db
2042 .snapshot_of(crate::db::ReadTarget::branch("feature"))
2043 .await
2044 .unwrap();
2045 let person_ds = snap.open("node:Person").await.unwrap();
2046 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2047
2048 let batches: Vec<RecordBatch> = person_ds
2049 .scan()
2050 .try_into_stream()
2051 .await
2052 .unwrap()
2053 .try_collect()
2054 .await
2055 .unwrap();
2056 let mut ages_by_id = HashMap::new();
2057 for batch in &batches {
2058 let ids = batch
2059 .column_by_name("id")
2060 .unwrap()
2061 .as_any()
2062 .downcast_ref::<StringArray>()
2063 .unwrap();
2064 let ages = batch
2065 .column_by_name("age")
2066 .unwrap()
2067 .as_any()
2068 .downcast_ref::<Int32Array>()
2069 .unwrap();
2070 for idx in 0..ids.len() {
2071 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2072 }
2073 }
2074
2075 assert_eq!(ages_by_id.get("Bob"), Some(&26));
2076 assert_eq!(ages_by_id.get("Eve"), Some(&31));
2077 assert_eq!(ages_by_id.get("Alice"), Some(&30));
2078 }
2079
2080 #[tokio::test]
2081 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2082 let dir = tempfile::tempdir().unwrap();
2083 let uri = dir.path().to_str().unwrap();
2084 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2085
2086 db.ingest_as(
2087 "feature",
2088 Some("main"),
2089 TEST_DATA,
2090 LoadMode::Overwrite,
2091 Some("act-andrew"),
2092 )
2093 .await
2094 .unwrap();
2095
2096 let head = db
2097 .list_commits(Some("feature"))
2098 .await
2099 .unwrap()
2100 .into_iter()
2101 .last()
2102 .unwrap();
2103 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2104 }
2105
2106 #[test]
2107 fn test_range_constraint_rejects_nan() {
2108 use arrow_array::{Float64Array, RecordBatch, StringArray};
2109 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2110 use std::sync::Arc;
2111
2112 let schema = Arc::new(arrow_schema::Schema::new(vec![
2113 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2114 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2115 ]));
2116
2117 let batch = RecordBatch::try_new(
2118 schema.clone(),
2119 vec![
2120 Arc::new(StringArray::from(vec!["bad"])),
2121 Arc::new(Float64Array::from(vec![f64::NAN])),
2122 ],
2123 )
2124 .unwrap();
2125
2126 let node_type = NodeType {
2127 name: "Test".to_string(),
2128 implements: vec![],
2129 properties: Default::default(),
2130 key: None,
2131 unique_constraints: vec![],
2132 indices: vec![],
2133 range_constraints: vec![RangeConstraint {
2134 property: "score".to_string(),
2135 min: Some(LiteralValue::Float(0.0)),
2136 max: Some(LiteralValue::Float(1.0)),
2137 }],
2138 check_constraints: vec![],
2139 embed_sources: Default::default(),
2140 blob_properties: Default::default(),
2141 arrow_schema: schema,
2142 };
2143
2144 let result = validate_value_constraints(&batch, &node_type);
2145 assert!(result.is_err(), "expected NaN to be rejected");
2146 let err = result.unwrap_err().to_string();
2147 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2148 }
2149}