1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8 Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9 builder::{
10 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11 Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12 UInt32Builder, UInt64Builder,
13 },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24
25#[derive(Debug, Clone, Default)]
27pub struct LoadResult {
28 pub nodes_loaded: HashMap<String, usize>,
29 pub edges_loaded: HashMap<String, usize>,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct IngestTableResult {
34 pub table_key: String,
35 pub rows_loaded: usize,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct IngestResult {
40 pub branch: String,
41 pub base_branch: String,
42 pub branch_created: bool,
43 pub mode: LoadMode,
44 pub tables: Vec<IngestTableResult>,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum LoadMode {
51 Overwrite,
53 Append,
55 Merge,
57}
58
59pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
61 let current_branch = db.active_branch().map(str::to_string);
62 let branch = current_branch.as_deref().unwrap_or("main");
63 db.load(branch, data, mode).await
64}
65
66pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
68 let current_branch = db.active_branch().map(str::to_string);
69 let branch = current_branch.as_deref().unwrap_or("main");
70 db.load_file(branch, path, mode).await
71}
72
73impl Omnigraph {
74 pub async fn ingest(
75 &mut self,
76 branch: &str,
77 from: Option<&str>,
78 data: &str,
79 mode: LoadMode,
80 ) -> Result<IngestResult> {
81 self.ingest_as(branch, from, data, mode, None).await
82 }
83
84 pub async fn ingest_as(
85 &mut self,
86 branch: &str,
87 from: Option<&str>,
88 data: &str,
89 mode: LoadMode,
90 actor_id: Option<&str>,
91 ) -> Result<IngestResult> {
92 let previous_actor = self.audit_actor_id.clone();
93 self.audit_actor_id = actor_id.map(str::to_string);
94 let result = self
95 .ingest_with_current_actor(branch, from, data, mode)
96 .await;
97 self.audit_actor_id = previous_actor;
98 result
99 }
100
101 pub async fn ingest_file(
102 &mut self,
103 branch: &str,
104 from: Option<&str>,
105 path: &str,
106 mode: LoadMode,
107 ) -> Result<IngestResult> {
108 self.ingest_file_as(branch, from, path, mode, None).await
109 }
110
111 pub async fn ingest_file_as(
112 &mut self,
113 branch: &str,
114 from: Option<&str>,
115 path: &str,
116 mode: LoadMode,
117 actor_id: Option<&str>,
118 ) -> Result<IngestResult> {
119 let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
120 self.ingest_as(branch, from, &data, mode, actor_id).await
121 }
122
123 async fn ingest_with_current_actor(
124 &mut self,
125 branch: &str,
126 from: Option<&str>,
127 data: &str,
128 mode: LoadMode,
129 ) -> Result<IngestResult> {
130 self.ensure_schema_state_valid().await?;
131 let target_branch =
132 Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
133 let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
134 .unwrap_or_else(|| "main".to_string());
135 let branch_created = !self
136 .branch_list()
137 .await?
138 .iter()
139 .any(|name| name == &target_branch);
140 if branch_created {
141 self.branch_create_from(crate::db::ReadTarget::branch(&base_branch), &target_branch)
142 .await?;
143 }
144
145 let result = self.load(&target_branch, data, mode).await?;
146 Ok(IngestResult {
147 branch: target_branch,
148 base_branch,
149 branch_created,
150 mode,
151 tables: result.to_ingest_tables(),
152 })
153 }
154
155 pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
156 self.ensure_schema_state_valid().await?;
157 let requested = Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
158 if crate::db::is_internal_run_branch(&requested) {
159 return self
160 .load_direct_on_branch(Some(requested.as_str()), data, mode)
161 .await;
162 }
163
164 let target_head_before = self.latest_branch_snapshot_id(&requested).await?;
165 let op = format!("load_jsonl:branch={}:mode={}", requested, mode.as_str());
166 let run = self.begin_run(&requested, Some(op.as_str())).await?;
167 let staged_result = match self
168 .load_direct_on_branch(Some(run.run_branch.as_str()), data, mode)
169 .await
170 {
171 Ok(result) => result,
172 Err(err) => {
173 let _ = self.fail_run(&run.run_id).await;
174 return Err(err);
175 }
176 };
177
178 let target_head_now = self.latest_branch_snapshot_id(&requested).await?;
179 if target_head_now.as_str() != target_head_before.as_str() {
180 let _ = self.fail_run(&run.run_id).await;
181 return Err(OmniError::manifest_conflict(format!(
182 "target branch '{}' advanced during transactional load; retry",
183 requested
184 )));
185 }
186
187 if let Err(err) = self.publish_run(&run.run_id).await {
188 let _ = self.fail_run(&run.run_id).await;
189 return Err(err);
190 }
191
192 Ok(staged_result)
193 }
194
195 pub async fn load_file(
196 &mut self,
197 branch: &str,
198 path: &str,
199 mode: LoadMode,
200 ) -> Result<LoadResult> {
201 let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
202 self.load(branch, &data, mode).await
203 }
204
205 async fn load_direct_on_branch(
206 &mut self,
207 branch: Option<&str>,
208 data: &str,
209 mode: LoadMode,
210 ) -> Result<LoadResult> {
211 let reader = BufReader::new(Cursor::new(data.as_bytes()));
212 load_jsonl_reader(self, branch, reader, mode).await
213 }
214}
215
216impl LoadMode {
217 pub fn as_str(self) -> &'static str {
218 match self {
219 LoadMode::Overwrite => "overwrite",
220 LoadMode::Append => "append",
221 LoadMode::Merge => "merge",
222 }
223 }
224}
225
226impl LoadResult {
227 pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
228 let mut tables = self
229 .nodes_loaded
230 .iter()
231 .map(|(type_name, rows_loaded)| IngestTableResult {
232 table_key: format!("node:{type_name}"),
233 rows_loaded: *rows_loaded,
234 })
235 .chain(
236 self.edges_loaded
237 .iter()
238 .map(|(edge_name, rows_loaded)| IngestTableResult {
239 table_key: format!("edge:{edge_name}"),
240 rows_loaded: *rows_loaded,
241 }),
242 )
243 .collect::<Vec<_>>();
244 tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
245 tables
246 }
247}
248
249async fn load_jsonl_reader<R: BufRead>(
250 db: &mut Omnigraph,
251 branch: Option<&str>,
252 reader: R,
253 mode: LoadMode,
254) -> Result<LoadResult> {
255 let catalog = db.catalog().clone();
256
257 let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
259 let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
260
261 for (line_num, line) in reader.lines().enumerate() {
262 let line = line?;
263 let line = line.trim();
264 if line.is_empty() {
265 continue;
266 }
267 let value: JsonValue = serde_json::from_str(line).map_err(|e| {
268 OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
269 })?;
270
271 if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
272 if !catalog.node_types.contains_key(type_name) {
273 return Err(OmniError::manifest(format!(
274 "line {}: unknown node type '{}'",
275 line_num + 1,
276 type_name
277 )));
278 }
279 let data = value
280 .get("data")
281 .cloned()
282 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
283 node_rows
284 .entry(type_name.to_string())
285 .or_default()
286 .push(data);
287 } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
288 if catalog.lookup_edge_by_name(edge_name).is_none() {
289 return Err(OmniError::manifest(format!(
290 "line {}: unknown edge type '{}'",
291 line_num + 1,
292 edge_name
293 )));
294 }
295 let from = value
296 .get("from")
297 .and_then(|v| v.as_str())
298 .ok_or_else(|| {
299 OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
300 })?
301 .to_string();
302 let to = value
303 .get("to")
304 .and_then(|v| v.as_str())
305 .ok_or_else(|| {
306 OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
307 })?
308 .to_string();
309 let data = value
310 .get("data")
311 .cloned()
312 .unwrap_or(JsonValue::Object(serde_json::Map::new()));
313 let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
314 edge_rows
315 .entry(canonical)
316 .or_default()
317 .push((from, to, data));
318 } else {
319 return Err(OmniError::manifest(format!(
320 "line {}: expected 'type' or 'edge' field",
321 line_num + 1
322 )));
323 }
324 }
325
326 let mut updates = Vec::new();
335 let mut result = LoadResult::default();
336 let snapshot = db.snapshot_for_branch(branch).await?;
337
338 let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
341 Vec::with_capacity(node_rows.len());
342 for (type_name, rows) in &node_rows {
343 let node_type = &catalog.node_types[type_name];
344 let batch = build_node_batch(node_type, rows)?;
345 validate_value_constraints(&batch, node_type)?;
346 let loaded_count = batch.num_rows();
347 let table_key = format!("node:{}", type_name);
348 snapshot
349 .entry(&table_key)
350 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
351 prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
352 }
353
354 let node_write_results = write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
356
357 for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
358 updates.push(crate::db::SubTableUpdate {
359 table_key,
360 table_version: state.version,
361 table_branch,
362 row_count: state.row_count,
363 version_metadata: state.version_metadata,
364 });
365 result.nodes_loaded.insert(type_name, loaded_count);
366 }
367
368 for (edge_name, rows) in &edge_rows {
371 let edge_type = &catalog.edge_types[edge_name];
372 let from_ids = collect_node_ids(
373 db,
374 branch,
375 &edge_type.from_type,
376 &node_rows,
377 &catalog,
378 &updates,
379 )
380 .await?;
381 let to_ids = collect_node_ids(
382 db,
383 branch,
384 &edge_type.to_type,
385 &node_rows,
386 &catalog,
387 &updates,
388 )
389 .await?;
390
391 for (i, (src, dst, _)) in rows.iter().enumerate() {
392 if !from_ids.contains(src.as_str()) {
393 return Err(OmniError::manifest(format!(
394 "edge {} row {}: src '{}' not found in {}",
395 edge_name,
396 i + 1,
397 src,
398 edge_type.from_type
399 )));
400 }
401 if !to_ids.contains(dst.as_str()) {
402 return Err(OmniError::manifest(format!(
403 "edge {} row {}: dst '{}' not found in {}",
404 edge_name,
405 i + 1,
406 dst,
407 edge_type.to_type
408 )));
409 }
410 }
411 }
412
413 let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
415 Vec::with_capacity(edge_rows.len());
416 for (edge_name, rows) in &edge_rows {
417 let edge_type = &catalog.edge_types[edge_name];
418 let batch = build_edge_batch(edge_type, rows)?;
419 let loaded_count = batch.num_rows();
420 let table_key = format!("edge:{}", edge_name);
421 snapshot
422 .entry(&table_key)
423 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
424 prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
425 }
426
427 let edge_write_results = write_batches_concurrently(db, branch, mode, prepared_edges).await?;
428
429 for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
430 updates.push(crate::db::SubTableUpdate {
431 table_key,
432 table_version: state.version,
433 table_branch,
434 row_count: state.row_count,
435 version_metadata: state.version_metadata,
436 });
437 result.edges_loaded.insert(edge_name, loaded_count);
438 }
439
440 for (edge_name, _) in &edge_rows {
444 let table_key = format!("edge:{}", edge_name);
445 if let Some(update) = updates.iter().find(|u| u.table_key == table_key) {
446 validate_edge_cardinality(
447 db,
448 branch,
449 edge_name,
450 update.table_version,
451 update.table_branch.as_deref(),
452 )
453 .await?;
454 }
455 }
456
457 db.commit_updates_on_branch(branch, &updates).await?;
459
460 Ok(result)
461}
462
463fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
464 let schema = node_type.arrow_schema.clone();
465
466 let ids: Vec<String> = rows
468 .iter()
469 .map(|row| {
470 let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
471 if let Some(key_prop) = node_type.key_property() {
472 let key_value = row
473 .get(key_prop)
474 .and_then(|v| v.as_str())
475 .map(|s| s.to_string())
476 .ok_or_else(|| {
477 OmniError::manifest(format!(
478 "node {} missing @key property '{}'",
479 node_type.name, key_prop
480 ))
481 })?;
482 if let Some(explicit_id) = explicit_id {
483 if explicit_id != key_value {
484 return Err(OmniError::manifest(format!(
485 "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
486 node_type.name, explicit_id, key_prop, key_value
487 )));
488 }
489 }
490 Ok(key_value)
491 } else if let Some(explicit_id) = explicit_id {
492 Ok(explicit_id)
493 } else {
494 Ok(generate_id())
495 }
496 })
497 .collect::<Result<Vec<_>>>()?;
498
499 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
500 columns.push(Arc::new(StringArray::from(ids)));
501
502 for field in schema.fields().iter().skip(1) {
504 if node_type.blob_properties.contains(field.name()) {
505 let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
506 columns.push(col);
507 } else {
508 let col =
509 build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
510 columns.push(col);
511 }
512 }
513
514 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
515}
516
517fn build_edge_batch(
518 edge_type: &omnigraph_compiler::catalog::EdgeType,
519 rows: &[(String, String, JsonValue)],
520) -> Result<RecordBatch> {
521 let schema = edge_type.arrow_schema.clone();
522
523 let ids: Vec<String> = rows
524 .iter()
525 .map(|(_, _, data)| {
526 data.get("id")
527 .and_then(|v| v.as_str())
528 .map(str::to_string)
529 .unwrap_or_else(generate_id)
530 })
531 .collect();
532 let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
533 let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
534
535 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
536 columns.push(Arc::new(StringArray::from(ids)));
537 columns.push(Arc::new(StringArray::from(srcs)));
538 columns.push(Arc::new(StringArray::from(dsts)));
539
540 let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
542 for field in schema.fields().iter().skip(3) {
543 if edge_type.blob_properties.contains(field.name()) {
544 let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
545 columns.push(col);
546 } else {
547 let col = build_column_from_json(
548 field.name(),
549 field.data_type(),
550 field.is_nullable(),
551 &data_values,
552 )?;
553 columns.push(col);
554 }
555 }
556
557 RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
558}
559
560pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
562 if let Some(encoded) = value.strip_prefix("base64:") {
563 let bytes = base64::engine::general_purpose::STANDARD
564 .decode(encoded)
565 .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
566 builder
567 .push_bytes(bytes)
568 .map_err(|e| OmniError::Lance(e.to_string()))
569 } else {
570 builder
572 .push_uri(value)
573 .map_err(|e| OmniError::Lance(e.to_string()))
574 }
575}
576
577fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
579 let mut builder = BlobArrayBuilder::new(rows.len());
580 for row in rows {
581 match row.get(name) {
582 Some(JsonValue::String(s)) => {
583 append_blob_value(&mut builder, s)?;
584 }
585 Some(JsonValue::Null) | None if nullable => {
586 builder
587 .push_null()
588 .map_err(|e| OmniError::Lance(e.to_string()))?;
589 }
590 Some(JsonValue::Null) | None => {
591 return Err(OmniError::manifest(format!(
592 "non-nullable blob property '{}' has null values",
593 name
594 )));
595 }
596 _ => {
597 return Err(OmniError::manifest(format!(
598 "blob property '{}' must be a URI string or base64: prefixed data",
599 name
600 )));
601 }
602 }
603 }
604 builder
605 .finish()
606 .map_err(|e| OmniError::Lance(e.to_string()))
607}
608
609fn build_column_from_json(
610 name: &str,
611 data_type: &DataType,
612 nullable: bool,
613 rows: &[JsonValue],
614) -> Result<ArrayRef> {
615 let array: ArrayRef = match data_type {
616 DataType::Utf8 => {
617 let values: Vec<Option<String>> = rows
618 .iter()
619 .map(|row| {
620 row.get(name)
621 .and_then(|v| v.as_str())
622 .map(|s| s.to_string())
623 })
624 .collect();
625 Arc::new(StringArray::from(values))
626 }
627 DataType::Int32 => {
628 let values: Vec<Option<i32>> = rows
629 .iter()
630 .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
631 .collect();
632 Arc::new(Int32Array::from(values))
633 }
634 DataType::Int64 => {
635 let values: Vec<Option<i64>> = rows
636 .iter()
637 .map(|row| row.get(name).and_then(|v| v.as_i64()))
638 .collect();
639 Arc::new(Int64Array::from(values))
640 }
641 DataType::UInt32 => {
642 let values: Vec<Option<u32>> = rows
643 .iter()
644 .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
645 .collect();
646 Arc::new(UInt32Array::from(values))
647 }
648 DataType::UInt64 => {
649 let values: Vec<Option<u64>> = rows
650 .iter()
651 .map(|row| row.get(name).and_then(|v| v.as_u64()))
652 .collect();
653 Arc::new(UInt64Array::from(values))
654 }
655 DataType::Float32 => {
656 let values: Vec<Option<f32>> = rows
657 .iter()
658 .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
659 .collect();
660 Arc::new(Float32Array::from(values))
661 }
662 DataType::Float64 => {
663 let values: Vec<Option<f64>> = rows
664 .iter()
665 .map(|row| row.get(name).and_then(|v| v.as_f64()))
666 .collect();
667 Arc::new(Float64Array::from(values))
668 }
669 DataType::Boolean => {
670 let values: Vec<Option<bool>> = rows
671 .iter()
672 .map(|row| row.get(name).and_then(|v| v.as_bool()))
673 .collect();
674 Arc::new(BooleanArray::from(values))
675 }
676 DataType::Date32 => {
677 let mut values = Vec::with_capacity(rows.len());
678 for row in rows {
679 values.push(parse_date32_json_value(
680 row.get(name).unwrap_or(&JsonValue::Null),
681 )?);
682 }
683 Arc::new(Date32Array::from(values))
684 }
685 DataType::Date64 => {
686 let mut values = Vec::with_capacity(rows.len());
687 for row in rows {
688 values.push(parse_date64_json_value(
689 row.get(name).unwrap_or(&JsonValue::Null),
690 )?);
691 }
692 Arc::new(Date64Array::from(values))
693 }
694 DataType::List(field) => {
695 let mut builder = ListBuilder::with_capacity(
696 make_list_value_builder(field.data_type(), rows.len())?,
697 rows.len(),
698 )
699 .with_field(field.clone());
700 for row in rows {
701 let value = row.get(name).unwrap_or(&JsonValue::Null);
702 if value.is_null() {
703 builder.append(false);
704 continue;
705 }
706 let items = value.as_array().ok_or_else(|| {
707 OmniError::manifest(format!(
708 "list property '{}' expects a JSON array, got {}",
709 name, value
710 ))
711 })?;
712 for item in items {
713 append_json_list_item(builder.values(), field.data_type(), item)?;
714 }
715 builder.append(true);
716 }
717 Arc::new(builder.finish())
718 }
719 DataType::FixedSizeList(child_field, dim) => {
720 let dim = *dim;
722 let mut builder = FixedSizeListBuilder::with_capacity(
723 Float32Builder::with_capacity(rows.len() * dim as usize),
724 dim,
725 rows.len(),
726 )
727 .with_field(child_field.clone());
728 for row in rows {
729 if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
730 if arr.len() != dim as usize {
731 return Err(OmniError::manifest(format!(
732 "vector property '{}' expects {} dimensions, got {}",
733 name,
734 dim,
735 arr.len()
736 )));
737 }
738 for val in arr {
739 builder
740 .values()
741 .append_value(val.as_f64().unwrap_or(0.0) as f32);
742 }
743 builder.append(true);
744 } else if nullable {
745 for _ in 0..dim as usize {
746 builder.values().append_null();
747 }
748 builder.append(false);
749 } else {
750 return Err(OmniError::manifest(format!(
751 "non-nullable vector property '{}' has null values",
752 name
753 )));
754 }
755 }
756 Arc::new(builder.finish())
757 }
758 _ => {
759 let values: Vec<Option<&str>> = vec![None; rows.len()];
761 Arc::new(StringArray::from(values))
762 }
763 };
764
765 if !nullable && array.null_count() > 0 {
766 return Err(OmniError::manifest(format!(
767 "non-nullable property '{}' has null or invalid values",
768 name
769 )));
770 }
771
772 Ok(array)
773}
774
775fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
776 Ok(match data_type {
777 DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
778 DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
779 DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
780 DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
781 DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
782 DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
783 DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
784 DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
785 DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
786 DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
787 other => {
788 return Err(OmniError::manifest(format!(
789 "unsupported list element data type {:?}",
790 other
791 )));
792 }
793 })
794}
795
796fn append_json_list_item(
797 builder: &mut Box<dyn ArrayBuilder>,
798 data_type: &DataType,
799 value: &JsonValue,
800) -> Result<()> {
801 match data_type {
802 DataType::Utf8 => {
803 let builder = builder
804 .as_any_mut()
805 .downcast_mut::<StringBuilder>()
806 .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
807 if let Some(value) = value.as_str() {
808 builder.append_value(value);
809 } else {
810 builder.append_null();
811 }
812 }
813 DataType::Boolean => {
814 let builder = builder
815 .as_any_mut()
816 .downcast_mut::<BooleanBuilder>()
817 .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
818 if let Some(value) = value.as_bool() {
819 builder.append_value(value);
820 } else {
821 builder.append_null();
822 }
823 }
824 DataType::Int32 => {
825 let builder = builder
826 .as_any_mut()
827 .downcast_mut::<Int32Builder>()
828 .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
829 if let Some(value) = value.as_i64() {
830 let value = i32::try_from(value).map_err(|_| {
831 OmniError::manifest(format!("list value {} exceeds Int32 range", value))
832 })?;
833 builder.append_value(value);
834 } else {
835 builder.append_null();
836 }
837 }
838 DataType::Int64 => {
839 let builder = builder
840 .as_any_mut()
841 .downcast_mut::<Int64Builder>()
842 .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
843 if let Some(value) = value.as_i64() {
844 builder.append_value(value);
845 } else {
846 builder.append_null();
847 }
848 }
849 DataType::UInt32 => {
850 let builder = builder
851 .as_any_mut()
852 .downcast_mut::<UInt32Builder>()
853 .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
854 if let Some(value) = value.as_u64() {
855 let value = u32::try_from(value).map_err(|_| {
856 OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
857 })?;
858 builder.append_value(value);
859 } else {
860 builder.append_null();
861 }
862 }
863 DataType::UInt64 => {
864 let builder = builder
865 .as_any_mut()
866 .downcast_mut::<UInt64Builder>()
867 .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
868 if let Some(value) = value.as_u64() {
869 builder.append_value(value);
870 } else {
871 builder.append_null();
872 }
873 }
874 DataType::Float32 => {
875 let builder = builder
876 .as_any_mut()
877 .downcast_mut::<Float32Builder>()
878 .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
879 if let Some(value) = value.as_f64() {
880 builder.append_value(value as f32);
881 } else {
882 builder.append_null();
883 }
884 }
885 DataType::Float64 => {
886 let builder = builder
887 .as_any_mut()
888 .downcast_mut::<Float64Builder>()
889 .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
890 if let Some(value) = value.as_f64() {
891 builder.append_value(value);
892 } else {
893 builder.append_null();
894 }
895 }
896 DataType::Date32 => {
897 let builder = builder
898 .as_any_mut()
899 .downcast_mut::<Date32Builder>()
900 .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
901 if let Some(value) = parse_date32_json_value(value)? {
902 builder.append_value(value);
903 } else {
904 builder.append_null();
905 }
906 }
907 DataType::Date64 => {
908 let builder = builder
909 .as_any_mut()
910 .downcast_mut::<Date64Builder>()
911 .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
912 if let Some(value) = parse_date64_json_value(value)? {
913 builder.append_value(value);
914 } else {
915 builder.append_null();
916 }
917 }
918 other => {
919 return Err(OmniError::manifest(format!(
920 "unsupported list element data type {:?}",
921 other
922 )));
923 }
924 }
925
926 Ok(())
927}
928
929fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
930 if value.is_null() {
931 return Ok(None);
932 }
933 if let Some(days) = value.as_i64() {
934 let days = i32::try_from(days)
935 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
936 return Ok(Some(days));
937 }
938 if let Some(days) = value.as_u64() {
939 let days = i32::try_from(days)
940 .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
941 return Ok(Some(days));
942 }
943 if let Some(value) = value.as_str() {
944 return Ok(Some(parse_date32_literal(value)?));
945 }
946 Ok(None)
947}
948
949fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
950 if value.is_null() {
951 return Ok(None);
952 }
953 if let Some(ms) = value.as_i64() {
954 return Ok(Some(ms));
955 }
956 if let Some(ms) = value.as_u64() {
957 let ms = i64::try_from(ms)
958 .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
959 return Ok(Some(ms));
960 }
961 if let Some(value) = value.as_str() {
962 return Ok(Some(parse_date64_literal(value)?));
963 }
964 Ok(None)
965}
966
967const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
978
979fn load_write_concurrency() -> usize {
980 std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
981 .ok()
982 .and_then(|v| v.parse::<usize>().ok())
983 .filter(|v| *v > 0)
984 .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
985}
986
987async fn write_batches_concurrently(
991 db: &Omnigraph,
992 branch: Option<&str>,
993 mode: LoadMode,
994 prepared: Vec<(String, String, RecordBatch, usize)>,
995) -> Result<
996 Vec<(
997 String,
998 String,
999 usize,
1000 crate::table_store::TableState,
1001 Option<String>,
1002 )>,
1003> {
1004 use futures::stream::StreamExt;
1005
1006 if prepared.is_empty() {
1007 return Ok(Vec::new());
1008 }
1009
1010 let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1011
1012 futures::stream::iter(prepared.into_iter().map(
1013 |(type_name, table_key, batch, loaded_count)| async move {
1014 let (state, table_branch) =
1015 write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1016 Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1017 },
1018 ))
1019 .buffered(concurrency)
1020 .collect::<Vec<Result<_>>>()
1021 .await
1022 .into_iter()
1023 .collect()
1024}
1025
1026async fn write_batch_to_dataset(
1027 db: &Omnigraph,
1028 branch: Option<&str>,
1029 table_key: &str,
1030 batch: RecordBatch,
1031 mode: LoadMode,
1032) -> Result<(crate::table_store::TableState, Option<String>)> {
1033 let (mut ds, full_path, table_branch) =
1034 db.open_for_mutation_on_branch(branch, table_key).await?;
1035 let table_store = db.table_store();
1036
1037 match mode {
1038 LoadMode::Overwrite => {
1039 let state = table_store
1040 .overwrite_batch(&full_path, &mut ds, batch)
1041 .await?;
1042 Ok((state, table_branch))
1043 }
1044 LoadMode::Append => {
1045 let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1046 Ok((state, table_branch))
1047 }
1048 LoadMode::Merge => {
1049 let state = table_store
1050 .merge_insert_batch(
1051 &full_path,
1052 ds,
1053 batch,
1054 vec!["id".to_string()],
1055 lance::dataset::WhenMatched::UpdateAll,
1056 lance::dataset::WhenNotMatched::InsertAll,
1057 )
1058 .await?;
1059 Ok((state, table_branch))
1060 }
1061 }
1062}
1063
1064fn generate_id() -> String {
1065 ulid::Ulid::new().to_string()
1066}
1067
1068pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1069 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1070 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1071 .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1072 let out = casted
1073 .as_any()
1074 .downcast_ref::<Date32Array>()
1075 .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1076 if out.is_null(0) {
1077 return Err(OmniError::manifest(format!(
1078 "invalid Date literal '{}'",
1079 value
1080 )));
1081 }
1082 Ok(out.value(0))
1083}
1084
1085pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1086 let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1087 let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1088 .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1089 let out = casted
1090 .as_any()
1091 .downcast_ref::<Date64Array>()
1092 .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1093 if out.is_null(0) {
1094 return Err(OmniError::manifest(format!(
1095 "invalid DateTime literal '{}'",
1096 value
1097 )));
1098 }
1099 Ok(out.value(0))
1100}
1101
1102pub(crate) fn validate_value_constraints(
1105 batch: &RecordBatch,
1106 node_type: &omnigraph_compiler::catalog::NodeType,
1107) -> Result<()> {
1108 use arrow_array::Array;
1109
1110 for rc in &node_type.range_constraints {
1112 let Some(col) = batch.column_by_name(&rc.property) else {
1113 continue;
1114 };
1115 for row in 0..batch.num_rows() {
1116 if col.is_null(row) {
1117 continue;
1118 }
1119 let value = extract_numeric_value(col, row);
1120 if let Some(val) = value {
1121 if val.is_nan() {
1122 return Err(OmniError::manifest(format!(
1123 "@range violation on {}.{}: value is NaN",
1124 node_type.name, rc.property
1125 )));
1126 }
1127 if let Some(ref min) = rc.min {
1128 let min_f = literal_value_to_f64(min);
1129 if val < min_f {
1130 return Err(OmniError::manifest(format!(
1131 "@range violation on {}.{}: value {} < min {}",
1132 node_type.name, rc.property, val, min_f
1133 )));
1134 }
1135 }
1136 if let Some(ref max) = rc.max {
1137 let max_f = literal_value_to_f64(max);
1138 if val > max_f {
1139 return Err(OmniError::manifest(format!(
1140 "@range violation on {}.{}: value {} > max {}",
1141 node_type.name, rc.property, val, max_f
1142 )));
1143 }
1144 }
1145 }
1146 }
1147 }
1148
1149 for cc in &node_type.check_constraints {
1151 let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1152 OmniError::manifest(format!(
1153 "@check on {}.{} has invalid regex '{}': {}",
1154 node_type.name, cc.property, cc.pattern, e
1155 ))
1156 })?;
1157 let Some(col) = batch.column_by_name(&cc.property) else {
1158 continue;
1159 };
1160 let str_col = col.as_any().downcast_ref::<StringArray>();
1161 if let Some(str_col) = str_col {
1162 for row in 0..str_col.len() {
1163 if str_col.is_null(row) {
1164 continue;
1165 }
1166 let val = str_col.value(row);
1167 if !re.is_match(val) {
1168 return Err(OmniError::manifest(format!(
1169 "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1170 node_type.name, cc.property, val, cc.pattern
1171 )));
1172 }
1173 }
1174 }
1175 }
1176
1177 Ok(())
1178}
1179
1180fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1181 use arrow_array::{
1182 Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1183 };
1184 if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1185 return Some(a.value(row) as f64);
1186 }
1187 if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1188 return Some(a.value(row) as f64);
1189 }
1190 if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1191 return Some(a.value(row) as f64);
1192 }
1193 if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1194 return Some(a.value(row) as f64);
1195 }
1196 if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1197 return Some(a.value(row) as f64);
1198 }
1199 if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1200 return Some(a.value(row));
1201 }
1202 None
1203}
1204
1205fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1206 use omnigraph_compiler::catalog::LiteralValue;
1207 match v {
1208 LiteralValue::Integer(n) => *n as f64,
1209 LiteralValue::Float(f) => *f,
1210 }
1211}
1212
1213async fn validate_edge_cardinality(
1216 db: &crate::db::Omnigraph,
1217 branch: Option<&str>,
1218 edge_name: &str,
1219 written_version: u64,
1220 written_branch: Option<&str>,
1221) -> Result<()> {
1222 use arrow_array::Array;
1223 let catalog = db.catalog();
1224 let edge_type = &catalog.edge_types[edge_name];
1225 if edge_type.cardinality.is_default() {
1226 return Ok(());
1227 }
1228
1229 let snapshot = db.snapshot_for_branch(branch).await?;
1232 let table_key = format!("edge:{}", edge_name);
1233 let entry = snapshot
1234 .entry(&table_key)
1235 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1236 let ds = db
1237 .open_dataset_at_state(
1238 &entry.table_path,
1239 written_branch.or(entry.table_branch.as_deref()),
1240 written_version,
1241 )
1242 .await?;
1243
1244 let batches = db
1246 .table_store()
1247 .scan(&ds, Some(&["src"]), None, None)
1248 .await?;
1249
1250 let mut counts: HashMap<String, u32> = HashMap::new();
1251 for batch in &batches {
1252 let srcs = batch
1253 .column_by_name("src")
1254 .unwrap()
1255 .as_any()
1256 .downcast_ref::<StringArray>()
1257 .unwrap();
1258 for i in 0..srcs.len() {
1259 *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1260 }
1261 }
1262
1263 let card = &edge_type.cardinality;
1264 for (src, count) in &counts {
1265 if let Some(max) = card.max {
1266 if *count > max {
1267 return Err(OmniError::manifest(format!(
1268 "@card violation on edge {}: source '{}' has {} edges (max {})",
1269 edge_name, src, count, max
1270 )));
1271 }
1272 }
1273 if *count < card.min {
1274 return Err(OmniError::manifest(format!(
1275 "@card violation on edge {}: source '{}' has {} edges (min {})",
1276 edge_name, src, count, card.min
1277 )));
1278 }
1279 }
1280
1281 Ok(())
1282}
1283
1284async fn collect_node_ids(
1289 db: &Omnigraph,
1290 branch: Option<&str>,
1291 type_name: &str,
1292 node_rows: &HashMap<String, Vec<JsonValue>>,
1293 catalog: &omnigraph_compiler::catalog::Catalog,
1294 updates: &[crate::db::SubTableUpdate],
1295) -> Result<HashSet<String>> {
1296 let mut ids = HashSet::new();
1297
1298 if let Some(rows) = node_rows.get(type_name) {
1300 if let Some(node_type) = catalog.node_types.get(type_name) {
1301 if let Some(key_prop) = node_type.key_property() {
1302 for row in rows {
1303 if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1304 ids.insert(id.to_string());
1305 }
1306 }
1307 }
1308 }
1309 }
1310
1311 let table_key = format!("node:{}", type_name);
1313 let snapshot = db.snapshot_for_branch(branch).await?;
1314 let Some(entry) = snapshot.entry(&table_key) else {
1315 return Ok(ids);
1316 };
1317 let updated = updates
1319 .iter()
1320 .find(|u| u.table_key == table_key)
1321 .map(|u| (u.table_version, u.table_branch.as_deref()));
1322 let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1323 let ds = db
1324 .open_dataset_at_state(&entry.table_path, branch, version)
1325 .await?;
1326
1327 let batches = db
1328 .table_store()
1329 .scan(&ds, Some(&["id"]), None, None)
1330 .await?;
1331
1332 for batch in &batches {
1333 let id_col = batch
1334 .column_by_name("id")
1335 .unwrap()
1336 .as_any()
1337 .downcast_ref::<StringArray>()
1338 .unwrap();
1339 for i in 0..batch.num_rows() {
1340 ids.insert(id_col.value(i).to_string());
1341 }
1342 }
1343
1344 Ok(ids)
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349 use super::*;
1350 use crate::db::Omnigraph;
1351 use arrow_array::Array;
1352 use futures::TryStreamExt;
1353 use std::collections::HashMap;
1354
1355 const TEST_SCHEMA: &str = r#"
1356node Person {
1357 name: String @key
1358 age: I32?
1359}
1360node Company {
1361 name: String @key
1362}
1363edge Knows: Person -> Person {
1364 since: Date?
1365}
1366edge WorksAt: Person -> Company
1367"#;
1368
1369 const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1370{"type": "Person", "data": {"name": "Bob", "age": 25}}
1371{"type": "Company", "data": {"name": "Acme"}}
1372{"edge": "Knows", "from": "Alice", "to": "Bob"}
1373{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1374"#;
1375
1376 #[tokio::test]
1377 async fn test_load_creates_data() {
1378 let dir = tempfile::tempdir().unwrap();
1379 let uri = dir.path().to_str().unwrap();
1380 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1381
1382 let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1383 .await
1384 .unwrap();
1385
1386 assert_eq!(result.nodes_loaded["Person"], 2);
1387 assert_eq!(result.nodes_loaded["Company"], 1);
1388 assert_eq!(result.edges_loaded["Knows"], 1);
1389 assert_eq!(result.edges_loaded["WorksAt"], 1);
1390 }
1391
1392 #[tokio::test]
1393 async fn test_load_data_readable_via_lance() {
1394 let dir = tempfile::tempdir().unwrap();
1395 let uri = dir.path().to_str().unwrap();
1396 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1397 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1398 .await
1399 .unwrap();
1400
1401 let snap = db.snapshot();
1403 let person_ds = snap.open("node:Person").await.unwrap();
1404
1405 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1406
1407 let batches: Vec<RecordBatch> = person_ds
1409 .scan()
1410 .try_into_stream()
1411 .await
1412 .unwrap()
1413 .try_collect()
1414 .await
1415 .unwrap();
1416
1417 let batch = &batches[0];
1418 let ids = batch
1419 .column_by_name("id")
1420 .unwrap()
1421 .as_any()
1422 .downcast_ref::<StringArray>()
1423 .unwrap();
1424 let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1426 assert!(id_values.contains(&"Alice"));
1427 assert!(id_values.contains(&"Bob"));
1428 }
1429
1430 #[tokio::test]
1431 async fn test_load_edges_reference_node_keys() {
1432 let dir = tempfile::tempdir().unwrap();
1433 let uri = dir.path().to_str().unwrap();
1434 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1435 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1436 .await
1437 .unwrap();
1438
1439 let snap = db.snapshot();
1440 let knows_ds = snap.open("edge:Knows").await.unwrap();
1441
1442 let batches: Vec<RecordBatch> = knows_ds
1443 .scan()
1444 .try_into_stream()
1445 .await
1446 .unwrap()
1447 .try_collect()
1448 .await
1449 .unwrap();
1450
1451 let batch = &batches[0];
1452 let srcs = batch
1453 .column_by_name("src")
1454 .unwrap()
1455 .as_any()
1456 .downcast_ref::<StringArray>()
1457 .unwrap();
1458 let dsts = batch
1459 .column_by_name("dst")
1460 .unwrap()
1461 .as_any()
1462 .downcast_ref::<StringArray>()
1463 .unwrap();
1464
1465 assert_eq!(srcs.value(0), "Alice");
1466 assert_eq!(dsts.value(0), "Bob");
1467 }
1468
1469 #[tokio::test]
1470 async fn test_load_manifest_version_advances() {
1471 let dir = tempfile::tempdir().unwrap();
1472 let uri = dir.path().to_str().unwrap();
1473 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1474 let v1 = db.version();
1475
1476 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1477 .await
1478 .unwrap();
1479
1480 assert!(db.version() > v1);
1481 }
1482
1483 #[tokio::test]
1484 async fn test_load_append_adds_rows() {
1485 let dir = tempfile::tempdir().unwrap();
1486 let uri = dir.path().to_str().unwrap();
1487 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1488
1489 let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1490 let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1491
1492 load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1493 .await
1494 .unwrap();
1495 load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1496
1497 let snap = db.snapshot();
1498 let person_ds = snap.open("node:Person").await.unwrap();
1499 assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1500 }
1501
1502 #[tokio::test]
1503 async fn test_load_unknown_type_rejected() {
1504 let dir = tempfile::tempdir().unwrap();
1505 let uri = dir.path().to_str().unwrap();
1506 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1507
1508 let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1509 let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1510 assert!(result.is_err());
1511 }
1512
1513 #[tokio::test]
1514 async fn test_ingest_creates_branch_and_reports_tables() {
1515 let dir = tempfile::tempdir().unwrap();
1516 let uri = dir.path().to_str().unwrap();
1517 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1518
1519 let result = db
1520 .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1521 .await
1522 .unwrap();
1523
1524 assert_eq!(result.branch, "feature");
1525 assert_eq!(result.base_branch, "main");
1526 assert!(result.branch_created);
1527 assert_eq!(result.mode, LoadMode::Overwrite);
1528 assert_eq!(
1529 result.tables,
1530 vec![
1531 IngestTableResult {
1532 table_key: "edge:Knows".to_string(),
1533 rows_loaded: 1
1534 },
1535 IngestTableResult {
1536 table_key: "edge:WorksAt".to_string(),
1537 rows_loaded: 1
1538 },
1539 IngestTableResult {
1540 table_key: "node:Company".to_string(),
1541 rows_loaded: 1
1542 },
1543 IngestTableResult {
1544 table_key: "node:Person".to_string(),
1545 rows_loaded: 2
1546 },
1547 ]
1548 );
1549 assert!(
1550 db.branch_list()
1551 .await
1552 .unwrap()
1553 .contains(&"feature".to_string())
1554 );
1555 }
1556
1557 #[tokio::test]
1558 async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1559 let dir = tempfile::tempdir().unwrap();
1560 let uri = dir.path().to_str().unwrap();
1561 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1562 load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1563 .await
1564 .unwrap();
1565 db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1566 .await
1567 .unwrap();
1568
1569 let result = db
1570 .ingest(
1571 "feature",
1572 Some("missing-base"),
1573 r#"{"type":"Person","data":{"name":"Bob","age":26}}
1574{"type":"Person","data":{"name":"Eve","age":31}}"#,
1575 LoadMode::Merge,
1576 )
1577 .await
1578 .unwrap();
1579
1580 assert_eq!(result.branch, "feature");
1581 assert_eq!(result.base_branch, "missing-base");
1582 assert!(!result.branch_created);
1583 assert_eq!(result.mode, LoadMode::Merge);
1584 assert_eq!(
1585 result.tables,
1586 vec![IngestTableResult {
1587 table_key: "node:Person".to_string(),
1588 rows_loaded: 2
1589 }]
1590 );
1591
1592 let snap = db
1593 .snapshot_of(crate::db::ReadTarget::branch("feature"))
1594 .await
1595 .unwrap();
1596 let person_ds = snap.open("node:Person").await.unwrap();
1597 assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
1598
1599 let batches: Vec<RecordBatch> = person_ds
1600 .scan()
1601 .try_into_stream()
1602 .await
1603 .unwrap()
1604 .try_collect()
1605 .await
1606 .unwrap();
1607 let mut ages_by_id = HashMap::new();
1608 for batch in &batches {
1609 let ids = batch
1610 .column_by_name("id")
1611 .unwrap()
1612 .as_any()
1613 .downcast_ref::<StringArray>()
1614 .unwrap();
1615 let ages = batch
1616 .column_by_name("age")
1617 .unwrap()
1618 .as_any()
1619 .downcast_ref::<Int32Array>()
1620 .unwrap();
1621 for idx in 0..ids.len() {
1622 ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
1623 }
1624 }
1625
1626 assert_eq!(ages_by_id.get("Bob"), Some(&26));
1627 assert_eq!(ages_by_id.get("Eve"), Some(&31));
1628 assert_eq!(ages_by_id.get("Alice"), Some(&30));
1629 }
1630
1631 #[tokio::test]
1632 async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
1633 let dir = tempfile::tempdir().unwrap();
1634 let uri = dir.path().to_str().unwrap();
1635 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1636
1637 db.ingest_as(
1638 "feature",
1639 Some("main"),
1640 TEST_DATA,
1641 LoadMode::Overwrite,
1642 Some("act-andrew"),
1643 )
1644 .await
1645 .unwrap();
1646
1647 let head = db
1648 .list_commits(Some("feature"))
1649 .await
1650 .unwrap()
1651 .into_iter()
1652 .last()
1653 .unwrap();
1654 assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
1655 }
1656
1657 #[test]
1658 fn test_range_constraint_rejects_nan() {
1659 use arrow_array::{Float64Array, RecordBatch, StringArray};
1660 use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
1661 use std::sync::Arc;
1662
1663 let schema = Arc::new(arrow_schema::Schema::new(vec![
1664 arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
1665 arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
1666 ]));
1667
1668 let batch = RecordBatch::try_new(
1669 schema.clone(),
1670 vec![
1671 Arc::new(StringArray::from(vec!["bad"])),
1672 Arc::new(Float64Array::from(vec![f64::NAN])),
1673 ],
1674 )
1675 .unwrap();
1676
1677 let node_type = NodeType {
1678 name: "Test".to_string(),
1679 implements: vec![],
1680 properties: Default::default(),
1681 key: None,
1682 unique_constraints: vec![],
1683 indices: vec![],
1684 range_constraints: vec![RangeConstraint {
1685 property: "score".to_string(),
1686 min: Some(LiteralValue::Float(0.0)),
1687 max: Some(LiteralValue::Float(1.0)),
1688 }],
1689 check_constraints: vec![],
1690 embed_sources: Default::default(),
1691 blob_properties: Default::default(),
1692 arrow_schema: schema,
1693 };
1694
1695 let result = validate_value_constraints(&batch, &node_type);
1696 assert!(result.is_err(), "expected NaN to be rejected");
1697 let err = result.unwrap_err().to_string();
1698 assert!(err.contains("NaN"), "error should mention NaN: {}", err);
1699 }
1700}