1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29 pub nodes_loaded: HashMap<String, usize>,
30 pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35 pub table_key: String,
36 pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41 pub branch: String,
42 pub base_branch: String,
43 pub branch_created: bool,
44 pub mode: LoadMode,
45 pub tables: Vec<IngestTableResult>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52 Overwrite,
54 Append,
56 Merge,
58}
59
60pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62 let current_branch = db.active_branch().map(str::to_string);
63 let branch = current_branch.as_deref().unwrap_or("main");
64 db.load(branch, data, mode).await
65}
66
67pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69 let current_branch = db.active_branch().map(str::to_string);
70 let branch = current_branch.as_deref().unwrap_or("main");
71 db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75 pub async fn ingest(
76 &mut self,
77 branch: &str,
78 from: Option<&str>,
79 data: &str,
80 mode: LoadMode,
81 ) -> Result<IngestResult> {
82 self.ingest_as(branch, from, data, mode, None).await
83 }
84
85 pub async fn ingest_as(
86 &mut self,
87 branch: &str,
88 from: Option<&str>,
89 data: &str,
90 mode: LoadMode,
91 actor_id: Option<&str>,
92 ) -> Result<IngestResult> {
93 let previous_actor = self.audit_actor_id.clone();
94 self.audit_actor_id = actor_id.map(str::to_string);
95 let result = self
96 .ingest_with_current_actor(branch, from, data, mode)
97 .await;
98 self.audit_actor_id = previous_actor;
99 result
100 }
101
102 pub async fn ingest_file(
103 &mut self,
104 branch: &str,
105 from: Option<&str>,
106 path: &str,
107 mode: LoadMode,
108 ) -> Result<IngestResult> {
109 self.ingest_file_as(branch, from, path, mode, None).await
110 }
111
112 pub async fn ingest_file_as(
113 &mut self,
114 branch: &str,
115 from: Option<&str>,
116 path: &str,
117 mode: LoadMode,
118 actor_id: Option<&str>,
119 ) -> Result<IngestResult> {
120 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
121 self.ingest_as(branch, from, &data, mode, actor_id).await
122 }
123
124 async fn ingest_with_current_actor(
125 &mut self,
126 branch: &str,
127 from: Option<&str>,
128 data: &str,
129 mode: LoadMode,
130 ) -> Result<IngestResult> {
131 self.ensure_schema_state_valid().await?;
132 let target_branch =
133 Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
134 let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
135 .unwrap_or_else(|| "main".to_string());
136 let branch_created = !self
137 .branch_list()
138 .await?
139 .iter()
140 .any(|name| name == &target_branch);
141 if branch_created {
142 self.branch_create_from(crate::db::ReadTarget::branch(&base_branch), &target_branch)
143 .await?;
144 }
145
146 let result = self.load(&target_branch, data, mode).await?;
147 Ok(IngestResult {
148 branch: target_branch,
149 base_branch,
150 branch_created,
151 mode,
152 tables: result.to_ingest_tables(),
153 })
154 }
155
156 pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
157 self.ensure_schema_state_valid().await?;
158 crate::db::ensure_public_branch_ref(branch, "load")?;
163 let requested = Self::normalize_branch_name(branch)?;
169 self.load_direct_on_branch(requested.as_deref(), data, mode)
173 .await
174 }
175
176 pub async fn load_file(
177 &mut self,
178 branch: &str,
179 path: &str,
180 mode: LoadMode,
181 ) -> Result<LoadResult> {
182 let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
183 self.load(branch, &data, mode).await
184 }
185
186 async fn load_direct_on_branch(
187 &mut self,
188 branch: Option<&str>,
189 data: &str,
190 mode: LoadMode,
191 ) -> Result<LoadResult> {
192 let reader = BufReader::new(Cursor::new(data.as_bytes()));
193 load_jsonl_reader(self, branch, reader, mode).await
194 }
195}
196
197impl LoadMode {
198 pub fn as_str(self) -> &'static str {
199 match self {
200 LoadMode::Overwrite => "overwrite",
201 LoadMode::Append => "append",
202 LoadMode::Merge => "merge",
203 }
204 }
205}
206
207impl LoadResult {
208 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
209 let mut tables = self
210 .nodes_loaded
211 .iter()
212 .map(|(type_name, rows_loaded)| IngestTableResult {
213 table_key: format!("node:{type_name}"),
214 rows_loaded: *rows_loaded,
215 })
216 .chain(
217 self.edges_loaded
218 .iter()
219 .map(|(edge_name, rows_loaded)| IngestTableResult {
220 table_key: format!("edge:{edge_name}"),
221 rows_loaded: *rows_loaded,
222 }),
223 )
224 .collect::<Vec<_>>();
225 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
226 tables
227 }
228}
229
230async fn load_jsonl_reader<R: BufRead>(
231 db: &mut Omnigraph,
232 branch: Option<&str>,
233 reader: R,
234 mode: LoadMode,
235) -> Result<LoadResult> {
236 let catalog = db.catalog().clone();
237
238 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
240 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
241
242 for (line_num, line) in reader.lines().enumerate() {
243 let line = line?;
244 let line = line.trim();
245 if line.is_empty() {
246 continue;
247 }
248 let value: JsonValue = serde_json::from_str(line).map_err(|e| {
249 OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
250 })?;
251
252 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
253 if !catalog.node_types.contains_key(type_name) {
254 return Err(OmniError::manifest(format!(
255 "line {}: unknown node type '{}'",
256 line_num + 1,
257 type_name
258 )));
259 }
260 let data = value
261 .get("data")
262 .cloned()
263 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
264 node_rows
265 .entry(type_name.to_string())
266 .or_default()
267 .push(data);
268 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
269 if catalog.lookup_edge_by_name(edge_name).is_none() {
270 return Err(OmniError::manifest(format!(
271 "line {}: unknown edge type '{}'",
272 line_num + 1,
273 edge_name
274 )));
275 }
276 let from = value
277 .get("from")
278 .and_then(|v| v.as_str())
279 .ok_or_else(|| {
280 OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
281 })?
282 .to_string();
283 let to = value
284 .get("to")
285 .and_then(|v| v.as_str())
286 .ok_or_else(|| {
287 OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
288 })?
289 .to_string();
290 let data = value
291 .get("data")
292 .cloned()
293 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
294 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
295 edge_rows
296 .entry(canonical)
297 .or_default()
298 .push((from, to, data));
299 } else {
300 return Err(OmniError::manifest(format!(
301 "line {}: expected 'type' or 'edge' field",
302 line_num + 1
303 )));
304 }
305 }
306
307 let mut result = LoadResult::default();
317 let snapshot = db.snapshot_for_branch(branch).await?;
318 let use_staging = !matches!(mode, LoadMode::Overwrite);
319 let mut staging = MutationStaging::default();
320 let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
321 let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
322 let pending_mode = match mode {
323 LoadMode::Merge => PendingMode::Merge,
324 LoadMode::Append => PendingMode::Append,
328 LoadMode::Overwrite => PendingMode::Append, };
330
331 let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
334 Vec::with_capacity(node_rows.len());
335 for (type_name, rows) in &node_rows {
336 let node_type = &catalog.node_types[type_name];
337 let batch = build_node_batch(node_type, rows)?;
338 validate_value_constraints(&batch, node_type)?;
339 validate_enum_constraints(&batch, &node_type.properties, type_name)?;
340 let unique_props = unique_property_names_for_node(node_type);
341 if !unique_props.is_empty() {
342 enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
343 }
344 let loaded_count = batch.num_rows();
345 let table_key = format!("node:{}", type_name);
346 let entry = snapshot
347 .entry(&table_key)
348 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
349 if !use_staging {
350 overwrite_expected.insert(table_key.clone(), entry.table_version);
351 }
352 prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
353 }
354
355 if use_staging {
358 for (type_name, table_key, batch, loaded_count) in prepared_nodes {
359 let (ds, full_path, table_branch) = db
360 .open_for_mutation_on_branch(branch, &table_key)
361 .await?;
362 let expected_version = ds.version().version;
363 staging.ensure_path(
364 &table_key,
365 full_path,
366 table_branch,
367 expected_version,
368 );
369 let schema = batch.schema();
370 staging.append_batch(&table_key, schema, pending_mode, batch)?;
371 result.nodes_loaded.insert(type_name, loaded_count);
372 }
373 } else {
374 let node_write_results =
375 write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
376 for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
377 overwrite_updates.push(crate::db::SubTableUpdate {
378 table_key,
379 table_version: state.version,
380 table_branch,
381 row_count: state.row_count,
382 version_metadata: state.version_metadata,
383 });
384 result.nodes_loaded.insert(type_name, loaded_count);
385 }
386 }
387
388 for (edge_name, rows) in &edge_rows {
393 let edge_type = &catalog.edge_types[edge_name];
394 let from_ids = if use_staging {
395 collect_node_ids_with_pending(
396 db,
397 branch,
398 &edge_type.from_type,
399 &staging,
400 )
401 .await?
402 } else {
403 collect_node_ids(
404 db,
405 branch,
406 &edge_type.from_type,
407 &node_rows,
408 &catalog,
409 &overwrite_updates,
410 )
411 .await?
412 };
413 let to_ids = if use_staging {
414 collect_node_ids_with_pending(
415 db,
416 branch,
417 &edge_type.to_type,
418 &staging,
419 )
420 .await?
421 } else {
422 collect_node_ids(
423 db,
424 branch,
425 &edge_type.to_type,
426 &node_rows,
427 &catalog,
428 &overwrite_updates,
429 )
430 .await?
431 };
432
433 for (i, (src, dst, _)) in rows.iter().enumerate() {
434 if !from_ids.contains(src.as_str()) {
435 return Err(OmniError::manifest(format!(
436 "edge {} row {}: src '{}' not found in {}",
437 edge_name,
438 i + 1,
439 src,
440 edge_type.from_type
441 )));
442 }
443 if !to_ids.contains(dst.as_str()) {
444 return Err(OmniError::manifest(format!(
445 "edge {} row {}: dst '{}' not found in {}",
446 edge_name,
447 i + 1,
448 dst,
449 edge_type.to_type
450 )));
451 }
452 }
453 }
454
455 let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
457 Vec::with_capacity(edge_rows.len());
458 for (edge_name, rows) in &edge_rows {
459 let edge_type = &catalog.edge_types[edge_name];
460 let batch = build_edge_batch(edge_type, rows)?;
461 validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
462 let unique_props = unique_property_names_for_edge(edge_type);
463 if !unique_props.is_empty() {
464 enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
465 }
466 let loaded_count = batch.num_rows();
467 let table_key = format!("edge:{}", edge_name);
468 let entry = snapshot
469 .entry(&table_key)
470 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
471 if !use_staging {
472 overwrite_expected.insert(table_key.clone(), entry.table_version);
473 }
474 prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
475 }
476
477 if use_staging {
479 for (edge_name, table_key, batch, loaded_count) in prepared_edges {
480 let (ds, full_path, table_branch) = db
481 .open_for_mutation_on_branch(branch, &table_key)
482 .await?;
483 let expected_version = ds.version().version;
484 staging.ensure_path(
485 &table_key,
486 full_path,
487 table_branch,
488 expected_version,
489 );
490 let schema = batch.schema();
491 staging.append_batch(&table_key, schema, pending_mode, batch)?;
492 result.edges_loaded.insert(edge_name, loaded_count);
493 }
494 } else {
495 let edge_write_results =
496 write_batches_concurrently(db, branch, mode, prepared_edges).await?;
497 for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
498 overwrite_updates.push(crate::db::SubTableUpdate {
499 table_key,
500 table_version: state.version,
501 table_branch,
502 row_count: state.row_count,
503 version_metadata: state.version_metadata,
504 });
505 result.edges_loaded.insert(edge_name, loaded_count);
506 }
507 }
508
509 for (edge_name, _) in &edge_rows {
514 let edge_type = &catalog.edge_types[edge_name];
515 let table_key = format!("edge:{}", edge_name);
516 if use_staging {
517 validate_edge_cardinality_with_pending_loader(
518 db,
519 branch,
520 edge_type,
521 &table_key,
522 &staging,
523 mode,
524 )
525 .await?;
526 } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
527 validate_edge_cardinality(
528 db,
529 branch,
530 edge_name,
531 update.table_version,
532 update.table_branch.as_deref(),
533 )
534 .await?;
535 }
536 }
537
538 if use_staging {
540 let (updates, expected_versions) = staging.finalize(db, branch).await?;
541 db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
542 .await?;
543 } else {
544 db.commit_updates_on_branch_with_expected(
545 branch,
546 &overwrite_updates,
547 &overwrite_expected,
548 )
549 .await?;
550 }
551
552 Ok(result)
553}
554
555fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
556 let schema = node_type.arrow_schema.clone();
557
558 let ids: Vec<String> = rows
560 .iter()
561 .map(|row| {
562 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
563 if let Some(key_prop) = node_type.key_property() {
564 let key_value = row
565 .get(key_prop)
566 .and_then(|v| v.as_str())
567 .map(|s| s.to_string())
568 .ok_or_else(|| {
569 OmniError::manifest(format!(
570 "node {} missing @key property '{}'",
571 node_type.name, key_prop
572 ))
573 })?;
574 if let Some(explicit_id) = explicit_id {
575 if explicit_id != key_value {
576 return Err(OmniError::manifest(format!(
577 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
578 node_type.name, explicit_id, key_prop, key_value
579 )));
580 }
581 }
582 Ok(key_value)
583 } else if let Some(explicit_id) = explicit_id {
584 Ok(explicit_id)
585 } else {
586 Ok(generate_id())
587 }
588 })
589 .collect::<Result<Vec<_>>>()?;
590
591 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
592 columns.push(Arc::new(StringArray::from(ids)));
593
594 for field in schema.fields().iter().skip(1) {
596 if node_type.blob_properties.contains(field.name()) {
597 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
598 columns.push(col);
599 } else {
600 let col =
601 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
602 columns.push(col);
603 }
604 }
605
606 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
607}
608
609fn build_edge_batch(
610 edge_type: &omnigraph_compiler::catalog::EdgeType,
611 rows: &[(String, String, JsonValue)],
612) -> Result<RecordBatch> {
613 let schema = edge_type.arrow_schema.clone();
614
615 let ids: Vec<String> = rows
616 .iter()
617 .map(|(_, _, data)| {
618 data.get("id")
619 .and_then(|v| v.as_str())
620 .map(str::to_string)
621 .unwrap_or_else(generate_id)
622 })
623 .collect();
624 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
625 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
626
627 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
628 columns.push(Arc::new(StringArray::from(ids)));
629 columns.push(Arc::new(StringArray::from(srcs)));
630 columns.push(Arc::new(StringArray::from(dsts)));
631
632 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
634 for field in schema.fields().iter().skip(3) {
635 if edge_type.blob_properties.contains(field.name()) {
636 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
637 columns.push(col);
638 } else {
639 let col = build_column_from_json(
640 field.name(),
641 field.data_type(),
642 field.is_nullable(),
643 &data_values,
644 )?;
645 columns.push(col);
646 }
647 }
648
649 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
650}
651
652pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
654 if let Some(encoded) = value.strip_prefix("base64:") {
655 let bytes = base64::engine::general_purpose::STANDARD
656 .decode(encoded)
657 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
658 builder
659 .push_bytes(bytes)
660 .map_err(|e| OmniError::Lance(e.to_string()))
661 } else {
662 builder
664 .push_uri(value)
665 .map_err(|e| OmniError::Lance(e.to_string()))
666 }
667}
668
669fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
671 let mut builder = BlobArrayBuilder::new(rows.len());
672 for row in rows {
673 match row.get(name) {
674 Some(JsonValue::String(s)) => {
675 append_blob_value(&mut builder, s)?;
676 }
677 Some(JsonValue::Null) | None if nullable => {
678 builder
679 .push_null()
680 .map_err(|e| OmniError::Lance(e.to_string()))?;
681 }
682 Some(JsonValue::Null) | None => {
683 return Err(OmniError::manifest(format!(
684 "non-nullable blob property '{}' has null values",
685 name
686 )));
687 }
688 _ => {
689 return Err(OmniError::manifest(format!(
690 "blob property '{}' must be a URI string or base64: prefixed data",
691 name
692 )));
693 }
694 }
695 }
696 builder
697 .finish()
698 .map_err(|e| OmniError::Lance(e.to_string()))
699}
700
701fn build_column_from_json(
702 name: &str,
703 data_type: &DataType,
704 nullable: bool,
705 rows: &[JsonValue],
706) -> Result<ArrayRef> {
707 let array: ArrayRef = match data_type {
708 DataType::Utf8 => {
709 let values: Vec<Option<String>> = rows
710 .iter()
711 .map(|row| {
712 row.get(name)
713 .and_then(|v| v.as_str())
714 .map(|s| s.to_string())
715 })
716 .collect();
717 Arc::new(StringArray::from(values))
718 }
719 DataType::Int32 => {
720 let values: Vec<Option<i32>> = rows
721 .iter()
722 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
723 .collect();
724 Arc::new(Int32Array::from(values))
725 }
726 DataType::Int64 => {
727 let values: Vec<Option<i64>> = rows
728 .iter()
729 .map(|row| row.get(name).and_then(|v| v.as_i64()))
730 .collect();
731 Arc::new(Int64Array::from(values))
732 }
733 DataType::UInt32 => {
734 let values: Vec<Option<u32>> = rows
735 .iter()
736 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
737 .collect();
738 Arc::new(UInt32Array::from(values))
739 }
740 DataType::UInt64 => {
741 let values: Vec<Option<u64>> = rows
742 .iter()
743 .map(|row| row.get(name).and_then(|v| v.as_u64()))
744 .collect();
745 Arc::new(UInt64Array::from(values))
746 }
747 DataType::Float32 => {
748 let values: Vec<Option<f32>> = rows
749 .iter()
750 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
751 .collect();
752 Arc::new(Float32Array::from(values))
753 }
754 DataType::Float64 => {
755 let values: Vec<Option<f64>> = rows
756 .iter()
757 .map(|row| row.get(name).and_then(|v| v.as_f64()))
758 .collect();
759 Arc::new(Float64Array::from(values))
760 }
761 DataType::Boolean => {
762 let values: Vec<Option<bool>> = rows
763 .iter()
764 .map(|row| row.get(name).and_then(|v| v.as_bool()))
765 .collect();
766 Arc::new(BooleanArray::from(values))
767 }
768 DataType::Date32 => {
769 let mut values = Vec::with_capacity(rows.len());
770 for row in rows {
771 values.push(parse_date32_json_value(
772 row.get(name).unwrap_or(&JsonValue::Null),
773 )?);
774 }
775 Arc::new(Date32Array::from(values))
776 }
777 DataType::Date64 => {
778 let mut values = Vec::with_capacity(rows.len());
779 for row in rows {
780 values.push(parse_date64_json_value(
781 row.get(name).unwrap_or(&JsonValue::Null),
782 )?);
783 }
784 Arc::new(Date64Array::from(values))
785 }
786 DataType::List(field) => {
787 let mut builder = ListBuilder::with_capacity(
788 make_list_value_builder(field.data_type(), rows.len())?,
789 rows.len(),
790 )
791 .with_field(field.clone());
792 for row in rows {
793 let value = row.get(name).unwrap_or(&JsonValue::Null);
794 if value.is_null() {
795 builder.append(false);
796 continue;
797 }
798 let items = value.as_array().ok_or_else(|| {
799 OmniError::manifest(format!(
800 "list property '{}' expects a JSON array, got {}",
801 name, value
802 ))
803 })?;
804 for item in items {
805 append_json_list_item(builder.values(), field.data_type(), item)?;
806 }
807 builder.append(true);
808 }
809 Arc::new(builder.finish())
810 }
811 DataType::FixedSizeList(child_field, dim) => {
812 let dim = *dim;
814 let mut builder = FixedSizeListBuilder::with_capacity(
815 Float32Builder::with_capacity(rows.len() * dim as usize),
816 dim,
817 rows.len(),
818 )
819 .with_field(child_field.clone());
820 for row in rows {
821 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
822 if arr.len() != dim as usize {
823 return Err(OmniError::manifest(format!(
824 "vector property '{}' expects {} dimensions, got {}",
825 name,
826 dim,
827 arr.len()
828 )));
829 }
830 for val in arr {
831 builder
832 .values()
833 .append_value(val.as_f64().unwrap_or(0.0) as f32);
834 }
835 builder.append(true);
836 } else if nullable {
837 for _ in 0..dim as usize {
838 builder.values().append_null();
839 }
840 builder.append(false);
841 } else {
842 return Err(OmniError::manifest(format!(
843 "non-nullable vector property '{}' has null values",
844 name
845 )));
846 }
847 }
848 Arc::new(builder.finish())
849 }
850 _ => {
851 let values: Vec<Option<&str>> = vec![None; rows.len()];
853 Arc::new(StringArray::from(values))
854 }
855 };
856
857 if !nullable && array.null_count() > 0 {
858 return Err(OmniError::manifest(format!(
859 "non-nullable property '{}' has null or invalid values",
860 name
861 )));
862 }
863
864 Ok(array)
865}
866
867fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
868 Ok(match data_type {
869 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
870 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
871 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
872 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
873 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
874 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
875 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
876 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
877 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
878 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
879 other => {
880 return Err(OmniError::manifest(format!(
881 "unsupported list element data type {:?}",
882 other
883 )));
884 }
885 })
886}
887
888fn append_json_list_item(
889 builder: &mut Box<dyn ArrayBuilder>,
890 data_type: &DataType,
891 value: &JsonValue,
892) -> Result<()> {
893 match data_type {
894 DataType::Utf8 => {
895 let builder = builder
896 .as_any_mut()
897 .downcast_mut::<StringBuilder>()
898 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
899 if let Some(value) = value.as_str() {
900 builder.append_value(value);
901 } else {
902 builder.append_null();
903 }
904 }
905 DataType::Boolean => {
906 let builder = builder
907 .as_any_mut()
908 .downcast_mut::<BooleanBuilder>()
909 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
910 if let Some(value) = value.as_bool() {
911 builder.append_value(value);
912 } else {
913 builder.append_null();
914 }
915 }
916 DataType::Int32 => {
917 let builder = builder
918 .as_any_mut()
919 .downcast_mut::<Int32Builder>()
920 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
921 if let Some(value) = value.as_i64() {
922 let value = i32::try_from(value).map_err(|_| {
923 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
924 })?;
925 builder.append_value(value);
926 } else {
927 builder.append_null();
928 }
929 }
930 DataType::Int64 => {
931 let builder = builder
932 .as_any_mut()
933 .downcast_mut::<Int64Builder>()
934 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
935 if let Some(value) = value.as_i64() {
936 builder.append_value(value);
937 } else {
938 builder.append_null();
939 }
940 }
941 DataType::UInt32 => {
942 let builder = builder
943 .as_any_mut()
944 .downcast_mut::<UInt32Builder>()
945 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
946 if let Some(value) = value.as_u64() {
947 let value = u32::try_from(value).map_err(|_| {
948 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
949 })?;
950 builder.append_value(value);
951 } else {
952 builder.append_null();
953 }
954 }
955 DataType::UInt64 => {
956 let builder = builder
957 .as_any_mut()
958 .downcast_mut::<UInt64Builder>()
959 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
960 if let Some(value) = value.as_u64() {
961 builder.append_value(value);
962 } else {
963 builder.append_null();
964 }
965 }
966 DataType::Float32 => {
967 let builder = builder
968 .as_any_mut()
969 .downcast_mut::<Float32Builder>()
970 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
971 if let Some(value) = value.as_f64() {
972 builder.append_value(value as f32);
973 } else {
974 builder.append_null();
975 }
976 }
977 DataType::Float64 => {
978 let builder = builder
979 .as_any_mut()
980 .downcast_mut::<Float64Builder>()
981 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
982 if let Some(value) = value.as_f64() {
983 builder.append_value(value);
984 } else {
985 builder.append_null();
986 }
987 }
988 DataType::Date32 => {
989 let builder = builder
990 .as_any_mut()
991 .downcast_mut::<Date32Builder>()
992 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
993 if let Some(value) = parse_date32_json_value(value)? {
994 builder.append_value(value);
995 } else {
996 builder.append_null();
997 }
998 }
999 DataType::Date64 => {
1000 let builder = builder
1001 .as_any_mut()
1002 .downcast_mut::<Date64Builder>()
1003 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1004 if let Some(value) = parse_date64_json_value(value)? {
1005 builder.append_value(value);
1006 } else {
1007 builder.append_null();
1008 }
1009 }
1010 other => {
1011 return Err(OmniError::manifest(format!(
1012 "unsupported list element data type {:?}",
1013 other
1014 )));
1015 }
1016 }
1017
1018 Ok(())
1019}
1020
1021fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1022 if value.is_null() {
1023 return Ok(None);
1024 }
1025 if let Some(days) = value.as_i64() {
1026 let days = i32::try_from(days)
1027 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1028 return Ok(Some(days));
1029 }
1030 if let Some(days) = value.as_u64() {
1031 let days = i32::try_from(days)
1032 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1033 return Ok(Some(days));
1034 }
1035 if let Some(value) = value.as_str() {
1036 return Ok(Some(parse_date32_literal(value)?));
1037 }
1038 Ok(None)
1039}
1040
1041fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1042 if value.is_null() {
1043 return Ok(None);
1044 }
1045 if let Some(ms) = value.as_i64() {
1046 return Ok(Some(ms));
1047 }
1048 if let Some(ms) = value.as_u64() {
1049 let ms = i64::try_from(ms)
1050 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1051 return Ok(Some(ms));
1052 }
1053 if let Some(value) = value.as_str() {
1054 return Ok(Some(parse_date64_literal(value)?));
1055 }
1056 Ok(None)
1057}
1058
1059const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1070
1071fn load_write_concurrency() -> usize {
1072 std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1073 .ok()
1074 .and_then(|v| v.parse::<usize>().ok())
1075 .filter(|v| *v > 0)
1076 .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1077}
1078
1079async fn write_batches_concurrently(
1083 db: &Omnigraph,
1084 branch: Option<&str>,
1085 mode: LoadMode,
1086 prepared: Vec<(String, String, RecordBatch, usize)>,
1087) -> Result<
1088 Vec<(
1089 String,
1090 String,
1091 usize,
1092 crate::table_store::TableState,
1093 Option<String>,
1094 )>,
1095> {
1096 use futures::stream::StreamExt;
1097
1098 if prepared.is_empty() {
1099 return Ok(Vec::new());
1100 }
1101
1102 let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1103
1104 futures::stream::iter(prepared.into_iter().map(
1105 |(type_name, table_key, batch, loaded_count)| async move {
1106 let (state, table_branch) =
1107 write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1108 Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1109 },
1110 ))
1111 .buffered(concurrency)
1112 .collect::<Vec<Result<_>>>()
1113 .await
1114 .into_iter()
1115 .collect()
1116}
1117
1118async fn write_batch_to_dataset(
1119 db: &Omnigraph,
1120 branch: Option<&str>,
1121 table_key: &str,
1122 batch: RecordBatch,
1123 mode: LoadMode,
1124) -> Result<(crate::table_store::TableState, Option<String>)> {
1125 let (mut ds, full_path, table_branch) =
1126 db.open_for_mutation_on_branch(branch, table_key).await?;
1127 let table_store = db.table_store();
1128
1129 match mode {
1130 LoadMode::Overwrite => {
1131 let state = table_store
1132 .overwrite_batch(&full_path, &mut ds, batch)
1133 .await?;
1134 Ok((state, table_branch))
1135 }
1136 LoadMode::Append => {
1137 let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1138 Ok((state, table_branch))
1139 }
1140 LoadMode::Merge => {
1141 let state = table_store
1142 .merge_insert_batch(
1143 &full_path,
1144 ds,
1145 batch,
1146 vec!["id".to_string()],
1147 lance::dataset::WhenMatched::UpdateAll,
1148 lance::dataset::WhenNotMatched::InsertAll,
1149 )
1150 .await?;
1151 Ok((state, table_branch))
1152 }
1153 }
1154}
1155
1156fn generate_id() -> String {
1157 ulid::Ulid::new().to_string()
1158}
1159
1160pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1161 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1162 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1163 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1164 let out = casted
1165 .as_any()
1166 .downcast_ref::<Date32Array>()
1167 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1168 if out.is_null(0) {
1169 return Err(OmniError::manifest(format!(
1170 "invalid Date literal '{}'",
1171 value
1172 )));
1173 }
1174 Ok(out.value(0))
1175}
1176
1177pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1178 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1179 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1180 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1181 let out = casted
1182 .as_any()
1183 .downcast_ref::<Date64Array>()
1184 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1185 if out.is_null(0) {
1186 return Err(OmniError::manifest(format!(
1187 "invalid DateTime literal '{}'",
1188 value
1189 )));
1190 }
1191 Ok(out.value(0))
1192}
1193
1194pub(crate) fn validate_value_constraints(
1197 batch: &RecordBatch,
1198 node_type: &omnigraph_compiler::catalog::NodeType,
1199) -> Result<()> {
1200 use arrow_array::Array;
1201
1202 for rc in &node_type.range_constraints {
1204 let Some(col) = batch.column_by_name(&rc.property) else {
1205 continue;
1206 };
1207 for row in 0..batch.num_rows() {
1208 if col.is_null(row) {
1209 continue;
1210 }
1211 let value = extract_numeric_value(col, row);
1212 if let Some(val) = value {
1213 if val.is_nan() {
1214 return Err(OmniError::manifest(format!(
1215 "@range violation on {}.{}: value is NaN",
1216 node_type.name, rc.property
1217 )));
1218 }
1219 if let Some(ref min) = rc.min {
1220 let min_f = literal_value_to_f64(min);
1221 if val < min_f {
1222 return Err(OmniError::manifest(format!(
1223 "@range violation on {}.{}: value {} < min {}",
1224 node_type.name, rc.property, val, min_f
1225 )));
1226 }
1227 }
1228 if let Some(ref max) = rc.max {
1229 let max_f = literal_value_to_f64(max);
1230 if val > max_f {
1231 return Err(OmniError::manifest(format!(
1232 "@range violation on {}.{}: value {} > max {}",
1233 node_type.name, rc.property, val, max_f
1234 )));
1235 }
1236 }
1237 }
1238 }
1239 }
1240
1241 for cc in &node_type.check_constraints {
1243 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1244 OmniError::manifest(format!(
1245 "@check on {}.{} has invalid regex '{}': {}",
1246 node_type.name, cc.property, cc.pattern, e
1247 ))
1248 })?;
1249 let Some(col) = batch.column_by_name(&cc.property) else {
1250 continue;
1251 };
1252 let str_col = col.as_any().downcast_ref::<StringArray>();
1253 if let Some(str_col) = str_col {
1254 for row in 0..str_col.len() {
1255 if str_col.is_null(row) {
1256 continue;
1257 }
1258 let val = str_col.value(row);
1259 if !re.is_match(val) {
1260 return Err(OmniError::manifest(format!(
1261 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1262 node_type.name, cc.property, val, cc.pattern
1263 )));
1264 }
1265 }
1266 }
1267 }
1268
1269 Ok(())
1270}
1271
1272pub(crate) fn validate_enum_constraints(
1279 batch: &RecordBatch,
1280 properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1281 type_name: &str,
1282) -> Result<()> {
1283 use arrow_array::{Array, ListArray};
1284
1285 for (prop_name, prop_type) in properties {
1286 let Some(allowed) = prop_type.enum_values.as_ref() else {
1287 continue;
1288 };
1289 let Some(col) = batch.column_by_name(prop_name) else {
1290 continue;
1291 };
1292 if prop_type.list {
1293 let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1294 continue;
1295 };
1296 for row in 0..list_col.len() {
1297 if list_col.is_null(row) {
1298 continue;
1299 }
1300 let item_arr = list_col.value(row);
1301 let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1302 continue;
1303 };
1304 for i in 0..str_arr.len() {
1305 if str_arr.is_null(i) {
1306 continue;
1307 }
1308 let val = str_arr.value(i);
1309 if !allowed.iter().any(|a| a.as_str() == val) {
1310 return Err(OmniError::manifest(format!(
1311 "invalid enum value '{}' for {}.{} (expected: {})",
1312 val,
1313 type_name,
1314 prop_name,
1315 allowed.join(", ")
1316 )));
1317 }
1318 }
1319 }
1320 } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1321 for row in 0..str_col.len() {
1322 if str_col.is_null(row) {
1323 continue;
1324 }
1325 let val = str_col.value(row);
1326 if !allowed.iter().any(|a| a.as_str() == val) {
1327 return Err(OmniError::manifest(format!(
1328 "invalid enum value '{}' for {}.{} (expected: {})",
1329 val,
1330 type_name,
1331 prop_name,
1332 allowed.join(", ")
1333 )));
1334 }
1335 }
1336 }
1337 }
1338 Ok(())
1339}
1340
1341pub(crate) fn enforce_unique_constraints_intra_batch(
1348 batch: &RecordBatch,
1349 type_name: &str,
1350 unique_properties: &[String],
1351) -> Result<()> {
1352 for property in unique_properties {
1353 let Some(col_idx) = batch.schema().index_of(property).ok() else {
1354 continue;
1355 };
1356 let arr = batch.column(col_idx);
1357 let mut seen: HashMap<String, usize> = HashMap::new();
1358 for row in 0..batch.num_rows() {
1359 let Some(value) = scalar_to_string(arr, row) else {
1360 continue;
1361 };
1362 if let Some(prev_row) = seen.insert(value.clone(), row) {
1363 return Err(OmniError::manifest(format!(
1364 "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1365 type_name, property, value, prev_row, row
1366 )));
1367 }
1368 }
1369 }
1370 Ok(())
1371}
1372
1373fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1377 use arrow_array::Array;
1378 if array.is_null(row) {
1379 return None;
1380 }
1381 if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1382 return Some(a.value(row).to_string());
1383 }
1384 if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1385 return Some(a.value(row).to_string());
1386 }
1387 if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1388 return Some(a.value(row).to_string());
1389 }
1390 if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1391 return Some(a.value(row).to_string());
1392 }
1393 if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1394 return Some(a.value(row).to_string());
1395 }
1396 if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1397 return Some(a.value(row).to_string());
1398 }
1399 if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1400 return Some(a.value(row).to_string());
1401 }
1402 if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1403 return Some(a.value(row).to_string());
1404 }
1405 if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1406 return Some(a.value(row).to_string());
1407 }
1408 if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1409 return Some(a.value(row).to_string());
1410 }
1411 None
1412}
1413
1414pub(crate) fn unique_property_names_for_node(
1418 node_type: &omnigraph_compiler::catalog::NodeType,
1419) -> Vec<String> {
1420 let mut props: Vec<String> = node_type
1421 .unique_constraints
1422 .iter()
1423 .flatten()
1424 .cloned()
1425 .collect();
1426 if let Some(key) = &node_type.key {
1427 props.extend(key.iter().cloned());
1428 }
1429 props.sort();
1430 props.dedup();
1431 props
1432}
1433
1434pub(crate) fn unique_property_names_for_edge(
1436 edge_type: &omnigraph_compiler::catalog::EdgeType,
1437) -> Vec<String> {
1438 let mut props: Vec<String> = edge_type
1439 .unique_constraints
1440 .iter()
1441 .flatten()
1442 .cloned()
1443 .collect();
1444 props.sort();
1445 props.dedup();
1446 props
1447}
1448
1449fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1450 use arrow_array::{
1451 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1452 };
1453 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1454 return Some(a.value(row) as f64);
1455 }
1456 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1457 return Some(a.value(row) as f64);
1458 }
1459 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1460 return Some(a.value(row) as f64);
1461 }
1462 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1463 return Some(a.value(row) as f64);
1464 }
1465 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1466 return Some(a.value(row) as f64);
1467 }
1468 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1469 return Some(a.value(row));
1470 }
1471 None
1472}
1473
1474fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1475 use omnigraph_compiler::catalog::LiteralValue;
1476 match v {
1477 LiteralValue::Integer(n) => *n as f64,
1478 LiteralValue::Float(f) => *f,
1479 }
1480}
1481
1482pub(crate) async fn validate_edge_cardinality(
1485 db: &crate::db::Omnigraph,
1486 branch: Option<&str>,
1487 edge_name: &str,
1488 written_version: u64,
1489 written_branch: Option<&str>,
1490) -> Result<()> {
1491 use arrow_array::Array;
1492 let catalog = db.catalog();
1493 let edge_type = &catalog.edge_types[edge_name];
1494 if edge_type.cardinality.is_default() {
1495 return Ok(());
1496 }
1497
1498 let snapshot = db.snapshot_for_branch(branch).await?;
1501 let table_key = format!("edge:{}", edge_name);
1502 let entry = snapshot
1503 .entry(&table_key)
1504 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1505 let ds = db
1506 .open_dataset_at_state(
1507 &entry.table_path,
1508 written_branch.or(entry.table_branch.as_deref()),
1509 written_version,
1510 )
1511 .await?;
1512
1513 let batches = db
1515 .table_store()
1516 .scan(&ds, Some(&["src"]), None, None)
1517 .await?;
1518
1519 let mut counts: HashMap<String, u32> = HashMap::new();
1520 for batch in &batches {
1521 let srcs = batch
1522 .column_by_name("src")
1523 .unwrap()
1524 .as_any()
1525 .downcast_ref::<StringArray>()
1526 .unwrap();
1527 for i in 0..srcs.len() {
1528 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1529 }
1530 }
1531
1532 let card = &edge_type.cardinality;
1533 for (src, count) in &counts {
1534 if let Some(max) = card.max {
1535 if *count > max {
1536 return Err(OmniError::manifest(format!(
1537 "@card violation on edge {}: source '{}' has {} edges (max {})",
1538 edge_name, src, count, max
1539 )));
1540 }
1541 }
1542 if *count < card.min {
1543 return Err(OmniError::manifest(format!(
1544 "@card violation on edge {}: source '{}' has {} edges (min {})",
1545 edge_name, src, count, card.min
1546 )));
1547 }
1548 }
1549
1550 Ok(())
1551}
1552
1553async fn validate_edge_cardinality_with_pending_loader(
1568 db: &Omnigraph,
1569 branch: Option<&str>,
1570 edge_type: &omnigraph_compiler::catalog::EdgeType,
1571 table_key: &str,
1572 staging: &MutationStaging,
1573 mode: LoadMode,
1574) -> Result<()> {
1575 if edge_type.cardinality.is_default() {
1576 return Ok(());
1577 }
1578 let snapshot = db.snapshot_for_branch(branch).await?;
1579 let Some(entry) = snapshot.entry(table_key) else {
1580 return Ok(());
1583 };
1584 let ds = db
1585 .open_dataset_at_state(
1586 &entry.table_path,
1587 entry.table_branch.as_deref(),
1588 entry.table_version,
1589 )
1590 .await?;
1591 let dedupe_key = match mode {
1592 LoadMode::Merge => Some("id"),
1593 LoadMode::Append | LoadMode::Overwrite => None,
1594 };
1595 let counts =
1596 crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
1597 .await?;
1598 crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1599}
1600
1601async fn collect_node_ids_with_pending(
1610 db: &Omnigraph,
1611 branch: Option<&str>,
1612 type_name: &str,
1613 staging: &MutationStaging,
1614) -> Result<HashSet<String>> {
1615 let mut ids = HashSet::new();
1616 let table_key = format!("node:{}", type_name);
1617
1618 for batch in staging.pending_batches(&table_key) {
1620 if let Some(col) = batch.column_by_name("id") {
1621 if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1622 for i in 0..arr.len() {
1623 if arr.is_valid(i) {
1624 ids.insert(arr.value(i).to_string());
1625 }
1626 }
1627 }
1628 }
1629 }
1630
1631 let snapshot = db.snapshot_for_branch(branch).await?;
1633 let Some(entry) = snapshot.entry(&table_key) else {
1634 return Ok(ids);
1635 };
1636 let ds = db
1637 .open_dataset_at_state(
1638 &entry.table_path,
1639 entry.table_branch.as_deref(),
1640 entry.table_version,
1641 )
1642 .await?;
1643
1644 let batches = db
1645 .table_store()
1646 .scan(&ds, Some(&["id"]), None, None)
1647 .await?;
1648
1649 for batch in &batches {
1650 let id_col = batch
1651 .column_by_name("id")
1652 .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1653 .as_any()
1654 .downcast_ref::<StringArray>()
1655 .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1656 for i in 0..batch.num_rows() {
1657 if id_col.is_valid(i) {
1662 ids.insert(id_col.value(i).to_string());
1663 }
1664 }
1665 }
1666
1667 Ok(ids)
1668}
1669
1670async fn collect_node_ids(
1675 db: &Omnigraph,
1676 branch: Option<&str>,
1677 type_name: &str,
1678 node_rows: &HashMap<String, Vec<JsonValue>>,
1679 catalog: &omnigraph_compiler::catalog::Catalog,
1680 updates: &[crate::db::SubTableUpdate],
1681) -> Result<HashSet<String>> {
1682 let mut ids = HashSet::new();
1683
1684 if let Some(rows) = node_rows.get(type_name) {
1686 if let Some(node_type) = catalog.node_types.get(type_name) {
1687 if let Some(key_prop) = node_type.key_property() {
1688 for row in rows {
1689 if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1690 ids.insert(id.to_string());
1691 }
1692 }
1693 }
1694 }
1695 }
1696
1697 let table_key = format!("node:{}", type_name);
1699 let snapshot = db.snapshot_for_branch(branch).await?;
1700 let Some(entry) = snapshot.entry(&table_key) else {
1701 return Ok(ids);
1702 };
1703 let updated = updates
1705 .iter()
1706 .find(|u| u.table_key == table_key)
1707 .map(|u| (u.table_version, u.table_branch.as_deref()));
1708 let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1709 let ds = db
1710 .open_dataset_at_state(&entry.table_path, branch, version)
1711 .await?;
1712
1713 let batches = db
1714 .table_store()
1715 .scan(&ds, Some(&["id"]), None, None)
1716 .await?;
1717
1718 for batch in &batches {
1719 let id_col = batch
1720 .column_by_name("id")
1721 .unwrap()
1722 .as_any()
1723 .downcast_ref::<StringArray>()
1724 .unwrap();
1725 for i in 0..batch.num_rows() {
1726 if !id_col.is_valid(i) {
1727 continue;
1728 }
1729 ids.insert(id_col.value(i).to_string());
1730 }
1731 }
1732
1733 Ok(ids)
1734}
1735
1736#[cfg(test)]
1737mod tests {
1738 use super::*;
1739 use crate::db::Omnigraph;
1740 use arrow_array::Array;
1741 use futures::TryStreamExt;
1742 use std::collections::HashMap;
1743
1744 const TEST_SCHEMA: &str = r#"
1745node Person {
1746 name: String @key
1747 age: I32?
1748}
1749node Company {
1750 name: String @key
1751}
1752edge Knows: Person -> Person {
1753 since: Date?
1754}
1755edge WorksAt: Person -> Company
1756"#;
1757
1758 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1759{"type": "Person", "data": {"name": "Bob", "age": 25}}
1760{"type": "Company", "data": {"name": "Acme"}}
1761{"edge": "Knows", "from": "Alice", "to": "Bob"}
1762{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1763"#;
1764
1765 #[tokio::test]
1766 async fn test_load_creates_data() {
1767 let dir = tempfile::tempdir().unwrap();
1768 let uri = dir.path().to_str().unwrap();
1769 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1770
1771 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1772 .await
1773 .unwrap();
1774
1775 assert_eq!(result.nodes_loaded["Person"], 2);
1776 assert_eq!(result.nodes_loaded["Company"], 1);
1777 assert_eq!(result.edges_loaded["Knows"], 1);
1778 assert_eq!(result.edges_loaded["WorksAt"], 1);
1779 }
1780
1781 #[tokio::test]
1782 async fn test_load_data_readable_via_lance() {
1783 let dir = tempfile::tempdir().unwrap();
1784 let uri = dir.path().to_str().unwrap();
1785 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1786 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1787 .await
1788 .unwrap();
1789
1790 let snap = db.snapshot();
1792 let person_ds = snap.open("node:Person").await.unwrap();
1793
1794 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1795
1796 let batches: Vec<RecordBatch> = person_ds
1798 .scan()
1799 .try_into_stream()
1800 .await
1801 .unwrap()
1802 .try_collect()
1803 .await
1804 .unwrap();
1805
1806 let batch = &batches[0];
1807 let ids = batch
1808 .column_by_name("id")
1809 .unwrap()
1810 .as_any()
1811 .downcast_ref::<StringArray>()
1812 .unwrap();
1813 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1815 assert!(id_values.contains(&"Alice"));
1816 assert!(id_values.contains(&"Bob"));
1817 }
1818
1819 #[tokio::test]
1820 async fn test_load_edges_reference_node_keys() {
1821 let dir = tempfile::tempdir().unwrap();
1822 let uri = dir.path().to_str().unwrap();
1823 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1824 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1825 .await
1826 .unwrap();
1827
1828 let snap = db.snapshot();
1829 let knows_ds = snap.open("edge:Knows").await.unwrap();
1830
1831 let batches: Vec<RecordBatch> = knows_ds
1832 .scan()
1833 .try_into_stream()
1834 .await
1835 .unwrap()
1836 .try_collect()
1837 .await
1838 .unwrap();
1839
1840 let batch = &batches[0];
1841 let srcs = batch
1842 .column_by_name("src")
1843 .unwrap()
1844 .as_any()
1845 .downcast_ref::<StringArray>()
1846 .unwrap();
1847 let dsts = batch
1848 .column_by_name("dst")
1849 .unwrap()
1850 .as_any()
1851 .downcast_ref::<StringArray>()
1852 .unwrap();
1853
1854 assert_eq!(srcs.value(0), "Alice");
1855 assert_eq!(dsts.value(0), "Bob");
1856 }
1857
1858 #[tokio::test]
1859 async fn test_load_manifest_version_advances() {
1860 let dir = tempfile::tempdir().unwrap();
1861 let uri = dir.path().to_str().unwrap();
1862 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1863 let v1 = db.version();
1864
1865 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1866 .await
1867 .unwrap();
1868
1869 assert!(db.version() > v1);
1870 }
1871
1872 #[tokio::test]
1873 async fn test_load_append_adds_rows() {
1874 let dir = tempfile::tempdir().unwrap();
1875 let uri = dir.path().to_str().unwrap();
1876 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1877
1878 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1879 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1880
1881 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1882 .await
1883 .unwrap();
1884 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1885
1886 let snap = db.snapshot();
1887 let person_ds = snap.open("node:Person").await.unwrap();
1888 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1889 }
1890
1891 #[tokio::test]
1892 async fn test_load_unknown_type_rejected() {
1893 let dir = tempfile::tempdir().unwrap();
1894 let uri = dir.path().to_str().unwrap();
1895 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1896
1897 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1898 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1899 assert!(result.is_err());
1900 }
1901
1902 #[tokio::test]
1903 async fn test_ingest_creates_branch_and_reports_tables() {
1904 let dir = tempfile::tempdir().unwrap();
1905 let uri = dir.path().to_str().unwrap();
1906 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1907
1908 let result = db
1909 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1910 .await
1911 .unwrap();
1912
1913 assert_eq!(result.branch, "feature");
1914 assert_eq!(result.base_branch, "main");
1915 assert!(result.branch_created);
1916 assert_eq!(result.mode, LoadMode::Overwrite);
1917 assert_eq!(
1918 result.tables,
1919 vec![
1920 IngestTableResult {
1921 table_key: "edge:Knows".to_string(),
1922 rows_loaded: 1
1923 },
1924 IngestTableResult {
1925 table_key: "edge:WorksAt".to_string(),
1926 rows_loaded: 1
1927 },
1928 IngestTableResult {
1929 table_key: "node:Company".to_string(),
1930 rows_loaded: 1
1931 },
1932 IngestTableResult {
1933 table_key: "node:Person".to_string(),
1934 rows_loaded: 2
1935 },
1936 ]
1937 );
1938 assert!(
1939 db.branch_list()
1940 .await
1941 .unwrap()
1942 .contains(&"feature".to_string())
1943 );
1944 }
1945
1946 #[tokio::test]
1947 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1948 let dir = tempfile::tempdir().unwrap();
1949 let uri = dir.path().to_str().unwrap();
1950 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1951 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1952 .await
1953 .unwrap();
1954 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1955 .await
1956 .unwrap();
1957
1958 let result = db
1959 .ingest(
1960 "feature",
1961 Some("missing-base"),
1962 r#"{"type":"Person","data":{"name":"Bob","age":26}}
1963{"type":"Person","data":{"name":"Eve","age":31}}"#,
1964 LoadMode::Merge,
1965 )
1966 .await
1967 .unwrap();
1968
1969 assert_eq!(result.branch, "feature");
1970 assert_eq!(result.base_branch, "missing-base");
1971 assert!(!result.branch_created);
1972 assert_eq!(result.mode, LoadMode::Merge);
1973 assert_eq!(
1974 result.tables,
1975 vec![IngestTableResult {
1976 table_key: "node:Person".to_string(),
1977 rows_loaded: 2
1978 }]
1979 );
1980
1981 let snap = db
1982 .snapshot_of(crate::db::ReadTarget::branch("feature"))
1983 .await
1984 .unwrap();
1985 let person_ds = snap.open("node:Person").await.unwrap();
1986 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
1987
1988 let batches: Vec<RecordBatch> = person_ds
1989 .scan()
1990 .try_into_stream()
1991 .await
1992 .unwrap()
1993 .try_collect()
1994 .await
1995 .unwrap();
1996 let mut ages_by_id = HashMap::new();
1997 for batch in &batches {
1998 let ids = batch
1999 .column_by_name("id")
2000 .unwrap()
2001 .as_any()
2002 .downcast_ref::<StringArray>()
2003 .unwrap();
2004 let ages = batch
2005 .column_by_name("age")
2006 .unwrap()
2007 .as_any()
2008 .downcast_ref::<Int32Array>()
2009 .unwrap();
2010 for idx in 0..ids.len() {
2011 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2012 }
2013 }
2014
2015 assert_eq!(ages_by_id.get("Bob"), Some(&26));
2016 assert_eq!(ages_by_id.get("Eve"), Some(&31));
2017 assert_eq!(ages_by_id.get("Alice"), Some(&30));
2018 }
2019
2020 #[tokio::test]
2021 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2022 let dir = tempfile::tempdir().unwrap();
2023 let uri = dir.path().to_str().unwrap();
2024 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2025
2026 db.ingest_as(
2027 "feature",
2028 Some("main"),
2029 TEST_DATA,
2030 LoadMode::Overwrite,
2031 Some("act-andrew"),
2032 )
2033 .await
2034 .unwrap();
2035
2036 let head = db
2037 .list_commits(Some("feature"))
2038 .await
2039 .unwrap()
2040 .into_iter()
2041 .last()
2042 .unwrap();
2043 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2044 }
2045
2046 #[test]
2047 fn test_range_constraint_rejects_nan() {
2048 use arrow_array::{Float64Array, RecordBatch, StringArray};
2049 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2050 use std::sync::Arc;
2051
2052 let schema = Arc::new(arrow_schema::Schema::new(vec![
2053 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2054 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2055 ]));
2056
2057 let batch = RecordBatch::try_new(
2058 schema.clone(),
2059 vec![
2060 Arc::new(StringArray::from(vec!["bad"])),
2061 Arc::new(Float64Array::from(vec![f64::NAN])),
2062 ],
2063 )
2064 .unwrap();
2065
2066 let node_type = NodeType {
2067 name: "Test".to_string(),
2068 implements: vec![],
2069 properties: Default::default(),
2070 key: None,
2071 unique_constraints: vec![],
2072 indices: vec![],
2073 range_constraints: vec![RangeConstraint {
2074 property: "score".to_string(),
2075 min: Some(LiteralValue::Float(0.0)),
2076 max: Some(LiteralValue::Float(1.0)),
2077 }],
2078 check_constraints: vec![],
2079 embed_sources: Default::default(),
2080 blob_properties: Default::default(),
2081 arrow_schema: schema,
2082 };
2083
2084 let result = validate_value_constraints(&batch, &node_type);
2085 assert!(result.is_err(), "expected NaN to be rejected");
2086 let err = result.unwrap_err().to_string();
2087 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2088 }
2089}