Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24
25/// Result of a load operation.
26#[derive(Debug, Clone, Default)]
27pub struct LoadResult {
28    pub nodes_loaded: HashMap<String, usize>,
29    pub edges_loaded: HashMap<String, usize>,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct IngestTableResult {
34    pub table_key: String,
35    pub rows_loaded: usize,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct IngestResult {
40    pub branch: String,
41    pub base_branch: String,
42    pub branch_created: bool,
43    pub mode: LoadMode,
44    pub tables: Vec<IngestTableResult>,
45}
46
47/// Load mode for data ingestion.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum LoadMode {
51    /// Overwrite existing data.
52    Overwrite,
53    /// Append to existing data.
54    Append,
55    /// Merge by `id` key (upsert).
56    Merge,
57}
58
59/// Load JSONL data into an Omnigraph database.
60pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
61    let current_branch = db.active_branch().map(str::to_string);
62    let branch = current_branch.as_deref().unwrap_or("main");
63    db.load(branch, data, mode).await
64}
65
66/// Load JSONL data from a file path.
67pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
68    let current_branch = db.active_branch().map(str::to_string);
69    let branch = current_branch.as_deref().unwrap_or("main");
70    db.load_file(branch, path, mode).await
71}
72
73impl Omnigraph {
74    pub async fn ingest(
75        &mut self,
76        branch: &str,
77        from: Option<&str>,
78        data: &str,
79        mode: LoadMode,
80    ) -> Result<IngestResult> {
81        self.ingest_as(branch, from, data, mode, None).await
82    }
83
84    pub async fn ingest_as(
85        &mut self,
86        branch: &str,
87        from: Option<&str>,
88        data: &str,
89        mode: LoadMode,
90        actor_id: Option<&str>,
91    ) -> Result<IngestResult> {
92        let previous_actor = self.audit_actor_id.clone();
93        self.audit_actor_id = actor_id.map(str::to_string);
94        let result = self
95            .ingest_with_current_actor(branch, from, data, mode)
96            .await;
97        self.audit_actor_id = previous_actor;
98        result
99    }
100
101    pub async fn ingest_file(
102        &mut self,
103        branch: &str,
104        from: Option<&str>,
105        path: &str,
106        mode: LoadMode,
107    ) -> Result<IngestResult> {
108        self.ingest_file_as(branch, from, path, mode, None).await
109    }
110
111    pub async fn ingest_file_as(
112        &mut self,
113        branch: &str,
114        from: Option<&str>,
115        path: &str,
116        mode: LoadMode,
117        actor_id: Option<&str>,
118    ) -> Result<IngestResult> {
119        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
120        self.ingest_as(branch, from, &data, mode, actor_id).await
121    }
122
123    async fn ingest_with_current_actor(
124        &mut self,
125        branch: &str,
126        from: Option<&str>,
127        data: &str,
128        mode: LoadMode,
129    ) -> Result<IngestResult> {
130        self.ensure_schema_state_valid().await?;
131        let target_branch =
132            Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
133        let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
134            .unwrap_or_else(|| "main".to_string());
135        let branch_created = !self
136            .branch_list()
137            .await?
138            .iter()
139            .any(|name| name == &target_branch);
140        if branch_created {
141            self.branch_create_from(crate::db::ReadTarget::branch(&base_branch), &target_branch)
142                .await?;
143        }
144
145        let result = self.load(&target_branch, data, mode).await?;
146        Ok(IngestResult {
147            branch: target_branch,
148            base_branch,
149            branch_created,
150            mode,
151            tables: result.to_ingest_tables(),
152        })
153    }
154
155    pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
156        self.ensure_schema_state_valid().await?;
157        let requested = Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
158        if crate::db::is_internal_run_branch(&requested) {
159            return self
160                .load_direct_on_branch(Some(requested.as_str()), data, mode)
161                .await;
162        }
163
164        let target_head_before = self.latest_branch_snapshot_id(&requested).await?;
165        let op = format!("load_jsonl:branch={}:mode={}", requested, mode.as_str());
166        let run = self.begin_run(&requested, Some(op.as_str())).await?;
167        let staged_result = match self
168            .load_direct_on_branch(Some(run.run_branch.as_str()), data, mode)
169            .await
170        {
171            Ok(result) => result,
172            Err(err) => {
173                let _ = self.fail_run(&run.run_id).await;
174                return Err(err);
175            }
176        };
177
178        let target_head_now = self.latest_branch_snapshot_id(&requested).await?;
179        if target_head_now.as_str() != target_head_before.as_str() {
180            let _ = self.fail_run(&run.run_id).await;
181            return Err(OmniError::manifest_conflict(format!(
182                "target branch '{}' advanced during transactional load; retry",
183                requested
184            )));
185        }
186
187        if let Err(err) = self.publish_run(&run.run_id).await {
188            let _ = self.fail_run(&run.run_id).await;
189            return Err(err);
190        }
191
192        Ok(staged_result)
193    }
194
195    pub async fn load_file(
196        &mut self,
197        branch: &str,
198        path: &str,
199        mode: LoadMode,
200    ) -> Result<LoadResult> {
201        let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
202        self.load(branch, &data, mode).await
203    }
204
205    async fn load_direct_on_branch(
206        &mut self,
207        branch: Option<&str>,
208        data: &str,
209        mode: LoadMode,
210    ) -> Result<LoadResult> {
211        let reader = BufReader::new(Cursor::new(data.as_bytes()));
212        load_jsonl_reader(self, branch, reader, mode).await
213    }
214}
215
216impl LoadMode {
217    pub fn as_str(self) -> &'static str {
218        match self {
219            LoadMode::Overwrite => "overwrite",
220            LoadMode::Append => "append",
221            LoadMode::Merge => "merge",
222        }
223    }
224}
225
226impl LoadResult {
227    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
228        let mut tables = self
229            .nodes_loaded
230            .iter()
231            .map(|(type_name, rows_loaded)| IngestTableResult {
232                table_key: format!("node:{type_name}"),
233                rows_loaded: *rows_loaded,
234            })
235            .chain(
236                self.edges_loaded
237                    .iter()
238                    .map(|(edge_name, rows_loaded)| IngestTableResult {
239                        table_key: format!("edge:{edge_name}"),
240                        rows_loaded: *rows_loaded,
241                    }),
242            )
243            .collect::<Vec<_>>();
244        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
245        tables
246    }
247}
248
249async fn load_jsonl_reader<R: BufRead>(
250    db: &mut Omnigraph,
251    branch: Option<&str>,
252    reader: R,
253    mode: LoadMode,
254) -> Result<LoadResult> {
255    let catalog = db.catalog().clone();
256
257    // Phase 1: Parse all lines, spool into per-type collections
258    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
259    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
260
261    for (line_num, line) in reader.lines().enumerate() {
262        let line = line?;
263        let line = line.trim();
264        if line.is_empty() {
265            continue;
266        }
267        let value: JsonValue = serde_json::from_str(line).map_err(|e| {
268            OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
269        })?;
270
271        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
272            if !catalog.node_types.contains_key(type_name) {
273                return Err(OmniError::manifest(format!(
274                    "line {}: unknown node type '{}'",
275                    line_num + 1,
276                    type_name
277                )));
278            }
279            let data = value
280                .get("data")
281                .cloned()
282                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
283            node_rows
284                .entry(type_name.to_string())
285                .or_default()
286                .push(data);
287        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
288            if catalog.lookup_edge_by_name(edge_name).is_none() {
289                return Err(OmniError::manifest(format!(
290                    "line {}: unknown edge type '{}'",
291                    line_num + 1,
292                    edge_name
293                )));
294            }
295            let from = value
296                .get("from")
297                .and_then(|v| v.as_str())
298                .ok_or_else(|| {
299                    OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
300                })?
301                .to_string();
302            let to = value
303                .get("to")
304                .and_then(|v| v.as_str())
305                .ok_or_else(|| {
306                    OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
307                })?
308                .to_string();
309            let data = value
310                .get("data")
311                .cloned()
312                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
313            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
314            edge_rows
315                .entry(canonical)
316                .or_default()
317                .push((from, to, data));
318        } else {
319            return Err(OmniError::manifest(format!(
320                "line {}: expected 'type' or 'edge' field",
321                line_num + 1
322            )));
323        }
324    }
325
326    // Phase 2: Build per-type RecordBatches and write to Lance.
327    //
328    // Writes to different tables are independent in Lance (each table has its
329    // own manifest + fragments), so we parallelize across types with a bounded
330    // concurrency limit. Serial writes against S3 were the dominant cost of
331    // load — batching and parallelizing per-type cuts wall time by roughly
332    // `LOAD_WRITE_CONCURRENCY`× for wide schemas (see MR-677).
333
334    let mut updates = Vec::new();
335    let mut result = LoadResult::default();
336    let snapshot = db.snapshot_for_branch(branch).await?;
337
338    // Phase 2a: build and validate every node batch up front. Cheap and
339    // synchronous — surfaces validation errors before any S3 traffic.
340    let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
341        Vec::with_capacity(node_rows.len());
342    for (type_name, rows) in &node_rows {
343        let node_type = &catalog.node_types[type_name];
344        let batch = build_node_batch(node_type, rows)?;
345        validate_value_constraints(&batch, node_type)?;
346        let loaded_count = batch.num_rows();
347        let table_key = format!("node:{}", type_name);
348        snapshot
349            .entry(&table_key)
350            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
351        prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
352    }
353
354    // Phase 2b: write every node type concurrently, bounded.
355    let node_write_results = write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
356
357    for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
358        updates.push(crate::db::SubTableUpdate {
359            table_key,
360            table_version: state.version,
361            table_branch,
362            row_count: state.row_count,
363            version_metadata: state.version_metadata,
364        });
365        result.nodes_loaded.insert(type_name, loaded_count);
366    }
367
368    // Phase 2b: Validate edge referential integrity — every src/dst must
369    // reference an existing node ID in the appropriate type.
370    for (edge_name, rows) in &edge_rows {
371        let edge_type = &catalog.edge_types[edge_name];
372        let from_ids = collect_node_ids(
373            db,
374            branch,
375            &edge_type.from_type,
376            &node_rows,
377            &catalog,
378            &updates,
379        )
380        .await?;
381        let to_ids = collect_node_ids(
382            db,
383            branch,
384            &edge_type.to_type,
385            &node_rows,
386            &catalog,
387            &updates,
388        )
389        .await?;
390
391        for (i, (src, dst, _)) in rows.iter().enumerate() {
392            if !from_ids.contains(src.as_str()) {
393                return Err(OmniError::manifest(format!(
394                    "edge {} row {}: src '{}' not found in {}",
395                    edge_name,
396                    i + 1,
397                    src,
398                    edge_type.from_type
399                )));
400            }
401            if !to_ids.contains(dst.as_str()) {
402                return Err(OmniError::manifest(format!(
403                    "edge {} row {}: dst '{}' not found in {}",
404                    edge_name,
405                    i + 1,
406                    dst,
407                    edge_type.to_type
408                )));
409            }
410        }
411    }
412
413    // Write edges (parallel per edge type, same pattern as nodes)
414    let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
415        Vec::with_capacity(edge_rows.len());
416    for (edge_name, rows) in &edge_rows {
417        let edge_type = &catalog.edge_types[edge_name];
418        let batch = build_edge_batch(edge_type, rows)?;
419        let loaded_count = batch.num_rows();
420        let table_key = format!("edge:{}", edge_name);
421        snapshot
422            .entry(&table_key)
423            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
424        prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
425    }
426
427    let edge_write_results = write_batches_concurrently(db, branch, mode, prepared_edges).await?;
428
429    for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
430        updates.push(crate::db::SubTableUpdate {
431            table_key,
432            table_version: state.version,
433            table_branch,
434            row_count: state.row_count,
435            version_metadata: state.version_metadata,
436        });
437        result.edges_loaded.insert(edge_name, loaded_count);
438    }
439
440    // Phase 3: Validate edge cardinality constraints (before commit — invalid
441    // data must not be committed). Opens edge sub-tables at their just-written
442    // versions, not through the snapshot (which still pins to pre-write state).
443    for (edge_name, _) in &edge_rows {
444        let table_key = format!("edge:{}", edge_name);
445        if let Some(update) = updates.iter().find(|u| u.table_key == table_key) {
446            validate_edge_cardinality(
447                db,
448                branch,
449                edge_name,
450                update.table_version,
451                update.table_branch.as_deref(),
452            )
453            .await?;
454        }
455    }
456
457    // Phase 4: Atomic manifest commit
458    db.commit_updates_on_branch(branch, &updates).await?;
459
460    Ok(result)
461}
462
463fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
464    let schema = node_type.arrow_schema.clone();
465
466    // Build id column: explicit id, @key value, or generated ULID.
467    let ids: Vec<String> = rows
468        .iter()
469        .map(|row| {
470            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
471            if let Some(key_prop) = node_type.key_property() {
472                let key_value = row
473                    .get(key_prop)
474                    .and_then(|v| v.as_str())
475                    .map(|s| s.to_string())
476                    .ok_or_else(|| {
477                        OmniError::manifest(format!(
478                            "node {} missing @key property '{}'",
479                            node_type.name, key_prop
480                        ))
481                    })?;
482                if let Some(explicit_id) = explicit_id {
483                    if explicit_id != key_value {
484                        return Err(OmniError::manifest(format!(
485                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
486                            node_type.name, explicit_id, key_prop, key_value
487                        )));
488                    }
489                }
490                Ok(key_value)
491            } else if let Some(explicit_id) = explicit_id {
492                Ok(explicit_id)
493            } else {
494                Ok(generate_id())
495            }
496        })
497        .collect::<Result<Vec<_>>>()?;
498
499    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
500    columns.push(Arc::new(StringArray::from(ids)));
501
502    // Build property columns (skip "id" field at index 0)
503    for field in schema.fields().iter().skip(1) {
504        if node_type.blob_properties.contains(field.name()) {
505            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
506            columns.push(col);
507        } else {
508            let col =
509                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
510            columns.push(col);
511        }
512    }
513
514    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
515}
516
517fn build_edge_batch(
518    edge_type: &omnigraph_compiler::catalog::EdgeType,
519    rows: &[(String, String, JsonValue)],
520) -> Result<RecordBatch> {
521    let schema = edge_type.arrow_schema.clone();
522
523    let ids: Vec<String> = rows
524        .iter()
525        .map(|(_, _, data)| {
526            data.get("id")
527                .and_then(|v| v.as_str())
528                .map(str::to_string)
529                .unwrap_or_else(generate_id)
530        })
531        .collect();
532    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
533    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
534
535    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
536    columns.push(Arc::new(StringArray::from(ids)));
537    columns.push(Arc::new(StringArray::from(srcs)));
538    columns.push(Arc::new(StringArray::from(dsts)));
539
540    // Build edge property columns (skip id, src, dst at indices 0-2)
541    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
542    for field in schema.fields().iter().skip(3) {
543        if edge_type.blob_properties.contains(field.name()) {
544            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
545            columns.push(col);
546        } else {
547            let col = build_column_from_json(
548                field.name(),
549                field.data_type(),
550                field.is_nullable(),
551                &data_values,
552            )?;
553            columns.push(col);
554        }
555    }
556
557    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
558}
559
560/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
561pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
562    if let Some(encoded) = value.strip_prefix("base64:") {
563        let bytes = base64::engine::general_purpose::STANDARD
564            .decode(encoded)
565            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
566        builder
567            .push_bytes(bytes)
568            .map_err(|e| OmniError::Lance(e.to_string()))
569    } else {
570        // Treat as URI (file://, s3://, gs://, or any other scheme)
571        builder
572            .push_uri(value)
573            .map_err(|e| OmniError::Lance(e.to_string()))
574    }
575}
576
577/// Build a blob column from JSON values using Lance BlobArrayBuilder.
578fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
579    let mut builder = BlobArrayBuilder::new(rows.len());
580    for row in rows {
581        match row.get(name) {
582            Some(JsonValue::String(s)) => {
583                append_blob_value(&mut builder, s)?;
584            }
585            Some(JsonValue::Null) | None if nullable => {
586                builder
587                    .push_null()
588                    .map_err(|e| OmniError::Lance(e.to_string()))?;
589            }
590            Some(JsonValue::Null) | None => {
591                return Err(OmniError::manifest(format!(
592                    "non-nullable blob property '{}' has null values",
593                    name
594                )));
595            }
596            _ => {
597                return Err(OmniError::manifest(format!(
598                    "blob property '{}' must be a URI string or base64: prefixed data",
599                    name
600                )));
601            }
602        }
603    }
604    builder
605        .finish()
606        .map_err(|e| OmniError::Lance(e.to_string()))
607}
608
609fn build_column_from_json(
610    name: &str,
611    data_type: &DataType,
612    nullable: bool,
613    rows: &[JsonValue],
614) -> Result<ArrayRef> {
615    let array: ArrayRef = match data_type {
616        DataType::Utf8 => {
617            let values: Vec<Option<String>> = rows
618                .iter()
619                .map(|row| {
620                    row.get(name)
621                        .and_then(|v| v.as_str())
622                        .map(|s| s.to_string())
623                })
624                .collect();
625            Arc::new(StringArray::from(values))
626        }
627        DataType::Int32 => {
628            let values: Vec<Option<i32>> = rows
629                .iter()
630                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
631                .collect();
632            Arc::new(Int32Array::from(values))
633        }
634        DataType::Int64 => {
635            let values: Vec<Option<i64>> = rows
636                .iter()
637                .map(|row| row.get(name).and_then(|v| v.as_i64()))
638                .collect();
639            Arc::new(Int64Array::from(values))
640        }
641        DataType::UInt32 => {
642            let values: Vec<Option<u32>> = rows
643                .iter()
644                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
645                .collect();
646            Arc::new(UInt32Array::from(values))
647        }
648        DataType::UInt64 => {
649            let values: Vec<Option<u64>> = rows
650                .iter()
651                .map(|row| row.get(name).and_then(|v| v.as_u64()))
652                .collect();
653            Arc::new(UInt64Array::from(values))
654        }
655        DataType::Float32 => {
656            let values: Vec<Option<f32>> = rows
657                .iter()
658                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
659                .collect();
660            Arc::new(Float32Array::from(values))
661        }
662        DataType::Float64 => {
663            let values: Vec<Option<f64>> = rows
664                .iter()
665                .map(|row| row.get(name).and_then(|v| v.as_f64()))
666                .collect();
667            Arc::new(Float64Array::from(values))
668        }
669        DataType::Boolean => {
670            let values: Vec<Option<bool>> = rows
671                .iter()
672                .map(|row| row.get(name).and_then(|v| v.as_bool()))
673                .collect();
674            Arc::new(BooleanArray::from(values))
675        }
676        DataType::Date32 => {
677            let mut values = Vec::with_capacity(rows.len());
678            for row in rows {
679                values.push(parse_date32_json_value(
680                    row.get(name).unwrap_or(&JsonValue::Null),
681                )?);
682            }
683            Arc::new(Date32Array::from(values))
684        }
685        DataType::Date64 => {
686            let mut values = Vec::with_capacity(rows.len());
687            for row in rows {
688                values.push(parse_date64_json_value(
689                    row.get(name).unwrap_or(&JsonValue::Null),
690                )?);
691            }
692            Arc::new(Date64Array::from(values))
693        }
694        DataType::List(field) => {
695            let mut builder = ListBuilder::with_capacity(
696                make_list_value_builder(field.data_type(), rows.len())?,
697                rows.len(),
698            )
699            .with_field(field.clone());
700            for row in rows {
701                let value = row.get(name).unwrap_or(&JsonValue::Null);
702                if value.is_null() {
703                    builder.append(false);
704                    continue;
705                }
706                let items = value.as_array().ok_or_else(|| {
707                    OmniError::manifest(format!(
708                        "list property '{}' expects a JSON array, got {}",
709                        name, value
710                    ))
711                })?;
712                for item in items {
713                    append_json_list_item(builder.values(), field.data_type(), item)?;
714                }
715                builder.append(true);
716            }
717            Arc::new(builder.finish())
718        }
719        DataType::FixedSizeList(child_field, dim) => {
720            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
721            let dim = *dim;
722            let mut builder = FixedSizeListBuilder::with_capacity(
723                Float32Builder::with_capacity(rows.len() * dim as usize),
724                dim,
725                rows.len(),
726            )
727            .with_field(child_field.clone());
728            for row in rows {
729                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
730                    if arr.len() != dim as usize {
731                        return Err(OmniError::manifest(format!(
732                            "vector property '{}' expects {} dimensions, got {}",
733                            name,
734                            dim,
735                            arr.len()
736                        )));
737                    }
738                    for val in arr {
739                        builder
740                            .values()
741                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
742                    }
743                    builder.append(true);
744                } else if nullable {
745                    for _ in 0..dim as usize {
746                        builder.values().append_null();
747                    }
748                    builder.append(false);
749                } else {
750                    return Err(OmniError::manifest(format!(
751                        "non-nullable vector property '{}' has null values",
752                        name
753                    )));
754                }
755            }
756            Arc::new(builder.finish())
757        }
758        _ => {
759            // Unsupported type: fill with nulls
760            let values: Vec<Option<&str>> = vec![None; rows.len()];
761            Arc::new(StringArray::from(values))
762        }
763    };
764
765    if !nullable && array.null_count() > 0 {
766        return Err(OmniError::manifest(format!(
767            "non-nullable property '{}' has null or invalid values",
768            name
769        )));
770    }
771
772    Ok(array)
773}
774
775fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
776    Ok(match data_type {
777        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
778        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
779        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
780        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
781        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
782        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
783        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
784        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
785        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
786        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
787        other => {
788            return Err(OmniError::manifest(format!(
789                "unsupported list element data type {:?}",
790                other
791            )));
792        }
793    })
794}
795
796fn append_json_list_item(
797    builder: &mut Box<dyn ArrayBuilder>,
798    data_type: &DataType,
799    value: &JsonValue,
800) -> Result<()> {
801    match data_type {
802        DataType::Utf8 => {
803            let builder = builder
804                .as_any_mut()
805                .downcast_mut::<StringBuilder>()
806                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
807            if let Some(value) = value.as_str() {
808                builder.append_value(value);
809            } else {
810                builder.append_null();
811            }
812        }
813        DataType::Boolean => {
814            let builder = builder
815                .as_any_mut()
816                .downcast_mut::<BooleanBuilder>()
817                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
818            if let Some(value) = value.as_bool() {
819                builder.append_value(value);
820            } else {
821                builder.append_null();
822            }
823        }
824        DataType::Int32 => {
825            let builder = builder
826                .as_any_mut()
827                .downcast_mut::<Int32Builder>()
828                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
829            if let Some(value) = value.as_i64() {
830                let value = i32::try_from(value).map_err(|_| {
831                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
832                })?;
833                builder.append_value(value);
834            } else {
835                builder.append_null();
836            }
837        }
838        DataType::Int64 => {
839            let builder = builder
840                .as_any_mut()
841                .downcast_mut::<Int64Builder>()
842                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
843            if let Some(value) = value.as_i64() {
844                builder.append_value(value);
845            } else {
846                builder.append_null();
847            }
848        }
849        DataType::UInt32 => {
850            let builder = builder
851                .as_any_mut()
852                .downcast_mut::<UInt32Builder>()
853                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
854            if let Some(value) = value.as_u64() {
855                let value = u32::try_from(value).map_err(|_| {
856                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
857                })?;
858                builder.append_value(value);
859            } else {
860                builder.append_null();
861            }
862        }
863        DataType::UInt64 => {
864            let builder = builder
865                .as_any_mut()
866                .downcast_mut::<UInt64Builder>()
867                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
868            if let Some(value) = value.as_u64() {
869                builder.append_value(value);
870            } else {
871                builder.append_null();
872            }
873        }
874        DataType::Float32 => {
875            let builder = builder
876                .as_any_mut()
877                .downcast_mut::<Float32Builder>()
878                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
879            if let Some(value) = value.as_f64() {
880                builder.append_value(value as f32);
881            } else {
882                builder.append_null();
883            }
884        }
885        DataType::Float64 => {
886            let builder = builder
887                .as_any_mut()
888                .downcast_mut::<Float64Builder>()
889                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
890            if let Some(value) = value.as_f64() {
891                builder.append_value(value);
892            } else {
893                builder.append_null();
894            }
895        }
896        DataType::Date32 => {
897            let builder = builder
898                .as_any_mut()
899                .downcast_mut::<Date32Builder>()
900                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
901            if let Some(value) = parse_date32_json_value(value)? {
902                builder.append_value(value);
903            } else {
904                builder.append_null();
905            }
906        }
907        DataType::Date64 => {
908            let builder = builder
909                .as_any_mut()
910                .downcast_mut::<Date64Builder>()
911                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
912            if let Some(value) = parse_date64_json_value(value)? {
913                builder.append_value(value);
914            } else {
915                builder.append_null();
916            }
917        }
918        other => {
919            return Err(OmniError::manifest(format!(
920                "unsupported list element data type {:?}",
921                other
922            )));
923        }
924    }
925
926    Ok(())
927}
928
929fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
930    if value.is_null() {
931        return Ok(None);
932    }
933    if let Some(days) = value.as_i64() {
934        let days = i32::try_from(days)
935            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
936        return Ok(Some(days));
937    }
938    if let Some(days) = value.as_u64() {
939        let days = i32::try_from(days)
940            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
941        return Ok(Some(days));
942    }
943    if let Some(value) = value.as_str() {
944        return Ok(Some(parse_date32_literal(value)?));
945    }
946    Ok(None)
947}
948
949fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
950    if value.is_null() {
951        return Ok(None);
952    }
953    if let Some(ms) = value.as_i64() {
954        return Ok(Some(ms));
955    }
956    if let Some(ms) = value.as_u64() {
957        let ms = i64::try_from(ms)
958            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
959        return Ok(Some(ms));
960    }
961    if let Some(value) = value.as_str() {
962        return Ok(Some(parse_date64_literal(value)?));
963    }
964    Ok(None)
965}
966
967/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
968/// How many per-type Lance writes to run concurrently during a load.
969///
970/// Each write is an independent S3 manifest + fragment write against a
971/// different table. Ops within a single table must still be serial (Lance
972/// OCC on the manifest), but cross-table writes have no shared state.
973///
974/// 8 is a conservative default — enough to overlap S3 round-trip latency
975/// across the typical 10-30 table schemas without flooding the runtime.
976/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
977const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
978
979fn load_write_concurrency() -> usize {
980    std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
981        .ok()
982        .and_then(|v| v.parse::<usize>().ok())
983        .filter(|v| *v > 0)
984        .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
985}
986
987/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples
988/// concurrently. Returns results in original iteration order so callers can
989/// zip them back to per-type metadata.
990async fn write_batches_concurrently(
991    db: &Omnigraph,
992    branch: Option<&str>,
993    mode: LoadMode,
994    prepared: Vec<(String, String, RecordBatch, usize)>,
995) -> Result<
996    Vec<(
997        String,
998        String,
999        usize,
1000        crate::table_store::TableState,
1001        Option<String>,
1002    )>,
1003> {
1004    use futures::stream::StreamExt;
1005
1006    if prepared.is_empty() {
1007        return Ok(Vec::new());
1008    }
1009
1010    let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1011
1012    futures::stream::iter(prepared.into_iter().map(
1013        |(type_name, table_key, batch, loaded_count)| async move {
1014            let (state, table_branch) =
1015                write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1016            Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1017        },
1018    ))
1019    .buffered(concurrency)
1020    .collect::<Vec<Result<_>>>()
1021    .await
1022    .into_iter()
1023    .collect()
1024}
1025
1026async fn write_batch_to_dataset(
1027    db: &Omnigraph,
1028    branch: Option<&str>,
1029    table_key: &str,
1030    batch: RecordBatch,
1031    mode: LoadMode,
1032) -> Result<(crate::table_store::TableState, Option<String>)> {
1033    let (mut ds, full_path, table_branch) =
1034        db.open_for_mutation_on_branch(branch, table_key).await?;
1035    let table_store = db.table_store();
1036
1037    match mode {
1038        LoadMode::Overwrite => {
1039            let state = table_store
1040                .overwrite_batch(&full_path, &mut ds, batch)
1041                .await?;
1042            Ok((state, table_branch))
1043        }
1044        LoadMode::Append => {
1045            let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1046            Ok((state, table_branch))
1047        }
1048        LoadMode::Merge => {
1049            let state = table_store
1050                .merge_insert_batch(
1051                    &full_path,
1052                    ds,
1053                    batch,
1054                    vec!["id".to_string()],
1055                    lance::dataset::WhenMatched::UpdateAll,
1056                    lance::dataset::WhenNotMatched::InsertAll,
1057                )
1058                .await?;
1059            Ok((state, table_branch))
1060        }
1061    }
1062}
1063
1064fn generate_id() -> String {
1065    ulid::Ulid::new().to_string()
1066}
1067
1068pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1069    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1070    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1071        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1072    let out = casted
1073        .as_any()
1074        .downcast_ref::<Date32Array>()
1075        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1076    if out.is_null(0) {
1077        return Err(OmniError::manifest(format!(
1078            "invalid Date literal '{}'",
1079            value
1080        )));
1081    }
1082    Ok(out.value(0))
1083}
1084
1085pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1086    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1087    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1088        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1089    let out = casted
1090        .as_any()
1091        .downcast_ref::<Date64Array>()
1092        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1093    if out.is_null(0) {
1094        return Err(OmniError::manifest(format!(
1095            "invalid DateTime literal '{}'",
1096            value
1097        )));
1098    }
1099    Ok(out.value(0))
1100}
1101
1102// ─── Value constraint validation ─────────────────────────────────────────────
1103
1104pub(crate) fn validate_value_constraints(
1105    batch: &RecordBatch,
1106    node_type: &omnigraph_compiler::catalog::NodeType,
1107) -> Result<()> {
1108    use arrow_array::Array;
1109
1110    // Range constraints
1111    for rc in &node_type.range_constraints {
1112        let Some(col) = batch.column_by_name(&rc.property) else {
1113            continue;
1114        };
1115        for row in 0..batch.num_rows() {
1116            if col.is_null(row) {
1117                continue;
1118            }
1119            let value = extract_numeric_value(col, row);
1120            if let Some(val) = value {
1121                if val.is_nan() {
1122                    return Err(OmniError::manifest(format!(
1123                        "@range violation on {}.{}: value is NaN",
1124                        node_type.name, rc.property
1125                    )));
1126                }
1127                if let Some(ref min) = rc.min {
1128                    let min_f = literal_value_to_f64(min);
1129                    if val < min_f {
1130                        return Err(OmniError::manifest(format!(
1131                            "@range violation on {}.{}: value {} < min {}",
1132                            node_type.name, rc.property, val, min_f
1133                        )));
1134                    }
1135                }
1136                if let Some(ref max) = rc.max {
1137                    let max_f = literal_value_to_f64(max);
1138                    if val > max_f {
1139                        return Err(OmniError::manifest(format!(
1140                            "@range violation on {}.{}: value {} > max {}",
1141                            node_type.name, rc.property, val, max_f
1142                        )));
1143                    }
1144                }
1145            }
1146        }
1147    }
1148
1149    // Check constraints (regex)
1150    for cc in &node_type.check_constraints {
1151        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1152            OmniError::manifest(format!(
1153                "@check on {}.{} has invalid regex '{}': {}",
1154                node_type.name, cc.property, cc.pattern, e
1155            ))
1156        })?;
1157        let Some(col) = batch.column_by_name(&cc.property) else {
1158            continue;
1159        };
1160        let str_col = col.as_any().downcast_ref::<StringArray>();
1161        if let Some(str_col) = str_col {
1162            for row in 0..str_col.len() {
1163                if str_col.is_null(row) {
1164                    continue;
1165                }
1166                let val = str_col.value(row);
1167                if !re.is_match(val) {
1168                    return Err(OmniError::manifest(format!(
1169                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1170                        node_type.name, cc.property, val, cc.pattern
1171                    )));
1172                }
1173            }
1174        }
1175    }
1176
1177    Ok(())
1178}
1179
1180fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1181    use arrow_array::{
1182        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1183    };
1184    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1185        return Some(a.value(row) as f64);
1186    }
1187    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1188        return Some(a.value(row) as f64);
1189    }
1190    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1191        return Some(a.value(row) as f64);
1192    }
1193    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1194        return Some(a.value(row) as f64);
1195    }
1196    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1197        return Some(a.value(row) as f64);
1198    }
1199    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1200        return Some(a.value(row));
1201    }
1202    None
1203}
1204
1205fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1206    use omnigraph_compiler::catalog::LiteralValue;
1207    match v {
1208        LiteralValue::Integer(n) => *n as f64,
1209        LiteralValue::Float(f) => *f,
1210    }
1211}
1212
1213// ─── Edge cardinality validation ─────────────────────────────────────────────
1214
1215async fn validate_edge_cardinality(
1216    db: &crate::db::Omnigraph,
1217    branch: Option<&str>,
1218    edge_name: &str,
1219    written_version: u64,
1220    written_branch: Option<&str>,
1221) -> Result<()> {
1222    use arrow_array::Array;
1223    let catalog = db.catalog();
1224    let edge_type = &catalog.edge_types[edge_name];
1225    if edge_type.cardinality.is_default() {
1226        return Ok(());
1227    }
1228
1229    // Open edge sub-table at the just-written version, not the snapshot's
1230    // (the snapshot still pins to the pre-write version).
1231    let snapshot = db.snapshot_for_branch(branch).await?;
1232    let table_key = format!("edge:{}", edge_name);
1233    let entry = snapshot
1234        .entry(&table_key)
1235        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1236    let ds = db
1237        .open_dataset_at_state(
1238            &entry.table_path,
1239            written_branch.or(entry.table_branch.as_deref()),
1240            written_version,
1241        )
1242        .await?;
1243
1244    // Scan src column, count per source
1245    let batches = db
1246        .table_store()
1247        .scan(&ds, Some(&["src"]), None, None)
1248        .await?;
1249
1250    let mut counts: HashMap<String, u32> = HashMap::new();
1251    for batch in &batches {
1252        let srcs = batch
1253            .column_by_name("src")
1254            .unwrap()
1255            .as_any()
1256            .downcast_ref::<StringArray>()
1257            .unwrap();
1258        for i in 0..srcs.len() {
1259            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1260        }
1261    }
1262
1263    let card = &edge_type.cardinality;
1264    for (src, count) in &counts {
1265        if let Some(max) = card.max {
1266            if *count > max {
1267                return Err(OmniError::manifest(format!(
1268                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1269                    edge_name, src, count, max
1270                )));
1271            }
1272        }
1273        if *count < card.min {
1274            return Err(OmniError::manifest(format!(
1275                "@card violation on edge {}: source '{}' has {} edges (min {})",
1276                edge_name, src, count, card.min
1277            )));
1278        }
1279    }
1280
1281    Ok(())
1282}
1283
1284/// Collect all valid node IDs for a given type. Union of:
1285/// - IDs from the just-loaded batch (in memory, from node_rows)
1286/// - IDs from the sub-table at the just-written version (if it was updated)
1287/// - IDs from the sub-table at the snapshot-pinned version (if it was not updated)
1288async fn collect_node_ids(
1289    db: &Omnigraph,
1290    branch: Option<&str>,
1291    type_name: &str,
1292    node_rows: &HashMap<String, Vec<JsonValue>>,
1293    catalog: &omnigraph_compiler::catalog::Catalog,
1294    updates: &[crate::db::SubTableUpdate],
1295) -> Result<HashSet<String>> {
1296    let mut ids = HashSet::new();
1297
1298    // IDs from the in-memory batch (just loaded in this operation)
1299    if let Some(rows) = node_rows.get(type_name) {
1300        if let Some(node_type) = catalog.node_types.get(type_name) {
1301            if let Some(key_prop) = node_type.key_property() {
1302                for row in rows {
1303                    if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1304                        ids.insert(id.to_string());
1305                    }
1306                }
1307            }
1308        }
1309    }
1310
1311    // IDs from the Lance sub-table
1312    let table_key = format!("node:{}", type_name);
1313    let snapshot = db.snapshot_for_branch(branch).await?;
1314    let Some(entry) = snapshot.entry(&table_key) else {
1315        return Ok(ids);
1316    };
1317    // Use the just-written version if this type was updated, else snapshot version
1318    let updated = updates
1319        .iter()
1320        .find(|u| u.table_key == table_key)
1321        .map(|u| (u.table_version, u.table_branch.as_deref()));
1322    let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1323    let ds = db
1324        .open_dataset_at_state(&entry.table_path, branch, version)
1325        .await?;
1326
1327    let batches = db
1328        .table_store()
1329        .scan(&ds, Some(&["id"]), None, None)
1330        .await?;
1331
1332    for batch in &batches {
1333        let id_col = batch
1334            .column_by_name("id")
1335            .unwrap()
1336            .as_any()
1337            .downcast_ref::<StringArray>()
1338            .unwrap();
1339        for i in 0..batch.num_rows() {
1340            ids.insert(id_col.value(i).to_string());
1341        }
1342    }
1343
1344    Ok(ids)
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349    use super::*;
1350    use crate::db::Omnigraph;
1351    use arrow_array::Array;
1352    use futures::TryStreamExt;
1353    use std::collections::HashMap;
1354
1355    const TEST_SCHEMA: &str = r#"
1356node Person {
1357    name: String @key
1358    age: I32?
1359}
1360node Company {
1361    name: String @key
1362}
1363edge Knows: Person -> Person {
1364    since: Date?
1365}
1366edge WorksAt: Person -> Company
1367"#;
1368
1369    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1370{"type": "Person", "data": {"name": "Bob", "age": 25}}
1371{"type": "Company", "data": {"name": "Acme"}}
1372{"edge": "Knows", "from": "Alice", "to": "Bob"}
1373{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1374"#;
1375
1376    #[tokio::test]
1377    async fn test_load_creates_data() {
1378        let dir = tempfile::tempdir().unwrap();
1379        let uri = dir.path().to_str().unwrap();
1380        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1381
1382        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1383            .await
1384            .unwrap();
1385
1386        assert_eq!(result.nodes_loaded["Person"], 2);
1387        assert_eq!(result.nodes_loaded["Company"], 1);
1388        assert_eq!(result.edges_loaded["Knows"], 1);
1389        assert_eq!(result.edges_loaded["WorksAt"], 1);
1390    }
1391
1392    #[tokio::test]
1393    async fn test_load_data_readable_via_lance() {
1394        let dir = tempfile::tempdir().unwrap();
1395        let uri = dir.path().to_str().unwrap();
1396        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1397        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1398            .await
1399            .unwrap();
1400
1401        // Read back via snapshot
1402        let snap = db.snapshot();
1403        let person_ds = snap.open("node:Person").await.unwrap();
1404
1405        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1406
1407        // Verify data
1408        let batches: Vec<RecordBatch> = person_ds
1409            .scan()
1410            .try_into_stream()
1411            .await
1412            .unwrap()
1413            .try_collect()
1414            .await
1415            .unwrap();
1416
1417        let batch = &batches[0];
1418        let ids = batch
1419            .column_by_name("id")
1420            .unwrap()
1421            .as_any()
1422            .downcast_ref::<StringArray>()
1423            .unwrap();
1424        // @key=name, so ids should be "Alice" and "Bob"
1425        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1426        assert!(id_values.contains(&"Alice"));
1427        assert!(id_values.contains(&"Bob"));
1428    }
1429
1430    #[tokio::test]
1431    async fn test_load_edges_reference_node_keys() {
1432        let dir = tempfile::tempdir().unwrap();
1433        let uri = dir.path().to_str().unwrap();
1434        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1435        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1436            .await
1437            .unwrap();
1438
1439        let snap = db.snapshot();
1440        let knows_ds = snap.open("edge:Knows").await.unwrap();
1441
1442        let batches: Vec<RecordBatch> = knows_ds
1443            .scan()
1444            .try_into_stream()
1445            .await
1446            .unwrap()
1447            .try_collect()
1448            .await
1449            .unwrap();
1450
1451        let batch = &batches[0];
1452        let srcs = batch
1453            .column_by_name("src")
1454            .unwrap()
1455            .as_any()
1456            .downcast_ref::<StringArray>()
1457            .unwrap();
1458        let dsts = batch
1459            .column_by_name("dst")
1460            .unwrap()
1461            .as_any()
1462            .downcast_ref::<StringArray>()
1463            .unwrap();
1464
1465        assert_eq!(srcs.value(0), "Alice");
1466        assert_eq!(dsts.value(0), "Bob");
1467    }
1468
1469    #[tokio::test]
1470    async fn test_load_manifest_version_advances() {
1471        let dir = tempfile::tempdir().unwrap();
1472        let uri = dir.path().to_str().unwrap();
1473        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1474        let v1 = db.version();
1475
1476        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1477            .await
1478            .unwrap();
1479
1480        assert!(db.version() > v1);
1481    }
1482
1483    #[tokio::test]
1484    async fn test_load_append_adds_rows() {
1485        let dir = tempfile::tempdir().unwrap();
1486        let uri = dir.path().to_str().unwrap();
1487        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1488
1489        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1490        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1491
1492        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1493            .await
1494            .unwrap();
1495        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1496
1497        let snap = db.snapshot();
1498        let person_ds = snap.open("node:Person").await.unwrap();
1499        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1500    }
1501
1502    #[tokio::test]
1503    async fn test_load_unknown_type_rejected() {
1504        let dir = tempfile::tempdir().unwrap();
1505        let uri = dir.path().to_str().unwrap();
1506        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1507
1508        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1509        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1510        assert!(result.is_err());
1511    }
1512
1513    #[tokio::test]
1514    async fn test_ingest_creates_branch_and_reports_tables() {
1515        let dir = tempfile::tempdir().unwrap();
1516        let uri = dir.path().to_str().unwrap();
1517        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1518
1519        let result = db
1520            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1521            .await
1522            .unwrap();
1523
1524        assert_eq!(result.branch, "feature");
1525        assert_eq!(result.base_branch, "main");
1526        assert!(result.branch_created);
1527        assert_eq!(result.mode, LoadMode::Overwrite);
1528        assert_eq!(
1529            result.tables,
1530            vec![
1531                IngestTableResult {
1532                    table_key: "edge:Knows".to_string(),
1533                    rows_loaded: 1
1534                },
1535                IngestTableResult {
1536                    table_key: "edge:WorksAt".to_string(),
1537                    rows_loaded: 1
1538                },
1539                IngestTableResult {
1540                    table_key: "node:Company".to_string(),
1541                    rows_loaded: 1
1542                },
1543                IngestTableResult {
1544                    table_key: "node:Person".to_string(),
1545                    rows_loaded: 2
1546                },
1547            ]
1548        );
1549        assert!(
1550            db.branch_list()
1551                .await
1552                .unwrap()
1553                .contains(&"feature".to_string())
1554        );
1555    }
1556
1557    #[tokio::test]
1558    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1559        let dir = tempfile::tempdir().unwrap();
1560        let uri = dir.path().to_str().unwrap();
1561        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1562        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1563            .await
1564            .unwrap();
1565        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1566            .await
1567            .unwrap();
1568
1569        let result = db
1570            .ingest(
1571                "feature",
1572                Some("missing-base"),
1573                r#"{"type":"Person","data":{"name":"Bob","age":26}}
1574{"type":"Person","data":{"name":"Eve","age":31}}"#,
1575                LoadMode::Merge,
1576            )
1577            .await
1578            .unwrap();
1579
1580        assert_eq!(result.branch, "feature");
1581        assert_eq!(result.base_branch, "missing-base");
1582        assert!(!result.branch_created);
1583        assert_eq!(result.mode, LoadMode::Merge);
1584        assert_eq!(
1585            result.tables,
1586            vec![IngestTableResult {
1587                table_key: "node:Person".to_string(),
1588                rows_loaded: 2
1589            }]
1590        );
1591
1592        let snap = db
1593            .snapshot_of(crate::db::ReadTarget::branch("feature"))
1594            .await
1595            .unwrap();
1596        let person_ds = snap.open("node:Person").await.unwrap();
1597        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
1598
1599        let batches: Vec<RecordBatch> = person_ds
1600            .scan()
1601            .try_into_stream()
1602            .await
1603            .unwrap()
1604            .try_collect()
1605            .await
1606            .unwrap();
1607        let mut ages_by_id = HashMap::new();
1608        for batch in &batches {
1609            let ids = batch
1610                .column_by_name("id")
1611                .unwrap()
1612                .as_any()
1613                .downcast_ref::<StringArray>()
1614                .unwrap();
1615            let ages = batch
1616                .column_by_name("age")
1617                .unwrap()
1618                .as_any()
1619                .downcast_ref::<Int32Array>()
1620                .unwrap();
1621            for idx in 0..ids.len() {
1622                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
1623            }
1624        }
1625
1626        assert_eq!(ages_by_id.get("Bob"), Some(&26));
1627        assert_eq!(ages_by_id.get("Eve"), Some(&31));
1628        assert_eq!(ages_by_id.get("Alice"), Some(&30));
1629    }
1630
1631    #[tokio::test]
1632    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
1633        let dir = tempfile::tempdir().unwrap();
1634        let uri = dir.path().to_str().unwrap();
1635        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1636
1637        db.ingest_as(
1638            "feature",
1639            Some("main"),
1640            TEST_DATA,
1641            LoadMode::Overwrite,
1642            Some("act-andrew"),
1643        )
1644        .await
1645        .unwrap();
1646
1647        let head = db
1648            .list_commits(Some("feature"))
1649            .await
1650            .unwrap()
1651            .into_iter()
1652            .last()
1653            .unwrap();
1654        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
1655    }
1656
1657    #[test]
1658    fn test_range_constraint_rejects_nan() {
1659        use arrow_array::{Float64Array, RecordBatch, StringArray};
1660        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
1661        use std::sync::Arc;
1662
1663        let schema = Arc::new(arrow_schema::Schema::new(vec![
1664            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
1665            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
1666        ]));
1667
1668        let batch = RecordBatch::try_new(
1669            schema.clone(),
1670            vec![
1671                Arc::new(StringArray::from(vec!["bad"])),
1672                Arc::new(Float64Array::from(vec![f64::NAN])),
1673            ],
1674        )
1675        .unwrap();
1676
1677        let node_type = NodeType {
1678            name: "Test".to_string(),
1679            implements: vec![],
1680            properties: Default::default(),
1681            key: None,
1682            unique_constraints: vec![],
1683            indices: vec![],
1684            range_constraints: vec![RangeConstraint {
1685                property: "score".to_string(),
1686                min: Some(LiteralValue::Float(0.0)),
1687                max: Some(LiteralValue::Float(1.0)),
1688            }],
1689            check_constraints: vec![],
1690            embed_sources: Default::default(),
1691            blob_properties: Default::default(),
1692            arrow_schema: schema,
1693        };
1694
1695        let result = validate_value_constraints(&batch, &node_type);
1696        assert!(result.is_err(), "expected NaN to be rejected");
1697        let err = result.unwrap_err().to_string();
1698        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
1699    }
1700}