Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26/// Result of a load operation.
27#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29    /// Branch the load landed on (`"main"` when no branch was given).
30    pub branch: String,
31    /// Base branch a fork was requested from (the `base` parameter of
32    /// `load_as`), recorded verbatim even when the target branch already
33    /// existed and no fork happened.
34    pub base_branch: Option<String>,
35    /// True when this load created `branch` by forking it from `base_branch`.
36    pub branch_created: bool,
37    pub nodes_loaded: HashMap<String, usize>,
38    pub edges_loaded: HashMap<String, usize>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct IngestTableResult {
43    pub table_key: String,
44    pub rows_loaded: usize,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct IngestResult {
49    pub branch: String,
50    pub base_branch: String,
51    pub branch_created: bool,
52    pub mode: LoadMode,
53    pub tables: Vec<IngestTableResult>,
54}
55
56/// Load mode for data ingestion.
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum LoadMode {
60    /// Overwrite existing data.
61    Overwrite,
62    /// Append to existing data.
63    Append,
64    /// Merge by `id` key (upsert).
65    Merge,
66}
67
68/// Convenience: load JSONL data onto the database handle's *active branch*
69/// (`main` when unbound). Equivalent to `db.load(active_branch, data, mode)`;
70/// use `Omnigraph::load`/`load_as` directly when targeting an explicit branch
71/// or when fork-from-base semantics are needed.
72pub async fn load_jsonl(db: &Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
73    let current_branch = db.active_branch().await;
74    let branch = current_branch.as_deref().unwrap_or("main");
75    db.load(branch, data, mode).await
76}
77
78/// Convenience: like [`load_jsonl`] but reading from a file path.
79pub async fn load_jsonl_file(db: &Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
80    let current_branch = db.active_branch().await;
81    let branch = current_branch.as_deref().unwrap_or("main");
82    db.load_file(branch, path, mode).await
83}
84
85impl Omnigraph {
86    #[deprecated(
87        note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
88    )]
89    pub async fn ingest(
90        &self,
91        branch: &str,
92        from: Option<&str>,
93        data: &str,
94        mode: LoadMode,
95    ) -> Result<IngestResult> {
96        #[allow(deprecated)]
97        self.ingest_as(branch, from, data, mode, None).await
98    }
99
100    /// Deprecated shim over the unified `load_as`. Preserves the historical
101    /// ingest contract exactly: `from: None` means fork from `main`, and the
102    /// base branch is recorded in the result even when the target branch
103    /// already existed (no fork happened).
104    #[deprecated(
105        note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
106    )]
107    pub async fn ingest_as(
108        &self,
109        branch: &str,
110        from: Option<&str>,
111        data: &str,
112        mode: LoadMode,
113        actor_id: Option<&str>,
114    ) -> Result<IngestResult> {
115        let result = self
116            .load_as(branch, Some(from.unwrap_or("main")), data, mode, actor_id)
117            .await?;
118        Ok(IngestResult {
119            branch: result.branch.clone(),
120            base_branch: result
121                .base_branch
122                .clone()
123                .unwrap_or_else(|| "main".to_string()),
124            branch_created: result.branch_created,
125            mode,
126            tables: result.to_ingest_tables(),
127        })
128    }
129
130    #[deprecated(
131        note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
132    )]
133    pub async fn ingest_file(
134        &self,
135        branch: &str,
136        from: Option<&str>,
137        path: &str,
138        mode: LoadMode,
139    ) -> Result<IngestResult> {
140        #[allow(deprecated)]
141        self.ingest_file_as(branch, from, path, mode, None).await
142    }
143
144    #[deprecated(
145        note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
146    )]
147    pub async fn ingest_file_as(
148        &self,
149        branch: &str,
150        from: Option<&str>,
151        path: &str,
152        mode: LoadMode,
153        actor_id: Option<&str>,
154    ) -> Result<IngestResult> {
155        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
156        #[allow(deprecated)]
157        self.ingest_as(branch, from, &data, mode, actor_id).await
158    }
159
160    pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
161        self.load_as(branch, None, data, mode, None).await
162    }
163
164    /// Load JSONL data onto `branch`.
165    ///
166    /// `base` selects the branch-creation behavior: with `Some(base)`, a
167    /// missing target branch is forked from `base` first (the former
168    /// `ingest` semantics); with `None`, the target branch must already
169    /// exist — staging fails on an unknown branch when it resolves the
170    /// manifest snapshot, so a typo'd branch name can never create one.
171    pub async fn load_as(
172        &self,
173        branch: &str,
174        base: Option<&str>,
175        data: &str,
176        mode: LoadMode,
177        actor_id: Option<&str>,
178    ) -> Result<LoadResult> {
179        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
180        // `Branch(branch)` to match the HTTP-layer Change convention.
181        // When a fork happens below, `branch_create_from_as` additionally
182        // checks `BranchCreate` — both authorities are genuinely needed
183        // for "load into a fresh branch", so the layered check is
184        // correct, not redundant.
185        self.enforce(
186            omnigraph_policy::PolicyAction::Change,
187            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
188            actor_id,
189        )?;
190        // Schema-contract validation is captured ONCE per write via the
191        // `WriteTxn` opened in `load_jsonl_reader` (after branch resolution).
192        // The redundant `ensure_schema_state_valid` that used to run here is
193        // subsumed by `open_write_txn`'s `resolved_branch_target` call.
194        // Converge any pending recovery sidecar (a previously failed
195        // writer's Phase B → Phase C residual) before staging anything:
196        // without this, sidecar-covered drift wedges every load on the
197        // commit-time drift guard until a process restart — `repair`
198        // refuses while a sidecar is pending. One `list_dir` when no
199        // sidecars exist (the steady state).
200        self.heal_pending_recovery_sidecars().await?;
201        // Reject internal `__run__*` / system-prefixed branches at the
202        // public write boundary. Direct-publish paths assert this
203        // explicitly so a caller can't write to legacy or system
204        // staging branches by passing the prefix verbatim.
205        crate::db::ensure_public_branch_ref(branch, "load")?;
206        // Branch convention: `None` represents `main`. Re-normalizing to
207        // `Some("main")` here would route the publisher commit through a
208        // separate coordinator (the cross-branch path in
209        // `commit_prepared_updates_on_branch_with_expected`) and leave
210        // `self.coordinator` with a stale manifest snapshot.
211        let requested = Self::normalize_branch_name(branch)?;
212        let base_branch = match base {
213            Some(base) => {
214                Some(Self::normalize_branch_name(base)?.unwrap_or_else(|| "main".to_string()))
215            }
216            None => None,
217        };
218        // Fork-if-missing only when a base branch was explicitly given.
219        // `requested == None` is `main`, which always exists.
220        let mut branch_created = false;
221        if let (Some(target), Some(base_name)) = (requested.as_deref(), base_branch.as_deref()) {
222            let exists = self.branch_list().await?.iter().any(|name| name == target);
223            if !exists {
224                // Thread the actor through to the implicit BranchCreate so
225                // policy decisions match what an explicit `branch_create_from_as`
226                // call would see. Calling the no-actor variant here would
227                // bypass BranchCreate enforcement when policy is installed —
228                // the footgun guard catches that case too, but threading is
229                // the correct fix.
230                self.branch_create_from_as(
231                    crate::db::ReadTarget::branch(base_name),
232                    target,
233                    actor_id,
234                )
235                .await?;
236                branch_created = true;
237            }
238        }
239        // Direct-to-target writes: no Run state machine, no `__run__` staging
240        // branch. Cross-table OCC is enforced by the publisher's
241        // `expected_table_versions` CAS inside `load_jsonl_reader`.
242        let mut result = self
243            .load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
244            .await?;
245        result.branch = requested.unwrap_or_else(|| "main".to_string());
246        result.base_branch = base_branch;
247        result.branch_created = branch_created;
248        Ok(result)
249    }
250
251    pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
252        self.load_file_as(branch, None, path, mode, None).await
253    }
254
255    /// Read a file into memory and delegate to `load_as`. Used by the
256    /// CLI's `omnigraph load` so file-path-based writes flow through
257    /// the same engine-layer policy gate as in-memory `load_as` calls.
258    pub async fn load_file_as(
259        &self,
260        branch: &str,
261        base: Option<&str>,
262        path: &str,
263        mode: LoadMode,
264        actor_id: Option<&str>,
265    ) -> Result<LoadResult> {
266        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
267        self.load_as(branch, base, &data, mode, actor_id).await
268    }
269
270    async fn load_direct_on_branch(
271        &self,
272        branch: Option<&str>,
273        data: &str,
274        mode: LoadMode,
275        actor_id: Option<&str>,
276    ) -> Result<LoadResult> {
277        let reader = BufReader::new(Cursor::new(data.as_bytes()));
278        load_jsonl_reader(self, branch, reader, mode, actor_id).await
279    }
280}
281
282impl LoadMode {
283    pub fn as_str(self) -> &'static str {
284        match self {
285            LoadMode::Overwrite => "overwrite",
286            LoadMode::Append => "append",
287            LoadMode::Merge => "merge",
288        }
289    }
290}
291
292impl LoadResult {
293    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
294        let mut tables = self
295            .nodes_loaded
296            .iter()
297            .map(|(type_name, rows_loaded)| IngestTableResult {
298                table_key: format!("node:{type_name}"),
299                rows_loaded: *rows_loaded,
300            })
301            .chain(
302                self.edges_loaded
303                    .iter()
304                    .map(|(edge_name, rows_loaded)| IngestTableResult {
305                        table_key: format!("edge:{edge_name}"),
306                        rows_loaded: *rows_loaded,
307                    }),
308            )
309            .collect::<Vec<_>>();
310        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
311        tables
312    }
313}
314
315async fn load_jsonl_reader<R: BufRead>(
316    db: &Omnigraph,
317    branch: Option<&str>,
318    reader: R,
319    mode: LoadMode,
320    actor_id: Option<&str>,
321) -> Result<LoadResult> {
322    let catalog = db.catalog().clone();
323
324    // Phase 1: Parse all lines, spool into per-type collections
325    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
326    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
327
328    // Parse a stream of JSON values. Accepts both compact JSONL (one object
329    // per line) and pretty-printed JSON where a single object spans multiple
330    // lines — serde's streaming deserializer treats any whitespace (including
331    // newlines) between top-level values as a separator.
332    for (idx, parsed) in serde_json::Deserializer::from_reader(reader)
333        .into_iter::<JsonValue>()
334        .enumerate()
335    {
336        let record_num = idx + 1;
337        let value: JsonValue = parsed.map_err(|e| {
338            OmniError::manifest(format!("invalid JSON at record {}: {}", record_num, e))
339        })?;
340
341        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
342            if !catalog.node_types.contains_key(type_name) {
343                return Err(OmniError::manifest(format!(
344                    "record {}: unknown node type '{}'",
345                    record_num, type_name
346                )));
347            }
348            let data = value
349                .get("data")
350                .cloned()
351                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
352            node_rows
353                .entry(type_name.to_string())
354                .or_default()
355                .push(data);
356        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
357            if catalog.lookup_edge_by_name(edge_name).is_none() {
358                return Err(OmniError::manifest(format!(
359                    "record {}: unknown edge type '{}'",
360                    record_num, edge_name
361                )));
362            }
363            let from = value
364                .get("from")
365                .and_then(|v| v.as_str())
366                .ok_or_else(|| {
367                    OmniError::manifest(format!("record {}: edge missing 'from'", record_num))
368                })?
369                .to_string();
370            let to = value
371                .get("to")
372                .and_then(|v| v.as_str())
373                .ok_or_else(|| {
374                    OmniError::manifest(format!("record {}: edge missing 'to'", record_num))
375                })?
376                .to_string();
377            let data = value
378                .get("data")
379                .cloned()
380                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
381            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
382            edge_rows
383                .entry(canonical)
384                .or_default()
385                .push((from, to, data));
386        } else {
387            return Err(OmniError::manifest(format!(
388                "record {}: expected 'type' or 'edge' field",
389                record_num
390            )));
391        }
392    }
393
394    // Phase 2: Build per-type RecordBatches and accumulate into the
395    // staging pipeline. Batches go into an in-memory accumulator and a
396    // single `stage_*` + `commit_staged` per touched table runs at
397    // end-of-load — a mid-load failure (RI / cardinality violation) leaves
398    // Lance HEAD untouched. `LoadMode::Overwrite` uses Lance's staged
399    // `Overwrite` transaction rather than the former truncate-then-append
400    // inline path.
401
402    let mut result = LoadResult::default();
403    // Capture-once write transaction (RFC-013 step 3b). `open_write_txn`
404    // validates the schema contract ONCE and pins the base snapshot. Threaded
405    // as `Some(&txn)` through the per-table opens and the manifest publish so
406    // each resolve point reuses the pinned base instead of re-validating the
407    // contract. The branch already exists here (fork-if-missing ran in
408    // `load_as` before this), so this captures the post-fork snapshot. The
409    // load's own base read (`db.snapshot_for_branch` previously) is the same
410    // per-branch snapshot, so reuse `txn.base` for it — dropping a validation.
411    let txn = db.open_write_txn(branch).await?;
412    let snapshot = txn.base.clone();
413    let mut staging = MutationStaging::default();
414    let pending_mode = match mode {
415        LoadMode::Merge => PendingMode::Merge,
416        // Append-mode loads accumulate as Append. Edge tables (no @key)
417        // and no-key node tables stay safe on the stage_append path. The
418        // Merge mode applies dedupe-by-id; Append assumes unique inputs.
419        LoadMode::Append => PendingMode::Append,
420        LoadMode::Overwrite => PendingMode::Overwrite,
421    };
422    // Map LoadMode to MutationOpKind for the version-check policy.
423    // Append/Merge skip the strict pre-stage check (concurrency-safe
424    // under the per-(table, branch) queue + publisher CAS); Overwrite
425    // uses the strict check because it truncates and replaces the
426    // dataset — concurrent advances change what "replace" means.
427    let load_op_kind = match mode {
428        LoadMode::Append => crate::db::MutationOpKind::Insert,
429        LoadMode::Merge => crate::db::MutationOpKind::Merge,
430        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
431    };
432
433    // Up-front fork-queue acquisition. The first write to a table on a
434    // non-main branch forks it (create_branch), which advances Lance state
435    // before the manifest publish; the reclaim of any manifest-unreferenced
436    // leftover (`reclaim_orphaned_fork_and_refork`) must not race a concurrent
437    // in-process fork. So when this load will fork at least one touched table,
438    // acquire the per-(table, branch) write queues for ALL touched tables up
439    // front (one sorted `acquire_many`, keyed uniformly by the target branch
440    // so it covers what `commit_all` recomputes) and hold them through the
441    // publish. Main-branch loads never fork; branch loads where every touched
442    // table is already forked skip this and let `commit_all` acquire at commit.
443    let fork_queue_guards: Option<(
444        Vec<(String, Option<String>)>,
445        Vec<tokio::sync::OwnedMutexGuard<()>>,
446    )> = if let Some(active) = branch {
447        let touched: Vec<(String, Option<String>)> = node_rows
448            .keys()
449            .map(|t| (format!("node:{t}"), Some(active.to_string())))
450            .chain(
451                edge_rows
452                    .keys()
453                    .map(|e| (format!("edge:{e}"), Some(active.to_string()))),
454            )
455            .collect();
456        let needs_fork = touched.iter().any(|(table_key, _)| {
457            snapshot
458                .entry(table_key)
459                .map(|e| e.table_branch.as_deref() != Some(active))
460                .unwrap_or(false)
461        });
462        if needs_fork {
463            let guards = db.write_queue().acquire_many(&touched).await;
464            Some((touched, guards))
465        } else {
466            None
467        }
468    } else {
469        None
470    };
471
472    // Phase 2a: build and validate every node batch up front. Cheap and
473    // synchronous — surfaces validation errors before any S3 traffic.
474    let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
475        Vec::with_capacity(node_rows.len());
476    for (type_name, rows) in &node_rows {
477        let node_type = &catalog.node_types[type_name];
478        let batch = build_node_batch(node_type, rows)?;
479        validate_value_constraints(&batch, node_type)?;
480        validate_enum_constraints(&batch, &node_type.properties, type_name)?;
481        let unique_groups = unique_constraint_groups_for_node(node_type);
482        if !unique_groups.is_empty() {
483            enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?;
484        }
485        let loaded_count = batch.num_rows();
486        let table_key = format!("node:{}", type_name);
487        let _entry = snapshot
488            .entry(&table_key)
489            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
490        prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
491    }
492
493    // Phase 2b: accumulate every node type in memory. Fragment writes are
494    // delayed until after all validation succeeds.
495    for (type_name, table_key, batch, loaded_count) in prepared_nodes {
496        // The loader only needs the captured expected version (the publisher's
497        // CAS fence) for `ensure_path` — it discards the handle. With a
498        // non-strict load op (Merge/Append) and a `WriteTxn`, collapse #1 skips
499        // the dataset open and returns the pinned base version directly.
500        let opened = db
501            .open_for_mutation_on_branch(branch, &table_key, load_op_kind, Some(&txn))
502            .await?;
503        staging.ensure_path(
504            &table_key,
505            opened.full_path,
506            opened.table_branch,
507            opened.expected_version,
508            load_op_kind,
509        );
510        let schema = batch.schema();
511        staging.append_batch(&table_key, schema, pending_mode, batch)?;
512        result.nodes_loaded.insert(type_name, loaded_count);
513    }
514
515    // Phase 2c: Validate edge referential integrity — every src/dst must
516    // reference an existing node ID in the appropriate type. For
517    // Append/Merge the lookup unions snapshot-committed IDs with the
518    // in-memory pending batches. For Overwrite, a touched node table's
519    // pending batch is the replacement image, so committed rows are not
520    // included for that table.
521    for (edge_name, rows) in &edge_rows {
522        let edge_type = &catalog.edge_types[edge_name];
523        let from_ids =
524            collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?;
525        let to_ids =
526            collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?;
527
528        for (i, (src, dst, _)) in rows.iter().enumerate() {
529            if !from_ids.contains(src.as_str()) {
530                return Err(OmniError::manifest(format!(
531                    "edge {} row {}: src '{}' not found in {}",
532                    edge_name,
533                    i + 1,
534                    src,
535                    edge_type.from_type
536                )));
537            }
538            if !to_ids.contains(dst.as_str()) {
539                return Err(OmniError::manifest(format!(
540                    "edge {} row {}: dst '{}' not found in {}",
541                    edge_name,
542                    i + 1,
543                    dst,
544                    edge_type.to_type
545                )));
546            }
547        }
548    }
549
550    // Phase 2d: build edge batches.
551    let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
552        Vec::with_capacity(edge_rows.len());
553    for (edge_name, rows) in &edge_rows {
554        let edge_type = &catalog.edge_types[edge_name];
555        let batch = build_edge_batch(edge_type, rows)?;
556        validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
557        let unique_groups = unique_constraint_groups_for_edge(edge_type);
558        if !unique_groups.is_empty() {
559            enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?;
560        }
561        let loaded_count = batch.num_rows();
562        let table_key = format!("edge:{}", edge_name);
563        let _entry = snapshot
564            .entry(&table_key)
565            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
566        prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
567    }
568
569    // Phase 2e: accumulate every edge type. Same dispatch as Phase 2b.
570    for (edge_name, table_key, batch, loaded_count) in prepared_edges {
571        // Same as the node phase: only the captured expected version is used;
572        // collapse #1 skips the open for a non-strict load op under a `WriteTxn`.
573        let opened = db
574            .open_for_mutation_on_branch(branch, &table_key, load_op_kind, Some(&txn))
575            .await?;
576        staging.ensure_path(
577            &table_key,
578            opened.full_path,
579            opened.table_branch,
580            opened.expected_version,
581            load_op_kind,
582        );
583        let schema = batch.schema();
584        staging.append_batch(&table_key, schema, pending_mode, batch)?;
585        result.edges_loaded.insert(edge_name, loaded_count);
586    }
587
588    // Phase 3: Validate edge cardinality constraints (before commit —
589    // invalid data must not be committed). The helper scans committed
590    // edges via Lance + iterates pending edges in-memory; for Overwrite it
591    // treats the pending edge batches as the replacement table image.
592    for (edge_name, _) in &edge_rows {
593        let edge_type = &catalog.edge_types[edge_name];
594        let table_key = format!("edge:{}", edge_name);
595        validate_edge_cardinality_with_pending_loader(
596            db, branch, edge_type, &table_key, &staging, mode,
597        )
598        .await?;
599    }
600
601    // Phase 4: Atomic manifest commit with publisher-level OCC.
602    let staged = staging
603        .stage_all_with_concurrency(db, branch, load_write_concurrency())
604        .await?;
605    // `_queue_guards` holds per-(table_key, branch) write queues
606    // across the manifest publish below — see exec/mutation.rs for
607    // the rationale (interleaving prevention).
608    let crate::exec::staging::CommittedMutation {
609        updates,
610        expected_versions,
611        sidecar_handle,
612        guards: _queue_guards,
613        committed_handles,
614    } = staged
615        .commit_all(
616            db,
617            branch,
618            crate::db::manifest::SidecarKind::Load,
619            actor_id,
620            fork_queue_guards,
621            Some(&txn),
622        )
623        .await?;
624    // Same finalize → publisher residual as mutations: per-table
625    // staged commits have advanced Lance HEAD, but the manifest
626    // publish has not run yet. Reuse the mutation failpoint name so
627    // one failpoint pins the shared `MutationStaging` boundary.
628    crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_POST_FINALIZE_PRE_PUBLISHER)?;
629    db.commit_updates_on_branch_with_expected(
630        branch,
631        &updates,
632        &expected_versions,
633        actor_id,
634        Some(&txn),
635        committed_handles,
636    )
637    .await?;
638    // The recovery sidecar protects the per-table commit_staged →
639    // manifest publish window. Phase C succeeded — clean up
640    // best-effort: failing the user here would error out a write
641    // that already landed durably.
642    if let Some(handle) = sidecar_handle {
643        if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
644            tracing::warn!(
645                error = %err,
646                operation_id = handle.operation_id.as_str(),
647                "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
648            );
649        }
650    }
651
652    Ok(result)
653}
654
655fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
656    let schema = node_type.arrow_schema.clone();
657
658    // Build id column: explicit id, @key value, or generated ULID.
659    let ids: Vec<String> = rows
660        .iter()
661        .map(|row| {
662            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
663            if let Some(key_prop) = node_type.key_property() {
664                let key_value = row
665                    .get(key_prop)
666                    .and_then(|v| v.as_str())
667                    .map(|s| s.to_string())
668                    .ok_or_else(|| {
669                        OmniError::manifest(format!(
670                            "node {} missing @key property '{}'",
671                            node_type.name, key_prop
672                        ))
673                    })?;
674                if let Some(explicit_id) = explicit_id {
675                    if explicit_id != key_value {
676                        return Err(OmniError::manifest(format!(
677                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
678                            node_type.name, explicit_id, key_prop, key_value
679                        )));
680                    }
681                }
682                Ok(key_value)
683            } else if let Some(explicit_id) = explicit_id {
684                Ok(explicit_id)
685            } else {
686                Ok(generate_id())
687            }
688        })
689        .collect::<Result<Vec<_>>>()?;
690
691    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
692    columns.push(Arc::new(StringArray::from(ids)));
693
694    // Build property columns (skip "id" field at index 0)
695    for field in schema.fields().iter().skip(1) {
696        if node_type.blob_properties.contains(field.name()) {
697            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
698            columns.push(col);
699        } else {
700            let col =
701                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
702            columns.push(col);
703        }
704    }
705
706    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
707}
708
709fn build_edge_batch(
710    edge_type: &omnigraph_compiler::catalog::EdgeType,
711    rows: &[(String, String, JsonValue)],
712) -> Result<RecordBatch> {
713    let schema = edge_type.arrow_schema.clone();
714
715    let ids: Vec<String> = rows
716        .iter()
717        .map(|(_, _, data)| {
718            data.get("id")
719                .and_then(|v| v.as_str())
720                .map(str::to_string)
721                .unwrap_or_else(generate_id)
722        })
723        .collect();
724    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
725    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
726
727    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
728    columns.push(Arc::new(StringArray::from(ids)));
729    columns.push(Arc::new(StringArray::from(srcs)));
730    columns.push(Arc::new(StringArray::from(dsts)));
731
732    // Build edge property columns (skip id, src, dst at indices 0-2)
733    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
734    for field in schema.fields().iter().skip(3) {
735        if edge_type.blob_properties.contains(field.name()) {
736            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
737            columns.push(col);
738        } else {
739            let col = build_column_from_json(
740                field.name(),
741                field.data_type(),
742                field.is_nullable(),
743                &data_values,
744            )?;
745            columns.push(col);
746        }
747    }
748
749    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
750}
751
752/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
753pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
754    if let Some(encoded) = value.strip_prefix("base64:") {
755        let bytes = base64::engine::general_purpose::STANDARD
756            .decode(encoded)
757            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
758        builder
759            .push_bytes(bytes)
760            .map_err(|e| OmniError::Lance(e.to_string()))
761    } else {
762        // Treat as URI (file://, s3://, gs://, or any other scheme)
763        builder
764            .push_uri(value)
765            .map_err(|e| OmniError::Lance(e.to_string()))
766    }
767}
768
769/// Build a blob column from JSON values using Lance BlobArrayBuilder.
770fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
771    let mut builder = BlobArrayBuilder::new(rows.len());
772    for row in rows {
773        match row.get(name) {
774            Some(JsonValue::String(s)) => {
775                append_blob_value(&mut builder, s)?;
776            }
777            Some(JsonValue::Null) | None if nullable => {
778                builder
779                    .push_null()
780                    .map_err(|e| OmniError::Lance(e.to_string()))?;
781            }
782            Some(JsonValue::Null) | None => {
783                return Err(OmniError::manifest(format!(
784                    "non-nullable blob property '{}' has null values",
785                    name
786                )));
787            }
788            _ => {
789                return Err(OmniError::manifest(format!(
790                    "blob property '{}' must be a URI string or base64: prefixed data",
791                    name
792                )));
793            }
794        }
795    }
796    builder
797        .finish()
798        .map_err(|e| OmniError::Lance(e.to_string()))
799}
800
801fn build_column_from_json(
802    name: &str,
803    data_type: &DataType,
804    nullable: bool,
805    rows: &[JsonValue],
806) -> Result<ArrayRef> {
807    let array: ArrayRef = match data_type {
808        DataType::Utf8 => {
809            let values: Vec<Option<String>> = rows
810                .iter()
811                .map(|row| {
812                    row.get(name)
813                        .and_then(|v| v.as_str())
814                        .map(|s| s.to_string())
815                })
816                .collect();
817            Arc::new(StringArray::from(values))
818        }
819        DataType::Int32 => {
820            let values: Vec<Option<i32>> = rows
821                .iter()
822                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
823                .collect();
824            Arc::new(Int32Array::from(values))
825        }
826        DataType::Int64 => {
827            let values: Vec<Option<i64>> = rows
828                .iter()
829                .map(|row| row.get(name).and_then(|v| v.as_i64()))
830                .collect();
831            Arc::new(Int64Array::from(values))
832        }
833        DataType::UInt32 => {
834            let values: Vec<Option<u32>> = rows
835                .iter()
836                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
837                .collect();
838            Arc::new(UInt32Array::from(values))
839        }
840        DataType::UInt64 => {
841            let values: Vec<Option<u64>> = rows
842                .iter()
843                .map(|row| row.get(name).and_then(|v| v.as_u64()))
844                .collect();
845            Arc::new(UInt64Array::from(values))
846        }
847        DataType::Float32 => {
848            let values: Vec<Option<f32>> = rows
849                .iter()
850                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
851                .collect();
852            Arc::new(Float32Array::from(values))
853        }
854        DataType::Float64 => {
855            let values: Vec<Option<f64>> = rows
856                .iter()
857                .map(|row| row.get(name).and_then(|v| v.as_f64()))
858                .collect();
859            Arc::new(Float64Array::from(values))
860        }
861        DataType::Boolean => {
862            let values: Vec<Option<bool>> = rows
863                .iter()
864                .map(|row| row.get(name).and_then(|v| v.as_bool()))
865                .collect();
866            Arc::new(BooleanArray::from(values))
867        }
868        DataType::Date32 => {
869            let mut values = Vec::with_capacity(rows.len());
870            for row in rows {
871                values.push(parse_date32_json_value(
872                    row.get(name).unwrap_or(&JsonValue::Null),
873                )?);
874            }
875            Arc::new(Date32Array::from(values))
876        }
877        DataType::Date64 => {
878            let mut values = Vec::with_capacity(rows.len());
879            for row in rows {
880                values.push(parse_date64_json_value(
881                    row.get(name).unwrap_or(&JsonValue::Null),
882                )?);
883            }
884            Arc::new(Date64Array::from(values))
885        }
886        DataType::List(field) => {
887            let mut builder = ListBuilder::with_capacity(
888                make_list_value_builder(field.data_type(), rows.len())?,
889                rows.len(),
890            )
891            .with_field(field.clone());
892            for row in rows {
893                let value = row.get(name).unwrap_or(&JsonValue::Null);
894                if value.is_null() {
895                    builder.append(false);
896                    continue;
897                }
898                let items = value.as_array().ok_or_else(|| {
899                    OmniError::manifest(format!(
900                        "list property '{}' expects a JSON array, got {}",
901                        name, value
902                    ))
903                })?;
904                for item in items {
905                    append_json_list_item(builder.values(), field.data_type(), item)?;
906                }
907                builder.append(true);
908            }
909            Arc::new(builder.finish())
910        }
911        DataType::FixedSizeList(child_field, dim) => {
912            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
913            let dim = *dim;
914            let mut builder = FixedSizeListBuilder::with_capacity(
915                Float32Builder::with_capacity(rows.len() * dim as usize),
916                dim,
917                rows.len(),
918            )
919            .with_field(child_field.clone());
920            for row in rows {
921                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
922                    if arr.len() != dim as usize {
923                        return Err(OmniError::manifest(format!(
924                            "vector property '{}' expects {} dimensions, got {}",
925                            name,
926                            dim,
927                            arr.len()
928                        )));
929                    }
930                    for val in arr {
931                        builder
932                            .values()
933                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
934                    }
935                    builder.append(true);
936                } else if nullable {
937                    for _ in 0..dim as usize {
938                        builder.values().append_null();
939                    }
940                    builder.append(false);
941                } else {
942                    return Err(OmniError::manifest(format!(
943                        "non-nullable vector property '{}' has null values",
944                        name
945                    )));
946                }
947            }
948            Arc::new(builder.finish())
949        }
950        _ => {
951            // Unsupported type: fill with nulls
952            let values: Vec<Option<&str>> = vec![None; rows.len()];
953            Arc::new(StringArray::from(values))
954        }
955    };
956
957    if !nullable && array.null_count() > 0 {
958        return Err(OmniError::manifest(format!(
959            "non-nullable property '{}' has null or invalid values",
960            name
961        )));
962    }
963
964    Ok(array)
965}
966
967fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
968    Ok(match data_type {
969        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
970        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
971        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
972        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
973        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
974        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
975        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
976        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
977        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
978        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
979        other => {
980            return Err(OmniError::manifest(format!(
981                "unsupported list element data type {:?}",
982                other
983            )));
984        }
985    })
986}
987
988fn append_json_list_item(
989    builder: &mut Box<dyn ArrayBuilder>,
990    data_type: &DataType,
991    value: &JsonValue,
992) -> Result<()> {
993    match data_type {
994        DataType::Utf8 => {
995            let builder = builder
996                .as_any_mut()
997                .downcast_mut::<StringBuilder>()
998                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
999            if let Some(value) = value.as_str() {
1000                builder.append_value(value);
1001            } else {
1002                builder.append_null();
1003            }
1004        }
1005        DataType::Boolean => {
1006            let builder = builder
1007                .as_any_mut()
1008                .downcast_mut::<BooleanBuilder>()
1009                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
1010            if let Some(value) = value.as_bool() {
1011                builder.append_value(value);
1012            } else {
1013                builder.append_null();
1014            }
1015        }
1016        DataType::Int32 => {
1017            let builder = builder
1018                .as_any_mut()
1019                .downcast_mut::<Int32Builder>()
1020                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
1021            if let Some(value) = value.as_i64() {
1022                let value = i32::try_from(value).map_err(|_| {
1023                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
1024                })?;
1025                builder.append_value(value);
1026            } else {
1027                builder.append_null();
1028            }
1029        }
1030        DataType::Int64 => {
1031            let builder = builder
1032                .as_any_mut()
1033                .downcast_mut::<Int64Builder>()
1034                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1035            if let Some(value) = value.as_i64() {
1036                builder.append_value(value);
1037            } else {
1038                builder.append_null();
1039            }
1040        }
1041        DataType::UInt32 => {
1042            let builder = builder
1043                .as_any_mut()
1044                .downcast_mut::<UInt32Builder>()
1045                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1046            if let Some(value) = value.as_u64() {
1047                let value = u32::try_from(value).map_err(|_| {
1048                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1049                })?;
1050                builder.append_value(value);
1051            } else {
1052                builder.append_null();
1053            }
1054        }
1055        DataType::UInt64 => {
1056            let builder = builder
1057                .as_any_mut()
1058                .downcast_mut::<UInt64Builder>()
1059                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1060            if let Some(value) = value.as_u64() {
1061                builder.append_value(value);
1062            } else {
1063                builder.append_null();
1064            }
1065        }
1066        DataType::Float32 => {
1067            let builder = builder
1068                .as_any_mut()
1069                .downcast_mut::<Float32Builder>()
1070                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1071            if let Some(value) = value.as_f64() {
1072                builder.append_value(value as f32);
1073            } else {
1074                builder.append_null();
1075            }
1076        }
1077        DataType::Float64 => {
1078            let builder = builder
1079                .as_any_mut()
1080                .downcast_mut::<Float64Builder>()
1081                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1082            if let Some(value) = value.as_f64() {
1083                builder.append_value(value);
1084            } else {
1085                builder.append_null();
1086            }
1087        }
1088        DataType::Date32 => {
1089            let builder = builder
1090                .as_any_mut()
1091                .downcast_mut::<Date32Builder>()
1092                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1093            if let Some(value) = parse_date32_json_value(value)? {
1094                builder.append_value(value);
1095            } else {
1096                builder.append_null();
1097            }
1098        }
1099        DataType::Date64 => {
1100            let builder = builder
1101                .as_any_mut()
1102                .downcast_mut::<Date64Builder>()
1103                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1104            if let Some(value) = parse_date64_json_value(value)? {
1105                builder.append_value(value);
1106            } else {
1107                builder.append_null();
1108            }
1109        }
1110        other => {
1111            return Err(OmniError::manifest(format!(
1112                "unsupported list element data type {:?}",
1113                other
1114            )));
1115        }
1116    }
1117
1118    Ok(())
1119}
1120
1121fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1122    if value.is_null() {
1123        return Ok(None);
1124    }
1125    if let Some(days) = value.as_i64() {
1126        let days = i32::try_from(days)
1127            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1128        return Ok(Some(days));
1129    }
1130    if let Some(days) = value.as_u64() {
1131        let days = i32::try_from(days)
1132            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1133        return Ok(Some(days));
1134    }
1135    if let Some(value) = value.as_str() {
1136        return Ok(Some(parse_date32_literal(value)?));
1137    }
1138    Ok(None)
1139}
1140
1141fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1142    if value.is_null() {
1143        return Ok(None);
1144    }
1145    if let Some(ms) = value.as_i64() {
1146        return Ok(Some(ms));
1147    }
1148    if let Some(ms) = value.as_u64() {
1149        let ms = i64::try_from(ms)
1150            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1151        return Ok(Some(ms));
1152    }
1153    if let Some(value) = value.as_str() {
1154        return Ok(Some(parse_date64_literal(value)?));
1155    }
1156    Ok(None)
1157}
1158
1159/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
1160/// How many per-type Lance writes to run concurrently during a load.
1161///
1162/// Each write is an independent S3 manifest + fragment write against a
1163/// different table. Ops within a single table must still be serial (Lance
1164/// OCC on the manifest), but cross-table writes have no shared state.
1165///
1166/// 8 is a conservative default — enough to overlap S3 round-trip latency
1167/// across the typical 10-30 table schemas without flooding the runtime.
1168/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
1169const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1170
1171fn load_write_concurrency() -> usize {
1172    std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1173        .ok()
1174        .and_then(|v| v.parse::<usize>().ok())
1175        .filter(|v| *v > 0)
1176        .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1177}
1178
1179fn generate_id() -> String {
1180    ulid::Ulid::new().to_string()
1181}
1182
1183pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1184    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1185    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1186        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1187    let out = casted
1188        .as_any()
1189        .downcast_ref::<Date32Array>()
1190        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1191    if out.is_null(0) {
1192        return Err(OmniError::manifest(format!(
1193            "invalid Date literal '{}'",
1194            value
1195        )));
1196    }
1197    Ok(out.value(0))
1198}
1199
1200pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1201    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1202    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1203        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1204    let out = casted
1205        .as_any()
1206        .downcast_ref::<Date64Array>()
1207        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1208    if out.is_null(0) {
1209        return Err(OmniError::manifest(format!(
1210            "invalid DateTime literal '{}'",
1211            value
1212        )));
1213    }
1214    Ok(out.value(0))
1215}
1216
1217// ─── Value constraint validation ─────────────────────────────────────────────
1218
1219pub(crate) fn validate_value_constraints(
1220    batch: &RecordBatch,
1221    node_type: &omnigraph_compiler::catalog::NodeType,
1222) -> Result<()> {
1223    use arrow_array::Array;
1224
1225    // Range constraints
1226    for rc in &node_type.range_constraints {
1227        let Some(col) = batch.column_by_name(&rc.property) else {
1228            continue;
1229        };
1230        for row in 0..batch.num_rows() {
1231            if col.is_null(row) {
1232                continue;
1233            }
1234            let value = extract_numeric_value(col, row);
1235            if let Some(val) = value {
1236                if val.is_nan() {
1237                    return Err(OmniError::manifest(format!(
1238                        "@range violation on {}.{}: value is NaN",
1239                        node_type.name, rc.property
1240                    )));
1241                }
1242                if let Some(ref min) = rc.min {
1243                    let min_f = literal_value_to_f64(min);
1244                    if val < min_f {
1245                        return Err(OmniError::manifest(format!(
1246                            "@range violation on {}.{}: value {} < min {}",
1247                            node_type.name, rc.property, val, min_f
1248                        )));
1249                    }
1250                }
1251                if let Some(ref max) = rc.max {
1252                    let max_f = literal_value_to_f64(max);
1253                    if val > max_f {
1254                        return Err(OmniError::manifest(format!(
1255                            "@range violation on {}.{}: value {} > max {}",
1256                            node_type.name, rc.property, val, max_f
1257                        )));
1258                    }
1259                }
1260            }
1261        }
1262    }
1263
1264    // Check constraints (regex)
1265    for cc in &node_type.check_constraints {
1266        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1267            OmniError::manifest(format!(
1268                "@check on {}.{} has invalid regex '{}': {}",
1269                node_type.name, cc.property, cc.pattern, e
1270            ))
1271        })?;
1272        let Some(col) = batch.column_by_name(&cc.property) else {
1273            continue;
1274        };
1275        let str_col = col.as_any().downcast_ref::<StringArray>();
1276        if let Some(str_col) = str_col {
1277            for row in 0..str_col.len() {
1278                if str_col.is_null(row) {
1279                    continue;
1280                }
1281                let val = str_col.value(row);
1282                if !re.is_match(val) {
1283                    return Err(OmniError::manifest(format!(
1284                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1285                        node_type.name, cc.property, val, cc.pattern
1286                    )));
1287                }
1288            }
1289        }
1290    }
1291
1292    Ok(())
1293}
1294
1295/// Validate that every enum-typed property in `properties` only contains values
1296/// from its declared enum value set. Operates on a single `RecordBatch` so it
1297/// can be called from any write path that already holds a batch.
1298///
1299/// Scalar string enums are checked directly. List-of-enum properties are
1300/// checked element-by-element across the underlying string values.
1301pub(crate) fn validate_enum_constraints(
1302    batch: &RecordBatch,
1303    properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1304    type_name: &str,
1305) -> Result<()> {
1306    use arrow_array::{Array, ListArray};
1307
1308    for (prop_name, prop_type) in properties {
1309        let Some(allowed) = prop_type.enum_values.as_ref() else {
1310            continue;
1311        };
1312        let Some(col) = batch.column_by_name(prop_name) else {
1313            continue;
1314        };
1315        if prop_type.list {
1316            let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1317                continue;
1318            };
1319            for row in 0..list_col.len() {
1320                if list_col.is_null(row) {
1321                    continue;
1322                }
1323                let item_arr = list_col.value(row);
1324                let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1325                    continue;
1326                };
1327                for i in 0..str_arr.len() {
1328                    if str_arr.is_null(i) {
1329                        continue;
1330                    }
1331                    let val = str_arr.value(i);
1332                    if !allowed.iter().any(|a| a.as_str() == val) {
1333                        return Err(OmniError::manifest(format!(
1334                            "invalid enum value '{}' for {}.{} (expected: {})",
1335                            val,
1336                            type_name,
1337                            prop_name,
1338                            allowed.join(", ")
1339                        )));
1340                    }
1341                }
1342            }
1343        } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1344            for row in 0..str_col.len() {
1345                if str_col.is_null(row) {
1346                    continue;
1347                }
1348                let val = str_col.value(row);
1349                if !allowed.iter().any(|a| a.as_str() == val) {
1350                    return Err(OmniError::manifest(format!(
1351                        "invalid enum value '{}' for {}.{} (expected: {})",
1352                        val,
1353                        type_name,
1354                        prop_name,
1355                        allowed.join(", ")
1356                    )));
1357                }
1358            }
1359        }
1360    }
1361    Ok(())
1362}
1363
1364/// Detect duplicate values within a single `RecordBatch` for any of the
1365/// `unique_constraints` groups. Each group is a list of one or more columns
1366/// that together form a uniqueness key: a violation occurs when two rows share
1367/// the same tuple of values across *all* columns in a group, so a composite
1368/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an
1369/// error on the first duplicate found.
1370///
1371/// Rows where any column in a group is null are exempt (standard SQL semantics
1372/// for uniqueness over nullable columns), as is any group whose columns are
1373/// not all present in the batch (e.g. a partial-schema load).
1374///
1375/// Note: this only catches duplicates *within* the batch. Cross-batch
1376/// uniqueness against already-committed rows is not enforced here — that
1377/// requires a dataset scan and is tracked separately.
1378pub(crate) fn enforce_unique_constraints_intra_batch(
1379    batch: &RecordBatch,
1380    type_name: &str,
1381    unique_constraints: &[Vec<String>],
1382) -> Result<()> {
1383    for columns in unique_constraints {
1384        // Resolve the group's columns once. A group whose columns aren't all
1385        // present in this batch is skipped (e.g. a partial-schema load).
1386        let Some(group_columns) = columns
1387            .iter()
1388            .map(|name| {
1389                batch
1390                    .schema()
1391                    .index_of(name)
1392                    .ok()
1393                    .map(|i| batch.column(i).clone())
1394            })
1395            .collect::<Option<Vec<ArrayRef>>>()
1396        else {
1397            continue;
1398        };
1399        let mut seen: HashMap<Vec<String>, usize> = HashMap::new();
1400        for row in 0..batch.num_rows() {
1401            let Some(key) = composite_unique_key(&group_columns, row)? else {
1402                continue;
1403            };
1404            if let Some(prev_row) = seen.insert(key.clone(), row) {
1405                return Err(OmniError::manifest(format!(
1406                    "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1407                    type_name,
1408                    format_tuple(columns),
1409                    format_tuple(&key),
1410                    prev_row,
1411                    row
1412                )));
1413            }
1414        }
1415    }
1416    Ok(())
1417}
1418
1419/// Build the composite uniqueness key for `row` over a constraint group's
1420/// already-resolved columns (in declaration order).
1421///
1422/// The key is the *tuple* of per-column scalar strings (`Vec<String>`), keyed
1423/// directly in the dedup map — there is no separator, so no data value can
1424/// forge a collision (an earlier version joined on `U+001F`, which a value
1425/// containing that control char could still defeat).
1426///
1427/// - `Ok(None)` if any column is null: the row is exempt (a partial tuple
1428///   can't violate uniqueness under SQL null semantics).
1429/// - `Ok(Some(tuple))` otherwise.
1430/// - `Err(..)` propagated from [`unique_key_scalar`] on an un-keyable value.
1431///
1432/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and the
1433/// branch-merge path (`exec/merge.rs::update_unique_constraints`) so the two
1434/// derive identical keys and cannot drift on separator or scalar conversion.
1435pub(crate) fn composite_unique_key(
1436    group_columns: &[ArrayRef],
1437    row: usize,
1438) -> Result<Option<Vec<String>>> {
1439    let mut parts = Vec::with_capacity(group_columns.len());
1440    for column in group_columns {
1441        match unique_key_scalar(column, row)? {
1442            Some(value) => parts.push(value),
1443            None => return Ok(None),
1444        }
1445    }
1446    Ok(Some(parts))
1447}
1448
1449/// Render a constraint's column tuple for error messages: a single item as
1450/// `col`, a composite as `(a, b)`. Used for both the column list and the
1451/// offending value tuple, which share the same shape.
1452fn format_tuple(items: &[String]) -> String {
1453    match items {
1454        [single] => single.clone(),
1455        _ => format!("({})", items.join(", ")),
1456    }
1457}
1458
1459/// Reduce a single Arrow scalar at (`array`, `row`) to its uniqueness-key
1460/// string.
1461///
1462/// - `Ok(None)` for a null value: nulls are exempt from uniqueness (standard
1463///   SQL semantics over nullable columns).
1464/// - `Ok(Some(s))` for every scalar type a `@unique` / `@key` column can hold.
1465///   Strings are covered in all three physical Arrow encodings (`Utf8`,
1466///   `LargeUtf8`, `Utf8View`), so a legal string column is always keyable
1467///   regardless of how Lance materializes it on read-back.
1468/// - `Err(..)` for a non-null value whose Arrow type can't be reduced to a key
1469///   (a list, blob, or vector column). This fails loudly rather than silently
1470///   exempting the row, and because every legal scalar encoding is handled
1471///   above, the error fires only for a genuinely un-keyable column type — never
1472///   for a legal value that merely arrived in an unenumerated encoding.
1473fn unique_key_scalar(array: &ArrayRef, row: usize) -> Result<Option<String>> {
1474    use arrow_array::{Array, LargeStringArray, StringViewArray};
1475    if array.is_null(row) {
1476        return Ok(None);
1477    }
1478    if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1479        return Ok(Some(a.value(row).to_string()));
1480    }
1481    if let Some(a) = array.as_any().downcast_ref::<LargeStringArray>() {
1482        return Ok(Some(a.value(row).to_string()));
1483    }
1484    if let Some(a) = array.as_any().downcast_ref::<StringViewArray>() {
1485        return Ok(Some(a.value(row).to_string()));
1486    }
1487    if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1488        return Ok(Some(a.value(row).to_string()));
1489    }
1490    if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1491        return Ok(Some(a.value(row).to_string()));
1492    }
1493    if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1494        return Ok(Some(a.value(row).to_string()));
1495    }
1496    if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1497        return Ok(Some(a.value(row).to_string()));
1498    }
1499    if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1500        return Ok(Some(a.value(row).to_string()));
1501    }
1502    if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1503        return Ok(Some(a.value(row).to_string()));
1504    }
1505    if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1506        return Ok(Some(a.value(row).to_string()));
1507    }
1508    if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1509        return Ok(Some(a.value(row).to_string()));
1510    }
1511    if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1512        return Ok(Some(a.value(row).to_string()));
1513    }
1514    Err(OmniError::manifest(format!(
1515        "uniqueness key: unsupported column type {:?} for @unique/@key enforcement",
1516        array.data_type()
1517    )))
1518}
1519
1520/// Build the list of uniqueness constraint groups to enforce on a node type.
1521/// Each group is the column tuple of one constraint. Includes every
1522/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the
1523/// `@key` (which implies uniqueness over its column tuple). Grouping is
1524/// preserved so a composite `@unique(a, b)` is enforced as a composite key
1525/// rather than degraded into independent single-field checks.
1526pub(crate) fn unique_constraint_groups_for_node(
1527    node_type: &omnigraph_compiler::catalog::NodeType,
1528) -> Vec<Vec<String>> {
1529    let mut groups: Vec<Vec<String>> = node_type.unique_constraints.clone();
1530    if let Some(key) = &node_type.key
1531        && !groups.contains(key)
1532    {
1533        groups.push(key.clone());
1534    }
1535    groups
1536}
1537
1538/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges
1539/// have no `@key`).
1540pub(crate) fn unique_constraint_groups_for_edge(
1541    edge_type: &omnigraph_compiler::catalog::EdgeType,
1542) -> Vec<Vec<String>> {
1543    edge_type.unique_constraints.clone()
1544}
1545
1546fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1547    use arrow_array::{
1548        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1549    };
1550    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1551        return Some(a.value(row) as f64);
1552    }
1553    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1554        return Some(a.value(row) as f64);
1555    }
1556    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1557        return Some(a.value(row) as f64);
1558    }
1559    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1560        return Some(a.value(row) as f64);
1561    }
1562    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1563        return Some(a.value(row) as f64);
1564    }
1565    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1566        return Some(a.value(row));
1567    }
1568    None
1569}
1570
1571fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1572    use omnigraph_compiler::catalog::LiteralValue;
1573    match v {
1574        LiteralValue::Integer(n) => *n as f64,
1575        LiteralValue::Float(f) => *f,
1576    }
1577}
1578
1579// ─── Edge cardinality validation ─────────────────────────────────────────────
1580
1581pub(crate) async fn validate_edge_cardinality(
1582    db: &crate::db::Omnigraph,
1583    branch: Option<&str>,
1584    edge_name: &str,
1585    written_version: u64,
1586    written_branch: Option<&str>,
1587) -> Result<()> {
1588    use arrow_array::Array;
1589    let catalog = db.catalog();
1590    let edge_type = &catalog.edge_types[edge_name];
1591    if edge_type.cardinality.is_default() {
1592        return Ok(());
1593    }
1594
1595    // Open edge sub-table at the just-written version, not the snapshot's
1596    // (the snapshot still pins to the pre-write version).
1597    let snapshot = db.snapshot_for_branch(branch).await?;
1598    let table_key = format!("edge:{}", edge_name);
1599    let entry = snapshot
1600        .entry(&table_key)
1601        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1602    let ds = db
1603        .open_dataset_at_state(
1604            &entry.table_path,
1605            written_branch.or(entry.table_branch.as_deref()),
1606            written_version,
1607        )
1608        .await?;
1609
1610    // Scan src column, count per source
1611    let batches = db.storage().scan(&ds, Some(&["src"]), None, None).await?;
1612
1613    let mut counts: HashMap<String, u32> = HashMap::new();
1614    for batch in &batches {
1615        let srcs = batch
1616            .column_by_name("src")
1617            .unwrap()
1618            .as_any()
1619            .downcast_ref::<StringArray>()
1620            .unwrap();
1621        for i in 0..srcs.len() {
1622            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1623        }
1624    }
1625
1626    let card = &edge_type.cardinality;
1627    for (src, count) in &counts {
1628        if let Some(max) = card.max {
1629            if *count > max {
1630                return Err(OmniError::manifest(format!(
1631                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1632                    edge_name, src, count, max
1633                )));
1634            }
1635        }
1636        if *count < card.min {
1637            return Err(OmniError::manifest(format!(
1638                "@card violation on edge {}: source '{}' has {} edges (min {})",
1639                edge_name, src, count, card.min
1640            )));
1641        }
1642    }
1643
1644    Ok(())
1645}
1646
1647/// Validate edge `@card` cardinality with in-memory pending edges visible.
1648///
1649/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
1650/// opens the committed dataset at the pre-load snapshot version, then
1651/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
1652/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
1653/// path uses `validate_edge_cardinality` which opens the just-written
1654/// Lance version).
1655///
1656/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
1657/// so committed edges that the load is *updating* (same edge id,
1658/// possibly changed `src`) are not double-counted. `LoadMode::Append`
1659/// passes `None` because each line generates a fresh ULID id that
1660/// never collides with committed.
1661async fn validate_edge_cardinality_with_pending_loader(
1662    db: &Omnigraph,
1663    branch: Option<&str>,
1664    edge_type: &omnigraph_compiler::catalog::EdgeType,
1665    table_key: &str,
1666    staging: &MutationStaging,
1667    mode: LoadMode,
1668) -> Result<()> {
1669    if edge_type.cardinality.is_default() {
1670        return Ok(());
1671    }
1672    let snapshot = db.snapshot_for_branch(branch).await?;
1673    let Some(entry) = snapshot.entry(table_key) else {
1674        // No manifest entry — table doesn't exist yet. Pending-only is
1675        // fine; the helper handles empty committed scans.
1676        return Ok(());
1677    };
1678    let ds = db
1679        .open_dataset_at_state(
1680            &entry.table_path,
1681            entry.table_branch.as_deref(),
1682            entry.table_version,
1683        )
1684        .await?;
1685    let dedupe_key = match mode {
1686        LoadMode::Merge => Some("id"),
1687        LoadMode::Append | LoadMode::Overwrite => None,
1688    };
1689    let counts =
1690        crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
1691    crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1692}
1693
1694/// Collect all valid node IDs for a given type, with in-memory pending
1695/// node inserts visible. Used by the staged loader's Phase 2c
1696/// referential-integrity validation.
1697///
1698/// Union of:
1699/// - IDs from the staged loader's pending batches (in-memory; just-staged
1700///   inserts of this type)
1701/// - IDs from the committed sub-table at the pre-load snapshot version
1702///
1703/// For `LoadMode::Overwrite`, if the node table is touched then the pending
1704/// batches are the replacement image. In that case committed IDs are not
1705/// included, so edge RI is validated against exactly what the overwrite will
1706/// publish.
1707async fn collect_node_ids_with_pending(
1708    db: &Omnigraph,
1709    branch: Option<&str>,
1710    type_name: &str,
1711    staging: &MutationStaging,
1712) -> Result<HashSet<String>> {
1713    let mut ids = HashSet::new();
1714    let table_key = format!("node:{}", type_name);
1715
1716    // From staging.pending: walk the in-memory accumulator's id column.
1717    for batch in staging.pending_batches(&table_key) {
1718        if let Some(col) = batch.column_by_name("id") {
1719            if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1720                for i in 0..arr.len() {
1721                    if arr.is_valid(i) {
1722                        ids.insert(arr.value(i).to_string());
1723                    }
1724                }
1725            }
1726        }
1727    }
1728
1729    if staging.pending_mode(&table_key) == Some(PendingMode::Overwrite) {
1730        return Ok(ids);
1731    }
1732
1733    // From the committed Lance sub-table at the pre-load snapshot version.
1734    let snapshot = db.snapshot_for_branch(branch).await?;
1735    let Some(entry) = snapshot.entry(&table_key) else {
1736        return Ok(ids);
1737    };
1738    let ds = db
1739        .open_dataset_at_state(
1740            &entry.table_path,
1741            entry.table_branch.as_deref(),
1742            entry.table_version,
1743        )
1744        .await?;
1745
1746    let batches = db.storage().scan(&ds, Some(&["id"]), None, None).await?;
1747
1748    for batch in &batches {
1749        let id_col = batch
1750            .column_by_name("id")
1751            .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1752            .as_any()
1753            .downcast_ref::<StringArray>()
1754            .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1755        for i in 0..batch.num_rows() {
1756            // Defensive: `id` is the @key column on every node type and
1757            // is non-nullable by schema, but a committed-row corruption
1758            // (or future schema change) could surface a NULL. Skip
1759            // rather than insert "" — pending-side does the same.
1760            if id_col.is_valid(i) {
1761                ids.insert(id_col.value(i).to_string());
1762            }
1763        }
1764    }
1765
1766    Ok(ids)
1767}
1768
1769#[cfg(test)]
1770mod tests {
1771    use super::*;
1772    use crate::db::Omnigraph;
1773    use arrow_array::Array;
1774    use futures::TryStreamExt;
1775    use std::collections::HashMap;
1776
1777    const TEST_SCHEMA: &str = r#"
1778node Person {
1779    name: String @key
1780    age: I32?
1781}
1782node Company {
1783    name: String @key
1784}
1785edge Knows: Person -> Person {
1786    since: Date?
1787}
1788edge WorksAt: Person -> Company
1789"#;
1790
1791    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1792{"type": "Person", "data": {"name": "Bob", "age": 25}}
1793{"type": "Company", "data": {"name": "Acme"}}
1794{"edge": "Knows", "from": "Alice", "to": "Bob"}
1795{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1796"#;
1797
1798    #[tokio::test]
1799    async fn test_load_creates_data() {
1800        let dir = tempfile::tempdir().unwrap();
1801        let uri = dir.path().to_str().unwrap();
1802        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1803
1804        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1805            .await
1806            .unwrap();
1807
1808        assert_eq!(result.nodes_loaded["Person"], 2);
1809        assert_eq!(result.nodes_loaded["Company"], 1);
1810        assert_eq!(result.edges_loaded["Knows"], 1);
1811        assert_eq!(result.edges_loaded["WorksAt"], 1);
1812    }
1813
1814    #[tokio::test]
1815    async fn test_load_data_readable_via_lance() {
1816        let dir = tempfile::tempdir().unwrap();
1817        let uri = dir.path().to_str().unwrap();
1818        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1819        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1820            .await
1821            .unwrap();
1822
1823        // Read back via snapshot
1824        let snap = db.snapshot().await;
1825        let person_ds = snap.open("node:Person").await.unwrap();
1826
1827        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1828
1829        // Verify data
1830        let batches: Vec<RecordBatch> = person_ds
1831            .scan()
1832            .try_into_stream()
1833            .await
1834            .unwrap()
1835            .try_collect()
1836            .await
1837            .unwrap();
1838
1839        let batch = &batches[0];
1840        let ids = batch
1841            .column_by_name("id")
1842            .unwrap()
1843            .as_any()
1844            .downcast_ref::<StringArray>()
1845            .unwrap();
1846        // @key=name, so ids should be "Alice" and "Bob"
1847        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1848        assert!(id_values.contains(&"Alice"));
1849        assert!(id_values.contains(&"Bob"));
1850    }
1851
1852    #[tokio::test]
1853    async fn test_load_edges_reference_node_keys() {
1854        let dir = tempfile::tempdir().unwrap();
1855        let uri = dir.path().to_str().unwrap();
1856        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1857        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1858            .await
1859            .unwrap();
1860
1861        let snap = db.snapshot().await;
1862        let knows_ds = snap.open("edge:Knows").await.unwrap();
1863
1864        let batches: Vec<RecordBatch> = knows_ds
1865            .scan()
1866            .try_into_stream()
1867            .await
1868            .unwrap()
1869            .try_collect()
1870            .await
1871            .unwrap();
1872
1873        let batch = &batches[0];
1874        let srcs = batch
1875            .column_by_name("src")
1876            .unwrap()
1877            .as_any()
1878            .downcast_ref::<StringArray>()
1879            .unwrap();
1880        let dsts = batch
1881            .column_by_name("dst")
1882            .unwrap()
1883            .as_any()
1884            .downcast_ref::<StringArray>()
1885            .unwrap();
1886
1887        assert_eq!(srcs.value(0), "Alice");
1888        assert_eq!(dsts.value(0), "Bob");
1889    }
1890
1891    #[tokio::test]
1892    async fn test_load_manifest_version_advances() {
1893        let dir = tempfile::tempdir().unwrap();
1894        let uri = dir.path().to_str().unwrap();
1895        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1896        let v1 = db.version().await;
1897
1898        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1899            .await
1900            .unwrap();
1901
1902        assert!(db.version().await > v1);
1903    }
1904
1905    #[tokio::test]
1906    async fn test_load_append_adds_rows() {
1907        let dir = tempfile::tempdir().unwrap();
1908        let uri = dir.path().to_str().unwrap();
1909        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1910
1911        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1912        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1913
1914        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1915            .await
1916            .unwrap();
1917        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1918
1919        let snap = db.snapshot().await;
1920        let person_ds = snap.open("node:Person").await.unwrap();
1921        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1922    }
1923
1924    #[tokio::test]
1925    async fn test_load_unknown_type_rejected() {
1926        let dir = tempfile::tempdir().unwrap();
1927        let uri = dir.path().to_str().unwrap();
1928        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1929
1930        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1931        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1932        assert!(result.is_err());
1933    }
1934
1935    #[tokio::test]
1936    #[allow(deprecated)]
1937    async fn test_ingest_creates_branch_and_reports_tables() {
1938        let dir = tempfile::tempdir().unwrap();
1939        let uri = dir.path().to_str().unwrap();
1940        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1941
1942        let result = db
1943            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1944            .await
1945            .unwrap();
1946
1947        assert_eq!(result.branch, "feature");
1948        assert_eq!(result.base_branch, "main");
1949        assert!(result.branch_created);
1950        assert_eq!(result.mode, LoadMode::Overwrite);
1951        assert_eq!(
1952            result.tables,
1953            vec![
1954                IngestTableResult {
1955                    table_key: "edge:Knows".to_string(),
1956                    rows_loaded: 1
1957                },
1958                IngestTableResult {
1959                    table_key: "edge:WorksAt".to_string(),
1960                    rows_loaded: 1
1961                },
1962                IngestTableResult {
1963                    table_key: "node:Company".to_string(),
1964                    rows_loaded: 1
1965                },
1966                IngestTableResult {
1967                    table_key: "node:Person".to_string(),
1968                    rows_loaded: 2
1969                },
1970            ]
1971        );
1972        assert!(
1973            db.branch_list()
1974                .await
1975                .unwrap()
1976                .contains(&"feature".to_string())
1977        );
1978    }
1979
1980    #[tokio::test]
1981    #[allow(deprecated)]
1982    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1983        let dir = tempfile::tempdir().unwrap();
1984        let uri = dir.path().to_str().unwrap();
1985        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1986        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1987            .await
1988            .unwrap();
1989        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1990            .await
1991            .unwrap();
1992
1993        let result = db
1994            .ingest(
1995                "feature",
1996                Some("missing-base"),
1997                r#"{"type":"Person","data":{"name":"Bob","age":26}}
1998{"type":"Person","data":{"name":"Eve","age":31}}"#,
1999                LoadMode::Merge,
2000            )
2001            .await
2002            .unwrap();
2003
2004        assert_eq!(result.branch, "feature");
2005        assert_eq!(result.base_branch, "missing-base");
2006        assert!(!result.branch_created);
2007        assert_eq!(result.mode, LoadMode::Merge);
2008        assert_eq!(
2009            result.tables,
2010            vec![IngestTableResult {
2011                table_key: "node:Person".to_string(),
2012                rows_loaded: 2
2013            }]
2014        );
2015
2016        let snap = db
2017            .snapshot_of(crate::db::ReadTarget::branch("feature"))
2018            .await
2019            .unwrap();
2020        let person_ds = snap.open("node:Person").await.unwrap();
2021        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2022
2023        let batches: Vec<RecordBatch> = person_ds
2024            .scan()
2025            .try_into_stream()
2026            .await
2027            .unwrap()
2028            .try_collect()
2029            .await
2030            .unwrap();
2031        let mut ages_by_id = HashMap::new();
2032        for batch in &batches {
2033            let ids = batch
2034                .column_by_name("id")
2035                .unwrap()
2036                .as_any()
2037                .downcast_ref::<StringArray>()
2038                .unwrap();
2039            let ages = batch
2040                .column_by_name("age")
2041                .unwrap()
2042                .as_any()
2043                .downcast_ref::<Int32Array>()
2044                .unwrap();
2045            for idx in 0..ids.len() {
2046                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2047            }
2048        }
2049
2050        assert_eq!(ages_by_id.get("Bob"), Some(&26));
2051        assert_eq!(ages_by_id.get("Eve"), Some(&31));
2052        assert_eq!(ages_by_id.get("Alice"), Some(&30));
2053    }
2054
2055    #[tokio::test]
2056    #[allow(deprecated)]
2057    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2058        let dir = tempfile::tempdir().unwrap();
2059        let uri = dir.path().to_str().unwrap();
2060        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2061
2062        db.ingest_as(
2063            "feature",
2064            Some("main"),
2065            TEST_DATA,
2066            LoadMode::Overwrite,
2067            Some("act-andrew"),
2068        )
2069        .await
2070        .unwrap();
2071
2072        let head = db
2073            .list_commits(Some("feature"))
2074            .await
2075            .unwrap()
2076            .into_iter()
2077            .last()
2078            .unwrap();
2079        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2080    }
2081
2082    #[tokio::test]
2083    async fn test_load_as_with_base_forks_missing_branch_and_stamps_metadata() {
2084        let dir = tempfile::tempdir().unwrap();
2085        let uri = dir.path().to_str().unwrap();
2086        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2087
2088        let result = db
2089            .load_as("feature", Some("main"), TEST_DATA, LoadMode::Merge, None)
2090            .await
2091            .unwrap();
2092
2093        assert_eq!(result.branch, "feature");
2094        assert_eq!(result.base_branch.as_deref(), Some("main"));
2095        assert!(result.branch_created);
2096        assert!(
2097            db.branch_list()
2098                .await
2099                .unwrap()
2100                .contains(&"feature".to_string())
2101        );
2102
2103        // Re-loading onto the now-existing branch records the base but
2104        // performs no fork.
2105        let again = db
2106            .load_as(
2107                "feature",
2108                Some("main"),
2109                r#"{"type":"Person","data":{"name":"Bob","age":26}}"#,
2110                LoadMode::Merge,
2111                None,
2112            )
2113            .await
2114            .unwrap();
2115        assert!(!again.branch_created);
2116        assert_eq!(again.base_branch.as_deref(), Some("main"));
2117    }
2118
2119    #[tokio::test]
2120    async fn test_load_as_without_base_errors_on_missing_branch() {
2121        let dir = tempfile::tempdir().unwrap();
2122        let uri = dir.path().to_str().unwrap();
2123        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2124
2125        let result = db
2126            .load_as("nonexistent", None, TEST_DATA, LoadMode::Merge, None)
2127            .await;
2128        assert!(result.is_err(), "load without base must not create branches");
2129        assert!(
2130            !db.branch_list()
2131                .await
2132                .unwrap()
2133                .contains(&"nonexistent".to_string()),
2134            "failed load must not leave a branch behind"
2135        );
2136
2137        // Loads to main carry the default branch metadata.
2138        let main_load = db.load("main", TEST_DATA, LoadMode::Overwrite).await.unwrap();
2139        assert_eq!(main_load.branch, "main");
2140        assert_eq!(main_load.base_branch, None);
2141        assert!(!main_load.branch_created);
2142    }
2143
2144    #[test]
2145    fn test_range_constraint_rejects_nan() {
2146        use arrow_array::{Float64Array, RecordBatch, StringArray};
2147        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2148        use std::sync::Arc;
2149
2150        let schema = Arc::new(arrow_schema::Schema::new(vec![
2151            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2152            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2153        ]));
2154
2155        let batch = RecordBatch::try_new(
2156            schema.clone(),
2157            vec![
2158                Arc::new(StringArray::from(vec!["bad"])),
2159                Arc::new(Float64Array::from(vec![f64::NAN])),
2160            ],
2161        )
2162        .unwrap();
2163
2164        let node_type = NodeType {
2165            name: "Test".to_string(),
2166            implements: vec![],
2167            properties: Default::default(),
2168            key: None,
2169            unique_constraints: vec![],
2170            indices: vec![],
2171            range_constraints: vec![RangeConstraint {
2172                property: "score".to_string(),
2173                min: Some(LiteralValue::Float(0.0)),
2174                max: Some(LiteralValue::Float(1.0)),
2175            }],
2176            check_constraints: vec![],
2177            embed_sources: Default::default(),
2178            blob_properties: Default::default(),
2179            arrow_schema: schema,
2180        };
2181
2182        let result = validate_value_constraints(&batch, &node_type);
2183        assert!(result.is_err(), "expected NaN to be rejected");
2184        let err = result.unwrap_err().to_string();
2185        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2186    }
2187
2188    #[test]
2189    fn composite_unique_key_builds_tuple_and_exempts_null() {
2190        let a: ArrayRef = Arc::new(StringArray::from(vec![Some("x|y"), Some("x"), None]));
2191        let b: ArrayRef = Arc::new(StringArray::from(vec![Some("z"), Some("y|z"), Some("q")]));
2192        let cols = [a, b];
2193
2194        // Tuple key, so `("x|y", "z")` and `("x", "y|z")` stay distinct —
2195        // a separator-joined key (the old `|` join) would collapse both to
2196        // `x|y|z`.
2197        assert_eq!(
2198            composite_unique_key(&cols, 0).unwrap(),
2199            Some(vec!["x|y".to_string(), "z".to_string()])
2200        );
2201        assert_eq!(
2202            composite_unique_key(&cols, 1).unwrap(),
2203            Some(vec!["x".to_string(), "y|z".to_string()])
2204        );
2205        assert_ne!(
2206            composite_unique_key(&cols, 0).unwrap(),
2207            composite_unique_key(&cols, 1).unwrap()
2208        );
2209
2210        // Any null column → the whole row is exempt (SQL null semantics).
2211        assert_eq!(composite_unique_key(&cols, 2).unwrap(), None);
2212    }
2213
2214    #[test]
2215    fn unique_key_scalar_errors_loudly_on_unkeyable_type() {
2216        use arrow_array::LargeBinaryArray;
2217        // A binary/blob column can't be reduced to a uniqueness key. Before the
2218        // hardening this returned `None`, so a `@unique` on such a column was
2219        // silently un-enforced; now it errors instead of weakening the
2220        // constraint in silence.
2221        let blob: ArrayRef = Arc::new(LargeBinaryArray::from(vec![Some(&b"abc"[..])]));
2222        let err = unique_key_scalar(&blob, 0).unwrap_err();
2223        assert!(
2224            err.to_string().contains("unsupported column type"),
2225            "un-keyable type must fail loudly (got: {err})"
2226        );
2227    }
2228
2229    #[test]
2230    fn unique_key_scalar_handles_all_string_encodings() {
2231        use arrow_array::{LargeStringArray, StringViewArray};
2232        // A legal string column is keyable in every physical Arrow encoding
2233        // Lance might hand back (Utf8 / LargeUtf8 / Utf8View). None of these may
2234        // fall through to the loud `Err` path — that branch is reserved for
2235        // genuinely un-keyable column types, not a legal value in an
2236        // unenumerated encoding.
2237        let utf8: ArrayRef = Arc::new(StringArray::from(vec![Some("v")]));
2238        let large: ArrayRef = Arc::new(LargeStringArray::from(vec![Some("v")]));
2239        let view: ArrayRef = Arc::new(StringViewArray::from(vec![Some("v")]));
2240        for array in [&utf8, &large, &view] {
2241            assert_eq!(
2242                unique_key_scalar(array, 0).unwrap(),
2243                Some("v".to_string()),
2244                "string array {:?} must render, not error",
2245                array.data_type()
2246            );
2247        }
2248    }
2249}