Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26/// Result of a load operation.
27#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29    pub nodes_loaded: HashMap<String, usize>,
30    pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35    pub table_key: String,
36    pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41    pub branch: String,
42    pub base_branch: String,
43    pub branch_created: bool,
44    pub mode: LoadMode,
45    pub tables: Vec<IngestTableResult>,
46}
47
48/// Load mode for data ingestion.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52    /// Overwrite existing data.
53    Overwrite,
54    /// Append to existing data.
55    Append,
56    /// Merge by `id` key (upsert).
57    Merge,
58}
59
60/// Load JSONL data into an Omnigraph database.
61pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62    let current_branch = db.active_branch().await;
63    let branch = current_branch.as_deref().unwrap_or("main");
64    db.load(branch, data, mode).await
65}
66
67/// Load JSONL data from a file path.
68pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69    let current_branch = db.active_branch().await;
70    let branch = current_branch.as_deref().unwrap_or("main");
71    db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75    pub async fn ingest(
76        &self,
77        branch: &str,
78        from: Option<&str>,
79        data: &str,
80        mode: LoadMode,
81    ) -> Result<IngestResult> {
82        self.ingest_as(branch, from, data, mode, None).await
83    }
84
85    pub async fn ingest_as(
86        &self,
87        branch: &str,
88        from: Option<&str>,
89        data: &str,
90        mode: LoadMode,
91        actor_id: Option<&str>,
92    ) -> Result<IngestResult> {
93        self.ingest_with_current_actor(branch, from, data, mode, actor_id)
94            .await
95    }
96
97    pub async fn ingest_file(
98        &self,
99        branch: &str,
100        from: Option<&str>,
101        path: &str,
102        mode: LoadMode,
103    ) -> Result<IngestResult> {
104        self.ingest_file_as(branch, from, path, mode, None).await
105    }
106
107    pub async fn ingest_file_as(
108        &self,
109        branch: &str,
110        from: Option<&str>,
111        path: &str,
112        mode: LoadMode,
113        actor_id: Option<&str>,
114    ) -> Result<IngestResult> {
115        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
116        self.ingest_as(branch, from, &data, mode, actor_id).await
117    }
118
119    async fn ingest_with_current_actor(
120        &self,
121        branch: &str,
122        from: Option<&str>,
123        data: &str,
124        mode: LoadMode,
125        actor_id: Option<&str>,
126    ) -> Result<IngestResult> {
127        self.ensure_schema_state_valid().await?;
128        let target_branch =
129            Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
130        let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
131            .unwrap_or_else(|| "main".to_string());
132        let branch_created = !self
133            .branch_list()
134            .await?
135            .iter()
136            .any(|name| name == &target_branch);
137        if branch_created {
138            self.branch_create_from(crate::db::ReadTarget::branch(&base_branch), &target_branch)
139                .await?;
140        }
141
142        let result = self.load_as(&target_branch, data, mode, actor_id).await?;
143        Ok(IngestResult {
144            branch: target_branch,
145            base_branch,
146            branch_created,
147            mode,
148            tables: result.to_ingest_tables(),
149        })
150    }
151
152    pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
153        self.load_as(branch, data, mode, None).await
154    }
155
156    pub async fn load_as(
157        &self,
158        branch: &str,
159        data: &str,
160        mode: LoadMode,
161        actor_id: Option<&str>,
162    ) -> Result<LoadResult> {
163        self.ensure_schema_state_valid().await?;
164        // Reject internal `__run__*` / system-prefixed branches at the
165        // public write boundary. Direct-publish paths assert this
166        // explicitly so a caller can't write to legacy or system
167        // staging branches by passing the prefix verbatim.
168        crate::db::ensure_public_branch_ref(branch, "load")?;
169        // Branch convention: `None` represents `main`. Re-normalizing to
170        // `Some("main")` here would route the publisher commit through a
171        // separate coordinator (the cross-branch path in
172        // `commit_prepared_updates_on_branch_with_expected`) and leave
173        // `self.coordinator` with a stale manifest snapshot.
174        let requested = Self::normalize_branch_name(branch)?;
175        // Direct-to-target writes: no Run state machine, no `__run__` staging
176        // branch. Cross-table OCC is enforced by the publisher's
177        // `expected_table_versions` CAS inside `load_jsonl_reader`.
178        self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
179            .await
180    }
181
182    pub async fn load_file(
183        &self,
184        branch: &str,
185        path: &str,
186        mode: LoadMode,
187    ) -> Result<LoadResult> {
188        let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
189        self.load(branch, &data, mode).await
190    }
191
192    async fn load_direct_on_branch(
193        &self,
194        branch: Option<&str>,
195        data: &str,
196        mode: LoadMode,
197        actor_id: Option<&str>,
198    ) -> Result<LoadResult> {
199        let reader = BufReader::new(Cursor::new(data.as_bytes()));
200        load_jsonl_reader(self, branch, reader, mode, actor_id).await
201    }
202}
203
204impl LoadMode {
205    pub fn as_str(self) -> &'static str {
206        match self {
207            LoadMode::Overwrite => "overwrite",
208            LoadMode::Append => "append",
209            LoadMode::Merge => "merge",
210        }
211    }
212}
213
214impl LoadResult {
215    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
216        let mut tables = self
217            .nodes_loaded
218            .iter()
219            .map(|(type_name, rows_loaded)| IngestTableResult {
220                table_key: format!("node:{type_name}"),
221                rows_loaded: *rows_loaded,
222            })
223            .chain(
224                self.edges_loaded
225                    .iter()
226                    .map(|(edge_name, rows_loaded)| IngestTableResult {
227                        table_key: format!("edge:{edge_name}"),
228                        rows_loaded: *rows_loaded,
229                    }),
230            )
231            .collect::<Vec<_>>();
232        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
233        tables
234    }
235}
236
237async fn load_jsonl_reader<R: BufRead>(
238    db: &Omnigraph,
239    branch: Option<&str>,
240    reader: R,
241    mode: LoadMode,
242    actor_id: Option<&str>,
243) -> Result<LoadResult> {
244    let catalog = db.catalog().clone();
245
246    // Phase 1: Parse all lines, spool into per-type collections
247    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
248    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
249
250    for (line_num, line) in reader.lines().enumerate() {
251        let line = line?;
252        let line = line.trim();
253        if line.is_empty() {
254            continue;
255        }
256        let value: JsonValue = serde_json::from_str(line).map_err(|e| {
257            OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
258        })?;
259
260        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
261            if !catalog.node_types.contains_key(type_name) {
262                return Err(OmniError::manifest(format!(
263                    "line {}: unknown node type '{}'",
264                    line_num + 1,
265                    type_name
266                )));
267            }
268            let data = value
269                .get("data")
270                .cloned()
271                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
272            node_rows
273                .entry(type_name.to_string())
274                .or_default()
275                .push(data);
276        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
277            if catalog.lookup_edge_by_name(edge_name).is_none() {
278                return Err(OmniError::manifest(format!(
279                    "line {}: unknown edge type '{}'",
280                    line_num + 1,
281                    edge_name
282                )));
283            }
284            let from = value
285                .get("from")
286                .and_then(|v| v.as_str())
287                .ok_or_else(|| {
288                    OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
289                })?
290                .to_string();
291            let to = value
292                .get("to")
293                .and_then(|v| v.as_str())
294                .ok_or_else(|| {
295                    OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
296                })?
297                .to_string();
298            let data = value
299                .get("data")
300                .cloned()
301                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
302            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
303            edge_rows
304                .entry(canonical)
305                .or_default()
306                .push((from, to, data));
307        } else {
308            return Err(OmniError::manifest(format!(
309                "line {}: expected 'type' or 'edge' field",
310                line_num + 1
311            )));
312        }
313    }
314
315    // Phase 2: Build per-type RecordBatches and accumulate into the
316    // staging pipeline. For Append/Merge, batches go into an in-memory
317    // accumulator and a single `stage_*` + `commit_staged` per touched
318    // table runs at end-of-load — a mid-load failure (RI / cardinality
319    // violation) leaves Lance HEAD untouched. For Overwrite, the legacy
320    // inline-commit path is preserved (truncate+append doesn't fit the
321    // staged shape cleanly, and overwrite has no in-flight read-your-writes
322    // requirement).
323
324    let mut result = LoadResult::default();
325    let snapshot = db.snapshot_for_branch(branch).await?;
326    let use_staging = !matches!(mode, LoadMode::Overwrite);
327    let mut staging = MutationStaging::default();
328    let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
329    let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
330    let pending_mode = match mode {
331        LoadMode::Merge => PendingMode::Merge,
332        // Append-mode loads accumulate as Append. Edge tables (no @key)
333        // and no-key node tables stay safe on the stage_append path. The
334        // Merge mode applies dedupe-by-id; Append assumes unique inputs.
335        LoadMode::Append => PendingMode::Append,
336        LoadMode::Overwrite => PendingMode::Append, // unused
337    };
338    // Map LoadMode to MutationOpKind for the version-check policy.
339    // Append/Merge skip the strict pre-stage check (concurrency-safe
340    // under the per-(table, branch) queue + publisher CAS); Overwrite
341    // uses the strict check because it truncates and replaces the
342    // dataset — concurrent advances change what "replace" means.
343    let load_op_kind = match mode {
344        LoadMode::Append => crate::db::MutationOpKind::Insert,
345        LoadMode::Merge => crate::db::MutationOpKind::Merge,
346        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
347    };
348
349    // Phase 2a: build and validate every node batch up front. Cheap and
350    // synchronous — surfaces validation errors before any S3 traffic.
351    let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
352        Vec::with_capacity(node_rows.len());
353    for (type_name, rows) in &node_rows {
354        let node_type = &catalog.node_types[type_name];
355        let batch = build_node_batch(node_type, rows)?;
356        validate_value_constraints(&batch, node_type)?;
357        validate_enum_constraints(&batch, &node_type.properties, type_name)?;
358        let unique_props = unique_property_names_for_node(node_type);
359        if !unique_props.is_empty() {
360            enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
361        }
362        let loaded_count = batch.num_rows();
363        let table_key = format!("node:{}", type_name);
364        let entry = snapshot
365            .entry(&table_key)
366            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
367        if !use_staging {
368            overwrite_expected.insert(table_key.clone(), entry.table_version);
369        }
370        prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
371    }
372
373    // Phase 2b: write every node type. Append/Merge → in-memory
374    // accumulator. Overwrite → concurrent inline-commit (legacy path).
375    if use_staging {
376        for (type_name, table_key, batch, loaded_count) in prepared_nodes {
377            let (ds, full_path, table_branch) = db
378                .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
379                .await?;
380            let expected_version = ds.version().version;
381            staging.ensure_path(
382                &table_key,
383                full_path,
384                table_branch,
385                expected_version,
386                load_op_kind,
387            );
388            let schema = batch.schema();
389            staging.append_batch(&table_key, schema, pending_mode, batch)?;
390            result.nodes_loaded.insert(type_name, loaded_count);
391        }
392    } else {
393        let node_write_results =
394            write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
395        for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
396            overwrite_updates.push(crate::db::SubTableUpdate {
397                table_key,
398                table_version: state.version,
399                table_branch,
400                row_count: state.row_count,
401                version_metadata: state.version_metadata,
402            });
403            result.nodes_loaded.insert(type_name, loaded_count);
404        }
405    }
406
407    // Phase 2c: Validate edge referential integrity — every src/dst must
408    // reference an existing node ID in the appropriate type. For staged
409    // loads, the lookup unions snapshot-committed IDs with the in-memory
410    // pending batches (which carry the just-staged node inserts).
411    for (edge_name, rows) in &edge_rows {
412        let edge_type = &catalog.edge_types[edge_name];
413        let from_ids = if use_staging {
414            collect_node_ids_with_pending(
415                db,
416                branch,
417                &edge_type.from_type,
418                &staging,
419            )
420            .await?
421        } else {
422            collect_node_ids(
423                db,
424                branch,
425                &edge_type.from_type,
426                &node_rows,
427                &catalog,
428                &overwrite_updates,
429            )
430            .await?
431        };
432        let to_ids = if use_staging {
433            collect_node_ids_with_pending(
434                db,
435                branch,
436                &edge_type.to_type,
437                &staging,
438            )
439            .await?
440        } else {
441            collect_node_ids(
442                db,
443                branch,
444                &edge_type.to_type,
445                &node_rows,
446                &catalog,
447                &overwrite_updates,
448            )
449            .await?
450        };
451
452        for (i, (src, dst, _)) in rows.iter().enumerate() {
453            if !from_ids.contains(src.as_str()) {
454                return Err(OmniError::manifest(format!(
455                    "edge {} row {}: src '{}' not found in {}",
456                    edge_name,
457                    i + 1,
458                    src,
459                    edge_type.from_type
460                )));
461            }
462            if !to_ids.contains(dst.as_str()) {
463                return Err(OmniError::manifest(format!(
464                    "edge {} row {}: dst '{}' not found in {}",
465                    edge_name,
466                    i + 1,
467                    dst,
468                    edge_type.to_type
469                )));
470            }
471        }
472    }
473
474    // Phase 2d: build edge batches.
475    let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
476        Vec::with_capacity(edge_rows.len());
477    for (edge_name, rows) in &edge_rows {
478        let edge_type = &catalog.edge_types[edge_name];
479        let batch = build_edge_batch(edge_type, rows)?;
480        validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
481        let unique_props = unique_property_names_for_edge(edge_type);
482        if !unique_props.is_empty() {
483            enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
484        }
485        let loaded_count = batch.num_rows();
486        let table_key = format!("edge:{}", edge_name);
487        let entry = snapshot
488            .entry(&table_key)
489            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
490        if !use_staging {
491            overwrite_expected.insert(table_key.clone(), entry.table_version);
492        }
493        prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
494    }
495
496    // Phase 2e: write every edge type. Same dispatch as Phase 2b.
497    if use_staging {
498        for (edge_name, table_key, batch, loaded_count) in prepared_edges {
499            let (ds, full_path, table_branch) = db
500                .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
501                .await?;
502            let expected_version = ds.version().version;
503            staging.ensure_path(
504                &table_key,
505                full_path,
506                table_branch,
507                expected_version,
508                load_op_kind,
509            );
510            let schema = batch.schema();
511            staging.append_batch(&table_key, schema, pending_mode, batch)?;
512            result.edges_loaded.insert(edge_name, loaded_count);
513        }
514    } else {
515        let edge_write_results =
516            write_batches_concurrently(db, branch, mode, prepared_edges).await?;
517        for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
518            overwrite_updates.push(crate::db::SubTableUpdate {
519                table_key,
520                table_version: state.version,
521                table_branch,
522                row_count: state.row_count,
523                version_metadata: state.version_metadata,
524            });
525            result.edges_loaded.insert(edge_name, loaded_count);
526        }
527    }
528
529    // Phase 3: Validate edge cardinality constraints (before commit —
530    // invalid data must not be committed). Staged path scans committed
531    // edges via Lance + iterates pending edges in-memory. Overwrite path
532    // opens the just-written version (legacy behavior).
533    for (edge_name, _) in &edge_rows {
534        let edge_type = &catalog.edge_types[edge_name];
535        let table_key = format!("edge:{}", edge_name);
536        if use_staging {
537            validate_edge_cardinality_with_pending_loader(
538                db,
539                branch,
540                edge_type,
541                &table_key,
542                &staging,
543                mode,
544            )
545            .await?;
546        } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
547            validate_edge_cardinality(
548                db,
549                branch,
550                edge_name,
551                update.table_version,
552                update.table_branch.as_deref(),
553            )
554            .await?;
555        }
556    }
557
558    // Phase 4: Atomic manifest commit with publisher-level OCC.
559    if use_staging {
560        let staged = staging.stage_all(db, branch).await?;
561        // `_queue_guards` holds per-(table_key, branch) write queues
562        // across the manifest publish below — see exec/mutation.rs for
563        // the rationale (interleaving prevention).
564        let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
565            .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
566            .await?;
567        // Same finalize → publisher residual as mutations: per-table
568        // staged commits have advanced Lance HEAD, but the manifest
569        // publish has not run yet. Reuse the mutation failpoint name so
570        // one failpoint pins the shared `MutationStaging` boundary.
571        crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
572        db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
573            .await?;
574        // The recovery sidecar protects the per-table commit_staged →
575        // manifest publish window. Phase C succeeded — clean up
576        // best-effort: failing the user here would error out a write
577        // that already landed durably.
578        if let Some(handle) = sidecar_handle {
579            if let Err(err) =
580                crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
581            {
582                tracing::warn!(
583                    error = %err,
584                    operation_id = handle.operation_id.as_str(),
585                    "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
586                );
587            }
588        }
589    } else {
590        // LoadMode::Overwrite keeps the legacy inline-commit path —
591        // truncate-then-append doesn't fit the staged shape (see
592        // `docs/runs.md` "LoadMode::Overwrite residual"). The recovery
593        // sidecar is not applicable here because the writer doesn't go
594        // through MutationStaging; per-table inline commits + a final
595        // manifest publish handle their own residual via the documented
596        // operator workflow (re-run overwrite to recover).
597        db.commit_updates_on_branch_with_expected(
598            branch,
599            &overwrite_updates,
600            &overwrite_expected,
601            actor_id,
602        )
603        .await?;
604    }
605
606    Ok(result)
607}
608
609fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
610    let schema = node_type.arrow_schema.clone();
611
612    // Build id column: explicit id, @key value, or generated ULID.
613    let ids: Vec<String> = rows
614        .iter()
615        .map(|row| {
616            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
617            if let Some(key_prop) = node_type.key_property() {
618                let key_value = row
619                    .get(key_prop)
620                    .and_then(|v| v.as_str())
621                    .map(|s| s.to_string())
622                    .ok_or_else(|| {
623                        OmniError::manifest(format!(
624                            "node {} missing @key property '{}'",
625                            node_type.name, key_prop
626                        ))
627                    })?;
628                if let Some(explicit_id) = explicit_id {
629                    if explicit_id != key_value {
630                        return Err(OmniError::manifest(format!(
631                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
632                            node_type.name, explicit_id, key_prop, key_value
633                        )));
634                    }
635                }
636                Ok(key_value)
637            } else if let Some(explicit_id) = explicit_id {
638                Ok(explicit_id)
639            } else {
640                Ok(generate_id())
641            }
642        })
643        .collect::<Result<Vec<_>>>()?;
644
645    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
646    columns.push(Arc::new(StringArray::from(ids)));
647
648    // Build property columns (skip "id" field at index 0)
649    for field in schema.fields().iter().skip(1) {
650        if node_type.blob_properties.contains(field.name()) {
651            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
652            columns.push(col);
653        } else {
654            let col =
655                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
656            columns.push(col);
657        }
658    }
659
660    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
661}
662
663fn build_edge_batch(
664    edge_type: &omnigraph_compiler::catalog::EdgeType,
665    rows: &[(String, String, JsonValue)],
666) -> Result<RecordBatch> {
667    let schema = edge_type.arrow_schema.clone();
668
669    let ids: Vec<String> = rows
670        .iter()
671        .map(|(_, _, data)| {
672            data.get("id")
673                .and_then(|v| v.as_str())
674                .map(str::to_string)
675                .unwrap_or_else(generate_id)
676        })
677        .collect();
678    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
679    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
680
681    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
682    columns.push(Arc::new(StringArray::from(ids)));
683    columns.push(Arc::new(StringArray::from(srcs)));
684    columns.push(Arc::new(StringArray::from(dsts)));
685
686    // Build edge property columns (skip id, src, dst at indices 0-2)
687    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
688    for field in schema.fields().iter().skip(3) {
689        if edge_type.blob_properties.contains(field.name()) {
690            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
691            columns.push(col);
692        } else {
693            let col = build_column_from_json(
694                field.name(),
695                field.data_type(),
696                field.is_nullable(),
697                &data_values,
698            )?;
699            columns.push(col);
700        }
701    }
702
703    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
704}
705
706/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
707pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
708    if let Some(encoded) = value.strip_prefix("base64:") {
709        let bytes = base64::engine::general_purpose::STANDARD
710            .decode(encoded)
711            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
712        builder
713            .push_bytes(bytes)
714            .map_err(|e| OmniError::Lance(e.to_string()))
715    } else {
716        // Treat as URI (file://, s3://, gs://, or any other scheme)
717        builder
718            .push_uri(value)
719            .map_err(|e| OmniError::Lance(e.to_string()))
720    }
721}
722
723/// Build a blob column from JSON values using Lance BlobArrayBuilder.
724fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
725    let mut builder = BlobArrayBuilder::new(rows.len());
726    for row in rows {
727        match row.get(name) {
728            Some(JsonValue::String(s)) => {
729                append_blob_value(&mut builder, s)?;
730            }
731            Some(JsonValue::Null) | None if nullable => {
732                builder
733                    .push_null()
734                    .map_err(|e| OmniError::Lance(e.to_string()))?;
735            }
736            Some(JsonValue::Null) | None => {
737                return Err(OmniError::manifest(format!(
738                    "non-nullable blob property '{}' has null values",
739                    name
740                )));
741            }
742            _ => {
743                return Err(OmniError::manifest(format!(
744                    "blob property '{}' must be a URI string or base64: prefixed data",
745                    name
746                )));
747            }
748        }
749    }
750    builder
751        .finish()
752        .map_err(|e| OmniError::Lance(e.to_string()))
753}
754
755fn build_column_from_json(
756    name: &str,
757    data_type: &DataType,
758    nullable: bool,
759    rows: &[JsonValue],
760) -> Result<ArrayRef> {
761    let array: ArrayRef = match data_type {
762        DataType::Utf8 => {
763            let values: Vec<Option<String>> = rows
764                .iter()
765                .map(|row| {
766                    row.get(name)
767                        .and_then(|v| v.as_str())
768                        .map(|s| s.to_string())
769                })
770                .collect();
771            Arc::new(StringArray::from(values))
772        }
773        DataType::Int32 => {
774            let values: Vec<Option<i32>> = rows
775                .iter()
776                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
777                .collect();
778            Arc::new(Int32Array::from(values))
779        }
780        DataType::Int64 => {
781            let values: Vec<Option<i64>> = rows
782                .iter()
783                .map(|row| row.get(name).and_then(|v| v.as_i64()))
784                .collect();
785            Arc::new(Int64Array::from(values))
786        }
787        DataType::UInt32 => {
788            let values: Vec<Option<u32>> = rows
789                .iter()
790                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
791                .collect();
792            Arc::new(UInt32Array::from(values))
793        }
794        DataType::UInt64 => {
795            let values: Vec<Option<u64>> = rows
796                .iter()
797                .map(|row| row.get(name).and_then(|v| v.as_u64()))
798                .collect();
799            Arc::new(UInt64Array::from(values))
800        }
801        DataType::Float32 => {
802            let values: Vec<Option<f32>> = rows
803                .iter()
804                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
805                .collect();
806            Arc::new(Float32Array::from(values))
807        }
808        DataType::Float64 => {
809            let values: Vec<Option<f64>> = rows
810                .iter()
811                .map(|row| row.get(name).and_then(|v| v.as_f64()))
812                .collect();
813            Arc::new(Float64Array::from(values))
814        }
815        DataType::Boolean => {
816            let values: Vec<Option<bool>> = rows
817                .iter()
818                .map(|row| row.get(name).and_then(|v| v.as_bool()))
819                .collect();
820            Arc::new(BooleanArray::from(values))
821        }
822        DataType::Date32 => {
823            let mut values = Vec::with_capacity(rows.len());
824            for row in rows {
825                values.push(parse_date32_json_value(
826                    row.get(name).unwrap_or(&JsonValue::Null),
827                )?);
828            }
829            Arc::new(Date32Array::from(values))
830        }
831        DataType::Date64 => {
832            let mut values = Vec::with_capacity(rows.len());
833            for row in rows {
834                values.push(parse_date64_json_value(
835                    row.get(name).unwrap_or(&JsonValue::Null),
836                )?);
837            }
838            Arc::new(Date64Array::from(values))
839        }
840        DataType::List(field) => {
841            let mut builder = ListBuilder::with_capacity(
842                make_list_value_builder(field.data_type(), rows.len())?,
843                rows.len(),
844            )
845            .with_field(field.clone());
846            for row in rows {
847                let value = row.get(name).unwrap_or(&JsonValue::Null);
848                if value.is_null() {
849                    builder.append(false);
850                    continue;
851                }
852                let items = value.as_array().ok_or_else(|| {
853                    OmniError::manifest(format!(
854                        "list property '{}' expects a JSON array, got {}",
855                        name, value
856                    ))
857                })?;
858                for item in items {
859                    append_json_list_item(builder.values(), field.data_type(), item)?;
860                }
861                builder.append(true);
862            }
863            Arc::new(builder.finish())
864        }
865        DataType::FixedSizeList(child_field, dim) => {
866            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
867            let dim = *dim;
868            let mut builder = FixedSizeListBuilder::with_capacity(
869                Float32Builder::with_capacity(rows.len() * dim as usize),
870                dim,
871                rows.len(),
872            )
873            .with_field(child_field.clone());
874            for row in rows {
875                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
876                    if arr.len() != dim as usize {
877                        return Err(OmniError::manifest(format!(
878                            "vector property '{}' expects {} dimensions, got {}",
879                            name,
880                            dim,
881                            arr.len()
882                        )));
883                    }
884                    for val in arr {
885                        builder
886                            .values()
887                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
888                    }
889                    builder.append(true);
890                } else if nullable {
891                    for _ in 0..dim as usize {
892                        builder.values().append_null();
893                    }
894                    builder.append(false);
895                } else {
896                    return Err(OmniError::manifest(format!(
897                        "non-nullable vector property '{}' has null values",
898                        name
899                    )));
900                }
901            }
902            Arc::new(builder.finish())
903        }
904        _ => {
905            // Unsupported type: fill with nulls
906            let values: Vec<Option<&str>> = vec![None; rows.len()];
907            Arc::new(StringArray::from(values))
908        }
909    };
910
911    if !nullable && array.null_count() > 0 {
912        return Err(OmniError::manifest(format!(
913            "non-nullable property '{}' has null or invalid values",
914            name
915        )));
916    }
917
918    Ok(array)
919}
920
921fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
922    Ok(match data_type {
923        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
924        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
925        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
926        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
927        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
928        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
929        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
930        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
931        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
932        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
933        other => {
934            return Err(OmniError::manifest(format!(
935                "unsupported list element data type {:?}",
936                other
937            )));
938        }
939    })
940}
941
942fn append_json_list_item(
943    builder: &mut Box<dyn ArrayBuilder>,
944    data_type: &DataType,
945    value: &JsonValue,
946) -> Result<()> {
947    match data_type {
948        DataType::Utf8 => {
949            let builder = builder
950                .as_any_mut()
951                .downcast_mut::<StringBuilder>()
952                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
953            if let Some(value) = value.as_str() {
954                builder.append_value(value);
955            } else {
956                builder.append_null();
957            }
958        }
959        DataType::Boolean => {
960            let builder = builder
961                .as_any_mut()
962                .downcast_mut::<BooleanBuilder>()
963                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
964            if let Some(value) = value.as_bool() {
965                builder.append_value(value);
966            } else {
967                builder.append_null();
968            }
969        }
970        DataType::Int32 => {
971            let builder = builder
972                .as_any_mut()
973                .downcast_mut::<Int32Builder>()
974                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
975            if let Some(value) = value.as_i64() {
976                let value = i32::try_from(value).map_err(|_| {
977                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
978                })?;
979                builder.append_value(value);
980            } else {
981                builder.append_null();
982            }
983        }
984        DataType::Int64 => {
985            let builder = builder
986                .as_any_mut()
987                .downcast_mut::<Int64Builder>()
988                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
989            if let Some(value) = value.as_i64() {
990                builder.append_value(value);
991            } else {
992                builder.append_null();
993            }
994        }
995        DataType::UInt32 => {
996            let builder = builder
997                .as_any_mut()
998                .downcast_mut::<UInt32Builder>()
999                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1000            if let Some(value) = value.as_u64() {
1001                let value = u32::try_from(value).map_err(|_| {
1002                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1003                })?;
1004                builder.append_value(value);
1005            } else {
1006                builder.append_null();
1007            }
1008        }
1009        DataType::UInt64 => {
1010            let builder = builder
1011                .as_any_mut()
1012                .downcast_mut::<UInt64Builder>()
1013                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1014            if let Some(value) = value.as_u64() {
1015                builder.append_value(value);
1016            } else {
1017                builder.append_null();
1018            }
1019        }
1020        DataType::Float32 => {
1021            let builder = builder
1022                .as_any_mut()
1023                .downcast_mut::<Float32Builder>()
1024                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1025            if let Some(value) = value.as_f64() {
1026                builder.append_value(value as f32);
1027            } else {
1028                builder.append_null();
1029            }
1030        }
1031        DataType::Float64 => {
1032            let builder = builder
1033                .as_any_mut()
1034                .downcast_mut::<Float64Builder>()
1035                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1036            if let Some(value) = value.as_f64() {
1037                builder.append_value(value);
1038            } else {
1039                builder.append_null();
1040            }
1041        }
1042        DataType::Date32 => {
1043            let builder = builder
1044                .as_any_mut()
1045                .downcast_mut::<Date32Builder>()
1046                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1047            if let Some(value) = parse_date32_json_value(value)? {
1048                builder.append_value(value);
1049            } else {
1050                builder.append_null();
1051            }
1052        }
1053        DataType::Date64 => {
1054            let builder = builder
1055                .as_any_mut()
1056                .downcast_mut::<Date64Builder>()
1057                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1058            if let Some(value) = parse_date64_json_value(value)? {
1059                builder.append_value(value);
1060            } else {
1061                builder.append_null();
1062            }
1063        }
1064        other => {
1065            return Err(OmniError::manifest(format!(
1066                "unsupported list element data type {:?}",
1067                other
1068            )));
1069        }
1070    }
1071
1072    Ok(())
1073}
1074
1075fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1076    if value.is_null() {
1077        return Ok(None);
1078    }
1079    if let Some(days) = value.as_i64() {
1080        let days = i32::try_from(days)
1081            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1082        return Ok(Some(days));
1083    }
1084    if let Some(days) = value.as_u64() {
1085        let days = i32::try_from(days)
1086            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1087        return Ok(Some(days));
1088    }
1089    if let Some(value) = value.as_str() {
1090        return Ok(Some(parse_date32_literal(value)?));
1091    }
1092    Ok(None)
1093}
1094
1095fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1096    if value.is_null() {
1097        return Ok(None);
1098    }
1099    if let Some(ms) = value.as_i64() {
1100        return Ok(Some(ms));
1101    }
1102    if let Some(ms) = value.as_u64() {
1103        let ms = i64::try_from(ms)
1104            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1105        return Ok(Some(ms));
1106    }
1107    if let Some(value) = value.as_str() {
1108        return Ok(Some(parse_date64_literal(value)?));
1109    }
1110    Ok(None)
1111}
1112
1113/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
1114/// How many per-type Lance writes to run concurrently during a load.
1115///
1116/// Each write is an independent S3 manifest + fragment write against a
1117/// different table. Ops within a single table must still be serial (Lance
1118/// OCC on the manifest), but cross-table writes have no shared state.
1119///
1120/// 8 is a conservative default — enough to overlap S3 round-trip latency
1121/// across the typical 10-30 table schemas without flooding the runtime.
1122/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
1123const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1124
1125fn load_write_concurrency() -> usize {
1126    std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1127        .ok()
1128        .and_then(|v| v.parse::<usize>().ok())
1129        .filter(|v| *v > 0)
1130        .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1131}
1132
1133/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples
1134/// concurrently. Returns results in original iteration order so callers can
1135/// zip them back to per-type metadata.
1136async fn write_batches_concurrently(
1137    db: &Omnigraph,
1138    branch: Option<&str>,
1139    mode: LoadMode,
1140    prepared: Vec<(String, String, RecordBatch, usize)>,
1141) -> Result<
1142    Vec<(
1143        String,
1144        String,
1145        usize,
1146        crate::table_store::TableState,
1147        Option<String>,
1148    )>,
1149> {
1150    use futures::stream::StreamExt;
1151
1152    if prepared.is_empty() {
1153        return Ok(Vec::new());
1154    }
1155
1156    let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1157
1158    futures::stream::iter(prepared.into_iter().map(
1159        |(type_name, table_key, batch, loaded_count)| async move {
1160            let (state, table_branch) =
1161                write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1162            Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1163        },
1164    ))
1165    .buffered(concurrency)
1166    .collect::<Vec<Result<_>>>()
1167    .await
1168    .into_iter()
1169    .collect()
1170}
1171
1172async fn write_batch_to_dataset(
1173    db: &Omnigraph,
1174    branch: Option<&str>,
1175    table_key: &str,
1176    batch: RecordBatch,
1177    mode: LoadMode,
1178) -> Result<(crate::table_store::TableState, Option<String>)> {
1179    let op_kind = match mode {
1180        LoadMode::Append => crate::db::MutationOpKind::Insert,
1181        LoadMode::Merge => crate::db::MutationOpKind::Merge,
1182        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
1183    };
1184    let (mut ds, full_path, table_branch) = db
1185        .open_for_mutation_on_branch(branch, table_key, op_kind)
1186        .await?;
1187    let table_store = db.table_store();
1188
1189    match mode {
1190        LoadMode::Overwrite => {
1191            let state = table_store
1192                .overwrite_batch(&full_path, &mut ds, batch)
1193                .await?;
1194            Ok((state, table_branch))
1195        }
1196        LoadMode::Append => {
1197            let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1198            Ok((state, table_branch))
1199        }
1200        LoadMode::Merge => {
1201            let state = table_store
1202                .merge_insert_batch(
1203                    &full_path,
1204                    ds,
1205                    batch,
1206                    vec!["id".to_string()],
1207                    lance::dataset::WhenMatched::UpdateAll,
1208                    lance::dataset::WhenNotMatched::InsertAll,
1209                )
1210                .await?;
1211            Ok((state, table_branch))
1212        }
1213    }
1214}
1215
1216fn generate_id() -> String {
1217    ulid::Ulid::new().to_string()
1218}
1219
1220pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1221    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1222    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1223        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1224    let out = casted
1225        .as_any()
1226        .downcast_ref::<Date32Array>()
1227        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1228    if out.is_null(0) {
1229        return Err(OmniError::manifest(format!(
1230            "invalid Date literal '{}'",
1231            value
1232        )));
1233    }
1234    Ok(out.value(0))
1235}
1236
1237pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1238    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1239    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1240        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1241    let out = casted
1242        .as_any()
1243        .downcast_ref::<Date64Array>()
1244        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1245    if out.is_null(0) {
1246        return Err(OmniError::manifest(format!(
1247            "invalid DateTime literal '{}'",
1248            value
1249        )));
1250    }
1251    Ok(out.value(0))
1252}
1253
1254// ─── Value constraint validation ─────────────────────────────────────────────
1255
1256pub(crate) fn validate_value_constraints(
1257    batch: &RecordBatch,
1258    node_type: &omnigraph_compiler::catalog::NodeType,
1259) -> Result<()> {
1260    use arrow_array::Array;
1261
1262    // Range constraints
1263    for rc in &node_type.range_constraints {
1264        let Some(col) = batch.column_by_name(&rc.property) else {
1265            continue;
1266        };
1267        for row in 0..batch.num_rows() {
1268            if col.is_null(row) {
1269                continue;
1270            }
1271            let value = extract_numeric_value(col, row);
1272            if let Some(val) = value {
1273                if val.is_nan() {
1274                    return Err(OmniError::manifest(format!(
1275                        "@range violation on {}.{}: value is NaN",
1276                        node_type.name, rc.property
1277                    )));
1278                }
1279                if let Some(ref min) = rc.min {
1280                    let min_f = literal_value_to_f64(min);
1281                    if val < min_f {
1282                        return Err(OmniError::manifest(format!(
1283                            "@range violation on {}.{}: value {} < min {}",
1284                            node_type.name, rc.property, val, min_f
1285                        )));
1286                    }
1287                }
1288                if let Some(ref max) = rc.max {
1289                    let max_f = literal_value_to_f64(max);
1290                    if val > max_f {
1291                        return Err(OmniError::manifest(format!(
1292                            "@range violation on {}.{}: value {} > max {}",
1293                            node_type.name, rc.property, val, max_f
1294                        )));
1295                    }
1296                }
1297            }
1298        }
1299    }
1300
1301    // Check constraints (regex)
1302    for cc in &node_type.check_constraints {
1303        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1304            OmniError::manifest(format!(
1305                "@check on {}.{} has invalid regex '{}': {}",
1306                node_type.name, cc.property, cc.pattern, e
1307            ))
1308        })?;
1309        let Some(col) = batch.column_by_name(&cc.property) else {
1310            continue;
1311        };
1312        let str_col = col.as_any().downcast_ref::<StringArray>();
1313        if let Some(str_col) = str_col {
1314            for row in 0..str_col.len() {
1315                if str_col.is_null(row) {
1316                    continue;
1317                }
1318                let val = str_col.value(row);
1319                if !re.is_match(val) {
1320                    return Err(OmniError::manifest(format!(
1321                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1322                        node_type.name, cc.property, val, cc.pattern
1323                    )));
1324                }
1325            }
1326        }
1327    }
1328
1329    Ok(())
1330}
1331
1332/// Validate that every enum-typed property in `properties` only contains values
1333/// from its declared enum value set. Operates on a single `RecordBatch` so it
1334/// can be called from any write path that already holds a batch.
1335///
1336/// Scalar string enums are checked directly. List-of-enum properties are
1337/// checked element-by-element across the underlying string values.
1338pub(crate) fn validate_enum_constraints(
1339    batch: &RecordBatch,
1340    properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1341    type_name: &str,
1342) -> Result<()> {
1343    use arrow_array::{Array, ListArray};
1344
1345    for (prop_name, prop_type) in properties {
1346        let Some(allowed) = prop_type.enum_values.as_ref() else {
1347            continue;
1348        };
1349        let Some(col) = batch.column_by_name(prop_name) else {
1350            continue;
1351        };
1352        if prop_type.list {
1353            let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1354                continue;
1355            };
1356            for row in 0..list_col.len() {
1357                if list_col.is_null(row) {
1358                    continue;
1359                }
1360                let item_arr = list_col.value(row);
1361                let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1362                    continue;
1363                };
1364                for i in 0..str_arr.len() {
1365                    if str_arr.is_null(i) {
1366                        continue;
1367                    }
1368                    let val = str_arr.value(i);
1369                    if !allowed.iter().any(|a| a.as_str() == val) {
1370                        return Err(OmniError::manifest(format!(
1371                            "invalid enum value '{}' for {}.{} (expected: {})",
1372                            val,
1373                            type_name,
1374                            prop_name,
1375                            allowed.join(", ")
1376                        )));
1377                    }
1378                }
1379            }
1380        } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1381            for row in 0..str_col.len() {
1382                if str_col.is_null(row) {
1383                    continue;
1384                }
1385                let val = str_col.value(row);
1386                if !allowed.iter().any(|a| a.as_str() == val) {
1387                    return Err(OmniError::manifest(format!(
1388                        "invalid enum value '{}' for {}.{} (expected: {})",
1389                        val,
1390                        type_name,
1391                        prop_name,
1392                        allowed.join(", ")
1393                    )));
1394                }
1395            }
1396        }
1397    }
1398    Ok(())
1399}
1400
1401/// Detect duplicate values within a single `RecordBatch` for any of the named
1402/// `unique_properties`. Returns an error on the first duplicate found.
1403///
1404/// Note: this only catches duplicates *within* the batch. Cross-batch
1405/// uniqueness against already-committed rows is not enforced here — that
1406/// requires a dataset scan and is tracked separately.
1407pub(crate) fn enforce_unique_constraints_intra_batch(
1408    batch: &RecordBatch,
1409    type_name: &str,
1410    unique_properties: &[String],
1411) -> Result<()> {
1412    for property in unique_properties {
1413        let Some(col_idx) = batch.schema().index_of(property).ok() else {
1414            continue;
1415        };
1416        let arr = batch.column(col_idx);
1417        let mut seen: HashMap<String, usize> = HashMap::new();
1418        for row in 0..batch.num_rows() {
1419            let Some(value) = scalar_to_string(arr, row) else {
1420                continue;
1421            };
1422            if let Some(prev_row) = seen.insert(value.clone(), row) {
1423                return Err(OmniError::manifest(format!(
1424                    "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1425                    type_name, property, value, prev_row, row
1426                )));
1427            }
1428        }
1429    }
1430    Ok(())
1431}
1432
1433/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
1434/// uniqueness comparison. Returns `None` for null values (nulls are exempt
1435/// from uniqueness in standard SQL semantics).
1436fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1437    use arrow_array::Array;
1438    if array.is_null(row) {
1439        return None;
1440    }
1441    if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1442        return Some(a.value(row).to_string());
1443    }
1444    if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1445        return Some(a.value(row).to_string());
1446    }
1447    if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1448        return Some(a.value(row).to_string());
1449    }
1450    if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1451        return Some(a.value(row).to_string());
1452    }
1453    if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1454        return Some(a.value(row).to_string());
1455    }
1456    if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1457        return Some(a.value(row).to_string());
1458    }
1459    if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1460        return Some(a.value(row).to_string());
1461    }
1462    if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1463        return Some(a.value(row).to_string());
1464    }
1465    if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1466        return Some(a.value(row).to_string());
1467    }
1468    if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1469        return Some(a.value(row).to_string());
1470    }
1471    None
1472}
1473
1474/// Build the flat list of property names that must be checked for uniqueness
1475/// on a node type. Includes both `@unique` properties (from
1476/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
1477pub(crate) fn unique_property_names_for_node(
1478    node_type: &omnigraph_compiler::catalog::NodeType,
1479) -> Vec<String> {
1480    let mut props: Vec<String> = node_type
1481        .unique_constraints
1482        .iter()
1483        .flatten()
1484        .cloned()
1485        .collect();
1486    if let Some(key) = &node_type.key {
1487        props.extend(key.iter().cloned());
1488    }
1489    props.sort();
1490    props.dedup();
1491    props
1492}
1493
1494/// Same as [`unique_property_names_for_node`] but for an edge type.
1495pub(crate) fn unique_property_names_for_edge(
1496    edge_type: &omnigraph_compiler::catalog::EdgeType,
1497) -> Vec<String> {
1498    let mut props: Vec<String> = edge_type
1499        .unique_constraints
1500        .iter()
1501        .flatten()
1502        .cloned()
1503        .collect();
1504    props.sort();
1505    props.dedup();
1506    props
1507}
1508
1509fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1510    use arrow_array::{
1511        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1512    };
1513    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1514        return Some(a.value(row) as f64);
1515    }
1516    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1517        return Some(a.value(row) as f64);
1518    }
1519    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1520        return Some(a.value(row) as f64);
1521    }
1522    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1523        return Some(a.value(row) as f64);
1524    }
1525    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1526        return Some(a.value(row) as f64);
1527    }
1528    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1529        return Some(a.value(row));
1530    }
1531    None
1532}
1533
1534fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1535    use omnigraph_compiler::catalog::LiteralValue;
1536    match v {
1537        LiteralValue::Integer(n) => *n as f64,
1538        LiteralValue::Float(f) => *f,
1539    }
1540}
1541
1542// ─── Edge cardinality validation ─────────────────────────────────────────────
1543
1544pub(crate) async fn validate_edge_cardinality(
1545    db: &crate::db::Omnigraph,
1546    branch: Option<&str>,
1547    edge_name: &str,
1548    written_version: u64,
1549    written_branch: Option<&str>,
1550) -> Result<()> {
1551    use arrow_array::Array;
1552    let catalog = db.catalog();
1553    let edge_type = &catalog.edge_types[edge_name];
1554    if edge_type.cardinality.is_default() {
1555        return Ok(());
1556    }
1557
1558    // Open edge sub-table at the just-written version, not the snapshot's
1559    // (the snapshot still pins to the pre-write version).
1560    let snapshot = db.snapshot_for_branch(branch).await?;
1561    let table_key = format!("edge:{}", edge_name);
1562    let entry = snapshot
1563        .entry(&table_key)
1564        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1565    let ds = db
1566        .open_dataset_at_state(
1567            &entry.table_path,
1568            written_branch.or(entry.table_branch.as_deref()),
1569            written_version,
1570        )
1571        .await?;
1572
1573    // Scan src column, count per source
1574    let batches = db
1575        .table_store()
1576        .scan(&ds, Some(&["src"]), None, None)
1577        .await?;
1578
1579    let mut counts: HashMap<String, u32> = HashMap::new();
1580    for batch in &batches {
1581        let srcs = batch
1582            .column_by_name("src")
1583            .unwrap()
1584            .as_any()
1585            .downcast_ref::<StringArray>()
1586            .unwrap();
1587        for i in 0..srcs.len() {
1588            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1589        }
1590    }
1591
1592    let card = &edge_type.cardinality;
1593    for (src, count) in &counts {
1594        if let Some(max) = card.max {
1595            if *count > max {
1596                return Err(OmniError::manifest(format!(
1597                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1598                    edge_name, src, count, max
1599                )));
1600            }
1601        }
1602        if *count < card.min {
1603            return Err(OmniError::manifest(format!(
1604                "@card violation on edge {}: source '{}' has {} edges (min {})",
1605                edge_name, src, count, card.min
1606            )));
1607        }
1608    }
1609
1610    Ok(())
1611}
1612
1613/// Validate edge `@card` cardinality with in-memory pending edges visible.
1614///
1615/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
1616/// opens the committed dataset at the pre-load snapshot version, then
1617/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
1618/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
1619/// path uses `validate_edge_cardinality` which opens the just-written
1620/// Lance version).
1621///
1622/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
1623/// so committed edges that the load is *updating* (same edge id,
1624/// possibly changed `src`) are not double-counted. `LoadMode::Append`
1625/// passes `None` because each line generates a fresh ULID id that
1626/// never collides with committed.
1627async fn validate_edge_cardinality_with_pending_loader(
1628    db: &Omnigraph,
1629    branch: Option<&str>,
1630    edge_type: &omnigraph_compiler::catalog::EdgeType,
1631    table_key: &str,
1632    staging: &MutationStaging,
1633    mode: LoadMode,
1634) -> Result<()> {
1635    if edge_type.cardinality.is_default() {
1636        return Ok(());
1637    }
1638    let snapshot = db.snapshot_for_branch(branch).await?;
1639    let Some(entry) = snapshot.entry(table_key) else {
1640        // No manifest entry — table doesn't exist yet. Pending-only is
1641        // fine; the helper handles empty committed scans.
1642        return Ok(());
1643    };
1644    let ds = db
1645        .open_dataset_at_state(
1646            &entry.table_path,
1647            entry.table_branch.as_deref(),
1648            entry.table_version,
1649        )
1650        .await?;
1651    let dedupe_key = match mode {
1652        LoadMode::Merge => Some("id"),
1653        LoadMode::Append | LoadMode::Overwrite => None,
1654    };
1655    let counts =
1656        crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
1657            .await?;
1658    crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1659}
1660
1661/// Collect all valid node IDs for a given type, with in-memory pending
1662/// node inserts visible. Used by the staged loader's Phase 2c
1663/// referential-integrity validation.
1664///
1665/// Union of:
1666/// - IDs from the staged loader's pending batches (in-memory; just-staged
1667///   inserts of this type)
1668/// - IDs from the committed sub-table at the pre-load snapshot version
1669async fn collect_node_ids_with_pending(
1670    db: &Omnigraph,
1671    branch: Option<&str>,
1672    type_name: &str,
1673    staging: &MutationStaging,
1674) -> Result<HashSet<String>> {
1675    let mut ids = HashSet::new();
1676    let table_key = format!("node:{}", type_name);
1677
1678    // From staging.pending: walk the in-memory accumulator's id column.
1679    for batch in staging.pending_batches(&table_key) {
1680        if let Some(col) = batch.column_by_name("id") {
1681            if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1682                for i in 0..arr.len() {
1683                    if arr.is_valid(i) {
1684                        ids.insert(arr.value(i).to_string());
1685                    }
1686                }
1687            }
1688        }
1689    }
1690
1691    // From the committed Lance sub-table at the pre-load snapshot version.
1692    let snapshot = db.snapshot_for_branch(branch).await?;
1693    let Some(entry) = snapshot.entry(&table_key) else {
1694        return Ok(ids);
1695    };
1696    let ds = db
1697        .open_dataset_at_state(
1698            &entry.table_path,
1699            entry.table_branch.as_deref(),
1700            entry.table_version,
1701        )
1702        .await?;
1703
1704    let batches = db
1705        .table_store()
1706        .scan(&ds, Some(&["id"]), None, None)
1707        .await?;
1708
1709    for batch in &batches {
1710        let id_col = batch
1711            .column_by_name("id")
1712            .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1713            .as_any()
1714            .downcast_ref::<StringArray>()
1715            .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1716        for i in 0..batch.num_rows() {
1717            // Defensive: `id` is the @key column on every node type and
1718            // is non-nullable by schema, but a committed-row corruption
1719            // (or future schema change) could surface a NULL. Skip
1720            // rather than insert "" — pending-side does the same.
1721            if id_col.is_valid(i) {
1722                ids.insert(id_col.value(i).to_string());
1723            }
1724        }
1725    }
1726
1727    Ok(ids)
1728}
1729
1730/// Collect all valid node IDs for a given type. Union of:
1731/// - IDs from the just-loaded batch (in memory, from node_rows)
1732/// - IDs from the sub-table at the just-written version (if it was updated)
1733/// - IDs from the sub-table at the snapshot-pinned version (if it was not updated)
1734async fn collect_node_ids(
1735    db: &Omnigraph,
1736    branch: Option<&str>,
1737    type_name: &str,
1738    node_rows: &HashMap<String, Vec<JsonValue>>,
1739    catalog: &omnigraph_compiler::catalog::Catalog,
1740    updates: &[crate::db::SubTableUpdate],
1741) -> Result<HashSet<String>> {
1742    let mut ids = HashSet::new();
1743
1744    // IDs from the in-memory batch (just loaded in this operation)
1745    if let Some(rows) = node_rows.get(type_name) {
1746        if let Some(node_type) = catalog.node_types.get(type_name) {
1747            if let Some(key_prop) = node_type.key_property() {
1748                for row in rows {
1749                    if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1750                        ids.insert(id.to_string());
1751                    }
1752                }
1753            }
1754        }
1755    }
1756
1757    // IDs from the Lance sub-table
1758    let table_key = format!("node:{}", type_name);
1759    let snapshot = db.snapshot_for_branch(branch).await?;
1760    let Some(entry) = snapshot.entry(&table_key) else {
1761        return Ok(ids);
1762    };
1763    // Use the just-written version if this type was updated, else snapshot version
1764    let updated = updates
1765        .iter()
1766        .find(|u| u.table_key == table_key)
1767        .map(|u| (u.table_version, u.table_branch.as_deref()));
1768    let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1769    let ds = db
1770        .open_dataset_at_state(&entry.table_path, branch, version)
1771        .await?;
1772
1773    let batches = db
1774        .table_store()
1775        .scan(&ds, Some(&["id"]), None, None)
1776        .await?;
1777
1778    for batch in &batches {
1779        let id_col = batch
1780            .column_by_name("id")
1781            .unwrap()
1782            .as_any()
1783            .downcast_ref::<StringArray>()
1784            .unwrap();
1785        for i in 0..batch.num_rows() {
1786            if !id_col.is_valid(i) {
1787                continue;
1788            }
1789            ids.insert(id_col.value(i).to_string());
1790        }
1791    }
1792
1793    Ok(ids)
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798    use super::*;
1799    use crate::db::Omnigraph;
1800    use arrow_array::Array;
1801    use futures::TryStreamExt;
1802    use std::collections::HashMap;
1803
1804    const TEST_SCHEMA: &str = r#"
1805node Person {
1806    name: String @key
1807    age: I32?
1808}
1809node Company {
1810    name: String @key
1811}
1812edge Knows: Person -> Person {
1813    since: Date?
1814}
1815edge WorksAt: Person -> Company
1816"#;
1817
1818    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1819{"type": "Person", "data": {"name": "Bob", "age": 25}}
1820{"type": "Company", "data": {"name": "Acme"}}
1821{"edge": "Knows", "from": "Alice", "to": "Bob"}
1822{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1823"#;
1824
1825    #[tokio::test]
1826    async fn test_load_creates_data() {
1827        let dir = tempfile::tempdir().unwrap();
1828        let uri = dir.path().to_str().unwrap();
1829        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1830
1831        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1832            .await
1833            .unwrap();
1834
1835        assert_eq!(result.nodes_loaded["Person"], 2);
1836        assert_eq!(result.nodes_loaded["Company"], 1);
1837        assert_eq!(result.edges_loaded["Knows"], 1);
1838        assert_eq!(result.edges_loaded["WorksAt"], 1);
1839    }
1840
1841    #[tokio::test]
1842    async fn test_load_data_readable_via_lance() {
1843        let dir = tempfile::tempdir().unwrap();
1844        let uri = dir.path().to_str().unwrap();
1845        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1846        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1847            .await
1848            .unwrap();
1849
1850        // Read back via snapshot
1851        let snap = db.snapshot().await;
1852        let person_ds = snap.open("node:Person").await.unwrap();
1853
1854        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1855
1856        // Verify data
1857        let batches: Vec<RecordBatch> = person_ds
1858            .scan()
1859            .try_into_stream()
1860            .await
1861            .unwrap()
1862            .try_collect()
1863            .await
1864            .unwrap();
1865
1866        let batch = &batches[0];
1867        let ids = batch
1868            .column_by_name("id")
1869            .unwrap()
1870            .as_any()
1871            .downcast_ref::<StringArray>()
1872            .unwrap();
1873        // @key=name, so ids should be "Alice" and "Bob"
1874        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1875        assert!(id_values.contains(&"Alice"));
1876        assert!(id_values.contains(&"Bob"));
1877    }
1878
1879    #[tokio::test]
1880    async fn test_load_edges_reference_node_keys() {
1881        let dir = tempfile::tempdir().unwrap();
1882        let uri = dir.path().to_str().unwrap();
1883        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1884        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1885            .await
1886            .unwrap();
1887
1888        let snap = db.snapshot().await;
1889        let knows_ds = snap.open("edge:Knows").await.unwrap();
1890
1891        let batches: Vec<RecordBatch> = knows_ds
1892            .scan()
1893            .try_into_stream()
1894            .await
1895            .unwrap()
1896            .try_collect()
1897            .await
1898            .unwrap();
1899
1900        let batch = &batches[0];
1901        let srcs = batch
1902            .column_by_name("src")
1903            .unwrap()
1904            .as_any()
1905            .downcast_ref::<StringArray>()
1906            .unwrap();
1907        let dsts = batch
1908            .column_by_name("dst")
1909            .unwrap()
1910            .as_any()
1911            .downcast_ref::<StringArray>()
1912            .unwrap();
1913
1914        assert_eq!(srcs.value(0), "Alice");
1915        assert_eq!(dsts.value(0), "Bob");
1916    }
1917
1918    #[tokio::test]
1919    async fn test_load_manifest_version_advances() {
1920        let dir = tempfile::tempdir().unwrap();
1921        let uri = dir.path().to_str().unwrap();
1922        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1923        let v1 = db.version().await;
1924
1925        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1926            .await
1927            .unwrap();
1928
1929        assert!(db.version().await > v1);
1930    }
1931
1932    #[tokio::test]
1933    async fn test_load_append_adds_rows() {
1934        let dir = tempfile::tempdir().unwrap();
1935        let uri = dir.path().to_str().unwrap();
1936        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1937
1938        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1939        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1940
1941        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1942            .await
1943            .unwrap();
1944        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1945
1946        let snap = db.snapshot().await;
1947        let person_ds = snap.open("node:Person").await.unwrap();
1948        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1949    }
1950
1951    #[tokio::test]
1952    async fn test_load_unknown_type_rejected() {
1953        let dir = tempfile::tempdir().unwrap();
1954        let uri = dir.path().to_str().unwrap();
1955        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1956
1957        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1958        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1959        assert!(result.is_err());
1960    }
1961
1962    #[tokio::test]
1963    async fn test_ingest_creates_branch_and_reports_tables() {
1964        let dir = tempfile::tempdir().unwrap();
1965        let uri = dir.path().to_str().unwrap();
1966        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1967
1968        let result = db
1969            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1970            .await
1971            .unwrap();
1972
1973        assert_eq!(result.branch, "feature");
1974        assert_eq!(result.base_branch, "main");
1975        assert!(result.branch_created);
1976        assert_eq!(result.mode, LoadMode::Overwrite);
1977        assert_eq!(
1978            result.tables,
1979            vec![
1980                IngestTableResult {
1981                    table_key: "edge:Knows".to_string(),
1982                    rows_loaded: 1
1983                },
1984                IngestTableResult {
1985                    table_key: "edge:WorksAt".to_string(),
1986                    rows_loaded: 1
1987                },
1988                IngestTableResult {
1989                    table_key: "node:Company".to_string(),
1990                    rows_loaded: 1
1991                },
1992                IngestTableResult {
1993                    table_key: "node:Person".to_string(),
1994                    rows_loaded: 2
1995                },
1996            ]
1997        );
1998        assert!(
1999            db.branch_list()
2000                .await
2001                .unwrap()
2002                .contains(&"feature".to_string())
2003        );
2004    }
2005
2006    #[tokio::test]
2007    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
2008        let dir = tempfile::tempdir().unwrap();
2009        let uri = dir.path().to_str().unwrap();
2010        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2011        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
2012            .await
2013            .unwrap();
2014        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
2015            .await
2016            .unwrap();
2017
2018        let result = db
2019            .ingest(
2020                "feature",
2021                Some("missing-base"),
2022                r#"{"type":"Person","data":{"name":"Bob","age":26}}
2023{"type":"Person","data":{"name":"Eve","age":31}}"#,
2024                LoadMode::Merge,
2025            )
2026            .await
2027            .unwrap();
2028
2029        assert_eq!(result.branch, "feature");
2030        assert_eq!(result.base_branch, "missing-base");
2031        assert!(!result.branch_created);
2032        assert_eq!(result.mode, LoadMode::Merge);
2033        assert_eq!(
2034            result.tables,
2035            vec![IngestTableResult {
2036                table_key: "node:Person".to_string(),
2037                rows_loaded: 2
2038            }]
2039        );
2040
2041        let snap = db
2042            .snapshot_of(crate::db::ReadTarget::branch("feature"))
2043            .await
2044            .unwrap();
2045        let person_ds = snap.open("node:Person").await.unwrap();
2046        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2047
2048        let batches: Vec<RecordBatch> = person_ds
2049            .scan()
2050            .try_into_stream()
2051            .await
2052            .unwrap()
2053            .try_collect()
2054            .await
2055            .unwrap();
2056        let mut ages_by_id = HashMap::new();
2057        for batch in &batches {
2058            let ids = batch
2059                .column_by_name("id")
2060                .unwrap()
2061                .as_any()
2062                .downcast_ref::<StringArray>()
2063                .unwrap();
2064            let ages = batch
2065                .column_by_name("age")
2066                .unwrap()
2067                .as_any()
2068                .downcast_ref::<Int32Array>()
2069                .unwrap();
2070            for idx in 0..ids.len() {
2071                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2072            }
2073        }
2074
2075        assert_eq!(ages_by_id.get("Bob"), Some(&26));
2076        assert_eq!(ages_by_id.get("Eve"), Some(&31));
2077        assert_eq!(ages_by_id.get("Alice"), Some(&30));
2078    }
2079
2080    #[tokio::test]
2081    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2082        let dir = tempfile::tempdir().unwrap();
2083        let uri = dir.path().to_str().unwrap();
2084        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2085
2086        db.ingest_as(
2087            "feature",
2088            Some("main"),
2089            TEST_DATA,
2090            LoadMode::Overwrite,
2091            Some("act-andrew"),
2092        )
2093        .await
2094        .unwrap();
2095
2096        let head = db
2097            .list_commits(Some("feature"))
2098            .await
2099            .unwrap()
2100            .into_iter()
2101            .last()
2102            .unwrap();
2103        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2104    }
2105
2106    #[test]
2107    fn test_range_constraint_rejects_nan() {
2108        use arrow_array::{Float64Array, RecordBatch, StringArray};
2109        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2110        use std::sync::Arc;
2111
2112        let schema = Arc::new(arrow_schema::Schema::new(vec![
2113            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2114            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2115        ]));
2116
2117        let batch = RecordBatch::try_new(
2118            schema.clone(),
2119            vec![
2120                Arc::new(StringArray::from(vec!["bad"])),
2121                Arc::new(Float64Array::from(vec![f64::NAN])),
2122            ],
2123        )
2124        .unwrap();
2125
2126        let node_type = NodeType {
2127            name: "Test".to_string(),
2128            implements: vec![],
2129            properties: Default::default(),
2130            key: None,
2131            unique_constraints: vec![],
2132            indices: vec![],
2133            range_constraints: vec![RangeConstraint {
2134                property: "score".to_string(),
2135                min: Some(LiteralValue::Float(0.0)),
2136                max: Some(LiteralValue::Float(1.0)),
2137            }],
2138            check_constraints: vec![],
2139            embed_sources: Default::default(),
2140            blob_properties: Default::default(),
2141            arrow_schema: schema,
2142        };
2143
2144        let result = validate_value_constraints(&batch, &node_type);
2145        assert!(result.is_err(), "expected NaN to be rejected");
2146        let err = result.unwrap_err().to_string();
2147        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2148    }
2149}