Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26/// Result of a load operation.
27#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29    pub nodes_loaded: HashMap<String, usize>,
30    pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35    pub table_key: String,
36    pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41    pub branch: String,
42    pub base_branch: String,
43    pub branch_created: bool,
44    pub mode: LoadMode,
45    pub tables: Vec<IngestTableResult>,
46}
47
48/// Load mode for data ingestion.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52    /// Overwrite existing data.
53    Overwrite,
54    /// Append to existing data.
55    Append,
56    /// Merge by `id` key (upsert).
57    Merge,
58}
59
60/// Load JSONL data into an Omnigraph database.
61pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62    let current_branch = db.active_branch().await;
63    let branch = current_branch.as_deref().unwrap_or("main");
64    db.load(branch, data, mode).await
65}
66
67/// Load JSONL data from a file path.
68pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69    let current_branch = db.active_branch().await;
70    let branch = current_branch.as_deref().unwrap_or("main");
71    db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75    pub async fn ingest(
76        &self,
77        branch: &str,
78        from: Option<&str>,
79        data: &str,
80        mode: LoadMode,
81    ) -> Result<IngestResult> {
82        self.ingest_as(branch, from, data, mode, None).await
83    }
84
85    pub async fn ingest_as(
86        &self,
87        branch: &str,
88        from: Option<&str>,
89        data: &str,
90        mode: LoadMode,
91        actor_id: Option<&str>,
92    ) -> Result<IngestResult> {
93        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
94        // `Branch(branch)` for the data-write portion. If ingest creates
95        // a new branch as a side-effect (target branch doesn't exist),
96        // the inner `branch_create_from_as` call below additionally
97        // checks `BranchCreate` — both authorities are genuinely needed
98        // for "ingest into a fresh branch", so the layered check is
99        // correct, not redundant.
100        self.enforce(
101            omnigraph_policy::PolicyAction::Change,
102            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
103            actor_id,
104        )?;
105        self.ingest_with_current_actor(branch, from, data, mode, actor_id)
106            .await
107    }
108
109    pub async fn ingest_file(
110        &self,
111        branch: &str,
112        from: Option<&str>,
113        path: &str,
114        mode: LoadMode,
115    ) -> Result<IngestResult> {
116        self.ingest_file_as(branch, from, path, mode, None).await
117    }
118
119    pub async fn ingest_file_as(
120        &self,
121        branch: &str,
122        from: Option<&str>,
123        path: &str,
124        mode: LoadMode,
125        actor_id: Option<&str>,
126    ) -> Result<IngestResult> {
127        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
128        self.ingest_as(branch, from, &data, mode, actor_id).await
129    }
130
131    async fn ingest_with_current_actor(
132        &self,
133        branch: &str,
134        from: Option<&str>,
135        data: &str,
136        mode: LoadMode,
137        actor_id: Option<&str>,
138    ) -> Result<IngestResult> {
139        self.ensure_schema_state_valid().await?;
140        let target_branch =
141            Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
142        let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
143            .unwrap_or_else(|| "main".to_string());
144        let branch_created = !self
145            .branch_list()
146            .await?
147            .iter()
148            .any(|name| name == &target_branch);
149        if branch_created {
150            // Thread the actor through to the implicit BranchCreate so
151            // policy decisions match what an explicit `branch_create_from_as`
152            // call would see. Calling the no-actor variant here would
153            // bypass BranchCreate enforcement when policy is installed —
154            // the footgun guard catches that case too, but threading is
155            // the correct fix.
156            self.branch_create_from_as(
157                crate::db::ReadTarget::branch(&base_branch),
158                &target_branch,
159                actor_id,
160            )
161            .await?;
162        }
163
164        let result = self.load_as(&target_branch, data, mode, actor_id).await?;
165        Ok(IngestResult {
166            branch: target_branch,
167            base_branch,
168            branch_created,
169            mode,
170            tables: result.to_ingest_tables(),
171        })
172    }
173
174    pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
175        self.load_as(branch, data, mode, None).await
176    }
177
178    pub async fn load_as(
179        &self,
180        branch: &str,
181        data: &str,
182        mode: LoadMode,
183        actor_id: Option<&str>,
184    ) -> Result<LoadResult> {
185        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
186        // `Branch(branch)` to match the HTTP-layer Change convention.
187        // `ingest_as` also calls `load_as` after enforcing its own
188        // Change gate — that double-check is fine because both gates
189        // resolve to identical Cedar decisions for the same actor +
190        // branch (the second check is a structurally-correct no-op).
191        self.enforce(
192            omnigraph_policy::PolicyAction::Change,
193            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
194            actor_id,
195        )?;
196        self.ensure_schema_state_valid().await?;
197        // Reject internal `__run__*` / system-prefixed branches at the
198        // public write boundary. Direct-publish paths assert this
199        // explicitly so a caller can't write to legacy or system
200        // staging branches by passing the prefix verbatim.
201        crate::db::ensure_public_branch_ref(branch, "load")?;
202        // Branch convention: `None` represents `main`. Re-normalizing to
203        // `Some("main")` here would route the publisher commit through a
204        // separate coordinator (the cross-branch path in
205        // `commit_prepared_updates_on_branch_with_expected`) and leave
206        // `self.coordinator` with a stale manifest snapshot.
207        let requested = Self::normalize_branch_name(branch)?;
208        // Direct-to-target writes: no Run state machine, no `__run__` staging
209        // branch. Cross-table OCC is enforced by the publisher's
210        // `expected_table_versions` CAS inside `load_jsonl_reader`.
211        self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
212            .await
213    }
214
215    pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
216        self.load_file_as(branch, path, mode, None).await
217    }
218
219    /// Read a file into memory and delegate to `load_as`. Used by the
220    /// CLI's `omnigraph load` so file-path-based writes flow through
221    /// the same engine-layer policy gate as in-memory `load_as` calls.
222    pub async fn load_file_as(
223        &self,
224        branch: &str,
225        path: &str,
226        mode: LoadMode,
227        actor_id: Option<&str>,
228    ) -> Result<LoadResult> {
229        let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
230        self.load_as(branch, &data, mode, actor_id).await
231    }
232
233    async fn load_direct_on_branch(
234        &self,
235        branch: Option<&str>,
236        data: &str,
237        mode: LoadMode,
238        actor_id: Option<&str>,
239    ) -> Result<LoadResult> {
240        let reader = BufReader::new(Cursor::new(data.as_bytes()));
241        load_jsonl_reader(self, branch, reader, mode, actor_id).await
242    }
243}
244
245impl LoadMode {
246    pub fn as_str(self) -> &'static str {
247        match self {
248            LoadMode::Overwrite => "overwrite",
249            LoadMode::Append => "append",
250            LoadMode::Merge => "merge",
251        }
252    }
253}
254
255impl LoadResult {
256    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
257        let mut tables = self
258            .nodes_loaded
259            .iter()
260            .map(|(type_name, rows_loaded)| IngestTableResult {
261                table_key: format!("node:{type_name}"),
262                rows_loaded: *rows_loaded,
263            })
264            .chain(
265                self.edges_loaded
266                    .iter()
267                    .map(|(edge_name, rows_loaded)| IngestTableResult {
268                        table_key: format!("edge:{edge_name}"),
269                        rows_loaded: *rows_loaded,
270                    }),
271            )
272            .collect::<Vec<_>>();
273        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
274        tables
275    }
276}
277
278async fn load_jsonl_reader<R: BufRead>(
279    db: &Omnigraph,
280    branch: Option<&str>,
281    reader: R,
282    mode: LoadMode,
283    actor_id: Option<&str>,
284) -> Result<LoadResult> {
285    let catalog = db.catalog().clone();
286
287    // Phase 1: Parse all lines, spool into per-type collections
288    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
289    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
290
291    for (line_num, line) in reader.lines().enumerate() {
292        let line = line?;
293        let line = line.trim();
294        if line.is_empty() {
295            continue;
296        }
297        let value: JsonValue = serde_json::from_str(line).map_err(|e| {
298            OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
299        })?;
300
301        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
302            if !catalog.node_types.contains_key(type_name) {
303                return Err(OmniError::manifest(format!(
304                    "line {}: unknown node type '{}'",
305                    line_num + 1,
306                    type_name
307                )));
308            }
309            let data = value
310                .get("data")
311                .cloned()
312                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
313            node_rows
314                .entry(type_name.to_string())
315                .or_default()
316                .push(data);
317        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
318            if catalog.lookup_edge_by_name(edge_name).is_none() {
319                return Err(OmniError::manifest(format!(
320                    "line {}: unknown edge type '{}'",
321                    line_num + 1,
322                    edge_name
323                )));
324            }
325            let from = value
326                .get("from")
327                .and_then(|v| v.as_str())
328                .ok_or_else(|| {
329                    OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
330                })?
331                .to_string();
332            let to = value
333                .get("to")
334                .and_then(|v| v.as_str())
335                .ok_or_else(|| {
336                    OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
337                })?
338                .to_string();
339            let data = value
340                .get("data")
341                .cloned()
342                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
343            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
344            edge_rows
345                .entry(canonical)
346                .or_default()
347                .push((from, to, data));
348        } else {
349            return Err(OmniError::manifest(format!(
350                "line {}: expected 'type' or 'edge' field",
351                line_num + 1
352            )));
353        }
354    }
355
356    // Phase 2: Build per-type RecordBatches and accumulate into the
357    // staging pipeline. For Append/Merge, batches go into an in-memory
358    // accumulator and a single `stage_*` + `commit_staged` per touched
359    // table runs at end-of-load — a mid-load failure (RI / cardinality
360    // violation) leaves Lance HEAD untouched. For Overwrite, the legacy
361    // inline-commit path is preserved (truncate+append doesn't fit the
362    // staged shape cleanly, and overwrite has no in-flight read-your-writes
363    // requirement).
364
365    let mut result = LoadResult::default();
366    let snapshot = db.snapshot_for_branch(branch).await?;
367    let use_staging = !matches!(mode, LoadMode::Overwrite);
368    let mut staging = MutationStaging::default();
369    let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
370    let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
371    let pending_mode = match mode {
372        LoadMode::Merge => PendingMode::Merge,
373        // Append-mode loads accumulate as Append. Edge tables (no @key)
374        // and no-key node tables stay safe on the stage_append path. The
375        // Merge mode applies dedupe-by-id; Append assumes unique inputs.
376        LoadMode::Append => PendingMode::Append,
377        LoadMode::Overwrite => PendingMode::Append, // unused
378    };
379    // Map LoadMode to MutationOpKind for the version-check policy.
380    // Append/Merge skip the strict pre-stage check (concurrency-safe
381    // under the per-(table, branch) queue + publisher CAS); Overwrite
382    // uses the strict check because it truncates and replaces the
383    // dataset — concurrent advances change what "replace" means.
384    let load_op_kind = match mode {
385        LoadMode::Append => crate::db::MutationOpKind::Insert,
386        LoadMode::Merge => crate::db::MutationOpKind::Merge,
387        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
388    };
389
390    // Phase 2a: build and validate every node batch up front. Cheap and
391    // synchronous — surfaces validation errors before any S3 traffic.
392    let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
393        Vec::with_capacity(node_rows.len());
394    for (type_name, rows) in &node_rows {
395        let node_type = &catalog.node_types[type_name];
396        let batch = build_node_batch(node_type, rows)?;
397        validate_value_constraints(&batch, node_type)?;
398        validate_enum_constraints(&batch, &node_type.properties, type_name)?;
399        let unique_props = unique_property_names_for_node(node_type);
400        if !unique_props.is_empty() {
401            enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
402        }
403        let loaded_count = batch.num_rows();
404        let table_key = format!("node:{}", type_name);
405        let entry = snapshot
406            .entry(&table_key)
407            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
408        if !use_staging {
409            overwrite_expected.insert(table_key.clone(), entry.table_version);
410        }
411        prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
412    }
413
414    // Phase 2b: write every node type. Append/Merge → in-memory
415    // accumulator. Overwrite → concurrent inline-commit (legacy path).
416    if use_staging {
417        for (type_name, table_key, batch, loaded_count) in prepared_nodes {
418            let (ds, full_path, table_branch) = db
419                .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
420                .await?;
421            let expected_version = ds.version().version;
422            staging.ensure_path(
423                &table_key,
424                full_path,
425                table_branch,
426                expected_version,
427                load_op_kind,
428            );
429            let schema = batch.schema();
430            staging.append_batch(&table_key, schema, pending_mode, batch)?;
431            result.nodes_loaded.insert(type_name, loaded_count);
432        }
433    } else {
434        let node_write_results =
435            write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
436        for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
437            overwrite_updates.push(crate::db::SubTableUpdate {
438                table_key,
439                table_version: state.version,
440                table_branch,
441                row_count: state.row_count,
442                version_metadata: state.version_metadata,
443            });
444            result.nodes_loaded.insert(type_name, loaded_count);
445        }
446    }
447
448    // Phase 2c: Validate edge referential integrity — every src/dst must
449    // reference an existing node ID in the appropriate type. For staged
450    // loads, the lookup unions snapshot-committed IDs with the in-memory
451    // pending batches (which carry the just-staged node inserts).
452    for (edge_name, rows) in &edge_rows {
453        let edge_type = &catalog.edge_types[edge_name];
454        let from_ids = if use_staging {
455            collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?
456        } else {
457            collect_node_ids(
458                db,
459                branch,
460                &edge_type.from_type,
461                &node_rows,
462                &catalog,
463                &overwrite_updates,
464            )
465            .await?
466        };
467        let to_ids = if use_staging {
468            collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?
469        } else {
470            collect_node_ids(
471                db,
472                branch,
473                &edge_type.to_type,
474                &node_rows,
475                &catalog,
476                &overwrite_updates,
477            )
478            .await?
479        };
480
481        for (i, (src, dst, _)) in rows.iter().enumerate() {
482            if !from_ids.contains(src.as_str()) {
483                return Err(OmniError::manifest(format!(
484                    "edge {} row {}: src '{}' not found in {}",
485                    edge_name,
486                    i + 1,
487                    src,
488                    edge_type.from_type
489                )));
490            }
491            if !to_ids.contains(dst.as_str()) {
492                return Err(OmniError::manifest(format!(
493                    "edge {} row {}: dst '{}' not found in {}",
494                    edge_name,
495                    i + 1,
496                    dst,
497                    edge_type.to_type
498                )));
499            }
500        }
501    }
502
503    // Phase 2d: build edge batches.
504    let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
505        Vec::with_capacity(edge_rows.len());
506    for (edge_name, rows) in &edge_rows {
507        let edge_type = &catalog.edge_types[edge_name];
508        let batch = build_edge_batch(edge_type, rows)?;
509        validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
510        let unique_props = unique_property_names_for_edge(edge_type);
511        if !unique_props.is_empty() {
512            enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
513        }
514        let loaded_count = batch.num_rows();
515        let table_key = format!("edge:{}", edge_name);
516        let entry = snapshot
517            .entry(&table_key)
518            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
519        if !use_staging {
520            overwrite_expected.insert(table_key.clone(), entry.table_version);
521        }
522        prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
523    }
524
525    // Phase 2e: write every edge type. Same dispatch as Phase 2b.
526    if use_staging {
527        for (edge_name, table_key, batch, loaded_count) in prepared_edges {
528            let (ds, full_path, table_branch) = db
529                .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
530                .await?;
531            let expected_version = ds.version().version;
532            staging.ensure_path(
533                &table_key,
534                full_path,
535                table_branch,
536                expected_version,
537                load_op_kind,
538            );
539            let schema = batch.schema();
540            staging.append_batch(&table_key, schema, pending_mode, batch)?;
541            result.edges_loaded.insert(edge_name, loaded_count);
542        }
543    } else {
544        let edge_write_results =
545            write_batches_concurrently(db, branch, mode, prepared_edges).await?;
546        for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
547            overwrite_updates.push(crate::db::SubTableUpdate {
548                table_key,
549                table_version: state.version,
550                table_branch,
551                row_count: state.row_count,
552                version_metadata: state.version_metadata,
553            });
554            result.edges_loaded.insert(edge_name, loaded_count);
555        }
556    }
557
558    // Phase 3: Validate edge cardinality constraints (before commit —
559    // invalid data must not be committed). Staged path scans committed
560    // edges via Lance + iterates pending edges in-memory. Overwrite path
561    // opens the just-written version (legacy behavior).
562    for (edge_name, _) in &edge_rows {
563        let edge_type = &catalog.edge_types[edge_name];
564        let table_key = format!("edge:{}", edge_name);
565        if use_staging {
566            validate_edge_cardinality_with_pending_loader(
567                db, branch, edge_type, &table_key, &staging, mode,
568            )
569            .await?;
570        } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
571            validate_edge_cardinality(
572                db,
573                branch,
574                edge_name,
575                update.table_version,
576                update.table_branch.as_deref(),
577            )
578            .await?;
579        }
580    }
581
582    // Phase 4: Atomic manifest commit with publisher-level OCC.
583    if use_staging {
584        let staged = staging.stage_all(db, branch).await?;
585        // `_queue_guards` holds per-(table_key, branch) write queues
586        // across the manifest publish below — see exec/mutation.rs for
587        // the rationale (interleaving prevention).
588        let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
589            .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
590            .await?;
591        // Same finalize → publisher residual as mutations: per-table
592        // staged commits have advanced Lance HEAD, but the manifest
593        // publish has not run yet. Reuse the mutation failpoint name so
594        // one failpoint pins the shared `MutationStaging` boundary.
595        crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
596        db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
597            .await?;
598        // The recovery sidecar protects the per-table commit_staged →
599        // manifest publish window. Phase C succeeded — clean up
600        // best-effort: failing the user here would error out a write
601        // that already landed durably.
602        if let Some(handle) = sidecar_handle {
603            if let Err(err) =
604                crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
605            {
606                tracing::warn!(
607                    error = %err,
608                    operation_id = handle.operation_id.as_str(),
609                    "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
610                );
611            }
612        }
613    } else {
614        // LoadMode::Overwrite keeps the legacy inline-commit path —
615        // truncate-then-append doesn't fit the staged shape (see
616        // `docs/runs.md` "LoadMode::Overwrite residual"). The recovery
617        // sidecar is not applicable here because the writer doesn't go
618        // through MutationStaging; per-table inline commits + a final
619        // manifest publish handle their own residual via the documented
620        // operator workflow (re-run overwrite to recover).
621        db.commit_updates_on_branch_with_expected(
622            branch,
623            &overwrite_updates,
624            &overwrite_expected,
625            actor_id,
626        )
627        .await?;
628    }
629
630    Ok(result)
631}
632
633fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
634    let schema = node_type.arrow_schema.clone();
635
636    // Build id column: explicit id, @key value, or generated ULID.
637    let ids: Vec<String> = rows
638        .iter()
639        .map(|row| {
640            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
641            if let Some(key_prop) = node_type.key_property() {
642                let key_value = row
643                    .get(key_prop)
644                    .and_then(|v| v.as_str())
645                    .map(|s| s.to_string())
646                    .ok_or_else(|| {
647                        OmniError::manifest(format!(
648                            "node {} missing @key property '{}'",
649                            node_type.name, key_prop
650                        ))
651                    })?;
652                if let Some(explicit_id) = explicit_id {
653                    if explicit_id != key_value {
654                        return Err(OmniError::manifest(format!(
655                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
656                            node_type.name, explicit_id, key_prop, key_value
657                        )));
658                    }
659                }
660                Ok(key_value)
661            } else if let Some(explicit_id) = explicit_id {
662                Ok(explicit_id)
663            } else {
664                Ok(generate_id())
665            }
666        })
667        .collect::<Result<Vec<_>>>()?;
668
669    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
670    columns.push(Arc::new(StringArray::from(ids)));
671
672    // Build property columns (skip "id" field at index 0)
673    for field in schema.fields().iter().skip(1) {
674        if node_type.blob_properties.contains(field.name()) {
675            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
676            columns.push(col);
677        } else {
678            let col =
679                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
680            columns.push(col);
681        }
682    }
683
684    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
685}
686
687fn build_edge_batch(
688    edge_type: &omnigraph_compiler::catalog::EdgeType,
689    rows: &[(String, String, JsonValue)],
690) -> Result<RecordBatch> {
691    let schema = edge_type.arrow_schema.clone();
692
693    let ids: Vec<String> = rows
694        .iter()
695        .map(|(_, _, data)| {
696            data.get("id")
697                .and_then(|v| v.as_str())
698                .map(str::to_string)
699                .unwrap_or_else(generate_id)
700        })
701        .collect();
702    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
703    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
704
705    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
706    columns.push(Arc::new(StringArray::from(ids)));
707    columns.push(Arc::new(StringArray::from(srcs)));
708    columns.push(Arc::new(StringArray::from(dsts)));
709
710    // Build edge property columns (skip id, src, dst at indices 0-2)
711    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
712    for field in schema.fields().iter().skip(3) {
713        if edge_type.blob_properties.contains(field.name()) {
714            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
715            columns.push(col);
716        } else {
717            let col = build_column_from_json(
718                field.name(),
719                field.data_type(),
720                field.is_nullable(),
721                &data_values,
722            )?;
723            columns.push(col);
724        }
725    }
726
727    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
728}
729
730/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
731pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
732    if let Some(encoded) = value.strip_prefix("base64:") {
733        let bytes = base64::engine::general_purpose::STANDARD
734            .decode(encoded)
735            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
736        builder
737            .push_bytes(bytes)
738            .map_err(|e| OmniError::Lance(e.to_string()))
739    } else {
740        // Treat as URI (file://, s3://, gs://, or any other scheme)
741        builder
742            .push_uri(value)
743            .map_err(|e| OmniError::Lance(e.to_string()))
744    }
745}
746
747/// Build a blob column from JSON values using Lance BlobArrayBuilder.
748fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
749    let mut builder = BlobArrayBuilder::new(rows.len());
750    for row in rows {
751        match row.get(name) {
752            Some(JsonValue::String(s)) => {
753                append_blob_value(&mut builder, s)?;
754            }
755            Some(JsonValue::Null) | None if nullable => {
756                builder
757                    .push_null()
758                    .map_err(|e| OmniError::Lance(e.to_string()))?;
759            }
760            Some(JsonValue::Null) | None => {
761                return Err(OmniError::manifest(format!(
762                    "non-nullable blob property '{}' has null values",
763                    name
764                )));
765            }
766            _ => {
767                return Err(OmniError::manifest(format!(
768                    "blob property '{}' must be a URI string or base64: prefixed data",
769                    name
770                )));
771            }
772        }
773    }
774    builder
775        .finish()
776        .map_err(|e| OmniError::Lance(e.to_string()))
777}
778
779fn build_column_from_json(
780    name: &str,
781    data_type: &DataType,
782    nullable: bool,
783    rows: &[JsonValue],
784) -> Result<ArrayRef> {
785    let array: ArrayRef = match data_type {
786        DataType::Utf8 => {
787            let values: Vec<Option<String>> = rows
788                .iter()
789                .map(|row| {
790                    row.get(name)
791                        .and_then(|v| v.as_str())
792                        .map(|s| s.to_string())
793                })
794                .collect();
795            Arc::new(StringArray::from(values))
796        }
797        DataType::Int32 => {
798            let values: Vec<Option<i32>> = rows
799                .iter()
800                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
801                .collect();
802            Arc::new(Int32Array::from(values))
803        }
804        DataType::Int64 => {
805            let values: Vec<Option<i64>> = rows
806                .iter()
807                .map(|row| row.get(name).and_then(|v| v.as_i64()))
808                .collect();
809            Arc::new(Int64Array::from(values))
810        }
811        DataType::UInt32 => {
812            let values: Vec<Option<u32>> = rows
813                .iter()
814                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
815                .collect();
816            Arc::new(UInt32Array::from(values))
817        }
818        DataType::UInt64 => {
819            let values: Vec<Option<u64>> = rows
820                .iter()
821                .map(|row| row.get(name).and_then(|v| v.as_u64()))
822                .collect();
823            Arc::new(UInt64Array::from(values))
824        }
825        DataType::Float32 => {
826            let values: Vec<Option<f32>> = rows
827                .iter()
828                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
829                .collect();
830            Arc::new(Float32Array::from(values))
831        }
832        DataType::Float64 => {
833            let values: Vec<Option<f64>> = rows
834                .iter()
835                .map(|row| row.get(name).and_then(|v| v.as_f64()))
836                .collect();
837            Arc::new(Float64Array::from(values))
838        }
839        DataType::Boolean => {
840            let values: Vec<Option<bool>> = rows
841                .iter()
842                .map(|row| row.get(name).and_then(|v| v.as_bool()))
843                .collect();
844            Arc::new(BooleanArray::from(values))
845        }
846        DataType::Date32 => {
847            let mut values = Vec::with_capacity(rows.len());
848            for row in rows {
849                values.push(parse_date32_json_value(
850                    row.get(name).unwrap_or(&JsonValue::Null),
851                )?);
852            }
853            Arc::new(Date32Array::from(values))
854        }
855        DataType::Date64 => {
856            let mut values = Vec::with_capacity(rows.len());
857            for row in rows {
858                values.push(parse_date64_json_value(
859                    row.get(name).unwrap_or(&JsonValue::Null),
860                )?);
861            }
862            Arc::new(Date64Array::from(values))
863        }
864        DataType::List(field) => {
865            let mut builder = ListBuilder::with_capacity(
866                make_list_value_builder(field.data_type(), rows.len())?,
867                rows.len(),
868            )
869            .with_field(field.clone());
870            for row in rows {
871                let value = row.get(name).unwrap_or(&JsonValue::Null);
872                if value.is_null() {
873                    builder.append(false);
874                    continue;
875                }
876                let items = value.as_array().ok_or_else(|| {
877                    OmniError::manifest(format!(
878                        "list property '{}' expects a JSON array, got {}",
879                        name, value
880                    ))
881                })?;
882                for item in items {
883                    append_json_list_item(builder.values(), field.data_type(), item)?;
884                }
885                builder.append(true);
886            }
887            Arc::new(builder.finish())
888        }
889        DataType::FixedSizeList(child_field, dim) => {
890            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
891            let dim = *dim;
892            let mut builder = FixedSizeListBuilder::with_capacity(
893                Float32Builder::with_capacity(rows.len() * dim as usize),
894                dim,
895                rows.len(),
896            )
897            .with_field(child_field.clone());
898            for row in rows {
899                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
900                    if arr.len() != dim as usize {
901                        return Err(OmniError::manifest(format!(
902                            "vector property '{}' expects {} dimensions, got {}",
903                            name,
904                            dim,
905                            arr.len()
906                        )));
907                    }
908                    for val in arr {
909                        builder
910                            .values()
911                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
912                    }
913                    builder.append(true);
914                } else if nullable {
915                    for _ in 0..dim as usize {
916                        builder.values().append_null();
917                    }
918                    builder.append(false);
919                } else {
920                    return Err(OmniError::manifest(format!(
921                        "non-nullable vector property '{}' has null values",
922                        name
923                    )));
924                }
925            }
926            Arc::new(builder.finish())
927        }
928        _ => {
929            // Unsupported type: fill with nulls
930            let values: Vec<Option<&str>> = vec![None; rows.len()];
931            Arc::new(StringArray::from(values))
932        }
933    };
934
935    if !nullable && array.null_count() > 0 {
936        return Err(OmniError::manifest(format!(
937            "non-nullable property '{}' has null or invalid values",
938            name
939        )));
940    }
941
942    Ok(array)
943}
944
945fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
946    Ok(match data_type {
947        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
948        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
949        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
950        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
951        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
952        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
953        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
954        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
955        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
956        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
957        other => {
958            return Err(OmniError::manifest(format!(
959                "unsupported list element data type {:?}",
960                other
961            )));
962        }
963    })
964}
965
966fn append_json_list_item(
967    builder: &mut Box<dyn ArrayBuilder>,
968    data_type: &DataType,
969    value: &JsonValue,
970) -> Result<()> {
971    match data_type {
972        DataType::Utf8 => {
973            let builder = builder
974                .as_any_mut()
975                .downcast_mut::<StringBuilder>()
976                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
977            if let Some(value) = value.as_str() {
978                builder.append_value(value);
979            } else {
980                builder.append_null();
981            }
982        }
983        DataType::Boolean => {
984            let builder = builder
985                .as_any_mut()
986                .downcast_mut::<BooleanBuilder>()
987                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
988            if let Some(value) = value.as_bool() {
989                builder.append_value(value);
990            } else {
991                builder.append_null();
992            }
993        }
994        DataType::Int32 => {
995            let builder = builder
996                .as_any_mut()
997                .downcast_mut::<Int32Builder>()
998                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
999            if let Some(value) = value.as_i64() {
1000                let value = i32::try_from(value).map_err(|_| {
1001                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
1002                })?;
1003                builder.append_value(value);
1004            } else {
1005                builder.append_null();
1006            }
1007        }
1008        DataType::Int64 => {
1009            let builder = builder
1010                .as_any_mut()
1011                .downcast_mut::<Int64Builder>()
1012                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1013            if let Some(value) = value.as_i64() {
1014                builder.append_value(value);
1015            } else {
1016                builder.append_null();
1017            }
1018        }
1019        DataType::UInt32 => {
1020            let builder = builder
1021                .as_any_mut()
1022                .downcast_mut::<UInt32Builder>()
1023                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1024            if let Some(value) = value.as_u64() {
1025                let value = u32::try_from(value).map_err(|_| {
1026                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1027                })?;
1028                builder.append_value(value);
1029            } else {
1030                builder.append_null();
1031            }
1032        }
1033        DataType::UInt64 => {
1034            let builder = builder
1035                .as_any_mut()
1036                .downcast_mut::<UInt64Builder>()
1037                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1038            if let Some(value) = value.as_u64() {
1039                builder.append_value(value);
1040            } else {
1041                builder.append_null();
1042            }
1043        }
1044        DataType::Float32 => {
1045            let builder = builder
1046                .as_any_mut()
1047                .downcast_mut::<Float32Builder>()
1048                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1049            if let Some(value) = value.as_f64() {
1050                builder.append_value(value as f32);
1051            } else {
1052                builder.append_null();
1053            }
1054        }
1055        DataType::Float64 => {
1056            let builder = builder
1057                .as_any_mut()
1058                .downcast_mut::<Float64Builder>()
1059                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1060            if let Some(value) = value.as_f64() {
1061                builder.append_value(value);
1062            } else {
1063                builder.append_null();
1064            }
1065        }
1066        DataType::Date32 => {
1067            let builder = builder
1068                .as_any_mut()
1069                .downcast_mut::<Date32Builder>()
1070                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1071            if let Some(value) = parse_date32_json_value(value)? {
1072                builder.append_value(value);
1073            } else {
1074                builder.append_null();
1075            }
1076        }
1077        DataType::Date64 => {
1078            let builder = builder
1079                .as_any_mut()
1080                .downcast_mut::<Date64Builder>()
1081                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1082            if let Some(value) = parse_date64_json_value(value)? {
1083                builder.append_value(value);
1084            } else {
1085                builder.append_null();
1086            }
1087        }
1088        other => {
1089            return Err(OmniError::manifest(format!(
1090                "unsupported list element data type {:?}",
1091                other
1092            )));
1093        }
1094    }
1095
1096    Ok(())
1097}
1098
1099fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1100    if value.is_null() {
1101        return Ok(None);
1102    }
1103    if let Some(days) = value.as_i64() {
1104        let days = i32::try_from(days)
1105            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1106        return Ok(Some(days));
1107    }
1108    if let Some(days) = value.as_u64() {
1109        let days = i32::try_from(days)
1110            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1111        return Ok(Some(days));
1112    }
1113    if let Some(value) = value.as_str() {
1114        return Ok(Some(parse_date32_literal(value)?));
1115    }
1116    Ok(None)
1117}
1118
1119fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1120    if value.is_null() {
1121        return Ok(None);
1122    }
1123    if let Some(ms) = value.as_i64() {
1124        return Ok(Some(ms));
1125    }
1126    if let Some(ms) = value.as_u64() {
1127        let ms = i64::try_from(ms)
1128            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1129        return Ok(Some(ms));
1130    }
1131    if let Some(value) = value.as_str() {
1132        return Ok(Some(parse_date64_literal(value)?));
1133    }
1134    Ok(None)
1135}
1136
1137/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
1138/// How many per-type Lance writes to run concurrently during a load.
1139///
1140/// Each write is an independent S3 manifest + fragment write against a
1141/// different table. Ops within a single table must still be serial (Lance
1142/// OCC on the manifest), but cross-table writes have no shared state.
1143///
1144/// 8 is a conservative default — enough to overlap S3 round-trip latency
1145/// across the typical 10-30 table schemas without flooding the runtime.
1146/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
1147const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1148
1149fn load_write_concurrency() -> usize {
1150    std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1151        .ok()
1152        .and_then(|v| v.parse::<usize>().ok())
1153        .filter(|v| *v > 0)
1154        .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1155}
1156
1157/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples
1158/// concurrently. Returns results in original iteration order so callers can
1159/// zip them back to per-type metadata.
1160async fn write_batches_concurrently(
1161    db: &Omnigraph,
1162    branch: Option<&str>,
1163    mode: LoadMode,
1164    prepared: Vec<(String, String, RecordBatch, usize)>,
1165) -> Result<
1166    Vec<(
1167        String,
1168        String,
1169        usize,
1170        crate::table_store::TableState,
1171        Option<String>,
1172    )>,
1173> {
1174    use futures::stream::StreamExt;
1175
1176    if prepared.is_empty() {
1177        return Ok(Vec::new());
1178    }
1179
1180    let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1181
1182    futures::stream::iter(prepared.into_iter().map(
1183        |(type_name, table_key, batch, loaded_count)| async move {
1184            let (state, table_branch) =
1185                write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1186            Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1187        },
1188    ))
1189    .buffered(concurrency)
1190    .collect::<Vec<Result<_>>>()
1191    .await
1192    .into_iter()
1193    .collect()
1194}
1195
1196async fn write_batch_to_dataset(
1197    db: &Omnigraph,
1198    branch: Option<&str>,
1199    table_key: &str,
1200    batch: RecordBatch,
1201    mode: LoadMode,
1202) -> Result<(crate::table_store::TableState, Option<String>)> {
1203    let op_kind = match mode {
1204        LoadMode::Append => crate::db::MutationOpKind::Insert,
1205        LoadMode::Merge => crate::db::MutationOpKind::Merge,
1206        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
1207    };
1208    let (mut ds, full_path, table_branch) = db
1209        .open_for_mutation_on_branch(branch, table_key, op_kind)
1210        .await?;
1211    let table_store = db.table_store();
1212
1213    match mode {
1214        LoadMode::Overwrite => {
1215            let state = table_store
1216                .overwrite_batch(&full_path, &mut ds, batch)
1217                .await?;
1218            Ok((state, table_branch))
1219        }
1220        LoadMode::Append => {
1221            let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1222            Ok((state, table_branch))
1223        }
1224        LoadMode::Merge => {
1225            let state = table_store
1226                .merge_insert_batch(
1227                    &full_path,
1228                    ds,
1229                    batch,
1230                    vec!["id".to_string()],
1231                    lance::dataset::WhenMatched::UpdateAll,
1232                    lance::dataset::WhenNotMatched::InsertAll,
1233                )
1234                .await?;
1235            Ok((state, table_branch))
1236        }
1237    }
1238}
1239
1240fn generate_id() -> String {
1241    ulid::Ulid::new().to_string()
1242}
1243
1244pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1245    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1246    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1247        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1248    let out = casted
1249        .as_any()
1250        .downcast_ref::<Date32Array>()
1251        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1252    if out.is_null(0) {
1253        return Err(OmniError::manifest(format!(
1254            "invalid Date literal '{}'",
1255            value
1256        )));
1257    }
1258    Ok(out.value(0))
1259}
1260
1261pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1262    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1263    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1264        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1265    let out = casted
1266        .as_any()
1267        .downcast_ref::<Date64Array>()
1268        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1269    if out.is_null(0) {
1270        return Err(OmniError::manifest(format!(
1271            "invalid DateTime literal '{}'",
1272            value
1273        )));
1274    }
1275    Ok(out.value(0))
1276}
1277
1278// ─── Value constraint validation ─────────────────────────────────────────────
1279
1280pub(crate) fn validate_value_constraints(
1281    batch: &RecordBatch,
1282    node_type: &omnigraph_compiler::catalog::NodeType,
1283) -> Result<()> {
1284    use arrow_array::Array;
1285
1286    // Range constraints
1287    for rc in &node_type.range_constraints {
1288        let Some(col) = batch.column_by_name(&rc.property) else {
1289            continue;
1290        };
1291        for row in 0..batch.num_rows() {
1292            if col.is_null(row) {
1293                continue;
1294            }
1295            let value = extract_numeric_value(col, row);
1296            if let Some(val) = value {
1297                if val.is_nan() {
1298                    return Err(OmniError::manifest(format!(
1299                        "@range violation on {}.{}: value is NaN",
1300                        node_type.name, rc.property
1301                    )));
1302                }
1303                if let Some(ref min) = rc.min {
1304                    let min_f = literal_value_to_f64(min);
1305                    if val < min_f {
1306                        return Err(OmniError::manifest(format!(
1307                            "@range violation on {}.{}: value {} < min {}",
1308                            node_type.name, rc.property, val, min_f
1309                        )));
1310                    }
1311                }
1312                if let Some(ref max) = rc.max {
1313                    let max_f = literal_value_to_f64(max);
1314                    if val > max_f {
1315                        return Err(OmniError::manifest(format!(
1316                            "@range violation on {}.{}: value {} > max {}",
1317                            node_type.name, rc.property, val, max_f
1318                        )));
1319                    }
1320                }
1321            }
1322        }
1323    }
1324
1325    // Check constraints (regex)
1326    for cc in &node_type.check_constraints {
1327        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1328            OmniError::manifest(format!(
1329                "@check on {}.{} has invalid regex '{}': {}",
1330                node_type.name, cc.property, cc.pattern, e
1331            ))
1332        })?;
1333        let Some(col) = batch.column_by_name(&cc.property) else {
1334            continue;
1335        };
1336        let str_col = col.as_any().downcast_ref::<StringArray>();
1337        if let Some(str_col) = str_col {
1338            for row in 0..str_col.len() {
1339                if str_col.is_null(row) {
1340                    continue;
1341                }
1342                let val = str_col.value(row);
1343                if !re.is_match(val) {
1344                    return Err(OmniError::manifest(format!(
1345                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1346                        node_type.name, cc.property, val, cc.pattern
1347                    )));
1348                }
1349            }
1350        }
1351    }
1352
1353    Ok(())
1354}
1355
1356/// Validate that every enum-typed property in `properties` only contains values
1357/// from its declared enum value set. Operates on a single `RecordBatch` so it
1358/// can be called from any write path that already holds a batch.
1359///
1360/// Scalar string enums are checked directly. List-of-enum properties are
1361/// checked element-by-element across the underlying string values.
1362pub(crate) fn validate_enum_constraints(
1363    batch: &RecordBatch,
1364    properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1365    type_name: &str,
1366) -> Result<()> {
1367    use arrow_array::{Array, ListArray};
1368
1369    for (prop_name, prop_type) in properties {
1370        let Some(allowed) = prop_type.enum_values.as_ref() else {
1371            continue;
1372        };
1373        let Some(col) = batch.column_by_name(prop_name) else {
1374            continue;
1375        };
1376        if prop_type.list {
1377            let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1378                continue;
1379            };
1380            for row in 0..list_col.len() {
1381                if list_col.is_null(row) {
1382                    continue;
1383                }
1384                let item_arr = list_col.value(row);
1385                let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1386                    continue;
1387                };
1388                for i in 0..str_arr.len() {
1389                    if str_arr.is_null(i) {
1390                        continue;
1391                    }
1392                    let val = str_arr.value(i);
1393                    if !allowed.iter().any(|a| a.as_str() == val) {
1394                        return Err(OmniError::manifest(format!(
1395                            "invalid enum value '{}' for {}.{} (expected: {})",
1396                            val,
1397                            type_name,
1398                            prop_name,
1399                            allowed.join(", ")
1400                        )));
1401                    }
1402                }
1403            }
1404        } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1405            for row in 0..str_col.len() {
1406                if str_col.is_null(row) {
1407                    continue;
1408                }
1409                let val = str_col.value(row);
1410                if !allowed.iter().any(|a| a.as_str() == val) {
1411                    return Err(OmniError::manifest(format!(
1412                        "invalid enum value '{}' for {}.{} (expected: {})",
1413                        val,
1414                        type_name,
1415                        prop_name,
1416                        allowed.join(", ")
1417                    )));
1418                }
1419            }
1420        }
1421    }
1422    Ok(())
1423}
1424
1425/// Detect duplicate values within a single `RecordBatch` for any of the named
1426/// `unique_properties`. Returns an error on the first duplicate found.
1427///
1428/// Note: this only catches duplicates *within* the batch. Cross-batch
1429/// uniqueness against already-committed rows is not enforced here — that
1430/// requires a dataset scan and is tracked separately.
1431pub(crate) fn enforce_unique_constraints_intra_batch(
1432    batch: &RecordBatch,
1433    type_name: &str,
1434    unique_properties: &[String],
1435) -> Result<()> {
1436    for property in unique_properties {
1437        let Some(col_idx) = batch.schema().index_of(property).ok() else {
1438            continue;
1439        };
1440        let arr = batch.column(col_idx);
1441        let mut seen: HashMap<String, usize> = HashMap::new();
1442        for row in 0..batch.num_rows() {
1443            let Some(value) = scalar_to_string(arr, row) else {
1444                continue;
1445            };
1446            if let Some(prev_row) = seen.insert(value.clone(), row) {
1447                return Err(OmniError::manifest(format!(
1448                    "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1449                    type_name, property, value, prev_row, row
1450                )));
1451            }
1452        }
1453    }
1454    Ok(())
1455}
1456
1457/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
1458/// uniqueness comparison. Returns `None` for null values (nulls are exempt
1459/// from uniqueness in standard SQL semantics).
1460fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1461    use arrow_array::Array;
1462    if array.is_null(row) {
1463        return None;
1464    }
1465    if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1466        return Some(a.value(row).to_string());
1467    }
1468    if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1469        return Some(a.value(row).to_string());
1470    }
1471    if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1472        return Some(a.value(row).to_string());
1473    }
1474    if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1475        return Some(a.value(row).to_string());
1476    }
1477    if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1478        return Some(a.value(row).to_string());
1479    }
1480    if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1481        return Some(a.value(row).to_string());
1482    }
1483    if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1484        return Some(a.value(row).to_string());
1485    }
1486    if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1487        return Some(a.value(row).to_string());
1488    }
1489    if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1490        return Some(a.value(row).to_string());
1491    }
1492    if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1493        return Some(a.value(row).to_string());
1494    }
1495    None
1496}
1497
1498/// Build the flat list of property names that must be checked for uniqueness
1499/// on a node type. Includes both `@unique` properties (from
1500/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
1501pub(crate) fn unique_property_names_for_node(
1502    node_type: &omnigraph_compiler::catalog::NodeType,
1503) -> Vec<String> {
1504    let mut props: Vec<String> = node_type
1505        .unique_constraints
1506        .iter()
1507        .flatten()
1508        .cloned()
1509        .collect();
1510    if let Some(key) = &node_type.key {
1511        props.extend(key.iter().cloned());
1512    }
1513    props.sort();
1514    props.dedup();
1515    props
1516}
1517
1518/// Same as [`unique_property_names_for_node`] but for an edge type.
1519pub(crate) fn unique_property_names_for_edge(
1520    edge_type: &omnigraph_compiler::catalog::EdgeType,
1521) -> Vec<String> {
1522    let mut props: Vec<String> = edge_type
1523        .unique_constraints
1524        .iter()
1525        .flatten()
1526        .cloned()
1527        .collect();
1528    props.sort();
1529    props.dedup();
1530    props
1531}
1532
1533fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1534    use arrow_array::{
1535        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1536    };
1537    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1538        return Some(a.value(row) as f64);
1539    }
1540    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1541        return Some(a.value(row) as f64);
1542    }
1543    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1544        return Some(a.value(row) as f64);
1545    }
1546    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1547        return Some(a.value(row) as f64);
1548    }
1549    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1550        return Some(a.value(row) as f64);
1551    }
1552    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1553        return Some(a.value(row));
1554    }
1555    None
1556}
1557
1558fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1559    use omnigraph_compiler::catalog::LiteralValue;
1560    match v {
1561        LiteralValue::Integer(n) => *n as f64,
1562        LiteralValue::Float(f) => *f,
1563    }
1564}
1565
1566// ─── Edge cardinality validation ─────────────────────────────────────────────
1567
1568pub(crate) async fn validate_edge_cardinality(
1569    db: &crate::db::Omnigraph,
1570    branch: Option<&str>,
1571    edge_name: &str,
1572    written_version: u64,
1573    written_branch: Option<&str>,
1574) -> Result<()> {
1575    use arrow_array::Array;
1576    let catalog = db.catalog();
1577    let edge_type = &catalog.edge_types[edge_name];
1578    if edge_type.cardinality.is_default() {
1579        return Ok(());
1580    }
1581
1582    // Open edge sub-table at the just-written version, not the snapshot's
1583    // (the snapshot still pins to the pre-write version).
1584    let snapshot = db.snapshot_for_branch(branch).await?;
1585    let table_key = format!("edge:{}", edge_name);
1586    let entry = snapshot
1587        .entry(&table_key)
1588        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1589    let ds = db
1590        .open_dataset_at_state(
1591            &entry.table_path,
1592            written_branch.or(entry.table_branch.as_deref()),
1593            written_version,
1594        )
1595        .await?;
1596
1597    // Scan src column, count per source
1598    let batches = db
1599        .table_store()
1600        .scan(&ds, Some(&["src"]), None, None)
1601        .await?;
1602
1603    let mut counts: HashMap<String, u32> = HashMap::new();
1604    for batch in &batches {
1605        let srcs = batch
1606            .column_by_name("src")
1607            .unwrap()
1608            .as_any()
1609            .downcast_ref::<StringArray>()
1610            .unwrap();
1611        for i in 0..srcs.len() {
1612            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1613        }
1614    }
1615
1616    let card = &edge_type.cardinality;
1617    for (src, count) in &counts {
1618        if let Some(max) = card.max {
1619            if *count > max {
1620                return Err(OmniError::manifest(format!(
1621                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1622                    edge_name, src, count, max
1623                )));
1624            }
1625        }
1626        if *count < card.min {
1627            return Err(OmniError::manifest(format!(
1628                "@card violation on edge {}: source '{}' has {} edges (min {})",
1629                edge_name, src, count, card.min
1630            )));
1631        }
1632    }
1633
1634    Ok(())
1635}
1636
1637/// Validate edge `@card` cardinality with in-memory pending edges visible.
1638///
1639/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
1640/// opens the committed dataset at the pre-load snapshot version, then
1641/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
1642/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
1643/// path uses `validate_edge_cardinality` which opens the just-written
1644/// Lance version).
1645///
1646/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
1647/// so committed edges that the load is *updating* (same edge id,
1648/// possibly changed `src`) are not double-counted. `LoadMode::Append`
1649/// passes `None` because each line generates a fresh ULID id that
1650/// never collides with committed.
1651async fn validate_edge_cardinality_with_pending_loader(
1652    db: &Omnigraph,
1653    branch: Option<&str>,
1654    edge_type: &omnigraph_compiler::catalog::EdgeType,
1655    table_key: &str,
1656    staging: &MutationStaging,
1657    mode: LoadMode,
1658) -> Result<()> {
1659    if edge_type.cardinality.is_default() {
1660        return Ok(());
1661    }
1662    let snapshot = db.snapshot_for_branch(branch).await?;
1663    let Some(entry) = snapshot.entry(table_key) else {
1664        // No manifest entry — table doesn't exist yet. Pending-only is
1665        // fine; the helper handles empty committed scans.
1666        return Ok(());
1667    };
1668    let ds = db
1669        .open_dataset_at_state(
1670            &entry.table_path,
1671            entry.table_branch.as_deref(),
1672            entry.table_version,
1673        )
1674        .await?;
1675    let dedupe_key = match mode {
1676        LoadMode::Merge => Some("id"),
1677        LoadMode::Append | LoadMode::Overwrite => None,
1678    };
1679    let counts =
1680        crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
1681    crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1682}
1683
1684/// Collect all valid node IDs for a given type, with in-memory pending
1685/// node inserts visible. Used by the staged loader's Phase 2c
1686/// referential-integrity validation.
1687///
1688/// Union of:
1689/// - IDs from the staged loader's pending batches (in-memory; just-staged
1690///   inserts of this type)
1691/// - IDs from the committed sub-table at the pre-load snapshot version
1692async fn collect_node_ids_with_pending(
1693    db: &Omnigraph,
1694    branch: Option<&str>,
1695    type_name: &str,
1696    staging: &MutationStaging,
1697) -> Result<HashSet<String>> {
1698    let mut ids = HashSet::new();
1699    let table_key = format!("node:{}", type_name);
1700
1701    // From staging.pending: walk the in-memory accumulator's id column.
1702    for batch in staging.pending_batches(&table_key) {
1703        if let Some(col) = batch.column_by_name("id") {
1704            if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1705                for i in 0..arr.len() {
1706                    if arr.is_valid(i) {
1707                        ids.insert(arr.value(i).to_string());
1708                    }
1709                }
1710            }
1711        }
1712    }
1713
1714    // From the committed Lance sub-table at the pre-load snapshot version.
1715    let snapshot = db.snapshot_for_branch(branch).await?;
1716    let Some(entry) = snapshot.entry(&table_key) else {
1717        return Ok(ids);
1718    };
1719    let ds = db
1720        .open_dataset_at_state(
1721            &entry.table_path,
1722            entry.table_branch.as_deref(),
1723            entry.table_version,
1724        )
1725        .await?;
1726
1727    let batches = db
1728        .table_store()
1729        .scan(&ds, Some(&["id"]), None, None)
1730        .await?;
1731
1732    for batch in &batches {
1733        let id_col = batch
1734            .column_by_name("id")
1735            .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1736            .as_any()
1737            .downcast_ref::<StringArray>()
1738            .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1739        for i in 0..batch.num_rows() {
1740            // Defensive: `id` is the @key column on every node type and
1741            // is non-nullable by schema, but a committed-row corruption
1742            // (or future schema change) could surface a NULL. Skip
1743            // rather than insert "" — pending-side does the same.
1744            if id_col.is_valid(i) {
1745                ids.insert(id_col.value(i).to_string());
1746            }
1747        }
1748    }
1749
1750    Ok(ids)
1751}
1752
1753/// Collect all valid node IDs for a given type. Union of:
1754/// - IDs from the just-loaded batch (in memory, from node_rows)
1755/// - IDs from the sub-table at the just-written version (if it was updated)
1756/// - IDs from the sub-table at the snapshot-pinned version (if it was not updated)
1757async fn collect_node_ids(
1758    db: &Omnigraph,
1759    branch: Option<&str>,
1760    type_name: &str,
1761    node_rows: &HashMap<String, Vec<JsonValue>>,
1762    catalog: &omnigraph_compiler::catalog::Catalog,
1763    updates: &[crate::db::SubTableUpdate],
1764) -> Result<HashSet<String>> {
1765    let mut ids = HashSet::new();
1766
1767    // IDs from the in-memory batch (just loaded in this operation)
1768    if let Some(rows) = node_rows.get(type_name) {
1769        if let Some(node_type) = catalog.node_types.get(type_name) {
1770            if let Some(key_prop) = node_type.key_property() {
1771                for row in rows {
1772                    if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1773                        ids.insert(id.to_string());
1774                    }
1775                }
1776            }
1777        }
1778    }
1779
1780    // IDs from the Lance sub-table
1781    let table_key = format!("node:{}", type_name);
1782    let snapshot = db.snapshot_for_branch(branch).await?;
1783    let Some(entry) = snapshot.entry(&table_key) else {
1784        return Ok(ids);
1785    };
1786    // Use the just-written version if this type was updated, else snapshot version
1787    let updated = updates
1788        .iter()
1789        .find(|u| u.table_key == table_key)
1790        .map(|u| (u.table_version, u.table_branch.as_deref()));
1791    let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1792    let ds = db
1793        .open_dataset_at_state(&entry.table_path, branch, version)
1794        .await?;
1795
1796    let batches = db
1797        .table_store()
1798        .scan(&ds, Some(&["id"]), None, None)
1799        .await?;
1800
1801    for batch in &batches {
1802        let id_col = batch
1803            .column_by_name("id")
1804            .unwrap()
1805            .as_any()
1806            .downcast_ref::<StringArray>()
1807            .unwrap();
1808        for i in 0..batch.num_rows() {
1809            if !id_col.is_valid(i) {
1810                continue;
1811            }
1812            ids.insert(id_col.value(i).to_string());
1813        }
1814    }
1815
1816    Ok(ids)
1817}
1818
1819#[cfg(test)]
1820mod tests {
1821    use super::*;
1822    use crate::db::Omnigraph;
1823    use arrow_array::Array;
1824    use futures::TryStreamExt;
1825    use std::collections::HashMap;
1826
1827    const TEST_SCHEMA: &str = r#"
1828node Person {
1829    name: String @key
1830    age: I32?
1831}
1832node Company {
1833    name: String @key
1834}
1835edge Knows: Person -> Person {
1836    since: Date?
1837}
1838edge WorksAt: Person -> Company
1839"#;
1840
1841    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1842{"type": "Person", "data": {"name": "Bob", "age": 25}}
1843{"type": "Company", "data": {"name": "Acme"}}
1844{"edge": "Knows", "from": "Alice", "to": "Bob"}
1845{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1846"#;
1847
1848    #[tokio::test]
1849    async fn test_load_creates_data() {
1850        let dir = tempfile::tempdir().unwrap();
1851        let uri = dir.path().to_str().unwrap();
1852        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1853
1854        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1855            .await
1856            .unwrap();
1857
1858        assert_eq!(result.nodes_loaded["Person"], 2);
1859        assert_eq!(result.nodes_loaded["Company"], 1);
1860        assert_eq!(result.edges_loaded["Knows"], 1);
1861        assert_eq!(result.edges_loaded["WorksAt"], 1);
1862    }
1863
1864    #[tokio::test]
1865    async fn test_load_data_readable_via_lance() {
1866        let dir = tempfile::tempdir().unwrap();
1867        let uri = dir.path().to_str().unwrap();
1868        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1869        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1870            .await
1871            .unwrap();
1872
1873        // Read back via snapshot
1874        let snap = db.snapshot().await;
1875        let person_ds = snap.open("node:Person").await.unwrap();
1876
1877        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1878
1879        // Verify data
1880        let batches: Vec<RecordBatch> = person_ds
1881            .scan()
1882            .try_into_stream()
1883            .await
1884            .unwrap()
1885            .try_collect()
1886            .await
1887            .unwrap();
1888
1889        let batch = &batches[0];
1890        let ids = batch
1891            .column_by_name("id")
1892            .unwrap()
1893            .as_any()
1894            .downcast_ref::<StringArray>()
1895            .unwrap();
1896        // @key=name, so ids should be "Alice" and "Bob"
1897        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1898        assert!(id_values.contains(&"Alice"));
1899        assert!(id_values.contains(&"Bob"));
1900    }
1901
1902    #[tokio::test]
1903    async fn test_load_edges_reference_node_keys() {
1904        let dir = tempfile::tempdir().unwrap();
1905        let uri = dir.path().to_str().unwrap();
1906        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1907        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1908            .await
1909            .unwrap();
1910
1911        let snap = db.snapshot().await;
1912        let knows_ds = snap.open("edge:Knows").await.unwrap();
1913
1914        let batches: Vec<RecordBatch> = knows_ds
1915            .scan()
1916            .try_into_stream()
1917            .await
1918            .unwrap()
1919            .try_collect()
1920            .await
1921            .unwrap();
1922
1923        let batch = &batches[0];
1924        let srcs = batch
1925            .column_by_name("src")
1926            .unwrap()
1927            .as_any()
1928            .downcast_ref::<StringArray>()
1929            .unwrap();
1930        let dsts = batch
1931            .column_by_name("dst")
1932            .unwrap()
1933            .as_any()
1934            .downcast_ref::<StringArray>()
1935            .unwrap();
1936
1937        assert_eq!(srcs.value(0), "Alice");
1938        assert_eq!(dsts.value(0), "Bob");
1939    }
1940
1941    #[tokio::test]
1942    async fn test_load_manifest_version_advances() {
1943        let dir = tempfile::tempdir().unwrap();
1944        let uri = dir.path().to_str().unwrap();
1945        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1946        let v1 = db.version().await;
1947
1948        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1949            .await
1950            .unwrap();
1951
1952        assert!(db.version().await > v1);
1953    }
1954
1955    #[tokio::test]
1956    async fn test_load_append_adds_rows() {
1957        let dir = tempfile::tempdir().unwrap();
1958        let uri = dir.path().to_str().unwrap();
1959        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1960
1961        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1962        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1963
1964        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1965            .await
1966            .unwrap();
1967        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1968
1969        let snap = db.snapshot().await;
1970        let person_ds = snap.open("node:Person").await.unwrap();
1971        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1972    }
1973
1974    #[tokio::test]
1975    async fn test_load_unknown_type_rejected() {
1976        let dir = tempfile::tempdir().unwrap();
1977        let uri = dir.path().to_str().unwrap();
1978        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1979
1980        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1981        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1982        assert!(result.is_err());
1983    }
1984
1985    #[tokio::test]
1986    async fn test_ingest_creates_branch_and_reports_tables() {
1987        let dir = tempfile::tempdir().unwrap();
1988        let uri = dir.path().to_str().unwrap();
1989        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1990
1991        let result = db
1992            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1993            .await
1994            .unwrap();
1995
1996        assert_eq!(result.branch, "feature");
1997        assert_eq!(result.base_branch, "main");
1998        assert!(result.branch_created);
1999        assert_eq!(result.mode, LoadMode::Overwrite);
2000        assert_eq!(
2001            result.tables,
2002            vec![
2003                IngestTableResult {
2004                    table_key: "edge:Knows".to_string(),
2005                    rows_loaded: 1
2006                },
2007                IngestTableResult {
2008                    table_key: "edge:WorksAt".to_string(),
2009                    rows_loaded: 1
2010                },
2011                IngestTableResult {
2012                    table_key: "node:Company".to_string(),
2013                    rows_loaded: 1
2014                },
2015                IngestTableResult {
2016                    table_key: "node:Person".to_string(),
2017                    rows_loaded: 2
2018                },
2019            ]
2020        );
2021        assert!(
2022            db.branch_list()
2023                .await
2024                .unwrap()
2025                .contains(&"feature".to_string())
2026        );
2027    }
2028
2029    #[tokio::test]
2030    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
2031        let dir = tempfile::tempdir().unwrap();
2032        let uri = dir.path().to_str().unwrap();
2033        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2034        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
2035            .await
2036            .unwrap();
2037        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
2038            .await
2039            .unwrap();
2040
2041        let result = db
2042            .ingest(
2043                "feature",
2044                Some("missing-base"),
2045                r#"{"type":"Person","data":{"name":"Bob","age":26}}
2046{"type":"Person","data":{"name":"Eve","age":31}}"#,
2047                LoadMode::Merge,
2048            )
2049            .await
2050            .unwrap();
2051
2052        assert_eq!(result.branch, "feature");
2053        assert_eq!(result.base_branch, "missing-base");
2054        assert!(!result.branch_created);
2055        assert_eq!(result.mode, LoadMode::Merge);
2056        assert_eq!(
2057            result.tables,
2058            vec![IngestTableResult {
2059                table_key: "node:Person".to_string(),
2060                rows_loaded: 2
2061            }]
2062        );
2063
2064        let snap = db
2065            .snapshot_of(crate::db::ReadTarget::branch("feature"))
2066            .await
2067            .unwrap();
2068        let person_ds = snap.open("node:Person").await.unwrap();
2069        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2070
2071        let batches: Vec<RecordBatch> = person_ds
2072            .scan()
2073            .try_into_stream()
2074            .await
2075            .unwrap()
2076            .try_collect()
2077            .await
2078            .unwrap();
2079        let mut ages_by_id = HashMap::new();
2080        for batch in &batches {
2081            let ids = batch
2082                .column_by_name("id")
2083                .unwrap()
2084                .as_any()
2085                .downcast_ref::<StringArray>()
2086                .unwrap();
2087            let ages = batch
2088                .column_by_name("age")
2089                .unwrap()
2090                .as_any()
2091                .downcast_ref::<Int32Array>()
2092                .unwrap();
2093            for idx in 0..ids.len() {
2094                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2095            }
2096        }
2097
2098        assert_eq!(ages_by_id.get("Bob"), Some(&26));
2099        assert_eq!(ages_by_id.get("Eve"), Some(&31));
2100        assert_eq!(ages_by_id.get("Alice"), Some(&30));
2101    }
2102
2103    #[tokio::test]
2104    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2105        let dir = tempfile::tempdir().unwrap();
2106        let uri = dir.path().to_str().unwrap();
2107        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2108
2109        db.ingest_as(
2110            "feature",
2111            Some("main"),
2112            TEST_DATA,
2113            LoadMode::Overwrite,
2114            Some("act-andrew"),
2115        )
2116        .await
2117        .unwrap();
2118
2119        let head = db
2120            .list_commits(Some("feature"))
2121            .await
2122            .unwrap()
2123            .into_iter()
2124            .last()
2125            .unwrap();
2126        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2127    }
2128
2129    #[test]
2130    fn test_range_constraint_rejects_nan() {
2131        use arrow_array::{Float64Array, RecordBatch, StringArray};
2132        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2133        use std::sync::Arc;
2134
2135        let schema = Arc::new(arrow_schema::Schema::new(vec![
2136            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2137            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2138        ]));
2139
2140        let batch = RecordBatch::try_new(
2141            schema.clone(),
2142            vec![
2143                Arc::new(StringArray::from(vec!["bad"])),
2144                Arc::new(Float64Array::from(vec![f64::NAN])),
2145            ],
2146        )
2147        .unwrap();
2148
2149        let node_type = NodeType {
2150            name: "Test".to_string(),
2151            implements: vec![],
2152            properties: Default::default(),
2153            key: None,
2154            unique_constraints: vec![],
2155            indices: vec![],
2156            range_constraints: vec![RangeConstraint {
2157                property: "score".to_string(),
2158                min: Some(LiteralValue::Float(0.0)),
2159                max: Some(LiteralValue::Float(1.0)),
2160            }],
2161            check_constraints: vec![],
2162            embed_sources: Default::default(),
2163            blob_properties: Default::default(),
2164            arrow_schema: schema,
2165        };
2166
2167        let result = validate_value_constraints(&batch, &node_type);
2168        assert!(result.is_err(), "expected NaN to be rejected");
2169        let err = result.unwrap_err().to_string();
2170        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2171    }
2172}