Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24
25/// Result of a load operation.
26#[derive(Debug, Clone, Default)]
27pub struct LoadResult {
28    pub nodes_loaded: HashMap<String, usize>,
29    pub edges_loaded: HashMap<String, usize>,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct IngestTableResult {
34    pub table_key: String,
35    pub rows_loaded: usize,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct IngestResult {
40    pub branch: String,
41    pub base_branch: String,
42    pub branch_created: bool,
43    pub mode: LoadMode,
44    pub tables: Vec<IngestTableResult>,
45}
46
47/// Load mode for data ingestion.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum LoadMode {
51    /// Overwrite existing data.
52    Overwrite,
53    /// Append to existing data.
54    Append,
55    /// Merge by `id` key (upsert).
56    Merge,
57}
58
59/// Load JSONL data into an Omnigraph database.
60pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
61    let current_branch = db.active_branch().map(str::to_string);
62    let branch = current_branch.as_deref().unwrap_or("main");
63    db.load(branch, data, mode).await
64}
65
66/// Load JSONL data from a file path.
67pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
68    let current_branch = db.active_branch().map(str::to_string);
69    let branch = current_branch.as_deref().unwrap_or("main");
70    db.load_file(branch, path, mode).await
71}
72
73impl Omnigraph {
74    pub async fn ingest(
75        &mut self,
76        branch: &str,
77        from: Option<&str>,
78        data: &str,
79        mode: LoadMode,
80    ) -> Result<IngestResult> {
81        self.ingest_as(branch, from, data, mode, None).await
82    }
83
84    pub async fn ingest_as(
85        &mut self,
86        branch: &str,
87        from: Option<&str>,
88        data: &str,
89        mode: LoadMode,
90        actor_id: Option<&str>,
91    ) -> Result<IngestResult> {
92        let previous_actor = self.audit_actor_id.clone();
93        self.audit_actor_id = actor_id.map(str::to_string);
94        let result = self
95            .ingest_with_current_actor(branch, from, data, mode)
96            .await;
97        self.audit_actor_id = previous_actor;
98        result
99    }
100
101    pub async fn ingest_file(
102        &mut self,
103        branch: &str,
104        from: Option<&str>,
105        path: &str,
106        mode: LoadMode,
107    ) -> Result<IngestResult> {
108        self.ingest_file_as(branch, from, path, mode, None).await
109    }
110
111    pub async fn ingest_file_as(
112        &mut self,
113        branch: &str,
114        from: Option<&str>,
115        path: &str,
116        mode: LoadMode,
117        actor_id: Option<&str>,
118    ) -> Result<IngestResult> {
119        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
120        self.ingest_as(branch, from, &data, mode, actor_id).await
121    }
122
123    async fn ingest_with_current_actor(
124        &mut self,
125        branch: &str,
126        from: Option<&str>,
127        data: &str,
128        mode: LoadMode,
129    ) -> Result<IngestResult> {
130        self.ensure_schema_state_valid().await?;
131        let target_branch =
132            Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
133        let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
134            .unwrap_or_else(|| "main".to_string());
135        let branch_created = !self
136            .branch_list()
137            .await?
138            .iter()
139            .any(|name| name == &target_branch);
140        if branch_created {
141            self.branch_create_from(crate::db::ReadTarget::branch(&base_branch), &target_branch)
142                .await?;
143        }
144
145        let result = self.load(&target_branch, data, mode).await?;
146        Ok(IngestResult {
147            branch: target_branch,
148            base_branch,
149            branch_created,
150            mode,
151            tables: result.to_ingest_tables(),
152        })
153    }
154
155    pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
156        self.ensure_schema_state_valid().await?;
157        let requested = Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
158        if crate::db::is_internal_run_branch(&requested) {
159            return self
160                .load_direct_on_branch(Some(requested.as_str()), data, mode)
161                .await;
162        }
163
164        let target_head_before = self.latest_branch_snapshot_id(&requested).await?;
165        let op = format!("load_jsonl:branch={}:mode={}", requested, mode.as_str());
166        let run = self.begin_run(&requested, Some(op.as_str())).await?;
167        let staged_result = match self
168            .load_direct_on_branch(Some(run.run_branch.as_str()), data, mode)
169            .await
170        {
171            Ok(result) => result,
172            Err(err) => {
173                let _ = self.fail_run(&run.run_id).await;
174                return Err(err);
175            }
176        };
177
178        let target_head_now = self.latest_branch_snapshot_id(&requested).await?;
179        if target_head_now.as_str() != target_head_before.as_str() {
180            let _ = self.fail_run(&run.run_id).await;
181            return Err(OmniError::manifest_conflict(format!(
182                "target branch '{}' advanced during transactional load; retry",
183                requested
184            )));
185        }
186
187        if let Err(err) = self.publish_run(&run.run_id).await {
188            let _ = self.fail_run(&run.run_id).await;
189            return Err(err);
190        }
191
192        Ok(staged_result)
193    }
194
195    pub async fn load_file(
196        &mut self,
197        branch: &str,
198        path: &str,
199        mode: LoadMode,
200    ) -> Result<LoadResult> {
201        let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
202        self.load(branch, &data, mode).await
203    }
204
205    async fn load_direct_on_branch(
206        &mut self,
207        branch: Option<&str>,
208        data: &str,
209        mode: LoadMode,
210    ) -> Result<LoadResult> {
211        let reader = BufReader::new(Cursor::new(data.as_bytes()));
212        load_jsonl_reader(self, branch, reader, mode).await
213    }
214}
215
216impl LoadMode {
217    pub fn as_str(self) -> &'static str {
218        match self {
219            LoadMode::Overwrite => "overwrite",
220            LoadMode::Append => "append",
221            LoadMode::Merge => "merge",
222        }
223    }
224}
225
226impl LoadResult {
227    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
228        let mut tables = self
229            .nodes_loaded
230            .iter()
231            .map(|(type_name, rows_loaded)| IngestTableResult {
232                table_key: format!("node:{type_name}"),
233                rows_loaded: *rows_loaded,
234            })
235            .chain(
236                self.edges_loaded
237                    .iter()
238                    .map(|(edge_name, rows_loaded)| IngestTableResult {
239                        table_key: format!("edge:{edge_name}"),
240                        rows_loaded: *rows_loaded,
241                    }),
242            )
243            .collect::<Vec<_>>();
244        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
245        tables
246    }
247}
248
249async fn load_jsonl_reader<R: BufRead>(
250    db: &mut Omnigraph,
251    branch: Option<&str>,
252    reader: R,
253    mode: LoadMode,
254) -> Result<LoadResult> {
255    let catalog = db.catalog().clone();
256
257    // Phase 1: Parse all lines, spool into per-type collections
258    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
259    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
260
261    for (line_num, line) in reader.lines().enumerate() {
262        let line = line?;
263        let line = line.trim();
264        if line.is_empty() {
265            continue;
266        }
267        let value: JsonValue = serde_json::from_str(line).map_err(|e| {
268            OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
269        })?;
270
271        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
272            if !catalog.node_types.contains_key(type_name) {
273                return Err(OmniError::manifest(format!(
274                    "line {}: unknown node type '{}'",
275                    line_num + 1,
276                    type_name
277                )));
278            }
279            let data = value
280                .get("data")
281                .cloned()
282                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
283            node_rows
284                .entry(type_name.to_string())
285                .or_default()
286                .push(data);
287        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
288            if catalog.lookup_edge_by_name(edge_name).is_none() {
289                return Err(OmniError::manifest(format!(
290                    "line {}: unknown edge type '{}'",
291                    line_num + 1,
292                    edge_name
293                )));
294            }
295            let from = value
296                .get("from")
297                .and_then(|v| v.as_str())
298                .ok_or_else(|| {
299                    OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
300                })?
301                .to_string();
302            let to = value
303                .get("to")
304                .and_then(|v| v.as_str())
305                .ok_or_else(|| {
306                    OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
307                })?
308                .to_string();
309            let data = value
310                .get("data")
311                .cloned()
312                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
313            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
314            edge_rows
315                .entry(canonical)
316                .or_default()
317                .push((from, to, data));
318        } else {
319            return Err(OmniError::manifest(format!(
320                "line {}: expected 'type' or 'edge' field",
321                line_num + 1
322            )));
323        }
324    }
325
326    // Phase 2: Build per-type RecordBatches and write to Lance
327
328    let mut updates = Vec::new();
329    let mut result = LoadResult::default();
330    let snapshot = db.snapshot_for_branch(branch).await?;
331
332    // Write nodes first (edges reference node IDs)
333    for (type_name, rows) in &node_rows {
334        let node_type = &catalog.node_types[type_name];
335        let batch = build_node_batch(node_type, rows)?;
336
337        // Validate value constraints before writing
338        validate_value_constraints(&batch, node_type)?;
339
340        let loaded_count = batch.num_rows();
341
342        let table_key = format!("node:{}", type_name);
343        snapshot
344            .entry(&table_key)
345            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
346
347        let (state, table_branch) =
348            write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
349
350        updates.push(crate::db::SubTableUpdate {
351            table_key,
352            table_version: state.version,
353            table_branch,
354            row_count: state.row_count,
355            version_metadata: state.version_metadata,
356        });
357        result.nodes_loaded.insert(type_name.clone(), loaded_count);
358    }
359
360    // Phase 2b: Validate edge referential integrity — every src/dst must
361    // reference an existing node ID in the appropriate type.
362    for (edge_name, rows) in &edge_rows {
363        let edge_type = &catalog.edge_types[edge_name];
364        let from_ids = collect_node_ids(
365            db,
366            branch,
367            &edge_type.from_type,
368            &node_rows,
369            &catalog,
370            &updates,
371        )
372        .await?;
373        let to_ids = collect_node_ids(
374            db,
375            branch,
376            &edge_type.to_type,
377            &node_rows,
378            &catalog,
379            &updates,
380        )
381        .await?;
382
383        for (i, (src, dst, _)) in rows.iter().enumerate() {
384            if !from_ids.contains(src.as_str()) {
385                return Err(OmniError::manifest(format!(
386                    "edge {} row {}: src '{}' not found in {}",
387                    edge_name,
388                    i + 1,
389                    src,
390                    edge_type.from_type
391                )));
392            }
393            if !to_ids.contains(dst.as_str()) {
394                return Err(OmniError::manifest(format!(
395                    "edge {} row {}: dst '{}' not found in {}",
396                    edge_name,
397                    i + 1,
398                    dst,
399                    edge_type.to_type
400                )));
401            }
402        }
403    }
404
405    // Write edges
406    for (edge_name, rows) in &edge_rows {
407        let edge_type = &catalog.edge_types[edge_name];
408        let batch = build_edge_batch(edge_type, rows)?;
409        let loaded_count = batch.num_rows();
410
411        let table_key = format!("edge:{}", edge_name);
412        snapshot
413            .entry(&table_key)
414            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
415
416        let (state, table_branch) =
417            write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
418
419        updates.push(crate::db::SubTableUpdate {
420            table_key,
421            table_version: state.version,
422            table_branch,
423            row_count: state.row_count,
424            version_metadata: state.version_metadata,
425        });
426        result.edges_loaded.insert(edge_name.clone(), loaded_count);
427    }
428
429    // Phase 3: Validate edge cardinality constraints (before commit — invalid
430    // data must not be committed). Opens edge sub-tables at their just-written
431    // versions, not through the snapshot (which still pins to pre-write state).
432    for (edge_name, _) in &edge_rows {
433        let table_key = format!("edge:{}", edge_name);
434        if let Some(update) = updates.iter().find(|u| u.table_key == table_key) {
435            validate_edge_cardinality(
436                db,
437                branch,
438                edge_name,
439                update.table_version,
440                update.table_branch.as_deref(),
441            )
442            .await?;
443        }
444    }
445
446    // Phase 4: Atomic manifest commit
447    db.commit_updates_on_branch(branch, &updates).await?;
448
449    Ok(result)
450}
451
452fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
453    let schema = node_type.arrow_schema.clone();
454
455    // Build id column: explicit id, @key value, or generated ULID.
456    let ids: Vec<String> = rows
457        .iter()
458        .map(|row| {
459            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
460            if let Some(key_prop) = node_type.key_property() {
461                let key_value = row
462                    .get(key_prop)
463                    .and_then(|v| v.as_str())
464                    .map(|s| s.to_string())
465                    .ok_or_else(|| {
466                        OmniError::manifest(format!(
467                            "node {} missing @key property '{}'",
468                            node_type.name, key_prop
469                        ))
470                    })?;
471                if let Some(explicit_id) = explicit_id {
472                    if explicit_id != key_value {
473                        return Err(OmniError::manifest(format!(
474                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
475                            node_type.name, explicit_id, key_prop, key_value
476                        )));
477                    }
478                }
479                Ok(key_value)
480            } else if let Some(explicit_id) = explicit_id {
481                Ok(explicit_id)
482            } else {
483                Ok(generate_id())
484            }
485        })
486        .collect::<Result<Vec<_>>>()?;
487
488    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
489    columns.push(Arc::new(StringArray::from(ids)));
490
491    // Build property columns (skip "id" field at index 0)
492    for field in schema.fields().iter().skip(1) {
493        if node_type.blob_properties.contains(field.name()) {
494            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
495            columns.push(col);
496        } else {
497            let col =
498                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
499            columns.push(col);
500        }
501    }
502
503    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
504}
505
506fn build_edge_batch(
507    edge_type: &omnigraph_compiler::catalog::EdgeType,
508    rows: &[(String, String, JsonValue)],
509) -> Result<RecordBatch> {
510    let schema = edge_type.arrow_schema.clone();
511
512    let ids: Vec<String> = rows
513        .iter()
514        .map(|(_, _, data)| {
515            data.get("id")
516                .and_then(|v| v.as_str())
517                .map(str::to_string)
518                .unwrap_or_else(generate_id)
519        })
520        .collect();
521    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
522    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
523
524    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
525    columns.push(Arc::new(StringArray::from(ids)));
526    columns.push(Arc::new(StringArray::from(srcs)));
527    columns.push(Arc::new(StringArray::from(dsts)));
528
529    // Build edge property columns (skip id, src, dst at indices 0-2)
530    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
531    for field in schema.fields().iter().skip(3) {
532        if edge_type.blob_properties.contains(field.name()) {
533            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
534            columns.push(col);
535        } else {
536            let col = build_column_from_json(
537                field.name(),
538                field.data_type(),
539                field.is_nullable(),
540                &data_values,
541            )?;
542            columns.push(col);
543        }
544    }
545
546    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
547}
548
549/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
550pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
551    if let Some(encoded) = value.strip_prefix("base64:") {
552        let bytes = base64::engine::general_purpose::STANDARD
553            .decode(encoded)
554            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
555        builder
556            .push_bytes(bytes)
557            .map_err(|e| OmniError::Lance(e.to_string()))
558    } else {
559        // Treat as URI (file://, s3://, gs://, or any other scheme)
560        builder
561            .push_uri(value)
562            .map_err(|e| OmniError::Lance(e.to_string()))
563    }
564}
565
566/// Build a blob column from JSON values using Lance BlobArrayBuilder.
567fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
568    let mut builder = BlobArrayBuilder::new(rows.len());
569    for row in rows {
570        match row.get(name) {
571            Some(JsonValue::String(s)) => {
572                append_blob_value(&mut builder, s)?;
573            }
574            Some(JsonValue::Null) | None if nullable => {
575                builder
576                    .push_null()
577                    .map_err(|e| OmniError::Lance(e.to_string()))?;
578            }
579            Some(JsonValue::Null) | None => {
580                return Err(OmniError::manifest(format!(
581                    "non-nullable blob property '{}' has null values",
582                    name
583                )));
584            }
585            _ => {
586                return Err(OmniError::manifest(format!(
587                    "blob property '{}' must be a URI string or base64: prefixed data",
588                    name
589                )));
590            }
591        }
592    }
593    builder
594        .finish()
595        .map_err(|e| OmniError::Lance(e.to_string()))
596}
597
598fn build_column_from_json(
599    name: &str,
600    data_type: &DataType,
601    nullable: bool,
602    rows: &[JsonValue],
603) -> Result<ArrayRef> {
604    let array: ArrayRef = match data_type {
605        DataType::Utf8 => {
606            let values: Vec<Option<String>> = rows
607                .iter()
608                .map(|row| {
609                    row.get(name)
610                        .and_then(|v| v.as_str())
611                        .map(|s| s.to_string())
612                })
613                .collect();
614            Arc::new(StringArray::from(values))
615        }
616        DataType::Int32 => {
617            let values: Vec<Option<i32>> = rows
618                .iter()
619                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
620                .collect();
621            Arc::new(Int32Array::from(values))
622        }
623        DataType::Int64 => {
624            let values: Vec<Option<i64>> = rows
625                .iter()
626                .map(|row| row.get(name).and_then(|v| v.as_i64()))
627                .collect();
628            Arc::new(Int64Array::from(values))
629        }
630        DataType::UInt32 => {
631            let values: Vec<Option<u32>> = rows
632                .iter()
633                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
634                .collect();
635            Arc::new(UInt32Array::from(values))
636        }
637        DataType::UInt64 => {
638            let values: Vec<Option<u64>> = rows
639                .iter()
640                .map(|row| row.get(name).and_then(|v| v.as_u64()))
641                .collect();
642            Arc::new(UInt64Array::from(values))
643        }
644        DataType::Float32 => {
645            let values: Vec<Option<f32>> = rows
646                .iter()
647                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
648                .collect();
649            Arc::new(Float32Array::from(values))
650        }
651        DataType::Float64 => {
652            let values: Vec<Option<f64>> = rows
653                .iter()
654                .map(|row| row.get(name).and_then(|v| v.as_f64()))
655                .collect();
656            Arc::new(Float64Array::from(values))
657        }
658        DataType::Boolean => {
659            let values: Vec<Option<bool>> = rows
660                .iter()
661                .map(|row| row.get(name).and_then(|v| v.as_bool()))
662                .collect();
663            Arc::new(BooleanArray::from(values))
664        }
665        DataType::Date32 => {
666            let mut values = Vec::with_capacity(rows.len());
667            for row in rows {
668                values.push(parse_date32_json_value(
669                    row.get(name).unwrap_or(&JsonValue::Null),
670                )?);
671            }
672            Arc::new(Date32Array::from(values))
673        }
674        DataType::Date64 => {
675            let mut values = Vec::with_capacity(rows.len());
676            for row in rows {
677                values.push(parse_date64_json_value(
678                    row.get(name).unwrap_or(&JsonValue::Null),
679                )?);
680            }
681            Arc::new(Date64Array::from(values))
682        }
683        DataType::List(field) => {
684            let mut builder = ListBuilder::with_capacity(
685                make_list_value_builder(field.data_type(), rows.len())?,
686                rows.len(),
687            )
688            .with_field(field.clone());
689            for row in rows {
690                let value = row.get(name).unwrap_or(&JsonValue::Null);
691                if value.is_null() {
692                    builder.append(false);
693                    continue;
694                }
695                let items = value.as_array().ok_or_else(|| {
696                    OmniError::manifest(format!(
697                        "list property '{}' expects a JSON array, got {}",
698                        name, value
699                    ))
700                })?;
701                for item in items {
702                    append_json_list_item(builder.values(), field.data_type(), item)?;
703                }
704                builder.append(true);
705            }
706            Arc::new(builder.finish())
707        }
708        DataType::FixedSizeList(child_field, dim) => {
709            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
710            let dim = *dim;
711            let mut builder = FixedSizeListBuilder::with_capacity(
712                Float32Builder::with_capacity(rows.len() * dim as usize),
713                dim,
714                rows.len(),
715            )
716            .with_field(child_field.clone());
717            for row in rows {
718                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
719                    if arr.len() != dim as usize {
720                        return Err(OmniError::manifest(format!(
721                            "vector property '{}' expects {} dimensions, got {}",
722                            name,
723                            dim,
724                            arr.len()
725                        )));
726                    }
727                    for val in arr {
728                        builder
729                            .values()
730                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
731                    }
732                    builder.append(true);
733                } else if nullable {
734                    for _ in 0..dim as usize {
735                        builder.values().append_null();
736                    }
737                    builder.append(false);
738                } else {
739                    return Err(OmniError::manifest(format!(
740                        "non-nullable vector property '{}' has null values",
741                        name
742                    )));
743                }
744            }
745            Arc::new(builder.finish())
746        }
747        _ => {
748            // Unsupported type: fill with nulls
749            let values: Vec<Option<&str>> = vec![None; rows.len()];
750            Arc::new(StringArray::from(values))
751        }
752    };
753
754    if !nullable && array.null_count() > 0 {
755        return Err(OmniError::manifest(format!(
756            "non-nullable property '{}' has null or invalid values",
757            name
758        )));
759    }
760
761    Ok(array)
762}
763
764fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
765    Ok(match data_type {
766        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
767        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
768        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
769        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
770        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
771        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
772        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
773        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
774        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
775        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
776        other => {
777            return Err(OmniError::manifest(format!(
778                "unsupported list element data type {:?}",
779                other
780            )));
781        }
782    })
783}
784
785fn append_json_list_item(
786    builder: &mut Box<dyn ArrayBuilder>,
787    data_type: &DataType,
788    value: &JsonValue,
789) -> Result<()> {
790    match data_type {
791        DataType::Utf8 => {
792            let builder = builder
793                .as_any_mut()
794                .downcast_mut::<StringBuilder>()
795                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
796            if let Some(value) = value.as_str() {
797                builder.append_value(value);
798            } else {
799                builder.append_null();
800            }
801        }
802        DataType::Boolean => {
803            let builder = builder
804                .as_any_mut()
805                .downcast_mut::<BooleanBuilder>()
806                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
807            if let Some(value) = value.as_bool() {
808                builder.append_value(value);
809            } else {
810                builder.append_null();
811            }
812        }
813        DataType::Int32 => {
814            let builder = builder
815                .as_any_mut()
816                .downcast_mut::<Int32Builder>()
817                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
818            if let Some(value) = value.as_i64() {
819                let value = i32::try_from(value).map_err(|_| {
820                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
821                })?;
822                builder.append_value(value);
823            } else {
824                builder.append_null();
825            }
826        }
827        DataType::Int64 => {
828            let builder = builder
829                .as_any_mut()
830                .downcast_mut::<Int64Builder>()
831                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
832            if let Some(value) = value.as_i64() {
833                builder.append_value(value);
834            } else {
835                builder.append_null();
836            }
837        }
838        DataType::UInt32 => {
839            let builder = builder
840                .as_any_mut()
841                .downcast_mut::<UInt32Builder>()
842                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
843            if let Some(value) = value.as_u64() {
844                let value = u32::try_from(value).map_err(|_| {
845                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
846                })?;
847                builder.append_value(value);
848            } else {
849                builder.append_null();
850            }
851        }
852        DataType::UInt64 => {
853            let builder = builder
854                .as_any_mut()
855                .downcast_mut::<UInt64Builder>()
856                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
857            if let Some(value) = value.as_u64() {
858                builder.append_value(value);
859            } else {
860                builder.append_null();
861            }
862        }
863        DataType::Float32 => {
864            let builder = builder
865                .as_any_mut()
866                .downcast_mut::<Float32Builder>()
867                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
868            if let Some(value) = value.as_f64() {
869                builder.append_value(value as f32);
870            } else {
871                builder.append_null();
872            }
873        }
874        DataType::Float64 => {
875            let builder = builder
876                .as_any_mut()
877                .downcast_mut::<Float64Builder>()
878                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
879            if let Some(value) = value.as_f64() {
880                builder.append_value(value);
881            } else {
882                builder.append_null();
883            }
884        }
885        DataType::Date32 => {
886            let builder = builder
887                .as_any_mut()
888                .downcast_mut::<Date32Builder>()
889                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
890            if let Some(value) = parse_date32_json_value(value)? {
891                builder.append_value(value);
892            } else {
893                builder.append_null();
894            }
895        }
896        DataType::Date64 => {
897            let builder = builder
898                .as_any_mut()
899                .downcast_mut::<Date64Builder>()
900                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
901            if let Some(value) = parse_date64_json_value(value)? {
902                builder.append_value(value);
903            } else {
904                builder.append_null();
905            }
906        }
907        other => {
908            return Err(OmniError::manifest(format!(
909                "unsupported list element data type {:?}",
910                other
911            )));
912        }
913    }
914
915    Ok(())
916}
917
918fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
919    if value.is_null() {
920        return Ok(None);
921    }
922    if let Some(days) = value.as_i64() {
923        let days = i32::try_from(days)
924            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
925        return Ok(Some(days));
926    }
927    if let Some(days) = value.as_u64() {
928        let days = i32::try_from(days)
929            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
930        return Ok(Some(days));
931    }
932    if let Some(value) = value.as_str() {
933        return Ok(Some(parse_date32_literal(value)?));
934    }
935    Ok(None)
936}
937
938fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
939    if value.is_null() {
940        return Ok(None);
941    }
942    if let Some(ms) = value.as_i64() {
943        return Ok(Some(ms));
944    }
945    if let Some(ms) = value.as_u64() {
946        let ms = i64::try_from(ms)
947            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
948        return Ok(Some(ms));
949    }
950    if let Some(value) = value.as_str() {
951        return Ok(Some(parse_date64_literal(value)?));
952    }
953    Ok(None)
954}
955
956/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
957async fn write_batch_to_dataset(
958    db: &Omnigraph,
959    branch: Option<&str>,
960    table_key: &str,
961    batch: RecordBatch,
962    mode: LoadMode,
963) -> Result<(crate::table_store::TableState, Option<String>)> {
964    let (mut ds, full_path, table_branch) =
965        db.open_for_mutation_on_branch(branch, table_key).await?;
966    let table_store = db.table_store();
967
968    match mode {
969        LoadMode::Overwrite => {
970            let state = table_store
971                .overwrite_batch(&full_path, &mut ds, batch)
972                .await?;
973            Ok((state, table_branch))
974        }
975        LoadMode::Append => {
976            let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
977            Ok((state, table_branch))
978        }
979        LoadMode::Merge => {
980            let state = table_store
981                .merge_insert_batch(
982                    &full_path,
983                    ds,
984                    batch,
985                    vec!["id".to_string()],
986                    lance::dataset::WhenMatched::UpdateAll,
987                    lance::dataset::WhenNotMatched::InsertAll,
988                )
989                .await?;
990            Ok((state, table_branch))
991        }
992    }
993}
994
995fn generate_id() -> String {
996    ulid::Ulid::new().to_string()
997}
998
999pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1000    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1001    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1002        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1003    let out = casted
1004        .as_any()
1005        .downcast_ref::<Date32Array>()
1006        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1007    if out.is_null(0) {
1008        return Err(OmniError::manifest(format!(
1009            "invalid Date literal '{}'",
1010            value
1011        )));
1012    }
1013    Ok(out.value(0))
1014}
1015
1016pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1017    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1018    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1019        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1020    let out = casted
1021        .as_any()
1022        .downcast_ref::<Date64Array>()
1023        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1024    if out.is_null(0) {
1025        return Err(OmniError::manifest(format!(
1026            "invalid DateTime literal '{}'",
1027            value
1028        )));
1029    }
1030    Ok(out.value(0))
1031}
1032
1033// ─── Value constraint validation ─────────────────────────────────────────────
1034
1035pub(crate) fn validate_value_constraints(
1036    batch: &RecordBatch,
1037    node_type: &omnigraph_compiler::catalog::NodeType,
1038) -> Result<()> {
1039    use arrow_array::Array;
1040
1041    // Range constraints
1042    for rc in &node_type.range_constraints {
1043        let Some(col) = batch.column_by_name(&rc.property) else {
1044            continue;
1045        };
1046        for row in 0..batch.num_rows() {
1047            if col.is_null(row) {
1048                continue;
1049            }
1050            let value = extract_numeric_value(col, row);
1051            if let Some(val) = value {
1052                if val.is_nan() {
1053                    return Err(OmniError::manifest(format!(
1054                        "@range violation on {}.{}: value is NaN",
1055                        node_type.name, rc.property
1056                    )));
1057                }
1058                if let Some(ref min) = rc.min {
1059                    let min_f = literal_value_to_f64(min);
1060                    if val < min_f {
1061                        return Err(OmniError::manifest(format!(
1062                            "@range violation on {}.{}: value {} < min {}",
1063                            node_type.name, rc.property, val, min_f
1064                        )));
1065                    }
1066                }
1067                if let Some(ref max) = rc.max {
1068                    let max_f = literal_value_to_f64(max);
1069                    if val > max_f {
1070                        return Err(OmniError::manifest(format!(
1071                            "@range violation on {}.{}: value {} > max {}",
1072                            node_type.name, rc.property, val, max_f
1073                        )));
1074                    }
1075                }
1076            }
1077        }
1078    }
1079
1080    // Check constraints (regex)
1081    for cc in &node_type.check_constraints {
1082        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1083            OmniError::manifest(format!(
1084                "@check on {}.{} has invalid regex '{}': {}",
1085                node_type.name, cc.property, cc.pattern, e
1086            ))
1087        })?;
1088        let Some(col) = batch.column_by_name(&cc.property) else {
1089            continue;
1090        };
1091        let str_col = col.as_any().downcast_ref::<StringArray>();
1092        if let Some(str_col) = str_col {
1093            for row in 0..str_col.len() {
1094                if str_col.is_null(row) {
1095                    continue;
1096                }
1097                let val = str_col.value(row);
1098                if !re.is_match(val) {
1099                    return Err(OmniError::manifest(format!(
1100                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1101                        node_type.name, cc.property, val, cc.pattern
1102                    )));
1103                }
1104            }
1105        }
1106    }
1107
1108    Ok(())
1109}
1110
1111fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1112    use arrow_array::{
1113        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1114    };
1115    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1116        return Some(a.value(row) as f64);
1117    }
1118    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1119        return Some(a.value(row) as f64);
1120    }
1121    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1122        return Some(a.value(row) as f64);
1123    }
1124    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1125        return Some(a.value(row) as f64);
1126    }
1127    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1128        return Some(a.value(row) as f64);
1129    }
1130    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1131        return Some(a.value(row));
1132    }
1133    None
1134}
1135
1136fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1137    use omnigraph_compiler::catalog::LiteralValue;
1138    match v {
1139        LiteralValue::Integer(n) => *n as f64,
1140        LiteralValue::Float(f) => *f,
1141    }
1142}
1143
1144// ─── Edge cardinality validation ─────────────────────────────────────────────
1145
1146async fn validate_edge_cardinality(
1147    db: &crate::db::Omnigraph,
1148    branch: Option<&str>,
1149    edge_name: &str,
1150    written_version: u64,
1151    written_branch: Option<&str>,
1152) -> Result<()> {
1153    use arrow_array::Array;
1154    let catalog = db.catalog();
1155    let edge_type = &catalog.edge_types[edge_name];
1156    if edge_type.cardinality.is_default() {
1157        return Ok(());
1158    }
1159
1160    // Open edge sub-table at the just-written version, not the snapshot's
1161    // (the snapshot still pins to the pre-write version).
1162    let snapshot = db.snapshot_for_branch(branch).await?;
1163    let table_key = format!("edge:{}", edge_name);
1164    let entry = snapshot
1165        .entry(&table_key)
1166        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1167    let ds = db
1168        .open_dataset_at_state(
1169            &entry.table_path,
1170            written_branch.or(entry.table_branch.as_deref()),
1171            written_version,
1172        )
1173        .await?;
1174
1175    // Scan src column, count per source
1176    let batches = db
1177        .table_store()
1178        .scan(&ds, Some(&["src"]), None, None)
1179        .await?;
1180
1181    let mut counts: HashMap<String, u32> = HashMap::new();
1182    for batch in &batches {
1183        let srcs = batch
1184            .column_by_name("src")
1185            .unwrap()
1186            .as_any()
1187            .downcast_ref::<StringArray>()
1188            .unwrap();
1189        for i in 0..srcs.len() {
1190            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1191        }
1192    }
1193
1194    let card = &edge_type.cardinality;
1195    for (src, count) in &counts {
1196        if let Some(max) = card.max {
1197            if *count > max {
1198                return Err(OmniError::manifest(format!(
1199                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1200                    edge_name, src, count, max
1201                )));
1202            }
1203        }
1204        if *count < card.min {
1205            return Err(OmniError::manifest(format!(
1206                "@card violation on edge {}: source '{}' has {} edges (min {})",
1207                edge_name, src, count, card.min
1208            )));
1209        }
1210    }
1211
1212    Ok(())
1213}
1214
1215/// Collect all valid node IDs for a given type. Union of:
1216/// - IDs from the just-loaded batch (in memory, from node_rows)
1217/// - IDs from the sub-table at the just-written version (if it was updated)
1218/// - IDs from the sub-table at the snapshot-pinned version (if it was not updated)
1219async fn collect_node_ids(
1220    db: &Omnigraph,
1221    branch: Option<&str>,
1222    type_name: &str,
1223    node_rows: &HashMap<String, Vec<JsonValue>>,
1224    catalog: &omnigraph_compiler::catalog::Catalog,
1225    updates: &[crate::db::SubTableUpdate],
1226) -> Result<HashSet<String>> {
1227    let mut ids = HashSet::new();
1228
1229    // IDs from the in-memory batch (just loaded in this operation)
1230    if let Some(rows) = node_rows.get(type_name) {
1231        if let Some(node_type) = catalog.node_types.get(type_name) {
1232            if let Some(key_prop) = node_type.key_property() {
1233                for row in rows {
1234                    if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1235                        ids.insert(id.to_string());
1236                    }
1237                }
1238            }
1239        }
1240    }
1241
1242    // IDs from the Lance sub-table
1243    let table_key = format!("node:{}", type_name);
1244    let snapshot = db.snapshot_for_branch(branch).await?;
1245    let Some(entry) = snapshot.entry(&table_key) else {
1246        return Ok(ids);
1247    };
1248    // Use the just-written version if this type was updated, else snapshot version
1249    let updated = updates
1250        .iter()
1251        .find(|u| u.table_key == table_key)
1252        .map(|u| (u.table_version, u.table_branch.as_deref()));
1253    let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1254    let ds = db
1255        .open_dataset_at_state(&entry.table_path, branch, version)
1256        .await?;
1257
1258    let batches = db
1259        .table_store()
1260        .scan(&ds, Some(&["id"]), None, None)
1261        .await?;
1262
1263    for batch in &batches {
1264        let id_col = batch
1265            .column_by_name("id")
1266            .unwrap()
1267            .as_any()
1268            .downcast_ref::<StringArray>()
1269            .unwrap();
1270        for i in 0..batch.num_rows() {
1271            ids.insert(id_col.value(i).to_string());
1272        }
1273    }
1274
1275    Ok(ids)
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use super::*;
1281    use crate::db::Omnigraph;
1282    use arrow_array::Array;
1283    use futures::TryStreamExt;
1284    use std::collections::HashMap;
1285
1286    const TEST_SCHEMA: &str = r#"
1287node Person {
1288    name: String @key
1289    age: I32?
1290}
1291node Company {
1292    name: String @key
1293}
1294edge Knows: Person -> Person {
1295    since: Date?
1296}
1297edge WorksAt: Person -> Company
1298"#;
1299
1300    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1301{"type": "Person", "data": {"name": "Bob", "age": 25}}
1302{"type": "Company", "data": {"name": "Acme"}}
1303{"edge": "Knows", "from": "Alice", "to": "Bob"}
1304{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1305"#;
1306
1307    #[tokio::test]
1308    async fn test_load_creates_data() {
1309        let dir = tempfile::tempdir().unwrap();
1310        let uri = dir.path().to_str().unwrap();
1311        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1312
1313        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1314            .await
1315            .unwrap();
1316
1317        assert_eq!(result.nodes_loaded["Person"], 2);
1318        assert_eq!(result.nodes_loaded["Company"], 1);
1319        assert_eq!(result.edges_loaded["Knows"], 1);
1320        assert_eq!(result.edges_loaded["WorksAt"], 1);
1321    }
1322
1323    #[tokio::test]
1324    async fn test_load_data_readable_via_lance() {
1325        let dir = tempfile::tempdir().unwrap();
1326        let uri = dir.path().to_str().unwrap();
1327        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1328        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1329            .await
1330            .unwrap();
1331
1332        // Read back via snapshot
1333        let snap = db.snapshot();
1334        let person_ds = snap.open("node:Person").await.unwrap();
1335
1336        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1337
1338        // Verify data
1339        let batches: Vec<RecordBatch> = person_ds
1340            .scan()
1341            .try_into_stream()
1342            .await
1343            .unwrap()
1344            .try_collect()
1345            .await
1346            .unwrap();
1347
1348        let batch = &batches[0];
1349        let ids = batch
1350            .column_by_name("id")
1351            .unwrap()
1352            .as_any()
1353            .downcast_ref::<StringArray>()
1354            .unwrap();
1355        // @key=name, so ids should be "Alice" and "Bob"
1356        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1357        assert!(id_values.contains(&"Alice"));
1358        assert!(id_values.contains(&"Bob"));
1359    }
1360
1361    #[tokio::test]
1362    async fn test_load_edges_reference_node_keys() {
1363        let dir = tempfile::tempdir().unwrap();
1364        let uri = dir.path().to_str().unwrap();
1365        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1366        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1367            .await
1368            .unwrap();
1369
1370        let snap = db.snapshot();
1371        let knows_ds = snap.open("edge:Knows").await.unwrap();
1372
1373        let batches: Vec<RecordBatch> = knows_ds
1374            .scan()
1375            .try_into_stream()
1376            .await
1377            .unwrap()
1378            .try_collect()
1379            .await
1380            .unwrap();
1381
1382        let batch = &batches[0];
1383        let srcs = batch
1384            .column_by_name("src")
1385            .unwrap()
1386            .as_any()
1387            .downcast_ref::<StringArray>()
1388            .unwrap();
1389        let dsts = batch
1390            .column_by_name("dst")
1391            .unwrap()
1392            .as_any()
1393            .downcast_ref::<StringArray>()
1394            .unwrap();
1395
1396        assert_eq!(srcs.value(0), "Alice");
1397        assert_eq!(dsts.value(0), "Bob");
1398    }
1399
1400    #[tokio::test]
1401    async fn test_load_manifest_version_advances() {
1402        let dir = tempfile::tempdir().unwrap();
1403        let uri = dir.path().to_str().unwrap();
1404        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1405        let v1 = db.version();
1406
1407        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1408            .await
1409            .unwrap();
1410
1411        assert!(db.version() > v1);
1412    }
1413
1414    #[tokio::test]
1415    async fn test_load_append_adds_rows() {
1416        let dir = tempfile::tempdir().unwrap();
1417        let uri = dir.path().to_str().unwrap();
1418        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1419
1420        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1421        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1422
1423        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1424            .await
1425            .unwrap();
1426        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1427
1428        let snap = db.snapshot();
1429        let person_ds = snap.open("node:Person").await.unwrap();
1430        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1431    }
1432
1433    #[tokio::test]
1434    async fn test_load_unknown_type_rejected() {
1435        let dir = tempfile::tempdir().unwrap();
1436        let uri = dir.path().to_str().unwrap();
1437        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1438
1439        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1440        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1441        assert!(result.is_err());
1442    }
1443
1444    #[tokio::test]
1445    async fn test_ingest_creates_branch_and_reports_tables() {
1446        let dir = tempfile::tempdir().unwrap();
1447        let uri = dir.path().to_str().unwrap();
1448        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1449
1450        let result = db
1451            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1452            .await
1453            .unwrap();
1454
1455        assert_eq!(result.branch, "feature");
1456        assert_eq!(result.base_branch, "main");
1457        assert!(result.branch_created);
1458        assert_eq!(result.mode, LoadMode::Overwrite);
1459        assert_eq!(
1460            result.tables,
1461            vec![
1462                IngestTableResult {
1463                    table_key: "edge:Knows".to_string(),
1464                    rows_loaded: 1
1465                },
1466                IngestTableResult {
1467                    table_key: "edge:WorksAt".to_string(),
1468                    rows_loaded: 1
1469                },
1470                IngestTableResult {
1471                    table_key: "node:Company".to_string(),
1472                    rows_loaded: 1
1473                },
1474                IngestTableResult {
1475                    table_key: "node:Person".to_string(),
1476                    rows_loaded: 2
1477                },
1478            ]
1479        );
1480        assert!(
1481            db.branch_list()
1482                .await
1483                .unwrap()
1484                .contains(&"feature".to_string())
1485        );
1486    }
1487
1488    #[tokio::test]
1489    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1490        let dir = tempfile::tempdir().unwrap();
1491        let uri = dir.path().to_str().unwrap();
1492        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1493        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1494            .await
1495            .unwrap();
1496        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1497            .await
1498            .unwrap();
1499
1500        let result = db
1501            .ingest(
1502                "feature",
1503                Some("missing-base"),
1504                r#"{"type":"Person","data":{"name":"Bob","age":26}}
1505{"type":"Person","data":{"name":"Eve","age":31}}"#,
1506                LoadMode::Merge,
1507            )
1508            .await
1509            .unwrap();
1510
1511        assert_eq!(result.branch, "feature");
1512        assert_eq!(result.base_branch, "missing-base");
1513        assert!(!result.branch_created);
1514        assert_eq!(result.mode, LoadMode::Merge);
1515        assert_eq!(
1516            result.tables,
1517            vec![IngestTableResult {
1518                table_key: "node:Person".to_string(),
1519                rows_loaded: 2
1520            }]
1521        );
1522
1523        let snap = db
1524            .snapshot_of(crate::db::ReadTarget::branch("feature"))
1525            .await
1526            .unwrap();
1527        let person_ds = snap.open("node:Person").await.unwrap();
1528        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
1529
1530        let batches: Vec<RecordBatch> = person_ds
1531            .scan()
1532            .try_into_stream()
1533            .await
1534            .unwrap()
1535            .try_collect()
1536            .await
1537            .unwrap();
1538        let mut ages_by_id = HashMap::new();
1539        for batch in &batches {
1540            let ids = batch
1541                .column_by_name("id")
1542                .unwrap()
1543                .as_any()
1544                .downcast_ref::<StringArray>()
1545                .unwrap();
1546            let ages = batch
1547                .column_by_name("age")
1548                .unwrap()
1549                .as_any()
1550                .downcast_ref::<Int32Array>()
1551                .unwrap();
1552            for idx in 0..ids.len() {
1553                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
1554            }
1555        }
1556
1557        assert_eq!(ages_by_id.get("Bob"), Some(&26));
1558        assert_eq!(ages_by_id.get("Eve"), Some(&31));
1559        assert_eq!(ages_by_id.get("Alice"), Some(&30));
1560    }
1561
1562    #[tokio::test]
1563    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
1564        let dir = tempfile::tempdir().unwrap();
1565        let uri = dir.path().to_str().unwrap();
1566        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1567
1568        db.ingest_as(
1569            "feature",
1570            Some("main"),
1571            TEST_DATA,
1572            LoadMode::Overwrite,
1573            Some("act-andrew"),
1574        )
1575        .await
1576        .unwrap();
1577
1578        let head = db
1579            .list_commits(Some("feature"))
1580            .await
1581            .unwrap()
1582            .into_iter()
1583            .last()
1584            .unwrap();
1585        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
1586    }
1587
1588    #[test]
1589    fn test_range_constraint_rejects_nan() {
1590        use arrow_array::{Float64Array, RecordBatch, StringArray};
1591        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
1592        use std::sync::Arc;
1593
1594        let schema = Arc::new(arrow_schema::Schema::new(vec![
1595            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
1596            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
1597        ]));
1598
1599        let batch = RecordBatch::try_new(
1600            schema.clone(),
1601            vec![
1602                Arc::new(StringArray::from(vec!["bad"])),
1603                Arc::new(Float64Array::from(vec![f64::NAN])),
1604            ],
1605        )
1606        .unwrap();
1607
1608        let node_type = NodeType {
1609            name: "Test".to_string(),
1610            implements: vec![],
1611            properties: Default::default(),
1612            key: None,
1613            unique_constraints: vec![],
1614            indices: vec![],
1615            range_constraints: vec![RangeConstraint {
1616                property: "score".to_string(),
1617                min: Some(LiteralValue::Float(0.0)),
1618                max: Some(LiteralValue::Float(1.0)),
1619            }],
1620            check_constraints: vec![],
1621            embed_sources: Default::default(),
1622            blob_properties: Default::default(),
1623            arrow_schema: schema,
1624        };
1625
1626        let result = validate_value_constraints(&batch, &node_type);
1627        assert!(result.is_err(), "expected NaN to be rejected");
1628        let err = result.unwrap_err().to_string();
1629        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
1630    }
1631}