Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26/// Result of a load operation.
27#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29    /// Branch the load landed on (`"main"` when no branch was given).
30    pub branch: String,
31    /// Base branch a fork was requested from (the `base` parameter of
32    /// `load_as`), recorded verbatim even when the target branch already
33    /// existed and no fork happened.
34    pub base_branch: Option<String>,
35    /// True when this load created `branch` by forking it from `base_branch`.
36    pub branch_created: bool,
37    pub nodes_loaded: HashMap<String, usize>,
38    pub edges_loaded: HashMap<String, usize>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct IngestTableResult {
43    pub table_key: String,
44    pub rows_loaded: usize,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct IngestResult {
49    pub branch: String,
50    pub base_branch: String,
51    pub branch_created: bool,
52    pub mode: LoadMode,
53    pub tables: Vec<IngestTableResult>,
54}
55
56/// Load mode for data ingestion.
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum LoadMode {
60    /// Overwrite existing data.
61    Overwrite,
62    /// Append to existing data.
63    Append,
64    /// Merge by `id` key (upsert).
65    Merge,
66}
67
68/// Convenience: load JSONL data onto the database handle's *active branch*
69/// (`main` when unbound). Equivalent to `db.load(active_branch, data, mode)`;
70/// use `Omnigraph::load`/`load_as` directly when targeting an explicit branch
71/// or when fork-from-base semantics are needed.
72pub async fn load_jsonl(db: &Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
73    let current_branch = db.active_branch().await;
74    let branch = current_branch.as_deref().unwrap_or("main");
75    db.load(branch, data, mode).await
76}
77
78/// Convenience: like [`load_jsonl`] but reading from a file path.
79pub async fn load_jsonl_file(db: &Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
80    let current_branch = db.active_branch().await;
81    let branch = current_branch.as_deref().unwrap_or("main");
82    db.load_file(branch, path, mode).await
83}
84
85impl Omnigraph {
86    #[deprecated(
87        note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
88    )]
89    pub async fn ingest(
90        &self,
91        branch: &str,
92        from: Option<&str>,
93        data: &str,
94        mode: LoadMode,
95    ) -> Result<IngestResult> {
96        #[allow(deprecated)]
97        self.ingest_as(branch, from, data, mode, None).await
98    }
99
100    /// Deprecated shim over the unified `load_as`. Preserves the historical
101    /// ingest contract exactly: `from: None` means fork from `main`, and the
102    /// base branch is recorded in the result even when the target branch
103    /// already existed (no fork happened).
104    #[deprecated(
105        note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
106    )]
107    pub async fn ingest_as(
108        &self,
109        branch: &str,
110        from: Option<&str>,
111        data: &str,
112        mode: LoadMode,
113        actor_id: Option<&str>,
114    ) -> Result<IngestResult> {
115        let result = self
116            .load_as(branch, Some(from.unwrap_or("main")), data, mode, actor_id)
117            .await?;
118        Ok(IngestResult {
119            branch: result.branch.clone(),
120            base_branch: result
121                .base_branch
122                .clone()
123                .unwrap_or_else(|| "main".to_string()),
124            branch_created: result.branch_created,
125            mode,
126            tables: result.to_ingest_tables(),
127        })
128    }
129
130    #[deprecated(
131        note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
132    )]
133    pub async fn ingest_file(
134        &self,
135        branch: &str,
136        from: Option<&str>,
137        path: &str,
138        mode: LoadMode,
139    ) -> Result<IngestResult> {
140        #[allow(deprecated)]
141        self.ingest_file_as(branch, from, path, mode, None).await
142    }
143
144    #[deprecated(
145        note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
146    )]
147    pub async fn ingest_file_as(
148        &self,
149        branch: &str,
150        from: Option<&str>,
151        path: &str,
152        mode: LoadMode,
153        actor_id: Option<&str>,
154    ) -> Result<IngestResult> {
155        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
156        #[allow(deprecated)]
157        self.ingest_as(branch, from, &data, mode, actor_id).await
158    }
159
160    pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
161        self.load_as(branch, None, data, mode, None).await
162    }
163
164    /// Load JSONL data onto `branch`.
165    ///
166    /// `base` selects the branch-creation behavior: with `Some(base)`, a
167    /// missing target branch is forked from `base` first (the former
168    /// `ingest` semantics); with `None`, the target branch must already
169    /// exist — staging fails on an unknown branch when it resolves the
170    /// manifest snapshot, so a typo'd branch name can never create one.
171    pub async fn load_as(
172        &self,
173        branch: &str,
174        base: Option<&str>,
175        data: &str,
176        mode: LoadMode,
177        actor_id: Option<&str>,
178    ) -> Result<LoadResult> {
179        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
180        // `Branch(branch)` to match the HTTP-layer Change convention.
181        // When a fork happens below, `branch_create_from_as` additionally
182        // checks `BranchCreate` — both authorities are genuinely needed
183        // for "load into a fresh branch", so the layered check is
184        // correct, not redundant.
185        self.enforce(
186            omnigraph_policy::PolicyAction::Change,
187            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
188            actor_id,
189        )?;
190        self.ensure_schema_state_valid().await?;
191        // Converge any pending recovery sidecar (a previously failed
192        // writer's Phase B → Phase C residual) before staging anything:
193        // without this, sidecar-covered drift wedges every load on the
194        // commit-time drift guard until a process restart — `repair`
195        // refuses while a sidecar is pending. One `list_dir` when no
196        // sidecars exist (the steady state).
197        self.heal_pending_recovery_sidecars().await?;
198        // Reject internal `__run__*` / system-prefixed branches at the
199        // public write boundary. Direct-publish paths assert this
200        // explicitly so a caller can't write to legacy or system
201        // staging branches by passing the prefix verbatim.
202        crate::db::ensure_public_branch_ref(branch, "load")?;
203        // Branch convention: `None` represents `main`. Re-normalizing to
204        // `Some("main")` here would route the publisher commit through a
205        // separate coordinator (the cross-branch path in
206        // `commit_prepared_updates_on_branch_with_expected`) and leave
207        // `self.coordinator` with a stale manifest snapshot.
208        let requested = Self::normalize_branch_name(branch)?;
209        let base_branch = match base {
210            Some(base) => {
211                Some(Self::normalize_branch_name(base)?.unwrap_or_else(|| "main".to_string()))
212            }
213            None => None,
214        };
215        // Fork-if-missing only when a base branch was explicitly given.
216        // `requested == None` is `main`, which always exists.
217        let mut branch_created = false;
218        if let (Some(target), Some(base_name)) = (requested.as_deref(), base_branch.as_deref()) {
219            let exists = self.branch_list().await?.iter().any(|name| name == target);
220            if !exists {
221                // Thread the actor through to the implicit BranchCreate so
222                // policy decisions match what an explicit `branch_create_from_as`
223                // call would see. Calling the no-actor variant here would
224                // bypass BranchCreate enforcement when policy is installed —
225                // the footgun guard catches that case too, but threading is
226                // the correct fix.
227                self.branch_create_from_as(
228                    crate::db::ReadTarget::branch(base_name),
229                    target,
230                    actor_id,
231                )
232                .await?;
233                branch_created = true;
234            }
235        }
236        // Direct-to-target writes: no Run state machine, no `__run__` staging
237        // branch. Cross-table OCC is enforced by the publisher's
238        // `expected_table_versions` CAS inside `load_jsonl_reader`.
239        let mut result = self
240            .load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
241            .await?;
242        result.branch = requested.unwrap_or_else(|| "main".to_string());
243        result.base_branch = base_branch;
244        result.branch_created = branch_created;
245        Ok(result)
246    }
247
248    pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
249        self.load_file_as(branch, None, path, mode, None).await
250    }
251
252    /// Read a file into memory and delegate to `load_as`. Used by the
253    /// CLI's `omnigraph load` so file-path-based writes flow through
254    /// the same engine-layer policy gate as in-memory `load_as` calls.
255    pub async fn load_file_as(
256        &self,
257        branch: &str,
258        base: Option<&str>,
259        path: &str,
260        mode: LoadMode,
261        actor_id: Option<&str>,
262    ) -> Result<LoadResult> {
263        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
264        self.load_as(branch, base, &data, mode, actor_id).await
265    }
266
267    async fn load_direct_on_branch(
268        &self,
269        branch: Option<&str>,
270        data: &str,
271        mode: LoadMode,
272        actor_id: Option<&str>,
273    ) -> Result<LoadResult> {
274        let reader = BufReader::new(Cursor::new(data.as_bytes()));
275        load_jsonl_reader(self, branch, reader, mode, actor_id).await
276    }
277}
278
279impl LoadMode {
280    pub fn as_str(self) -> &'static str {
281        match self {
282            LoadMode::Overwrite => "overwrite",
283            LoadMode::Append => "append",
284            LoadMode::Merge => "merge",
285        }
286    }
287}
288
289impl LoadResult {
290    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
291        let mut tables = self
292            .nodes_loaded
293            .iter()
294            .map(|(type_name, rows_loaded)| IngestTableResult {
295                table_key: format!("node:{type_name}"),
296                rows_loaded: *rows_loaded,
297            })
298            .chain(
299                self.edges_loaded
300                    .iter()
301                    .map(|(edge_name, rows_loaded)| IngestTableResult {
302                        table_key: format!("edge:{edge_name}"),
303                        rows_loaded: *rows_loaded,
304                    }),
305            )
306            .collect::<Vec<_>>();
307        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
308        tables
309    }
310}
311
312async fn load_jsonl_reader<R: BufRead>(
313    db: &Omnigraph,
314    branch: Option<&str>,
315    reader: R,
316    mode: LoadMode,
317    actor_id: Option<&str>,
318) -> Result<LoadResult> {
319    let catalog = db.catalog().clone();
320
321    // Phase 1: Parse all lines, spool into per-type collections
322    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
323    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
324
325    // Parse a stream of JSON values. Accepts both compact JSONL (one object
326    // per line) and pretty-printed JSON where a single object spans multiple
327    // lines — serde's streaming deserializer treats any whitespace (including
328    // newlines) between top-level values as a separator.
329    for (idx, parsed) in serde_json::Deserializer::from_reader(reader)
330        .into_iter::<JsonValue>()
331        .enumerate()
332    {
333        let record_num = idx + 1;
334        let value: JsonValue = parsed.map_err(|e| {
335            OmniError::manifest(format!("invalid JSON at record {}: {}", record_num, e))
336        })?;
337
338        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
339            if !catalog.node_types.contains_key(type_name) {
340                return Err(OmniError::manifest(format!(
341                    "record {}: unknown node type '{}'",
342                    record_num, type_name
343                )));
344            }
345            let data = value
346                .get("data")
347                .cloned()
348                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
349            node_rows
350                .entry(type_name.to_string())
351                .or_default()
352                .push(data);
353        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
354            if catalog.lookup_edge_by_name(edge_name).is_none() {
355                return Err(OmniError::manifest(format!(
356                    "record {}: unknown edge type '{}'",
357                    record_num, edge_name
358                )));
359            }
360            let from = value
361                .get("from")
362                .and_then(|v| v.as_str())
363                .ok_or_else(|| {
364                    OmniError::manifest(format!("record {}: edge missing 'from'", record_num))
365                })?
366                .to_string();
367            let to = value
368                .get("to")
369                .and_then(|v| v.as_str())
370                .ok_or_else(|| {
371                    OmniError::manifest(format!("record {}: edge missing 'to'", record_num))
372                })?
373                .to_string();
374            let data = value
375                .get("data")
376                .cloned()
377                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
378            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
379            edge_rows
380                .entry(canonical)
381                .or_default()
382                .push((from, to, data));
383        } else {
384            return Err(OmniError::manifest(format!(
385                "record {}: expected 'type' or 'edge' field",
386                record_num
387            )));
388        }
389    }
390
391    // Phase 2: Build per-type RecordBatches and accumulate into the
392    // staging pipeline. Batches go into an in-memory accumulator and a
393    // single `stage_*` + `commit_staged` per touched table runs at
394    // end-of-load — a mid-load failure (RI / cardinality violation) leaves
395    // Lance HEAD untouched. `LoadMode::Overwrite` uses Lance's staged
396    // `Overwrite` transaction rather than the former truncate-then-append
397    // inline path.
398
399    let mut result = LoadResult::default();
400    let snapshot = db.snapshot_for_branch(branch).await?;
401    let mut staging = MutationStaging::default();
402    let pending_mode = match mode {
403        LoadMode::Merge => PendingMode::Merge,
404        // Append-mode loads accumulate as Append. Edge tables (no @key)
405        // and no-key node tables stay safe on the stage_append path. The
406        // Merge mode applies dedupe-by-id; Append assumes unique inputs.
407        LoadMode::Append => PendingMode::Append,
408        LoadMode::Overwrite => PendingMode::Overwrite,
409    };
410    // Map LoadMode to MutationOpKind for the version-check policy.
411    // Append/Merge skip the strict pre-stage check (concurrency-safe
412    // under the per-(table, branch) queue + publisher CAS); Overwrite
413    // uses the strict check because it truncates and replaces the
414    // dataset — concurrent advances change what "replace" means.
415    let load_op_kind = match mode {
416        LoadMode::Append => crate::db::MutationOpKind::Insert,
417        LoadMode::Merge => crate::db::MutationOpKind::Merge,
418        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
419    };
420
421    // Up-front fork-queue acquisition. The first write to a table on a
422    // non-main branch forks it (create_branch), which advances Lance state
423    // before the manifest publish; the reclaim of any manifest-unreferenced
424    // leftover (`reclaim_orphaned_fork_and_refork`) must not race a concurrent
425    // in-process fork. So when this load will fork at least one touched table,
426    // acquire the per-(table, branch) write queues for ALL touched tables up
427    // front (one sorted `acquire_many`, keyed uniformly by the target branch
428    // so it covers what `commit_all` recomputes) and hold them through the
429    // publish. Main-branch loads never fork; branch loads where every touched
430    // table is already forked skip this and let `commit_all` acquire at commit.
431    let fork_queue_guards: Option<(
432        Vec<(String, Option<String>)>,
433        Vec<tokio::sync::OwnedMutexGuard<()>>,
434    )> = if let Some(active) = branch {
435        let touched: Vec<(String, Option<String>)> = node_rows
436            .keys()
437            .map(|t| (format!("node:{t}"), Some(active.to_string())))
438            .chain(
439                edge_rows
440                    .keys()
441                    .map(|e| (format!("edge:{e}"), Some(active.to_string()))),
442            )
443            .collect();
444        let needs_fork = touched.iter().any(|(table_key, _)| {
445            snapshot
446                .entry(table_key)
447                .map(|e| e.table_branch.as_deref() != Some(active))
448                .unwrap_or(false)
449        });
450        if needs_fork {
451            let guards = db.write_queue().acquire_many(&touched).await;
452            Some((touched, guards))
453        } else {
454            None
455        }
456    } else {
457        None
458    };
459
460    // Phase 2a: build and validate every node batch up front. Cheap and
461    // synchronous — surfaces validation errors before any S3 traffic.
462    let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
463        Vec::with_capacity(node_rows.len());
464    for (type_name, rows) in &node_rows {
465        let node_type = &catalog.node_types[type_name];
466        let batch = build_node_batch(node_type, rows)?;
467        validate_value_constraints(&batch, node_type)?;
468        validate_enum_constraints(&batch, &node_type.properties, type_name)?;
469        let unique_groups = unique_constraint_groups_for_node(node_type);
470        if !unique_groups.is_empty() {
471            enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?;
472        }
473        let loaded_count = batch.num_rows();
474        let table_key = format!("node:{}", type_name);
475        let _entry = snapshot
476            .entry(&table_key)
477            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
478        prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
479    }
480
481    // Phase 2b: accumulate every node type in memory. Fragment writes are
482    // delayed until after all validation succeeds.
483    for (type_name, table_key, batch, loaded_count) in prepared_nodes {
484        let (ds, full_path, table_branch) = db
485            .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
486            .await?;
487        let expected_version = ds.version();
488        staging.ensure_path(
489            &table_key,
490            full_path,
491            table_branch,
492            expected_version,
493            load_op_kind,
494        );
495        let schema = batch.schema();
496        staging.append_batch(&table_key, schema, pending_mode, batch)?;
497        result.nodes_loaded.insert(type_name, loaded_count);
498    }
499
500    // Phase 2c: Validate edge referential integrity — every src/dst must
501    // reference an existing node ID in the appropriate type. For
502    // Append/Merge the lookup unions snapshot-committed IDs with the
503    // in-memory pending batches. For Overwrite, a touched node table's
504    // pending batch is the replacement image, so committed rows are not
505    // included for that table.
506    for (edge_name, rows) in &edge_rows {
507        let edge_type = &catalog.edge_types[edge_name];
508        let from_ids =
509            collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?;
510        let to_ids =
511            collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?;
512
513        for (i, (src, dst, _)) in rows.iter().enumerate() {
514            if !from_ids.contains(src.as_str()) {
515                return Err(OmniError::manifest(format!(
516                    "edge {} row {}: src '{}' not found in {}",
517                    edge_name,
518                    i + 1,
519                    src,
520                    edge_type.from_type
521                )));
522            }
523            if !to_ids.contains(dst.as_str()) {
524                return Err(OmniError::manifest(format!(
525                    "edge {} row {}: dst '{}' not found in {}",
526                    edge_name,
527                    i + 1,
528                    dst,
529                    edge_type.to_type
530                )));
531            }
532        }
533    }
534
535    // Phase 2d: build edge batches.
536    let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
537        Vec::with_capacity(edge_rows.len());
538    for (edge_name, rows) in &edge_rows {
539        let edge_type = &catalog.edge_types[edge_name];
540        let batch = build_edge_batch(edge_type, rows)?;
541        validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
542        let unique_groups = unique_constraint_groups_for_edge(edge_type);
543        if !unique_groups.is_empty() {
544            enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?;
545        }
546        let loaded_count = batch.num_rows();
547        let table_key = format!("edge:{}", edge_name);
548        let _entry = snapshot
549            .entry(&table_key)
550            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
551        prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
552    }
553
554    // Phase 2e: accumulate every edge type. Same dispatch as Phase 2b.
555    for (edge_name, table_key, batch, loaded_count) in prepared_edges {
556        let (ds, full_path, table_branch) = db
557            .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
558            .await?;
559        let expected_version = ds.version();
560        staging.ensure_path(
561            &table_key,
562            full_path,
563            table_branch,
564            expected_version,
565            load_op_kind,
566        );
567        let schema = batch.schema();
568        staging.append_batch(&table_key, schema, pending_mode, batch)?;
569        result.edges_loaded.insert(edge_name, loaded_count);
570    }
571
572    // Phase 3: Validate edge cardinality constraints (before commit —
573    // invalid data must not be committed). The helper scans committed
574    // edges via Lance + iterates pending edges in-memory; for Overwrite it
575    // treats the pending edge batches as the replacement table image.
576    for (edge_name, _) in &edge_rows {
577        let edge_type = &catalog.edge_types[edge_name];
578        let table_key = format!("edge:{}", edge_name);
579        validate_edge_cardinality_with_pending_loader(
580            db, branch, edge_type, &table_key, &staging, mode,
581        )
582        .await?;
583    }
584
585    // Phase 4: Atomic manifest commit with publisher-level OCC.
586    let staged = staging
587        .stage_all_with_concurrency(db, branch, load_write_concurrency())
588        .await?;
589    // `_queue_guards` holds per-(table_key, branch) write queues
590    // across the manifest publish below — see exec/mutation.rs for
591    // the rationale (interleaving prevention).
592    let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
593        .commit_all(
594            db,
595            branch,
596            crate::db::manifest::SidecarKind::Load,
597            actor_id,
598            fork_queue_guards,
599        )
600        .await?;
601    // Same finalize → publisher residual as mutations: per-table
602    // staged commits have advanced Lance HEAD, but the manifest
603    // publish has not run yet. Reuse the mutation failpoint name so
604    // one failpoint pins the shared `MutationStaging` boundary.
605    crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
606    db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
607        .await?;
608    // The recovery sidecar protects the per-table commit_staged →
609    // manifest publish window. Phase C succeeded — clean up
610    // best-effort: failing the user here would error out a write
611    // that already landed durably.
612    if let Some(handle) = sidecar_handle {
613        if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
614            tracing::warn!(
615                error = %err,
616                operation_id = handle.operation_id.as_str(),
617                "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
618            );
619        }
620    }
621
622    Ok(result)
623}
624
625fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
626    let schema = node_type.arrow_schema.clone();
627
628    // Build id column: explicit id, @key value, or generated ULID.
629    let ids: Vec<String> = rows
630        .iter()
631        .map(|row| {
632            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
633            if let Some(key_prop) = node_type.key_property() {
634                let key_value = row
635                    .get(key_prop)
636                    .and_then(|v| v.as_str())
637                    .map(|s| s.to_string())
638                    .ok_or_else(|| {
639                        OmniError::manifest(format!(
640                            "node {} missing @key property '{}'",
641                            node_type.name, key_prop
642                        ))
643                    })?;
644                if let Some(explicit_id) = explicit_id {
645                    if explicit_id != key_value {
646                        return Err(OmniError::manifest(format!(
647                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
648                            node_type.name, explicit_id, key_prop, key_value
649                        )));
650                    }
651                }
652                Ok(key_value)
653            } else if let Some(explicit_id) = explicit_id {
654                Ok(explicit_id)
655            } else {
656                Ok(generate_id())
657            }
658        })
659        .collect::<Result<Vec<_>>>()?;
660
661    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
662    columns.push(Arc::new(StringArray::from(ids)));
663
664    // Build property columns (skip "id" field at index 0)
665    for field in schema.fields().iter().skip(1) {
666        if node_type.blob_properties.contains(field.name()) {
667            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
668            columns.push(col);
669        } else {
670            let col =
671                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
672            columns.push(col);
673        }
674    }
675
676    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
677}
678
679fn build_edge_batch(
680    edge_type: &omnigraph_compiler::catalog::EdgeType,
681    rows: &[(String, String, JsonValue)],
682) -> Result<RecordBatch> {
683    let schema = edge_type.arrow_schema.clone();
684
685    let ids: Vec<String> = rows
686        .iter()
687        .map(|(_, _, data)| {
688            data.get("id")
689                .and_then(|v| v.as_str())
690                .map(str::to_string)
691                .unwrap_or_else(generate_id)
692        })
693        .collect();
694    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
695    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
696
697    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
698    columns.push(Arc::new(StringArray::from(ids)));
699    columns.push(Arc::new(StringArray::from(srcs)));
700    columns.push(Arc::new(StringArray::from(dsts)));
701
702    // Build edge property columns (skip id, src, dst at indices 0-2)
703    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
704    for field in schema.fields().iter().skip(3) {
705        if edge_type.blob_properties.contains(field.name()) {
706            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
707            columns.push(col);
708        } else {
709            let col = build_column_from_json(
710                field.name(),
711                field.data_type(),
712                field.is_nullable(),
713                &data_values,
714            )?;
715            columns.push(col);
716        }
717    }
718
719    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
720}
721
722/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
723pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
724    if let Some(encoded) = value.strip_prefix("base64:") {
725        let bytes = base64::engine::general_purpose::STANDARD
726            .decode(encoded)
727            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
728        builder
729            .push_bytes(bytes)
730            .map_err(|e| OmniError::Lance(e.to_string()))
731    } else {
732        // Treat as URI (file://, s3://, gs://, or any other scheme)
733        builder
734            .push_uri(value)
735            .map_err(|e| OmniError::Lance(e.to_string()))
736    }
737}
738
739/// Build a blob column from JSON values using Lance BlobArrayBuilder.
740fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
741    let mut builder = BlobArrayBuilder::new(rows.len());
742    for row in rows {
743        match row.get(name) {
744            Some(JsonValue::String(s)) => {
745                append_blob_value(&mut builder, s)?;
746            }
747            Some(JsonValue::Null) | None if nullable => {
748                builder
749                    .push_null()
750                    .map_err(|e| OmniError::Lance(e.to_string()))?;
751            }
752            Some(JsonValue::Null) | None => {
753                return Err(OmniError::manifest(format!(
754                    "non-nullable blob property '{}' has null values",
755                    name
756                )));
757            }
758            _ => {
759                return Err(OmniError::manifest(format!(
760                    "blob property '{}' must be a URI string or base64: prefixed data",
761                    name
762                )));
763            }
764        }
765    }
766    builder
767        .finish()
768        .map_err(|e| OmniError::Lance(e.to_string()))
769}
770
771fn build_column_from_json(
772    name: &str,
773    data_type: &DataType,
774    nullable: bool,
775    rows: &[JsonValue],
776) -> Result<ArrayRef> {
777    let array: ArrayRef = match data_type {
778        DataType::Utf8 => {
779            let values: Vec<Option<String>> = rows
780                .iter()
781                .map(|row| {
782                    row.get(name)
783                        .and_then(|v| v.as_str())
784                        .map(|s| s.to_string())
785                })
786                .collect();
787            Arc::new(StringArray::from(values))
788        }
789        DataType::Int32 => {
790            let values: Vec<Option<i32>> = rows
791                .iter()
792                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
793                .collect();
794            Arc::new(Int32Array::from(values))
795        }
796        DataType::Int64 => {
797            let values: Vec<Option<i64>> = rows
798                .iter()
799                .map(|row| row.get(name).and_then(|v| v.as_i64()))
800                .collect();
801            Arc::new(Int64Array::from(values))
802        }
803        DataType::UInt32 => {
804            let values: Vec<Option<u32>> = rows
805                .iter()
806                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
807                .collect();
808            Arc::new(UInt32Array::from(values))
809        }
810        DataType::UInt64 => {
811            let values: Vec<Option<u64>> = rows
812                .iter()
813                .map(|row| row.get(name).and_then(|v| v.as_u64()))
814                .collect();
815            Arc::new(UInt64Array::from(values))
816        }
817        DataType::Float32 => {
818            let values: Vec<Option<f32>> = rows
819                .iter()
820                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
821                .collect();
822            Arc::new(Float32Array::from(values))
823        }
824        DataType::Float64 => {
825            let values: Vec<Option<f64>> = rows
826                .iter()
827                .map(|row| row.get(name).and_then(|v| v.as_f64()))
828                .collect();
829            Arc::new(Float64Array::from(values))
830        }
831        DataType::Boolean => {
832            let values: Vec<Option<bool>> = rows
833                .iter()
834                .map(|row| row.get(name).and_then(|v| v.as_bool()))
835                .collect();
836            Arc::new(BooleanArray::from(values))
837        }
838        DataType::Date32 => {
839            let mut values = Vec::with_capacity(rows.len());
840            for row in rows {
841                values.push(parse_date32_json_value(
842                    row.get(name).unwrap_or(&JsonValue::Null),
843                )?);
844            }
845            Arc::new(Date32Array::from(values))
846        }
847        DataType::Date64 => {
848            let mut values = Vec::with_capacity(rows.len());
849            for row in rows {
850                values.push(parse_date64_json_value(
851                    row.get(name).unwrap_or(&JsonValue::Null),
852                )?);
853            }
854            Arc::new(Date64Array::from(values))
855        }
856        DataType::List(field) => {
857            let mut builder = ListBuilder::with_capacity(
858                make_list_value_builder(field.data_type(), rows.len())?,
859                rows.len(),
860            )
861            .with_field(field.clone());
862            for row in rows {
863                let value = row.get(name).unwrap_or(&JsonValue::Null);
864                if value.is_null() {
865                    builder.append(false);
866                    continue;
867                }
868                let items = value.as_array().ok_or_else(|| {
869                    OmniError::manifest(format!(
870                        "list property '{}' expects a JSON array, got {}",
871                        name, value
872                    ))
873                })?;
874                for item in items {
875                    append_json_list_item(builder.values(), field.data_type(), item)?;
876                }
877                builder.append(true);
878            }
879            Arc::new(builder.finish())
880        }
881        DataType::FixedSizeList(child_field, dim) => {
882            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
883            let dim = *dim;
884            let mut builder = FixedSizeListBuilder::with_capacity(
885                Float32Builder::with_capacity(rows.len() * dim as usize),
886                dim,
887                rows.len(),
888            )
889            .with_field(child_field.clone());
890            for row in rows {
891                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
892                    if arr.len() != dim as usize {
893                        return Err(OmniError::manifest(format!(
894                            "vector property '{}' expects {} dimensions, got {}",
895                            name,
896                            dim,
897                            arr.len()
898                        )));
899                    }
900                    for val in arr {
901                        builder
902                            .values()
903                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
904                    }
905                    builder.append(true);
906                } else if nullable {
907                    for _ in 0..dim as usize {
908                        builder.values().append_null();
909                    }
910                    builder.append(false);
911                } else {
912                    return Err(OmniError::manifest(format!(
913                        "non-nullable vector property '{}' has null values",
914                        name
915                    )));
916                }
917            }
918            Arc::new(builder.finish())
919        }
920        _ => {
921            // Unsupported type: fill with nulls
922            let values: Vec<Option<&str>> = vec![None; rows.len()];
923            Arc::new(StringArray::from(values))
924        }
925    };
926
927    if !nullable && array.null_count() > 0 {
928        return Err(OmniError::manifest(format!(
929            "non-nullable property '{}' has null or invalid values",
930            name
931        )));
932    }
933
934    Ok(array)
935}
936
937fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
938    Ok(match data_type {
939        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
940        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
941        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
942        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
943        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
944        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
945        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
946        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
947        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
948        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
949        other => {
950            return Err(OmniError::manifest(format!(
951                "unsupported list element data type {:?}",
952                other
953            )));
954        }
955    })
956}
957
958fn append_json_list_item(
959    builder: &mut Box<dyn ArrayBuilder>,
960    data_type: &DataType,
961    value: &JsonValue,
962) -> Result<()> {
963    match data_type {
964        DataType::Utf8 => {
965            let builder = builder
966                .as_any_mut()
967                .downcast_mut::<StringBuilder>()
968                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
969            if let Some(value) = value.as_str() {
970                builder.append_value(value);
971            } else {
972                builder.append_null();
973            }
974        }
975        DataType::Boolean => {
976            let builder = builder
977                .as_any_mut()
978                .downcast_mut::<BooleanBuilder>()
979                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
980            if let Some(value) = value.as_bool() {
981                builder.append_value(value);
982            } else {
983                builder.append_null();
984            }
985        }
986        DataType::Int32 => {
987            let builder = builder
988                .as_any_mut()
989                .downcast_mut::<Int32Builder>()
990                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
991            if let Some(value) = value.as_i64() {
992                let value = i32::try_from(value).map_err(|_| {
993                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
994                })?;
995                builder.append_value(value);
996            } else {
997                builder.append_null();
998            }
999        }
1000        DataType::Int64 => {
1001            let builder = builder
1002                .as_any_mut()
1003                .downcast_mut::<Int64Builder>()
1004                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1005            if let Some(value) = value.as_i64() {
1006                builder.append_value(value);
1007            } else {
1008                builder.append_null();
1009            }
1010        }
1011        DataType::UInt32 => {
1012            let builder = builder
1013                .as_any_mut()
1014                .downcast_mut::<UInt32Builder>()
1015                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1016            if let Some(value) = value.as_u64() {
1017                let value = u32::try_from(value).map_err(|_| {
1018                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1019                })?;
1020                builder.append_value(value);
1021            } else {
1022                builder.append_null();
1023            }
1024        }
1025        DataType::UInt64 => {
1026            let builder = builder
1027                .as_any_mut()
1028                .downcast_mut::<UInt64Builder>()
1029                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1030            if let Some(value) = value.as_u64() {
1031                builder.append_value(value);
1032            } else {
1033                builder.append_null();
1034            }
1035        }
1036        DataType::Float32 => {
1037            let builder = builder
1038                .as_any_mut()
1039                .downcast_mut::<Float32Builder>()
1040                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1041            if let Some(value) = value.as_f64() {
1042                builder.append_value(value as f32);
1043            } else {
1044                builder.append_null();
1045            }
1046        }
1047        DataType::Float64 => {
1048            let builder = builder
1049                .as_any_mut()
1050                .downcast_mut::<Float64Builder>()
1051                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1052            if let Some(value) = value.as_f64() {
1053                builder.append_value(value);
1054            } else {
1055                builder.append_null();
1056            }
1057        }
1058        DataType::Date32 => {
1059            let builder = builder
1060                .as_any_mut()
1061                .downcast_mut::<Date32Builder>()
1062                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1063            if let Some(value) = parse_date32_json_value(value)? {
1064                builder.append_value(value);
1065            } else {
1066                builder.append_null();
1067            }
1068        }
1069        DataType::Date64 => {
1070            let builder = builder
1071                .as_any_mut()
1072                .downcast_mut::<Date64Builder>()
1073                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1074            if let Some(value) = parse_date64_json_value(value)? {
1075                builder.append_value(value);
1076            } else {
1077                builder.append_null();
1078            }
1079        }
1080        other => {
1081            return Err(OmniError::manifest(format!(
1082                "unsupported list element data type {:?}",
1083                other
1084            )));
1085        }
1086    }
1087
1088    Ok(())
1089}
1090
1091fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1092    if value.is_null() {
1093        return Ok(None);
1094    }
1095    if let Some(days) = value.as_i64() {
1096        let days = i32::try_from(days)
1097            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1098        return Ok(Some(days));
1099    }
1100    if let Some(days) = value.as_u64() {
1101        let days = i32::try_from(days)
1102            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1103        return Ok(Some(days));
1104    }
1105    if let Some(value) = value.as_str() {
1106        return Ok(Some(parse_date32_literal(value)?));
1107    }
1108    Ok(None)
1109}
1110
1111fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1112    if value.is_null() {
1113        return Ok(None);
1114    }
1115    if let Some(ms) = value.as_i64() {
1116        return Ok(Some(ms));
1117    }
1118    if let Some(ms) = value.as_u64() {
1119        let ms = i64::try_from(ms)
1120            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1121        return Ok(Some(ms));
1122    }
1123    if let Some(value) = value.as_str() {
1124        return Ok(Some(parse_date64_literal(value)?));
1125    }
1126    Ok(None)
1127}
1128
1129/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
1130/// How many per-type Lance writes to run concurrently during a load.
1131///
1132/// Each write is an independent S3 manifest + fragment write against a
1133/// different table. Ops within a single table must still be serial (Lance
1134/// OCC on the manifest), but cross-table writes have no shared state.
1135///
1136/// 8 is a conservative default — enough to overlap S3 round-trip latency
1137/// across the typical 10-30 table schemas without flooding the runtime.
1138/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
1139const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1140
1141fn load_write_concurrency() -> usize {
1142    std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1143        .ok()
1144        .and_then(|v| v.parse::<usize>().ok())
1145        .filter(|v| *v > 0)
1146        .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1147}
1148
1149fn generate_id() -> String {
1150    ulid::Ulid::new().to_string()
1151}
1152
1153pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1154    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1155    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1156        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1157    let out = casted
1158        .as_any()
1159        .downcast_ref::<Date32Array>()
1160        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1161    if out.is_null(0) {
1162        return Err(OmniError::manifest(format!(
1163            "invalid Date literal '{}'",
1164            value
1165        )));
1166    }
1167    Ok(out.value(0))
1168}
1169
1170pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1171    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1172    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1173        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1174    let out = casted
1175        .as_any()
1176        .downcast_ref::<Date64Array>()
1177        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1178    if out.is_null(0) {
1179        return Err(OmniError::manifest(format!(
1180            "invalid DateTime literal '{}'",
1181            value
1182        )));
1183    }
1184    Ok(out.value(0))
1185}
1186
1187// ─── Value constraint validation ─────────────────────────────────────────────
1188
1189pub(crate) fn validate_value_constraints(
1190    batch: &RecordBatch,
1191    node_type: &omnigraph_compiler::catalog::NodeType,
1192) -> Result<()> {
1193    use arrow_array::Array;
1194
1195    // Range constraints
1196    for rc in &node_type.range_constraints {
1197        let Some(col) = batch.column_by_name(&rc.property) else {
1198            continue;
1199        };
1200        for row in 0..batch.num_rows() {
1201            if col.is_null(row) {
1202                continue;
1203            }
1204            let value = extract_numeric_value(col, row);
1205            if let Some(val) = value {
1206                if val.is_nan() {
1207                    return Err(OmniError::manifest(format!(
1208                        "@range violation on {}.{}: value is NaN",
1209                        node_type.name, rc.property
1210                    )));
1211                }
1212                if let Some(ref min) = rc.min {
1213                    let min_f = literal_value_to_f64(min);
1214                    if val < min_f {
1215                        return Err(OmniError::manifest(format!(
1216                            "@range violation on {}.{}: value {} < min {}",
1217                            node_type.name, rc.property, val, min_f
1218                        )));
1219                    }
1220                }
1221                if let Some(ref max) = rc.max {
1222                    let max_f = literal_value_to_f64(max);
1223                    if val > max_f {
1224                        return Err(OmniError::manifest(format!(
1225                            "@range violation on {}.{}: value {} > max {}",
1226                            node_type.name, rc.property, val, max_f
1227                        )));
1228                    }
1229                }
1230            }
1231        }
1232    }
1233
1234    // Check constraints (regex)
1235    for cc in &node_type.check_constraints {
1236        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1237            OmniError::manifest(format!(
1238                "@check on {}.{} has invalid regex '{}': {}",
1239                node_type.name, cc.property, cc.pattern, e
1240            ))
1241        })?;
1242        let Some(col) = batch.column_by_name(&cc.property) else {
1243            continue;
1244        };
1245        let str_col = col.as_any().downcast_ref::<StringArray>();
1246        if let Some(str_col) = str_col {
1247            for row in 0..str_col.len() {
1248                if str_col.is_null(row) {
1249                    continue;
1250                }
1251                let val = str_col.value(row);
1252                if !re.is_match(val) {
1253                    return Err(OmniError::manifest(format!(
1254                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1255                        node_type.name, cc.property, val, cc.pattern
1256                    )));
1257                }
1258            }
1259        }
1260    }
1261
1262    Ok(())
1263}
1264
1265/// Validate that every enum-typed property in `properties` only contains values
1266/// from its declared enum value set. Operates on a single `RecordBatch` so it
1267/// can be called from any write path that already holds a batch.
1268///
1269/// Scalar string enums are checked directly. List-of-enum properties are
1270/// checked element-by-element across the underlying string values.
1271pub(crate) fn validate_enum_constraints(
1272    batch: &RecordBatch,
1273    properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1274    type_name: &str,
1275) -> Result<()> {
1276    use arrow_array::{Array, ListArray};
1277
1278    for (prop_name, prop_type) in properties {
1279        let Some(allowed) = prop_type.enum_values.as_ref() else {
1280            continue;
1281        };
1282        let Some(col) = batch.column_by_name(prop_name) else {
1283            continue;
1284        };
1285        if prop_type.list {
1286            let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1287                continue;
1288            };
1289            for row in 0..list_col.len() {
1290                if list_col.is_null(row) {
1291                    continue;
1292                }
1293                let item_arr = list_col.value(row);
1294                let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1295                    continue;
1296                };
1297                for i in 0..str_arr.len() {
1298                    if str_arr.is_null(i) {
1299                        continue;
1300                    }
1301                    let val = str_arr.value(i);
1302                    if !allowed.iter().any(|a| a.as_str() == val) {
1303                        return Err(OmniError::manifest(format!(
1304                            "invalid enum value '{}' for {}.{} (expected: {})",
1305                            val,
1306                            type_name,
1307                            prop_name,
1308                            allowed.join(", ")
1309                        )));
1310                    }
1311                }
1312            }
1313        } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1314            for row in 0..str_col.len() {
1315                if str_col.is_null(row) {
1316                    continue;
1317                }
1318                let val = str_col.value(row);
1319                if !allowed.iter().any(|a| a.as_str() == val) {
1320                    return Err(OmniError::manifest(format!(
1321                        "invalid enum value '{}' for {}.{} (expected: {})",
1322                        val,
1323                        type_name,
1324                        prop_name,
1325                        allowed.join(", ")
1326                    )));
1327                }
1328            }
1329        }
1330    }
1331    Ok(())
1332}
1333
1334/// Detect duplicate values within a single `RecordBatch` for any of the
1335/// `unique_constraints` groups. Each group is a list of one or more columns
1336/// that together form a uniqueness key: a violation occurs when two rows share
1337/// the same tuple of values across *all* columns in a group, so a composite
1338/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an
1339/// error on the first duplicate found.
1340///
1341/// Rows where any column in a group is null are exempt (standard SQL semantics
1342/// for uniqueness over nullable columns), as is any group whose columns are
1343/// not all present in the batch (e.g. a partial-schema load).
1344///
1345/// Note: this only catches duplicates *within* the batch. Cross-batch
1346/// uniqueness against already-committed rows is not enforced here — that
1347/// requires a dataset scan and is tracked separately.
1348pub(crate) fn enforce_unique_constraints_intra_batch(
1349    batch: &RecordBatch,
1350    type_name: &str,
1351    unique_constraints: &[Vec<String>],
1352) -> Result<()> {
1353    for columns in unique_constraints {
1354        // Resolve the group's columns once. A group whose columns aren't all
1355        // present in this batch is skipped (e.g. a partial-schema load).
1356        let Some(group_columns) = columns
1357            .iter()
1358            .map(|name| {
1359                batch
1360                    .schema()
1361                    .index_of(name)
1362                    .ok()
1363                    .map(|i| batch.column(i).clone())
1364            })
1365            .collect::<Option<Vec<ArrayRef>>>()
1366        else {
1367            continue;
1368        };
1369        let mut seen: HashMap<Vec<String>, usize> = HashMap::new();
1370        for row in 0..batch.num_rows() {
1371            let Some(key) = composite_unique_key(&group_columns, row)? else {
1372                continue;
1373            };
1374            if let Some(prev_row) = seen.insert(key.clone(), row) {
1375                return Err(OmniError::manifest(format!(
1376                    "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1377                    type_name,
1378                    format_tuple(columns),
1379                    format_tuple(&key),
1380                    prev_row,
1381                    row
1382                )));
1383            }
1384        }
1385    }
1386    Ok(())
1387}
1388
1389/// Build the composite uniqueness key for `row` over a constraint group's
1390/// already-resolved columns (in declaration order).
1391///
1392/// The key is the *tuple* of per-column scalar strings (`Vec<String>`), keyed
1393/// directly in the dedup map — there is no separator, so no data value can
1394/// forge a collision (an earlier version joined on `U+001F`, which a value
1395/// containing that control char could still defeat).
1396///
1397/// - `Ok(None)` if any column is null: the row is exempt (a partial tuple
1398///   can't violate uniqueness under SQL null semantics).
1399/// - `Ok(Some(tuple))` otherwise.
1400/// - `Err(..)` propagated from [`unique_key_scalar`] on an un-keyable value.
1401///
1402/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and the
1403/// branch-merge path (`exec/merge.rs::update_unique_constraints`) so the two
1404/// derive identical keys and cannot drift on separator or scalar conversion.
1405pub(crate) fn composite_unique_key(
1406    group_columns: &[ArrayRef],
1407    row: usize,
1408) -> Result<Option<Vec<String>>> {
1409    let mut parts = Vec::with_capacity(group_columns.len());
1410    for column in group_columns {
1411        match unique_key_scalar(column, row)? {
1412            Some(value) => parts.push(value),
1413            None => return Ok(None),
1414        }
1415    }
1416    Ok(Some(parts))
1417}
1418
1419/// Render a constraint's column tuple for error messages: a single item as
1420/// `col`, a composite as `(a, b)`. Used for both the column list and the
1421/// offending value tuple, which share the same shape.
1422fn format_tuple(items: &[String]) -> String {
1423    match items {
1424        [single] => single.clone(),
1425        _ => format!("({})", items.join(", ")),
1426    }
1427}
1428
1429/// Reduce a single Arrow scalar at (`array`, `row`) to its uniqueness-key
1430/// string.
1431///
1432/// - `Ok(None)` for a null value: nulls are exempt from uniqueness (standard
1433///   SQL semantics over nullable columns).
1434/// - `Ok(Some(s))` for every scalar type a `@unique` / `@key` column can hold.
1435///   Strings are covered in all three physical Arrow encodings (`Utf8`,
1436///   `LargeUtf8`, `Utf8View`), so a legal string column is always keyable
1437///   regardless of how Lance materializes it on read-back.
1438/// - `Err(..)` for a non-null value whose Arrow type can't be reduced to a key
1439///   (a list, blob, or vector column). This fails loudly rather than silently
1440///   exempting the row, and because every legal scalar encoding is handled
1441///   above, the error fires only for a genuinely un-keyable column type — never
1442///   for a legal value that merely arrived in an unenumerated encoding.
1443fn unique_key_scalar(array: &ArrayRef, row: usize) -> Result<Option<String>> {
1444    use arrow_array::{Array, LargeStringArray, StringViewArray};
1445    if array.is_null(row) {
1446        return Ok(None);
1447    }
1448    if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1449        return Ok(Some(a.value(row).to_string()));
1450    }
1451    if let Some(a) = array.as_any().downcast_ref::<LargeStringArray>() {
1452        return Ok(Some(a.value(row).to_string()));
1453    }
1454    if let Some(a) = array.as_any().downcast_ref::<StringViewArray>() {
1455        return Ok(Some(a.value(row).to_string()));
1456    }
1457    if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1458        return Ok(Some(a.value(row).to_string()));
1459    }
1460    if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1461        return Ok(Some(a.value(row).to_string()));
1462    }
1463    if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1464        return Ok(Some(a.value(row).to_string()));
1465    }
1466    if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1467        return Ok(Some(a.value(row).to_string()));
1468    }
1469    if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1470        return Ok(Some(a.value(row).to_string()));
1471    }
1472    if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1473        return Ok(Some(a.value(row).to_string()));
1474    }
1475    if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1476        return Ok(Some(a.value(row).to_string()));
1477    }
1478    if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1479        return Ok(Some(a.value(row).to_string()));
1480    }
1481    if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1482        return Ok(Some(a.value(row).to_string()));
1483    }
1484    Err(OmniError::manifest(format!(
1485        "uniqueness key: unsupported column type {:?} for @unique/@key enforcement",
1486        array.data_type()
1487    )))
1488}
1489
1490/// Build the list of uniqueness constraint groups to enforce on a node type.
1491/// Each group is the column tuple of one constraint. Includes every
1492/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the
1493/// `@key` (which implies uniqueness over its column tuple). Grouping is
1494/// preserved so a composite `@unique(a, b)` is enforced as a composite key
1495/// rather than degraded into independent single-field checks.
1496pub(crate) fn unique_constraint_groups_for_node(
1497    node_type: &omnigraph_compiler::catalog::NodeType,
1498) -> Vec<Vec<String>> {
1499    let mut groups: Vec<Vec<String>> = node_type.unique_constraints.clone();
1500    if let Some(key) = &node_type.key
1501        && !groups.contains(key)
1502    {
1503        groups.push(key.clone());
1504    }
1505    groups
1506}
1507
1508/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges
1509/// have no `@key`).
1510pub(crate) fn unique_constraint_groups_for_edge(
1511    edge_type: &omnigraph_compiler::catalog::EdgeType,
1512) -> Vec<Vec<String>> {
1513    edge_type.unique_constraints.clone()
1514}
1515
1516fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1517    use arrow_array::{
1518        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1519    };
1520    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1521        return Some(a.value(row) as f64);
1522    }
1523    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1524        return Some(a.value(row) as f64);
1525    }
1526    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1527        return Some(a.value(row) as f64);
1528    }
1529    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1530        return Some(a.value(row) as f64);
1531    }
1532    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1533        return Some(a.value(row) as f64);
1534    }
1535    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1536        return Some(a.value(row));
1537    }
1538    None
1539}
1540
1541fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1542    use omnigraph_compiler::catalog::LiteralValue;
1543    match v {
1544        LiteralValue::Integer(n) => *n as f64,
1545        LiteralValue::Float(f) => *f,
1546    }
1547}
1548
1549// ─── Edge cardinality validation ─────────────────────────────────────────────
1550
1551pub(crate) async fn validate_edge_cardinality(
1552    db: &crate::db::Omnigraph,
1553    branch: Option<&str>,
1554    edge_name: &str,
1555    written_version: u64,
1556    written_branch: Option<&str>,
1557) -> Result<()> {
1558    use arrow_array::Array;
1559    let catalog = db.catalog();
1560    let edge_type = &catalog.edge_types[edge_name];
1561    if edge_type.cardinality.is_default() {
1562        return Ok(());
1563    }
1564
1565    // Open edge sub-table at the just-written version, not the snapshot's
1566    // (the snapshot still pins to the pre-write version).
1567    let snapshot = db.snapshot_for_branch(branch).await?;
1568    let table_key = format!("edge:{}", edge_name);
1569    let entry = snapshot
1570        .entry(&table_key)
1571        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1572    let ds = db
1573        .open_dataset_at_state(
1574            &entry.table_path,
1575            written_branch.or(entry.table_branch.as_deref()),
1576            written_version,
1577        )
1578        .await?;
1579
1580    // Scan src column, count per source
1581    let batches = db.storage().scan(&ds, Some(&["src"]), None, None).await?;
1582
1583    let mut counts: HashMap<String, u32> = HashMap::new();
1584    for batch in &batches {
1585        let srcs = batch
1586            .column_by_name("src")
1587            .unwrap()
1588            .as_any()
1589            .downcast_ref::<StringArray>()
1590            .unwrap();
1591        for i in 0..srcs.len() {
1592            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1593        }
1594    }
1595
1596    let card = &edge_type.cardinality;
1597    for (src, count) in &counts {
1598        if let Some(max) = card.max {
1599            if *count > max {
1600                return Err(OmniError::manifest(format!(
1601                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1602                    edge_name, src, count, max
1603                )));
1604            }
1605        }
1606        if *count < card.min {
1607            return Err(OmniError::manifest(format!(
1608                "@card violation on edge {}: source '{}' has {} edges (min {})",
1609                edge_name, src, count, card.min
1610            )));
1611        }
1612    }
1613
1614    Ok(())
1615}
1616
1617/// Validate edge `@card` cardinality with in-memory pending edges visible.
1618///
1619/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
1620/// opens the committed dataset at the pre-load snapshot version, then
1621/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
1622/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
1623/// path uses `validate_edge_cardinality` which opens the just-written
1624/// Lance version).
1625///
1626/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
1627/// so committed edges that the load is *updating* (same edge id,
1628/// possibly changed `src`) are not double-counted. `LoadMode::Append`
1629/// passes `None` because each line generates a fresh ULID id that
1630/// never collides with committed.
1631async fn validate_edge_cardinality_with_pending_loader(
1632    db: &Omnigraph,
1633    branch: Option<&str>,
1634    edge_type: &omnigraph_compiler::catalog::EdgeType,
1635    table_key: &str,
1636    staging: &MutationStaging,
1637    mode: LoadMode,
1638) -> Result<()> {
1639    if edge_type.cardinality.is_default() {
1640        return Ok(());
1641    }
1642    let snapshot = db.snapshot_for_branch(branch).await?;
1643    let Some(entry) = snapshot.entry(table_key) else {
1644        // No manifest entry — table doesn't exist yet. Pending-only is
1645        // fine; the helper handles empty committed scans.
1646        return Ok(());
1647    };
1648    let ds = db
1649        .open_dataset_at_state(
1650            &entry.table_path,
1651            entry.table_branch.as_deref(),
1652            entry.table_version,
1653        )
1654        .await?;
1655    let dedupe_key = match mode {
1656        LoadMode::Merge => Some("id"),
1657        LoadMode::Append | LoadMode::Overwrite => None,
1658    };
1659    let counts =
1660        crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
1661    crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1662}
1663
1664/// Collect all valid node IDs for a given type, with in-memory pending
1665/// node inserts visible. Used by the staged loader's Phase 2c
1666/// referential-integrity validation.
1667///
1668/// Union of:
1669/// - IDs from the staged loader's pending batches (in-memory; just-staged
1670///   inserts of this type)
1671/// - IDs from the committed sub-table at the pre-load snapshot version
1672///
1673/// For `LoadMode::Overwrite`, if the node table is touched then the pending
1674/// batches are the replacement image. In that case committed IDs are not
1675/// included, so edge RI is validated against exactly what the overwrite will
1676/// publish.
1677async fn collect_node_ids_with_pending(
1678    db: &Omnigraph,
1679    branch: Option<&str>,
1680    type_name: &str,
1681    staging: &MutationStaging,
1682) -> Result<HashSet<String>> {
1683    let mut ids = HashSet::new();
1684    let table_key = format!("node:{}", type_name);
1685
1686    // From staging.pending: walk the in-memory accumulator's id column.
1687    for batch in staging.pending_batches(&table_key) {
1688        if let Some(col) = batch.column_by_name("id") {
1689            if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1690                for i in 0..arr.len() {
1691                    if arr.is_valid(i) {
1692                        ids.insert(arr.value(i).to_string());
1693                    }
1694                }
1695            }
1696        }
1697    }
1698
1699    if staging.pending_mode(&table_key) == Some(PendingMode::Overwrite) {
1700        return Ok(ids);
1701    }
1702
1703    // From the committed Lance sub-table at the pre-load snapshot version.
1704    let snapshot = db.snapshot_for_branch(branch).await?;
1705    let Some(entry) = snapshot.entry(&table_key) else {
1706        return Ok(ids);
1707    };
1708    let ds = db
1709        .open_dataset_at_state(
1710            &entry.table_path,
1711            entry.table_branch.as_deref(),
1712            entry.table_version,
1713        )
1714        .await?;
1715
1716    let batches = db.storage().scan(&ds, Some(&["id"]), None, None).await?;
1717
1718    for batch in &batches {
1719        let id_col = batch
1720            .column_by_name("id")
1721            .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1722            .as_any()
1723            .downcast_ref::<StringArray>()
1724            .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1725        for i in 0..batch.num_rows() {
1726            // Defensive: `id` is the @key column on every node type and
1727            // is non-nullable by schema, but a committed-row corruption
1728            // (or future schema change) could surface a NULL. Skip
1729            // rather than insert "" — pending-side does the same.
1730            if id_col.is_valid(i) {
1731                ids.insert(id_col.value(i).to_string());
1732            }
1733        }
1734    }
1735
1736    Ok(ids)
1737}
1738
1739#[cfg(test)]
1740mod tests {
1741    use super::*;
1742    use crate::db::Omnigraph;
1743    use arrow_array::Array;
1744    use futures::TryStreamExt;
1745    use std::collections::HashMap;
1746
1747    const TEST_SCHEMA: &str = r#"
1748node Person {
1749    name: String @key
1750    age: I32?
1751}
1752node Company {
1753    name: String @key
1754}
1755edge Knows: Person -> Person {
1756    since: Date?
1757}
1758edge WorksAt: Person -> Company
1759"#;
1760
1761    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1762{"type": "Person", "data": {"name": "Bob", "age": 25}}
1763{"type": "Company", "data": {"name": "Acme"}}
1764{"edge": "Knows", "from": "Alice", "to": "Bob"}
1765{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1766"#;
1767
1768    #[tokio::test]
1769    async fn test_load_creates_data() {
1770        let dir = tempfile::tempdir().unwrap();
1771        let uri = dir.path().to_str().unwrap();
1772        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1773
1774        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1775            .await
1776            .unwrap();
1777
1778        assert_eq!(result.nodes_loaded["Person"], 2);
1779        assert_eq!(result.nodes_loaded["Company"], 1);
1780        assert_eq!(result.edges_loaded["Knows"], 1);
1781        assert_eq!(result.edges_loaded["WorksAt"], 1);
1782    }
1783
1784    #[tokio::test]
1785    async fn test_load_data_readable_via_lance() {
1786        let dir = tempfile::tempdir().unwrap();
1787        let uri = dir.path().to_str().unwrap();
1788        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1789        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1790            .await
1791            .unwrap();
1792
1793        // Read back via snapshot
1794        let snap = db.snapshot().await;
1795        let person_ds = snap.open("node:Person").await.unwrap();
1796
1797        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1798
1799        // Verify data
1800        let batches: Vec<RecordBatch> = person_ds
1801            .scan()
1802            .try_into_stream()
1803            .await
1804            .unwrap()
1805            .try_collect()
1806            .await
1807            .unwrap();
1808
1809        let batch = &batches[0];
1810        let ids = batch
1811            .column_by_name("id")
1812            .unwrap()
1813            .as_any()
1814            .downcast_ref::<StringArray>()
1815            .unwrap();
1816        // @key=name, so ids should be "Alice" and "Bob"
1817        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1818        assert!(id_values.contains(&"Alice"));
1819        assert!(id_values.contains(&"Bob"));
1820    }
1821
1822    #[tokio::test]
1823    async fn test_load_edges_reference_node_keys() {
1824        let dir = tempfile::tempdir().unwrap();
1825        let uri = dir.path().to_str().unwrap();
1826        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1827        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1828            .await
1829            .unwrap();
1830
1831        let snap = db.snapshot().await;
1832        let knows_ds = snap.open("edge:Knows").await.unwrap();
1833
1834        let batches: Vec<RecordBatch> = knows_ds
1835            .scan()
1836            .try_into_stream()
1837            .await
1838            .unwrap()
1839            .try_collect()
1840            .await
1841            .unwrap();
1842
1843        let batch = &batches[0];
1844        let srcs = batch
1845            .column_by_name("src")
1846            .unwrap()
1847            .as_any()
1848            .downcast_ref::<StringArray>()
1849            .unwrap();
1850        let dsts = batch
1851            .column_by_name("dst")
1852            .unwrap()
1853            .as_any()
1854            .downcast_ref::<StringArray>()
1855            .unwrap();
1856
1857        assert_eq!(srcs.value(0), "Alice");
1858        assert_eq!(dsts.value(0), "Bob");
1859    }
1860
1861    #[tokio::test]
1862    async fn test_load_manifest_version_advances() {
1863        let dir = tempfile::tempdir().unwrap();
1864        let uri = dir.path().to_str().unwrap();
1865        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1866        let v1 = db.version().await;
1867
1868        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1869            .await
1870            .unwrap();
1871
1872        assert!(db.version().await > v1);
1873    }
1874
1875    #[tokio::test]
1876    async fn test_load_append_adds_rows() {
1877        let dir = tempfile::tempdir().unwrap();
1878        let uri = dir.path().to_str().unwrap();
1879        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1880
1881        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1882        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1883
1884        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1885            .await
1886            .unwrap();
1887        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1888
1889        let snap = db.snapshot().await;
1890        let person_ds = snap.open("node:Person").await.unwrap();
1891        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1892    }
1893
1894    #[tokio::test]
1895    async fn test_load_unknown_type_rejected() {
1896        let dir = tempfile::tempdir().unwrap();
1897        let uri = dir.path().to_str().unwrap();
1898        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1899
1900        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1901        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1902        assert!(result.is_err());
1903    }
1904
1905    #[tokio::test]
1906    #[allow(deprecated)]
1907    async fn test_ingest_creates_branch_and_reports_tables() {
1908        let dir = tempfile::tempdir().unwrap();
1909        let uri = dir.path().to_str().unwrap();
1910        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1911
1912        let result = db
1913            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1914            .await
1915            .unwrap();
1916
1917        assert_eq!(result.branch, "feature");
1918        assert_eq!(result.base_branch, "main");
1919        assert!(result.branch_created);
1920        assert_eq!(result.mode, LoadMode::Overwrite);
1921        assert_eq!(
1922            result.tables,
1923            vec![
1924                IngestTableResult {
1925                    table_key: "edge:Knows".to_string(),
1926                    rows_loaded: 1
1927                },
1928                IngestTableResult {
1929                    table_key: "edge:WorksAt".to_string(),
1930                    rows_loaded: 1
1931                },
1932                IngestTableResult {
1933                    table_key: "node:Company".to_string(),
1934                    rows_loaded: 1
1935                },
1936                IngestTableResult {
1937                    table_key: "node:Person".to_string(),
1938                    rows_loaded: 2
1939                },
1940            ]
1941        );
1942        assert!(
1943            db.branch_list()
1944                .await
1945                .unwrap()
1946                .contains(&"feature".to_string())
1947        );
1948    }
1949
1950    #[tokio::test]
1951    #[allow(deprecated)]
1952    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1953        let dir = tempfile::tempdir().unwrap();
1954        let uri = dir.path().to_str().unwrap();
1955        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1956        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1957            .await
1958            .unwrap();
1959        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1960            .await
1961            .unwrap();
1962
1963        let result = db
1964            .ingest(
1965                "feature",
1966                Some("missing-base"),
1967                r#"{"type":"Person","data":{"name":"Bob","age":26}}
1968{"type":"Person","data":{"name":"Eve","age":31}}"#,
1969                LoadMode::Merge,
1970            )
1971            .await
1972            .unwrap();
1973
1974        assert_eq!(result.branch, "feature");
1975        assert_eq!(result.base_branch, "missing-base");
1976        assert!(!result.branch_created);
1977        assert_eq!(result.mode, LoadMode::Merge);
1978        assert_eq!(
1979            result.tables,
1980            vec![IngestTableResult {
1981                table_key: "node:Person".to_string(),
1982                rows_loaded: 2
1983            }]
1984        );
1985
1986        let snap = db
1987            .snapshot_of(crate::db::ReadTarget::branch("feature"))
1988            .await
1989            .unwrap();
1990        let person_ds = snap.open("node:Person").await.unwrap();
1991        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
1992
1993        let batches: Vec<RecordBatch> = person_ds
1994            .scan()
1995            .try_into_stream()
1996            .await
1997            .unwrap()
1998            .try_collect()
1999            .await
2000            .unwrap();
2001        let mut ages_by_id = HashMap::new();
2002        for batch in &batches {
2003            let ids = batch
2004                .column_by_name("id")
2005                .unwrap()
2006                .as_any()
2007                .downcast_ref::<StringArray>()
2008                .unwrap();
2009            let ages = batch
2010                .column_by_name("age")
2011                .unwrap()
2012                .as_any()
2013                .downcast_ref::<Int32Array>()
2014                .unwrap();
2015            for idx in 0..ids.len() {
2016                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2017            }
2018        }
2019
2020        assert_eq!(ages_by_id.get("Bob"), Some(&26));
2021        assert_eq!(ages_by_id.get("Eve"), Some(&31));
2022        assert_eq!(ages_by_id.get("Alice"), Some(&30));
2023    }
2024
2025    #[tokio::test]
2026    #[allow(deprecated)]
2027    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2028        let dir = tempfile::tempdir().unwrap();
2029        let uri = dir.path().to_str().unwrap();
2030        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2031
2032        db.ingest_as(
2033            "feature",
2034            Some("main"),
2035            TEST_DATA,
2036            LoadMode::Overwrite,
2037            Some("act-andrew"),
2038        )
2039        .await
2040        .unwrap();
2041
2042        let head = db
2043            .list_commits(Some("feature"))
2044            .await
2045            .unwrap()
2046            .into_iter()
2047            .last()
2048            .unwrap();
2049        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2050    }
2051
2052    #[tokio::test]
2053    async fn test_load_as_with_base_forks_missing_branch_and_stamps_metadata() {
2054        let dir = tempfile::tempdir().unwrap();
2055        let uri = dir.path().to_str().unwrap();
2056        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2057
2058        let result = db
2059            .load_as("feature", Some("main"), TEST_DATA, LoadMode::Merge, None)
2060            .await
2061            .unwrap();
2062
2063        assert_eq!(result.branch, "feature");
2064        assert_eq!(result.base_branch.as_deref(), Some("main"));
2065        assert!(result.branch_created);
2066        assert!(
2067            db.branch_list()
2068                .await
2069                .unwrap()
2070                .contains(&"feature".to_string())
2071        );
2072
2073        // Re-loading onto the now-existing branch records the base but
2074        // performs no fork.
2075        let again = db
2076            .load_as(
2077                "feature",
2078                Some("main"),
2079                r#"{"type":"Person","data":{"name":"Bob","age":26}}"#,
2080                LoadMode::Merge,
2081                None,
2082            )
2083            .await
2084            .unwrap();
2085        assert!(!again.branch_created);
2086        assert_eq!(again.base_branch.as_deref(), Some("main"));
2087    }
2088
2089    #[tokio::test]
2090    async fn test_load_as_without_base_errors_on_missing_branch() {
2091        let dir = tempfile::tempdir().unwrap();
2092        let uri = dir.path().to_str().unwrap();
2093        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2094
2095        let result = db
2096            .load_as("nonexistent", None, TEST_DATA, LoadMode::Merge, None)
2097            .await;
2098        assert!(result.is_err(), "load without base must not create branches");
2099        assert!(
2100            !db.branch_list()
2101                .await
2102                .unwrap()
2103                .contains(&"nonexistent".to_string()),
2104            "failed load must not leave a branch behind"
2105        );
2106
2107        // Loads to main carry the default branch metadata.
2108        let main_load = db.load("main", TEST_DATA, LoadMode::Overwrite).await.unwrap();
2109        assert_eq!(main_load.branch, "main");
2110        assert_eq!(main_load.base_branch, None);
2111        assert!(!main_load.branch_created);
2112    }
2113
2114    #[test]
2115    fn test_range_constraint_rejects_nan() {
2116        use arrow_array::{Float64Array, RecordBatch, StringArray};
2117        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2118        use std::sync::Arc;
2119
2120        let schema = Arc::new(arrow_schema::Schema::new(vec![
2121            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2122            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2123        ]));
2124
2125        let batch = RecordBatch::try_new(
2126            schema.clone(),
2127            vec![
2128                Arc::new(StringArray::from(vec!["bad"])),
2129                Arc::new(Float64Array::from(vec![f64::NAN])),
2130            ],
2131        )
2132        .unwrap();
2133
2134        let node_type = NodeType {
2135            name: "Test".to_string(),
2136            implements: vec![],
2137            properties: Default::default(),
2138            key: None,
2139            unique_constraints: vec![],
2140            indices: vec![],
2141            range_constraints: vec![RangeConstraint {
2142                property: "score".to_string(),
2143                min: Some(LiteralValue::Float(0.0)),
2144                max: Some(LiteralValue::Float(1.0)),
2145            }],
2146            check_constraints: vec![],
2147            embed_sources: Default::default(),
2148            blob_properties: Default::default(),
2149            arrow_schema: schema,
2150        };
2151
2152        let result = validate_value_constraints(&batch, &node_type);
2153        assert!(result.is_err(), "expected NaN to be rejected");
2154        let err = result.unwrap_err().to_string();
2155        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2156    }
2157
2158    #[test]
2159    fn composite_unique_key_builds_tuple_and_exempts_null() {
2160        let a: ArrayRef = Arc::new(StringArray::from(vec![Some("x|y"), Some("x"), None]));
2161        let b: ArrayRef = Arc::new(StringArray::from(vec![Some("z"), Some("y|z"), Some("q")]));
2162        let cols = [a, b];
2163
2164        // Tuple key, so `("x|y", "z")` and `("x", "y|z")` stay distinct —
2165        // a separator-joined key (the old `|` join) would collapse both to
2166        // `x|y|z`.
2167        assert_eq!(
2168            composite_unique_key(&cols, 0).unwrap(),
2169            Some(vec!["x|y".to_string(), "z".to_string()])
2170        );
2171        assert_eq!(
2172            composite_unique_key(&cols, 1).unwrap(),
2173            Some(vec!["x".to_string(), "y|z".to_string()])
2174        );
2175        assert_ne!(
2176            composite_unique_key(&cols, 0).unwrap(),
2177            composite_unique_key(&cols, 1).unwrap()
2178        );
2179
2180        // Any null column → the whole row is exempt (SQL null semantics).
2181        assert_eq!(composite_unique_key(&cols, 2).unwrap(), None);
2182    }
2183
2184    #[test]
2185    fn unique_key_scalar_errors_loudly_on_unkeyable_type() {
2186        use arrow_array::LargeBinaryArray;
2187        // A binary/blob column can't be reduced to a uniqueness key. Before the
2188        // hardening this returned `None`, so a `@unique` on such a column was
2189        // silently un-enforced; now it errors instead of weakening the
2190        // constraint in silence.
2191        let blob: ArrayRef = Arc::new(LargeBinaryArray::from(vec![Some(&b"abc"[..])]));
2192        let err = unique_key_scalar(&blob, 0).unwrap_err();
2193        assert!(
2194            err.to_string().contains("unsupported column type"),
2195            "un-keyable type must fail loudly (got: {err})"
2196        );
2197    }
2198
2199    #[test]
2200    fn unique_key_scalar_handles_all_string_encodings() {
2201        use arrow_array::{LargeStringArray, StringViewArray};
2202        // A legal string column is keyable in every physical Arrow encoding
2203        // Lance might hand back (Utf8 / LargeUtf8 / Utf8View). None of these may
2204        // fall through to the loud `Err` path — that branch is reserved for
2205        // genuinely un-keyable column types, not a legal value in an
2206        // unenumerated encoding.
2207        let utf8: ArrayRef = Arc::new(StringArray::from(vec![Some("v")]));
2208        let large: ArrayRef = Arc::new(LargeStringArray::from(vec![Some("v")]));
2209        let view: ArrayRef = Arc::new(StringViewArray::from(vec![Some("v")]));
2210        for array in [&utf8, &large, &view] {
2211            assert_eq!(
2212                unique_key_scalar(array, 0).unwrap(),
2213                Some("v".to_string()),
2214                "string array {:?} must render, not error",
2215                array.data_type()
2216            );
2217        }
2218    }
2219}