Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26/// Result of a load operation.
27#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29    pub nodes_loaded: HashMap<String, usize>,
30    pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35    pub table_key: String,
36    pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41    pub branch: String,
42    pub base_branch: String,
43    pub branch_created: bool,
44    pub mode: LoadMode,
45    pub tables: Vec<IngestTableResult>,
46}
47
48/// Load mode for data ingestion.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52    /// Overwrite existing data.
53    Overwrite,
54    /// Append to existing data.
55    Append,
56    /// Merge by `id` key (upsert).
57    Merge,
58}
59
60/// Load JSONL data into an Omnigraph database.
61pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62    let current_branch = db.active_branch().await;
63    let branch = current_branch.as_deref().unwrap_or("main");
64    db.load(branch, data, mode).await
65}
66
67/// Load JSONL data from a file path.
68pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69    let current_branch = db.active_branch().await;
70    let branch = current_branch.as_deref().unwrap_or("main");
71    db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75    pub async fn ingest(
76        &self,
77        branch: &str,
78        from: Option<&str>,
79        data: &str,
80        mode: LoadMode,
81    ) -> Result<IngestResult> {
82        self.ingest_as(branch, from, data, mode, None).await
83    }
84
85    pub async fn ingest_as(
86        &self,
87        branch: &str,
88        from: Option<&str>,
89        data: &str,
90        mode: LoadMode,
91        actor_id: Option<&str>,
92    ) -> Result<IngestResult> {
93        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
94        // `Branch(branch)` for the data-write portion. If ingest creates
95        // a new branch as a side-effect (target branch doesn't exist),
96        // the inner `branch_create_from_as` call below additionally
97        // checks `BranchCreate` — both authorities are genuinely needed
98        // for "ingest into a fresh branch", so the layered check is
99        // correct, not redundant.
100        self.enforce(
101            omnigraph_policy::PolicyAction::Change,
102            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
103            actor_id,
104        )?;
105        self.ingest_with_current_actor(branch, from, data, mode, actor_id)
106            .await
107    }
108
109    pub async fn ingest_file(
110        &self,
111        branch: &str,
112        from: Option<&str>,
113        path: &str,
114        mode: LoadMode,
115    ) -> Result<IngestResult> {
116        self.ingest_file_as(branch, from, path, mode, None).await
117    }
118
119    pub async fn ingest_file_as(
120        &self,
121        branch: &str,
122        from: Option<&str>,
123        path: &str,
124        mode: LoadMode,
125        actor_id: Option<&str>,
126    ) -> Result<IngestResult> {
127        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
128        self.ingest_as(branch, from, &data, mode, actor_id).await
129    }
130
131    async fn ingest_with_current_actor(
132        &self,
133        branch: &str,
134        from: Option<&str>,
135        data: &str,
136        mode: LoadMode,
137        actor_id: Option<&str>,
138    ) -> Result<IngestResult> {
139        self.ensure_schema_state_valid().await?;
140        let target_branch =
141            Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
142        let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
143            .unwrap_or_else(|| "main".to_string());
144        let branch_created = !self
145            .branch_list()
146            .await?
147            .iter()
148            .any(|name| name == &target_branch);
149        if branch_created {
150            // Thread the actor through to the implicit BranchCreate so
151            // policy decisions match what an explicit `branch_create_from_as`
152            // call would see. Calling the no-actor variant here would
153            // bypass BranchCreate enforcement when policy is installed —
154            // the footgun guard catches that case too, but threading is
155            // the correct fix.
156            self.branch_create_from_as(
157                crate::db::ReadTarget::branch(&base_branch),
158                &target_branch,
159                actor_id,
160            )
161            .await?;
162        }
163
164        let result = self.load_as(&target_branch, data, mode, actor_id).await?;
165        Ok(IngestResult {
166            branch: target_branch,
167            base_branch,
168            branch_created,
169            mode,
170            tables: result.to_ingest_tables(),
171        })
172    }
173
174    pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
175        self.load_as(branch, data, mode, None).await
176    }
177
178    pub async fn load_as(
179        &self,
180        branch: &str,
181        data: &str,
182        mode: LoadMode,
183        actor_id: Option<&str>,
184    ) -> Result<LoadResult> {
185        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
186        // `Branch(branch)` to match the HTTP-layer Change convention.
187        // `ingest_as` also calls `load_as` after enforcing its own
188        // Change gate — that double-check is fine because both gates
189        // resolve to identical Cedar decisions for the same actor +
190        // branch (the second check is a structurally-correct no-op).
191        self.enforce(
192            omnigraph_policy::PolicyAction::Change,
193            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
194            actor_id,
195        )?;
196        self.ensure_schema_state_valid().await?;
197        // Reject internal `__run__*` / system-prefixed branches at the
198        // public write boundary. Direct-publish paths assert this
199        // explicitly so a caller can't write to legacy or system
200        // staging branches by passing the prefix verbatim.
201        crate::db::ensure_public_branch_ref(branch, "load")?;
202        // Branch convention: `None` represents `main`. Re-normalizing to
203        // `Some("main")` here would route the publisher commit through a
204        // separate coordinator (the cross-branch path in
205        // `commit_prepared_updates_on_branch_with_expected`) and leave
206        // `self.coordinator` with a stale manifest snapshot.
207        let requested = Self::normalize_branch_name(branch)?;
208        // Direct-to-target writes: no Run state machine, no `__run__` staging
209        // branch. Cross-table OCC is enforced by the publisher's
210        // `expected_table_versions` CAS inside `load_jsonl_reader`.
211        self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
212            .await
213    }
214
215    pub async fn load_file(
216        &self,
217        branch: &str,
218        path: &str,
219        mode: LoadMode,
220    ) -> Result<LoadResult> {
221        self.load_file_as(branch, path, mode, None).await
222    }
223
224    /// Read a file into memory and delegate to `load_as`. Used by the
225    /// CLI's `omnigraph load` so file-path-based writes flow through
226    /// the same engine-layer policy gate as in-memory `load_as` calls.
227    pub async fn load_file_as(
228        &self,
229        branch: &str,
230        path: &str,
231        mode: LoadMode,
232        actor_id: Option<&str>,
233    ) -> Result<LoadResult> {
234        let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
235        self.load_as(branch, &data, mode, actor_id).await
236    }
237
238    async fn load_direct_on_branch(
239        &self,
240        branch: Option<&str>,
241        data: &str,
242        mode: LoadMode,
243        actor_id: Option<&str>,
244    ) -> Result<LoadResult> {
245        let reader = BufReader::new(Cursor::new(data.as_bytes()));
246        load_jsonl_reader(self, branch, reader, mode, actor_id).await
247    }
248}
249
250impl LoadMode {
251    pub fn as_str(self) -> &'static str {
252        match self {
253            LoadMode::Overwrite => "overwrite",
254            LoadMode::Append => "append",
255            LoadMode::Merge => "merge",
256        }
257    }
258}
259
260impl LoadResult {
261    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
262        let mut tables = self
263            .nodes_loaded
264            .iter()
265            .map(|(type_name, rows_loaded)| IngestTableResult {
266                table_key: format!("node:{type_name}"),
267                rows_loaded: *rows_loaded,
268            })
269            .chain(
270                self.edges_loaded
271                    .iter()
272                    .map(|(edge_name, rows_loaded)| IngestTableResult {
273                        table_key: format!("edge:{edge_name}"),
274                        rows_loaded: *rows_loaded,
275                    }),
276            )
277            .collect::<Vec<_>>();
278        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
279        tables
280    }
281}
282
283async fn load_jsonl_reader<R: BufRead>(
284    db: &Omnigraph,
285    branch: Option<&str>,
286    reader: R,
287    mode: LoadMode,
288    actor_id: Option<&str>,
289) -> Result<LoadResult> {
290    let catalog = db.catalog().clone();
291
292    // Phase 1: Parse all lines, spool into per-type collections
293    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
294    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
295
296    for (line_num, line) in reader.lines().enumerate() {
297        let line = line?;
298        let line = line.trim();
299        if line.is_empty() {
300            continue;
301        }
302        let value: JsonValue = serde_json::from_str(line).map_err(|e| {
303            OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
304        })?;
305
306        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
307            if !catalog.node_types.contains_key(type_name) {
308                return Err(OmniError::manifest(format!(
309                    "line {}: unknown node type '{}'",
310                    line_num + 1,
311                    type_name
312                )));
313            }
314            let data = value
315                .get("data")
316                .cloned()
317                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
318            node_rows
319                .entry(type_name.to_string())
320                .or_default()
321                .push(data);
322        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
323            if catalog.lookup_edge_by_name(edge_name).is_none() {
324                return Err(OmniError::manifest(format!(
325                    "line {}: unknown edge type '{}'",
326                    line_num + 1,
327                    edge_name
328                )));
329            }
330            let from = value
331                .get("from")
332                .and_then(|v| v.as_str())
333                .ok_or_else(|| {
334                    OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
335                })?
336                .to_string();
337            let to = value
338                .get("to")
339                .and_then(|v| v.as_str())
340                .ok_or_else(|| {
341                    OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
342                })?
343                .to_string();
344            let data = value
345                .get("data")
346                .cloned()
347                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
348            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
349            edge_rows
350                .entry(canonical)
351                .or_default()
352                .push((from, to, data));
353        } else {
354            return Err(OmniError::manifest(format!(
355                "line {}: expected 'type' or 'edge' field",
356                line_num + 1
357            )));
358        }
359    }
360
361    // Phase 2: Build per-type RecordBatches and accumulate into the
362    // staging pipeline. For Append/Merge, batches go into an in-memory
363    // accumulator and a single `stage_*` + `commit_staged` per touched
364    // table runs at end-of-load — a mid-load failure (RI / cardinality
365    // violation) leaves Lance HEAD untouched. For Overwrite, the legacy
366    // inline-commit path is preserved (truncate+append doesn't fit the
367    // staged shape cleanly, and overwrite has no in-flight read-your-writes
368    // requirement).
369
370    let mut result = LoadResult::default();
371    let snapshot = db.snapshot_for_branch(branch).await?;
372    let use_staging = !matches!(mode, LoadMode::Overwrite);
373    let mut staging = MutationStaging::default();
374    let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
375    let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
376    let pending_mode = match mode {
377        LoadMode::Merge => PendingMode::Merge,
378        // Append-mode loads accumulate as Append. Edge tables (no @key)
379        // and no-key node tables stay safe on the stage_append path. The
380        // Merge mode applies dedupe-by-id; Append assumes unique inputs.
381        LoadMode::Append => PendingMode::Append,
382        LoadMode::Overwrite => PendingMode::Append, // unused
383    };
384    // Map LoadMode to MutationOpKind for the version-check policy.
385    // Append/Merge skip the strict pre-stage check (concurrency-safe
386    // under the per-(table, branch) queue + publisher CAS); Overwrite
387    // uses the strict check because it truncates and replaces the
388    // dataset — concurrent advances change what "replace" means.
389    let load_op_kind = match mode {
390        LoadMode::Append => crate::db::MutationOpKind::Insert,
391        LoadMode::Merge => crate::db::MutationOpKind::Merge,
392        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
393    };
394
395    // Phase 2a: build and validate every node batch up front. Cheap and
396    // synchronous — surfaces validation errors before any S3 traffic.
397    let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
398        Vec::with_capacity(node_rows.len());
399    for (type_name, rows) in &node_rows {
400        let node_type = &catalog.node_types[type_name];
401        let batch = build_node_batch(node_type, rows)?;
402        validate_value_constraints(&batch, node_type)?;
403        validate_enum_constraints(&batch, &node_type.properties, type_name)?;
404        let unique_props = unique_property_names_for_node(node_type);
405        if !unique_props.is_empty() {
406            enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
407        }
408        let loaded_count = batch.num_rows();
409        let table_key = format!("node:{}", type_name);
410        let entry = snapshot
411            .entry(&table_key)
412            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
413        if !use_staging {
414            overwrite_expected.insert(table_key.clone(), entry.table_version);
415        }
416        prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
417    }
418
419    // Phase 2b: write every node type. Append/Merge → in-memory
420    // accumulator. Overwrite → concurrent inline-commit (legacy path).
421    if use_staging {
422        for (type_name, table_key, batch, loaded_count) in prepared_nodes {
423            let (ds, full_path, table_branch) = db
424                .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
425                .await?;
426            let expected_version = ds.version().version;
427            staging.ensure_path(
428                &table_key,
429                full_path,
430                table_branch,
431                expected_version,
432                load_op_kind,
433            );
434            let schema = batch.schema();
435            staging.append_batch(&table_key, schema, pending_mode, batch)?;
436            result.nodes_loaded.insert(type_name, loaded_count);
437        }
438    } else {
439        let node_write_results =
440            write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
441        for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
442            overwrite_updates.push(crate::db::SubTableUpdate {
443                table_key,
444                table_version: state.version,
445                table_branch,
446                row_count: state.row_count,
447                version_metadata: state.version_metadata,
448            });
449            result.nodes_loaded.insert(type_name, loaded_count);
450        }
451    }
452
453    // Phase 2c: Validate edge referential integrity — every src/dst must
454    // reference an existing node ID in the appropriate type. For staged
455    // loads, the lookup unions snapshot-committed IDs with the in-memory
456    // pending batches (which carry the just-staged node inserts).
457    for (edge_name, rows) in &edge_rows {
458        let edge_type = &catalog.edge_types[edge_name];
459        let from_ids = if use_staging {
460            collect_node_ids_with_pending(
461                db,
462                branch,
463                &edge_type.from_type,
464                &staging,
465            )
466            .await?
467        } else {
468            collect_node_ids(
469                db,
470                branch,
471                &edge_type.from_type,
472                &node_rows,
473                &catalog,
474                &overwrite_updates,
475            )
476            .await?
477        };
478        let to_ids = if use_staging {
479            collect_node_ids_with_pending(
480                db,
481                branch,
482                &edge_type.to_type,
483                &staging,
484            )
485            .await?
486        } else {
487            collect_node_ids(
488                db,
489                branch,
490                &edge_type.to_type,
491                &node_rows,
492                &catalog,
493                &overwrite_updates,
494            )
495            .await?
496        };
497
498        for (i, (src, dst, _)) in rows.iter().enumerate() {
499            if !from_ids.contains(src.as_str()) {
500                return Err(OmniError::manifest(format!(
501                    "edge {} row {}: src '{}' not found in {}",
502                    edge_name,
503                    i + 1,
504                    src,
505                    edge_type.from_type
506                )));
507            }
508            if !to_ids.contains(dst.as_str()) {
509                return Err(OmniError::manifest(format!(
510                    "edge {} row {}: dst '{}' not found in {}",
511                    edge_name,
512                    i + 1,
513                    dst,
514                    edge_type.to_type
515                )));
516            }
517        }
518    }
519
520    // Phase 2d: build edge batches.
521    let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
522        Vec::with_capacity(edge_rows.len());
523    for (edge_name, rows) in &edge_rows {
524        let edge_type = &catalog.edge_types[edge_name];
525        let batch = build_edge_batch(edge_type, rows)?;
526        validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
527        let unique_props = unique_property_names_for_edge(edge_type);
528        if !unique_props.is_empty() {
529            enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
530        }
531        let loaded_count = batch.num_rows();
532        let table_key = format!("edge:{}", edge_name);
533        let entry = snapshot
534            .entry(&table_key)
535            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
536        if !use_staging {
537            overwrite_expected.insert(table_key.clone(), entry.table_version);
538        }
539        prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
540    }
541
542    // Phase 2e: write every edge type. Same dispatch as Phase 2b.
543    if use_staging {
544        for (edge_name, table_key, batch, loaded_count) in prepared_edges {
545            let (ds, full_path, table_branch) = db
546                .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
547                .await?;
548            let expected_version = ds.version().version;
549            staging.ensure_path(
550                &table_key,
551                full_path,
552                table_branch,
553                expected_version,
554                load_op_kind,
555            );
556            let schema = batch.schema();
557            staging.append_batch(&table_key, schema, pending_mode, batch)?;
558            result.edges_loaded.insert(edge_name, loaded_count);
559        }
560    } else {
561        let edge_write_results =
562            write_batches_concurrently(db, branch, mode, prepared_edges).await?;
563        for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
564            overwrite_updates.push(crate::db::SubTableUpdate {
565                table_key,
566                table_version: state.version,
567                table_branch,
568                row_count: state.row_count,
569                version_metadata: state.version_metadata,
570            });
571            result.edges_loaded.insert(edge_name, loaded_count);
572        }
573    }
574
575    // Phase 3: Validate edge cardinality constraints (before commit —
576    // invalid data must not be committed). Staged path scans committed
577    // edges via Lance + iterates pending edges in-memory. Overwrite path
578    // opens the just-written version (legacy behavior).
579    for (edge_name, _) in &edge_rows {
580        let edge_type = &catalog.edge_types[edge_name];
581        let table_key = format!("edge:{}", edge_name);
582        if use_staging {
583            validate_edge_cardinality_with_pending_loader(
584                db,
585                branch,
586                edge_type,
587                &table_key,
588                &staging,
589                mode,
590            )
591            .await?;
592        } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
593            validate_edge_cardinality(
594                db,
595                branch,
596                edge_name,
597                update.table_version,
598                update.table_branch.as_deref(),
599            )
600            .await?;
601        }
602    }
603
604    // Phase 4: Atomic manifest commit with publisher-level OCC.
605    if use_staging {
606        let staged = staging.stage_all(db, branch).await?;
607        // `_queue_guards` holds per-(table_key, branch) write queues
608        // across the manifest publish below — see exec/mutation.rs for
609        // the rationale (interleaving prevention).
610        let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
611            .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
612            .await?;
613        // Same finalize → publisher residual as mutations: per-table
614        // staged commits have advanced Lance HEAD, but the manifest
615        // publish has not run yet. Reuse the mutation failpoint name so
616        // one failpoint pins the shared `MutationStaging` boundary.
617        crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
618        db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
619            .await?;
620        // The recovery sidecar protects the per-table commit_staged →
621        // manifest publish window. Phase C succeeded — clean up
622        // best-effort: failing the user here would error out a write
623        // that already landed durably.
624        if let Some(handle) = sidecar_handle {
625            if let Err(err) =
626                crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
627            {
628                tracing::warn!(
629                    error = %err,
630                    operation_id = handle.operation_id.as_str(),
631                    "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
632                );
633            }
634        }
635    } else {
636        // LoadMode::Overwrite keeps the legacy inline-commit path —
637        // truncate-then-append doesn't fit the staged shape (see
638        // `docs/runs.md` "LoadMode::Overwrite residual"). The recovery
639        // sidecar is not applicable here because the writer doesn't go
640        // through MutationStaging; per-table inline commits + a final
641        // manifest publish handle their own residual via the documented
642        // operator workflow (re-run overwrite to recover).
643        db.commit_updates_on_branch_with_expected(
644            branch,
645            &overwrite_updates,
646            &overwrite_expected,
647            actor_id,
648        )
649        .await?;
650    }
651
652    Ok(result)
653}
654
655fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
656    let schema = node_type.arrow_schema.clone();
657
658    // Build id column: explicit id, @key value, or generated ULID.
659    let ids: Vec<String> = rows
660        .iter()
661        .map(|row| {
662            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
663            if let Some(key_prop) = node_type.key_property() {
664                let key_value = row
665                    .get(key_prop)
666                    .and_then(|v| v.as_str())
667                    .map(|s| s.to_string())
668                    .ok_or_else(|| {
669                        OmniError::manifest(format!(
670                            "node {} missing @key property '{}'",
671                            node_type.name, key_prop
672                        ))
673                    })?;
674                if let Some(explicit_id) = explicit_id {
675                    if explicit_id != key_value {
676                        return Err(OmniError::manifest(format!(
677                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
678                            node_type.name, explicit_id, key_prop, key_value
679                        )));
680                    }
681                }
682                Ok(key_value)
683            } else if let Some(explicit_id) = explicit_id {
684                Ok(explicit_id)
685            } else {
686                Ok(generate_id())
687            }
688        })
689        .collect::<Result<Vec<_>>>()?;
690
691    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
692    columns.push(Arc::new(StringArray::from(ids)));
693
694    // Build property columns (skip "id" field at index 0)
695    for field in schema.fields().iter().skip(1) {
696        if node_type.blob_properties.contains(field.name()) {
697            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
698            columns.push(col);
699        } else {
700            let col =
701                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
702            columns.push(col);
703        }
704    }
705
706    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
707}
708
709fn build_edge_batch(
710    edge_type: &omnigraph_compiler::catalog::EdgeType,
711    rows: &[(String, String, JsonValue)],
712) -> Result<RecordBatch> {
713    let schema = edge_type.arrow_schema.clone();
714
715    let ids: Vec<String> = rows
716        .iter()
717        .map(|(_, _, data)| {
718            data.get("id")
719                .and_then(|v| v.as_str())
720                .map(str::to_string)
721                .unwrap_or_else(generate_id)
722        })
723        .collect();
724    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
725    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
726
727    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
728    columns.push(Arc::new(StringArray::from(ids)));
729    columns.push(Arc::new(StringArray::from(srcs)));
730    columns.push(Arc::new(StringArray::from(dsts)));
731
732    // Build edge property columns (skip id, src, dst at indices 0-2)
733    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
734    for field in schema.fields().iter().skip(3) {
735        if edge_type.blob_properties.contains(field.name()) {
736            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
737            columns.push(col);
738        } else {
739            let col = build_column_from_json(
740                field.name(),
741                field.data_type(),
742                field.is_nullable(),
743                &data_values,
744            )?;
745            columns.push(col);
746        }
747    }
748
749    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
750}
751
752/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
753pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
754    if let Some(encoded) = value.strip_prefix("base64:") {
755        let bytes = base64::engine::general_purpose::STANDARD
756            .decode(encoded)
757            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
758        builder
759            .push_bytes(bytes)
760            .map_err(|e| OmniError::Lance(e.to_string()))
761    } else {
762        // Treat as URI (file://, s3://, gs://, or any other scheme)
763        builder
764            .push_uri(value)
765            .map_err(|e| OmniError::Lance(e.to_string()))
766    }
767}
768
769/// Build a blob column from JSON values using Lance BlobArrayBuilder.
770fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
771    let mut builder = BlobArrayBuilder::new(rows.len());
772    for row in rows {
773        match row.get(name) {
774            Some(JsonValue::String(s)) => {
775                append_blob_value(&mut builder, s)?;
776            }
777            Some(JsonValue::Null) | None if nullable => {
778                builder
779                    .push_null()
780                    .map_err(|e| OmniError::Lance(e.to_string()))?;
781            }
782            Some(JsonValue::Null) | None => {
783                return Err(OmniError::manifest(format!(
784                    "non-nullable blob property '{}' has null values",
785                    name
786                )));
787            }
788            _ => {
789                return Err(OmniError::manifest(format!(
790                    "blob property '{}' must be a URI string or base64: prefixed data",
791                    name
792                )));
793            }
794        }
795    }
796    builder
797        .finish()
798        .map_err(|e| OmniError::Lance(e.to_string()))
799}
800
801fn build_column_from_json(
802    name: &str,
803    data_type: &DataType,
804    nullable: bool,
805    rows: &[JsonValue],
806) -> Result<ArrayRef> {
807    let array: ArrayRef = match data_type {
808        DataType::Utf8 => {
809            let values: Vec<Option<String>> = rows
810                .iter()
811                .map(|row| {
812                    row.get(name)
813                        .and_then(|v| v.as_str())
814                        .map(|s| s.to_string())
815                })
816                .collect();
817            Arc::new(StringArray::from(values))
818        }
819        DataType::Int32 => {
820            let values: Vec<Option<i32>> = rows
821                .iter()
822                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
823                .collect();
824            Arc::new(Int32Array::from(values))
825        }
826        DataType::Int64 => {
827            let values: Vec<Option<i64>> = rows
828                .iter()
829                .map(|row| row.get(name).and_then(|v| v.as_i64()))
830                .collect();
831            Arc::new(Int64Array::from(values))
832        }
833        DataType::UInt32 => {
834            let values: Vec<Option<u32>> = rows
835                .iter()
836                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
837                .collect();
838            Arc::new(UInt32Array::from(values))
839        }
840        DataType::UInt64 => {
841            let values: Vec<Option<u64>> = rows
842                .iter()
843                .map(|row| row.get(name).and_then(|v| v.as_u64()))
844                .collect();
845            Arc::new(UInt64Array::from(values))
846        }
847        DataType::Float32 => {
848            let values: Vec<Option<f32>> = rows
849                .iter()
850                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
851                .collect();
852            Arc::new(Float32Array::from(values))
853        }
854        DataType::Float64 => {
855            let values: Vec<Option<f64>> = rows
856                .iter()
857                .map(|row| row.get(name).and_then(|v| v.as_f64()))
858                .collect();
859            Arc::new(Float64Array::from(values))
860        }
861        DataType::Boolean => {
862            let values: Vec<Option<bool>> = rows
863                .iter()
864                .map(|row| row.get(name).and_then(|v| v.as_bool()))
865                .collect();
866            Arc::new(BooleanArray::from(values))
867        }
868        DataType::Date32 => {
869            let mut values = Vec::with_capacity(rows.len());
870            for row in rows {
871                values.push(parse_date32_json_value(
872                    row.get(name).unwrap_or(&JsonValue::Null),
873                )?);
874            }
875            Arc::new(Date32Array::from(values))
876        }
877        DataType::Date64 => {
878            let mut values = Vec::with_capacity(rows.len());
879            for row in rows {
880                values.push(parse_date64_json_value(
881                    row.get(name).unwrap_or(&JsonValue::Null),
882                )?);
883            }
884            Arc::new(Date64Array::from(values))
885        }
886        DataType::List(field) => {
887            let mut builder = ListBuilder::with_capacity(
888                make_list_value_builder(field.data_type(), rows.len())?,
889                rows.len(),
890            )
891            .with_field(field.clone());
892            for row in rows {
893                let value = row.get(name).unwrap_or(&JsonValue::Null);
894                if value.is_null() {
895                    builder.append(false);
896                    continue;
897                }
898                let items = value.as_array().ok_or_else(|| {
899                    OmniError::manifest(format!(
900                        "list property '{}' expects a JSON array, got {}",
901                        name, value
902                    ))
903                })?;
904                for item in items {
905                    append_json_list_item(builder.values(), field.data_type(), item)?;
906                }
907                builder.append(true);
908            }
909            Arc::new(builder.finish())
910        }
911        DataType::FixedSizeList(child_field, dim) => {
912            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
913            let dim = *dim;
914            let mut builder = FixedSizeListBuilder::with_capacity(
915                Float32Builder::with_capacity(rows.len() * dim as usize),
916                dim,
917                rows.len(),
918            )
919            .with_field(child_field.clone());
920            for row in rows {
921                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
922                    if arr.len() != dim as usize {
923                        return Err(OmniError::manifest(format!(
924                            "vector property '{}' expects {} dimensions, got {}",
925                            name,
926                            dim,
927                            arr.len()
928                        )));
929                    }
930                    for val in arr {
931                        builder
932                            .values()
933                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
934                    }
935                    builder.append(true);
936                } else if nullable {
937                    for _ in 0..dim as usize {
938                        builder.values().append_null();
939                    }
940                    builder.append(false);
941                } else {
942                    return Err(OmniError::manifest(format!(
943                        "non-nullable vector property '{}' has null values",
944                        name
945                    )));
946                }
947            }
948            Arc::new(builder.finish())
949        }
950        _ => {
951            // Unsupported type: fill with nulls
952            let values: Vec<Option<&str>> = vec![None; rows.len()];
953            Arc::new(StringArray::from(values))
954        }
955    };
956
957    if !nullable && array.null_count() > 0 {
958        return Err(OmniError::manifest(format!(
959            "non-nullable property '{}' has null or invalid values",
960            name
961        )));
962    }
963
964    Ok(array)
965}
966
967fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
968    Ok(match data_type {
969        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
970        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
971        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
972        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
973        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
974        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
975        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
976        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
977        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
978        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
979        other => {
980            return Err(OmniError::manifest(format!(
981                "unsupported list element data type {:?}",
982                other
983            )));
984        }
985    })
986}
987
988fn append_json_list_item(
989    builder: &mut Box<dyn ArrayBuilder>,
990    data_type: &DataType,
991    value: &JsonValue,
992) -> Result<()> {
993    match data_type {
994        DataType::Utf8 => {
995            let builder = builder
996                .as_any_mut()
997                .downcast_mut::<StringBuilder>()
998                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
999            if let Some(value) = value.as_str() {
1000                builder.append_value(value);
1001            } else {
1002                builder.append_null();
1003            }
1004        }
1005        DataType::Boolean => {
1006            let builder = builder
1007                .as_any_mut()
1008                .downcast_mut::<BooleanBuilder>()
1009                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
1010            if let Some(value) = value.as_bool() {
1011                builder.append_value(value);
1012            } else {
1013                builder.append_null();
1014            }
1015        }
1016        DataType::Int32 => {
1017            let builder = builder
1018                .as_any_mut()
1019                .downcast_mut::<Int32Builder>()
1020                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
1021            if let Some(value) = value.as_i64() {
1022                let value = i32::try_from(value).map_err(|_| {
1023                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
1024                })?;
1025                builder.append_value(value);
1026            } else {
1027                builder.append_null();
1028            }
1029        }
1030        DataType::Int64 => {
1031            let builder = builder
1032                .as_any_mut()
1033                .downcast_mut::<Int64Builder>()
1034                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1035            if let Some(value) = value.as_i64() {
1036                builder.append_value(value);
1037            } else {
1038                builder.append_null();
1039            }
1040        }
1041        DataType::UInt32 => {
1042            let builder = builder
1043                .as_any_mut()
1044                .downcast_mut::<UInt32Builder>()
1045                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1046            if let Some(value) = value.as_u64() {
1047                let value = u32::try_from(value).map_err(|_| {
1048                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1049                })?;
1050                builder.append_value(value);
1051            } else {
1052                builder.append_null();
1053            }
1054        }
1055        DataType::UInt64 => {
1056            let builder = builder
1057                .as_any_mut()
1058                .downcast_mut::<UInt64Builder>()
1059                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1060            if let Some(value) = value.as_u64() {
1061                builder.append_value(value);
1062            } else {
1063                builder.append_null();
1064            }
1065        }
1066        DataType::Float32 => {
1067            let builder = builder
1068                .as_any_mut()
1069                .downcast_mut::<Float32Builder>()
1070                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1071            if let Some(value) = value.as_f64() {
1072                builder.append_value(value as f32);
1073            } else {
1074                builder.append_null();
1075            }
1076        }
1077        DataType::Float64 => {
1078            let builder = builder
1079                .as_any_mut()
1080                .downcast_mut::<Float64Builder>()
1081                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1082            if let Some(value) = value.as_f64() {
1083                builder.append_value(value);
1084            } else {
1085                builder.append_null();
1086            }
1087        }
1088        DataType::Date32 => {
1089            let builder = builder
1090                .as_any_mut()
1091                .downcast_mut::<Date32Builder>()
1092                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1093            if let Some(value) = parse_date32_json_value(value)? {
1094                builder.append_value(value);
1095            } else {
1096                builder.append_null();
1097            }
1098        }
1099        DataType::Date64 => {
1100            let builder = builder
1101                .as_any_mut()
1102                .downcast_mut::<Date64Builder>()
1103                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1104            if let Some(value) = parse_date64_json_value(value)? {
1105                builder.append_value(value);
1106            } else {
1107                builder.append_null();
1108            }
1109        }
1110        other => {
1111            return Err(OmniError::manifest(format!(
1112                "unsupported list element data type {:?}",
1113                other
1114            )));
1115        }
1116    }
1117
1118    Ok(())
1119}
1120
1121fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1122    if value.is_null() {
1123        return Ok(None);
1124    }
1125    if let Some(days) = value.as_i64() {
1126        let days = i32::try_from(days)
1127            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1128        return Ok(Some(days));
1129    }
1130    if let Some(days) = value.as_u64() {
1131        let days = i32::try_from(days)
1132            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1133        return Ok(Some(days));
1134    }
1135    if let Some(value) = value.as_str() {
1136        return Ok(Some(parse_date32_literal(value)?));
1137    }
1138    Ok(None)
1139}
1140
1141fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1142    if value.is_null() {
1143        return Ok(None);
1144    }
1145    if let Some(ms) = value.as_i64() {
1146        return Ok(Some(ms));
1147    }
1148    if let Some(ms) = value.as_u64() {
1149        let ms = i64::try_from(ms)
1150            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1151        return Ok(Some(ms));
1152    }
1153    if let Some(value) = value.as_str() {
1154        return Ok(Some(parse_date64_literal(value)?));
1155    }
1156    Ok(None)
1157}
1158
1159/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
1160/// How many per-type Lance writes to run concurrently during a load.
1161///
1162/// Each write is an independent S3 manifest + fragment write against a
1163/// different table. Ops within a single table must still be serial (Lance
1164/// OCC on the manifest), but cross-table writes have no shared state.
1165///
1166/// 8 is a conservative default — enough to overlap S3 round-trip latency
1167/// across the typical 10-30 table schemas without flooding the runtime.
1168/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
1169const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1170
1171fn load_write_concurrency() -> usize {
1172    std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1173        .ok()
1174        .and_then(|v| v.parse::<usize>().ok())
1175        .filter(|v| *v > 0)
1176        .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1177}
1178
1179/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples
1180/// concurrently. Returns results in original iteration order so callers can
1181/// zip them back to per-type metadata.
1182async fn write_batches_concurrently(
1183    db: &Omnigraph,
1184    branch: Option<&str>,
1185    mode: LoadMode,
1186    prepared: Vec<(String, String, RecordBatch, usize)>,
1187) -> Result<
1188    Vec<(
1189        String,
1190        String,
1191        usize,
1192        crate::table_store::TableState,
1193        Option<String>,
1194    )>,
1195> {
1196    use futures::stream::StreamExt;
1197
1198    if prepared.is_empty() {
1199        return Ok(Vec::new());
1200    }
1201
1202    let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1203
1204    futures::stream::iter(prepared.into_iter().map(
1205        |(type_name, table_key, batch, loaded_count)| async move {
1206            let (state, table_branch) =
1207                write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1208            Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1209        },
1210    ))
1211    .buffered(concurrency)
1212    .collect::<Vec<Result<_>>>()
1213    .await
1214    .into_iter()
1215    .collect()
1216}
1217
1218async fn write_batch_to_dataset(
1219    db: &Omnigraph,
1220    branch: Option<&str>,
1221    table_key: &str,
1222    batch: RecordBatch,
1223    mode: LoadMode,
1224) -> Result<(crate::table_store::TableState, Option<String>)> {
1225    let op_kind = match mode {
1226        LoadMode::Append => crate::db::MutationOpKind::Insert,
1227        LoadMode::Merge => crate::db::MutationOpKind::Merge,
1228        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
1229    };
1230    let (mut ds, full_path, table_branch) = db
1231        .open_for_mutation_on_branch(branch, table_key, op_kind)
1232        .await?;
1233    let table_store = db.table_store();
1234
1235    match mode {
1236        LoadMode::Overwrite => {
1237            let state = table_store
1238                .overwrite_batch(&full_path, &mut ds, batch)
1239                .await?;
1240            Ok((state, table_branch))
1241        }
1242        LoadMode::Append => {
1243            let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1244            Ok((state, table_branch))
1245        }
1246        LoadMode::Merge => {
1247            let state = table_store
1248                .merge_insert_batch(
1249                    &full_path,
1250                    ds,
1251                    batch,
1252                    vec!["id".to_string()],
1253                    lance::dataset::WhenMatched::UpdateAll,
1254                    lance::dataset::WhenNotMatched::InsertAll,
1255                )
1256                .await?;
1257            Ok((state, table_branch))
1258        }
1259    }
1260}
1261
1262fn generate_id() -> String {
1263    ulid::Ulid::new().to_string()
1264}
1265
1266pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1267    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1268    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1269        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1270    let out = casted
1271        .as_any()
1272        .downcast_ref::<Date32Array>()
1273        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1274    if out.is_null(0) {
1275        return Err(OmniError::manifest(format!(
1276            "invalid Date literal '{}'",
1277            value
1278        )));
1279    }
1280    Ok(out.value(0))
1281}
1282
1283pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1284    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1285    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1286        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1287    let out = casted
1288        .as_any()
1289        .downcast_ref::<Date64Array>()
1290        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1291    if out.is_null(0) {
1292        return Err(OmniError::manifest(format!(
1293            "invalid DateTime literal '{}'",
1294            value
1295        )));
1296    }
1297    Ok(out.value(0))
1298}
1299
1300// ─── Value constraint validation ─────────────────────────────────────────────
1301
1302pub(crate) fn validate_value_constraints(
1303    batch: &RecordBatch,
1304    node_type: &omnigraph_compiler::catalog::NodeType,
1305) -> Result<()> {
1306    use arrow_array::Array;
1307
1308    // Range constraints
1309    for rc in &node_type.range_constraints {
1310        let Some(col) = batch.column_by_name(&rc.property) else {
1311            continue;
1312        };
1313        for row in 0..batch.num_rows() {
1314            if col.is_null(row) {
1315                continue;
1316            }
1317            let value = extract_numeric_value(col, row);
1318            if let Some(val) = value {
1319                if val.is_nan() {
1320                    return Err(OmniError::manifest(format!(
1321                        "@range violation on {}.{}: value is NaN",
1322                        node_type.name, rc.property
1323                    )));
1324                }
1325                if let Some(ref min) = rc.min {
1326                    let min_f = literal_value_to_f64(min);
1327                    if val < min_f {
1328                        return Err(OmniError::manifest(format!(
1329                            "@range violation on {}.{}: value {} < min {}",
1330                            node_type.name, rc.property, val, min_f
1331                        )));
1332                    }
1333                }
1334                if let Some(ref max) = rc.max {
1335                    let max_f = literal_value_to_f64(max);
1336                    if val > max_f {
1337                        return Err(OmniError::manifest(format!(
1338                            "@range violation on {}.{}: value {} > max {}",
1339                            node_type.name, rc.property, val, max_f
1340                        )));
1341                    }
1342                }
1343            }
1344        }
1345    }
1346
1347    // Check constraints (regex)
1348    for cc in &node_type.check_constraints {
1349        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1350            OmniError::manifest(format!(
1351                "@check on {}.{} has invalid regex '{}': {}",
1352                node_type.name, cc.property, cc.pattern, e
1353            ))
1354        })?;
1355        let Some(col) = batch.column_by_name(&cc.property) else {
1356            continue;
1357        };
1358        let str_col = col.as_any().downcast_ref::<StringArray>();
1359        if let Some(str_col) = str_col {
1360            for row in 0..str_col.len() {
1361                if str_col.is_null(row) {
1362                    continue;
1363                }
1364                let val = str_col.value(row);
1365                if !re.is_match(val) {
1366                    return Err(OmniError::manifest(format!(
1367                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1368                        node_type.name, cc.property, val, cc.pattern
1369                    )));
1370                }
1371            }
1372        }
1373    }
1374
1375    Ok(())
1376}
1377
1378/// Validate that every enum-typed property in `properties` only contains values
1379/// from its declared enum value set. Operates on a single `RecordBatch` so it
1380/// can be called from any write path that already holds a batch.
1381///
1382/// Scalar string enums are checked directly. List-of-enum properties are
1383/// checked element-by-element across the underlying string values.
1384pub(crate) fn validate_enum_constraints(
1385    batch: &RecordBatch,
1386    properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1387    type_name: &str,
1388) -> Result<()> {
1389    use arrow_array::{Array, ListArray};
1390
1391    for (prop_name, prop_type) in properties {
1392        let Some(allowed) = prop_type.enum_values.as_ref() else {
1393            continue;
1394        };
1395        let Some(col) = batch.column_by_name(prop_name) else {
1396            continue;
1397        };
1398        if prop_type.list {
1399            let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1400                continue;
1401            };
1402            for row in 0..list_col.len() {
1403                if list_col.is_null(row) {
1404                    continue;
1405                }
1406                let item_arr = list_col.value(row);
1407                let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1408                    continue;
1409                };
1410                for i in 0..str_arr.len() {
1411                    if str_arr.is_null(i) {
1412                        continue;
1413                    }
1414                    let val = str_arr.value(i);
1415                    if !allowed.iter().any(|a| a.as_str() == val) {
1416                        return Err(OmniError::manifest(format!(
1417                            "invalid enum value '{}' for {}.{} (expected: {})",
1418                            val,
1419                            type_name,
1420                            prop_name,
1421                            allowed.join(", ")
1422                        )));
1423                    }
1424                }
1425            }
1426        } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1427            for row in 0..str_col.len() {
1428                if str_col.is_null(row) {
1429                    continue;
1430                }
1431                let val = str_col.value(row);
1432                if !allowed.iter().any(|a| a.as_str() == val) {
1433                    return Err(OmniError::manifest(format!(
1434                        "invalid enum value '{}' for {}.{} (expected: {})",
1435                        val,
1436                        type_name,
1437                        prop_name,
1438                        allowed.join(", ")
1439                    )));
1440                }
1441            }
1442        }
1443    }
1444    Ok(())
1445}
1446
1447/// Detect duplicate values within a single `RecordBatch` for any of the named
1448/// `unique_properties`. Returns an error on the first duplicate found.
1449///
1450/// Note: this only catches duplicates *within* the batch. Cross-batch
1451/// uniqueness against already-committed rows is not enforced here — that
1452/// requires a dataset scan and is tracked separately.
1453pub(crate) fn enforce_unique_constraints_intra_batch(
1454    batch: &RecordBatch,
1455    type_name: &str,
1456    unique_properties: &[String],
1457) -> Result<()> {
1458    for property in unique_properties {
1459        let Some(col_idx) = batch.schema().index_of(property).ok() else {
1460            continue;
1461        };
1462        let arr = batch.column(col_idx);
1463        let mut seen: HashMap<String, usize> = HashMap::new();
1464        for row in 0..batch.num_rows() {
1465            let Some(value) = scalar_to_string(arr, row) else {
1466                continue;
1467            };
1468            if let Some(prev_row) = seen.insert(value.clone(), row) {
1469                return Err(OmniError::manifest(format!(
1470                    "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1471                    type_name, property, value, prev_row, row
1472                )));
1473            }
1474        }
1475    }
1476    Ok(())
1477}
1478
1479/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
1480/// uniqueness comparison. Returns `None` for null values (nulls are exempt
1481/// from uniqueness in standard SQL semantics).
1482fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1483    use arrow_array::Array;
1484    if array.is_null(row) {
1485        return None;
1486    }
1487    if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1488        return Some(a.value(row).to_string());
1489    }
1490    if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1491        return Some(a.value(row).to_string());
1492    }
1493    if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1494        return Some(a.value(row).to_string());
1495    }
1496    if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1497        return Some(a.value(row).to_string());
1498    }
1499    if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1500        return Some(a.value(row).to_string());
1501    }
1502    if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1503        return Some(a.value(row).to_string());
1504    }
1505    if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1506        return Some(a.value(row).to_string());
1507    }
1508    if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1509        return Some(a.value(row).to_string());
1510    }
1511    if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1512        return Some(a.value(row).to_string());
1513    }
1514    if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1515        return Some(a.value(row).to_string());
1516    }
1517    None
1518}
1519
1520/// Build the flat list of property names that must be checked for uniqueness
1521/// on a node type. Includes both `@unique` properties (from
1522/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
1523pub(crate) fn unique_property_names_for_node(
1524    node_type: &omnigraph_compiler::catalog::NodeType,
1525) -> Vec<String> {
1526    let mut props: Vec<String> = node_type
1527        .unique_constraints
1528        .iter()
1529        .flatten()
1530        .cloned()
1531        .collect();
1532    if let Some(key) = &node_type.key {
1533        props.extend(key.iter().cloned());
1534    }
1535    props.sort();
1536    props.dedup();
1537    props
1538}
1539
1540/// Same as [`unique_property_names_for_node`] but for an edge type.
1541pub(crate) fn unique_property_names_for_edge(
1542    edge_type: &omnigraph_compiler::catalog::EdgeType,
1543) -> Vec<String> {
1544    let mut props: Vec<String> = edge_type
1545        .unique_constraints
1546        .iter()
1547        .flatten()
1548        .cloned()
1549        .collect();
1550    props.sort();
1551    props.dedup();
1552    props
1553}
1554
1555fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1556    use arrow_array::{
1557        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1558    };
1559    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1560        return Some(a.value(row) as f64);
1561    }
1562    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1563        return Some(a.value(row) as f64);
1564    }
1565    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1566        return Some(a.value(row) as f64);
1567    }
1568    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1569        return Some(a.value(row) as f64);
1570    }
1571    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1572        return Some(a.value(row) as f64);
1573    }
1574    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1575        return Some(a.value(row));
1576    }
1577    None
1578}
1579
1580fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1581    use omnigraph_compiler::catalog::LiteralValue;
1582    match v {
1583        LiteralValue::Integer(n) => *n as f64,
1584        LiteralValue::Float(f) => *f,
1585    }
1586}
1587
1588// ─── Edge cardinality validation ─────────────────────────────────────────────
1589
1590pub(crate) async fn validate_edge_cardinality(
1591    db: &crate::db::Omnigraph,
1592    branch: Option<&str>,
1593    edge_name: &str,
1594    written_version: u64,
1595    written_branch: Option<&str>,
1596) -> Result<()> {
1597    use arrow_array::Array;
1598    let catalog = db.catalog();
1599    let edge_type = &catalog.edge_types[edge_name];
1600    if edge_type.cardinality.is_default() {
1601        return Ok(());
1602    }
1603
1604    // Open edge sub-table at the just-written version, not the snapshot's
1605    // (the snapshot still pins to the pre-write version).
1606    let snapshot = db.snapshot_for_branch(branch).await?;
1607    let table_key = format!("edge:{}", edge_name);
1608    let entry = snapshot
1609        .entry(&table_key)
1610        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1611    let ds = db
1612        .open_dataset_at_state(
1613            &entry.table_path,
1614            written_branch.or(entry.table_branch.as_deref()),
1615            written_version,
1616        )
1617        .await?;
1618
1619    // Scan src column, count per source
1620    let batches = db
1621        .table_store()
1622        .scan(&ds, Some(&["src"]), None, None)
1623        .await?;
1624
1625    let mut counts: HashMap<String, u32> = HashMap::new();
1626    for batch in &batches {
1627        let srcs = batch
1628            .column_by_name("src")
1629            .unwrap()
1630            .as_any()
1631            .downcast_ref::<StringArray>()
1632            .unwrap();
1633        for i in 0..srcs.len() {
1634            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1635        }
1636    }
1637
1638    let card = &edge_type.cardinality;
1639    for (src, count) in &counts {
1640        if let Some(max) = card.max {
1641            if *count > max {
1642                return Err(OmniError::manifest(format!(
1643                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1644                    edge_name, src, count, max
1645                )));
1646            }
1647        }
1648        if *count < card.min {
1649            return Err(OmniError::manifest(format!(
1650                "@card violation on edge {}: source '{}' has {} edges (min {})",
1651                edge_name, src, count, card.min
1652            )));
1653        }
1654    }
1655
1656    Ok(())
1657}
1658
1659/// Validate edge `@card` cardinality with in-memory pending edges visible.
1660///
1661/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
1662/// opens the committed dataset at the pre-load snapshot version, then
1663/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
1664/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
1665/// path uses `validate_edge_cardinality` which opens the just-written
1666/// Lance version).
1667///
1668/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
1669/// so committed edges that the load is *updating* (same edge id,
1670/// possibly changed `src`) are not double-counted. `LoadMode::Append`
1671/// passes `None` because each line generates a fresh ULID id that
1672/// never collides with committed.
1673async fn validate_edge_cardinality_with_pending_loader(
1674    db: &Omnigraph,
1675    branch: Option<&str>,
1676    edge_type: &omnigraph_compiler::catalog::EdgeType,
1677    table_key: &str,
1678    staging: &MutationStaging,
1679    mode: LoadMode,
1680) -> Result<()> {
1681    if edge_type.cardinality.is_default() {
1682        return Ok(());
1683    }
1684    let snapshot = db.snapshot_for_branch(branch).await?;
1685    let Some(entry) = snapshot.entry(table_key) else {
1686        // No manifest entry — table doesn't exist yet. Pending-only is
1687        // fine; the helper handles empty committed scans.
1688        return Ok(());
1689    };
1690    let ds = db
1691        .open_dataset_at_state(
1692            &entry.table_path,
1693            entry.table_branch.as_deref(),
1694            entry.table_version,
1695        )
1696        .await?;
1697    let dedupe_key = match mode {
1698        LoadMode::Merge => Some("id"),
1699        LoadMode::Append | LoadMode::Overwrite => None,
1700    };
1701    let counts =
1702        crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
1703            .await?;
1704    crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1705}
1706
1707/// Collect all valid node IDs for a given type, with in-memory pending
1708/// node inserts visible. Used by the staged loader's Phase 2c
1709/// referential-integrity validation.
1710///
1711/// Union of:
1712/// - IDs from the staged loader's pending batches (in-memory; just-staged
1713///   inserts of this type)
1714/// - IDs from the committed sub-table at the pre-load snapshot version
1715async fn collect_node_ids_with_pending(
1716    db: &Omnigraph,
1717    branch: Option<&str>,
1718    type_name: &str,
1719    staging: &MutationStaging,
1720) -> Result<HashSet<String>> {
1721    let mut ids = HashSet::new();
1722    let table_key = format!("node:{}", type_name);
1723
1724    // From staging.pending: walk the in-memory accumulator's id column.
1725    for batch in staging.pending_batches(&table_key) {
1726        if let Some(col) = batch.column_by_name("id") {
1727            if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1728                for i in 0..arr.len() {
1729                    if arr.is_valid(i) {
1730                        ids.insert(arr.value(i).to_string());
1731                    }
1732                }
1733            }
1734        }
1735    }
1736
1737    // From the committed Lance sub-table at the pre-load snapshot version.
1738    let snapshot = db.snapshot_for_branch(branch).await?;
1739    let Some(entry) = snapshot.entry(&table_key) else {
1740        return Ok(ids);
1741    };
1742    let ds = db
1743        .open_dataset_at_state(
1744            &entry.table_path,
1745            entry.table_branch.as_deref(),
1746            entry.table_version,
1747        )
1748        .await?;
1749
1750    let batches = db
1751        .table_store()
1752        .scan(&ds, Some(&["id"]), None, None)
1753        .await?;
1754
1755    for batch in &batches {
1756        let id_col = batch
1757            .column_by_name("id")
1758            .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1759            .as_any()
1760            .downcast_ref::<StringArray>()
1761            .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1762        for i in 0..batch.num_rows() {
1763            // Defensive: `id` is the @key column on every node type and
1764            // is non-nullable by schema, but a committed-row corruption
1765            // (or future schema change) could surface a NULL. Skip
1766            // rather than insert "" — pending-side does the same.
1767            if id_col.is_valid(i) {
1768                ids.insert(id_col.value(i).to_string());
1769            }
1770        }
1771    }
1772
1773    Ok(ids)
1774}
1775
1776/// Collect all valid node IDs for a given type. Union of:
1777/// - IDs from the just-loaded batch (in memory, from node_rows)
1778/// - IDs from the sub-table at the just-written version (if it was updated)
1779/// - IDs from the sub-table at the snapshot-pinned version (if it was not updated)
1780async fn collect_node_ids(
1781    db: &Omnigraph,
1782    branch: Option<&str>,
1783    type_name: &str,
1784    node_rows: &HashMap<String, Vec<JsonValue>>,
1785    catalog: &omnigraph_compiler::catalog::Catalog,
1786    updates: &[crate::db::SubTableUpdate],
1787) -> Result<HashSet<String>> {
1788    let mut ids = HashSet::new();
1789
1790    // IDs from the in-memory batch (just loaded in this operation)
1791    if let Some(rows) = node_rows.get(type_name) {
1792        if let Some(node_type) = catalog.node_types.get(type_name) {
1793            if let Some(key_prop) = node_type.key_property() {
1794                for row in rows {
1795                    if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1796                        ids.insert(id.to_string());
1797                    }
1798                }
1799            }
1800        }
1801    }
1802
1803    // IDs from the Lance sub-table
1804    let table_key = format!("node:{}", type_name);
1805    let snapshot = db.snapshot_for_branch(branch).await?;
1806    let Some(entry) = snapshot.entry(&table_key) else {
1807        return Ok(ids);
1808    };
1809    // Use the just-written version if this type was updated, else snapshot version
1810    let updated = updates
1811        .iter()
1812        .find(|u| u.table_key == table_key)
1813        .map(|u| (u.table_version, u.table_branch.as_deref()));
1814    let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1815    let ds = db
1816        .open_dataset_at_state(&entry.table_path, branch, version)
1817        .await?;
1818
1819    let batches = db
1820        .table_store()
1821        .scan(&ds, Some(&["id"]), None, None)
1822        .await?;
1823
1824    for batch in &batches {
1825        let id_col = batch
1826            .column_by_name("id")
1827            .unwrap()
1828            .as_any()
1829            .downcast_ref::<StringArray>()
1830            .unwrap();
1831        for i in 0..batch.num_rows() {
1832            if !id_col.is_valid(i) {
1833                continue;
1834            }
1835            ids.insert(id_col.value(i).to_string());
1836        }
1837    }
1838
1839    Ok(ids)
1840}
1841
1842#[cfg(test)]
1843mod tests {
1844    use super::*;
1845    use crate::db::Omnigraph;
1846    use arrow_array::Array;
1847    use futures::TryStreamExt;
1848    use std::collections::HashMap;
1849
1850    const TEST_SCHEMA: &str = r#"
1851node Person {
1852    name: String @key
1853    age: I32?
1854}
1855node Company {
1856    name: String @key
1857}
1858edge Knows: Person -> Person {
1859    since: Date?
1860}
1861edge WorksAt: Person -> Company
1862"#;
1863
1864    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1865{"type": "Person", "data": {"name": "Bob", "age": 25}}
1866{"type": "Company", "data": {"name": "Acme"}}
1867{"edge": "Knows", "from": "Alice", "to": "Bob"}
1868{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1869"#;
1870
1871    #[tokio::test]
1872    async fn test_load_creates_data() {
1873        let dir = tempfile::tempdir().unwrap();
1874        let uri = dir.path().to_str().unwrap();
1875        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1876
1877        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1878            .await
1879            .unwrap();
1880
1881        assert_eq!(result.nodes_loaded["Person"], 2);
1882        assert_eq!(result.nodes_loaded["Company"], 1);
1883        assert_eq!(result.edges_loaded["Knows"], 1);
1884        assert_eq!(result.edges_loaded["WorksAt"], 1);
1885    }
1886
1887    #[tokio::test]
1888    async fn test_load_data_readable_via_lance() {
1889        let dir = tempfile::tempdir().unwrap();
1890        let uri = dir.path().to_str().unwrap();
1891        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1892        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1893            .await
1894            .unwrap();
1895
1896        // Read back via snapshot
1897        let snap = db.snapshot().await;
1898        let person_ds = snap.open("node:Person").await.unwrap();
1899
1900        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1901
1902        // Verify data
1903        let batches: Vec<RecordBatch> = person_ds
1904            .scan()
1905            .try_into_stream()
1906            .await
1907            .unwrap()
1908            .try_collect()
1909            .await
1910            .unwrap();
1911
1912        let batch = &batches[0];
1913        let ids = batch
1914            .column_by_name("id")
1915            .unwrap()
1916            .as_any()
1917            .downcast_ref::<StringArray>()
1918            .unwrap();
1919        // @key=name, so ids should be "Alice" and "Bob"
1920        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1921        assert!(id_values.contains(&"Alice"));
1922        assert!(id_values.contains(&"Bob"));
1923    }
1924
1925    #[tokio::test]
1926    async fn test_load_edges_reference_node_keys() {
1927        let dir = tempfile::tempdir().unwrap();
1928        let uri = dir.path().to_str().unwrap();
1929        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1930        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1931            .await
1932            .unwrap();
1933
1934        let snap = db.snapshot().await;
1935        let knows_ds = snap.open("edge:Knows").await.unwrap();
1936
1937        let batches: Vec<RecordBatch> = knows_ds
1938            .scan()
1939            .try_into_stream()
1940            .await
1941            .unwrap()
1942            .try_collect()
1943            .await
1944            .unwrap();
1945
1946        let batch = &batches[0];
1947        let srcs = batch
1948            .column_by_name("src")
1949            .unwrap()
1950            .as_any()
1951            .downcast_ref::<StringArray>()
1952            .unwrap();
1953        let dsts = batch
1954            .column_by_name("dst")
1955            .unwrap()
1956            .as_any()
1957            .downcast_ref::<StringArray>()
1958            .unwrap();
1959
1960        assert_eq!(srcs.value(0), "Alice");
1961        assert_eq!(dsts.value(0), "Bob");
1962    }
1963
1964    #[tokio::test]
1965    async fn test_load_manifest_version_advances() {
1966        let dir = tempfile::tempdir().unwrap();
1967        let uri = dir.path().to_str().unwrap();
1968        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1969        let v1 = db.version().await;
1970
1971        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1972            .await
1973            .unwrap();
1974
1975        assert!(db.version().await > v1);
1976    }
1977
1978    #[tokio::test]
1979    async fn test_load_append_adds_rows() {
1980        let dir = tempfile::tempdir().unwrap();
1981        let uri = dir.path().to_str().unwrap();
1982        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1983
1984        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1985        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1986
1987        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1988            .await
1989            .unwrap();
1990        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1991
1992        let snap = db.snapshot().await;
1993        let person_ds = snap.open("node:Person").await.unwrap();
1994        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1995    }
1996
1997    #[tokio::test]
1998    async fn test_load_unknown_type_rejected() {
1999        let dir = tempfile::tempdir().unwrap();
2000        let uri = dir.path().to_str().unwrap();
2001        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2002
2003        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
2004        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
2005        assert!(result.is_err());
2006    }
2007
2008    #[tokio::test]
2009    async fn test_ingest_creates_branch_and_reports_tables() {
2010        let dir = tempfile::tempdir().unwrap();
2011        let uri = dir.path().to_str().unwrap();
2012        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2013
2014        let result = db
2015            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
2016            .await
2017            .unwrap();
2018
2019        assert_eq!(result.branch, "feature");
2020        assert_eq!(result.base_branch, "main");
2021        assert!(result.branch_created);
2022        assert_eq!(result.mode, LoadMode::Overwrite);
2023        assert_eq!(
2024            result.tables,
2025            vec![
2026                IngestTableResult {
2027                    table_key: "edge:Knows".to_string(),
2028                    rows_loaded: 1
2029                },
2030                IngestTableResult {
2031                    table_key: "edge:WorksAt".to_string(),
2032                    rows_loaded: 1
2033                },
2034                IngestTableResult {
2035                    table_key: "node:Company".to_string(),
2036                    rows_loaded: 1
2037                },
2038                IngestTableResult {
2039                    table_key: "node:Person".to_string(),
2040                    rows_loaded: 2
2041                },
2042            ]
2043        );
2044        assert!(
2045            db.branch_list()
2046                .await
2047                .unwrap()
2048                .contains(&"feature".to_string())
2049        );
2050    }
2051
2052    #[tokio::test]
2053    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
2054        let dir = tempfile::tempdir().unwrap();
2055        let uri = dir.path().to_str().unwrap();
2056        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2057        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
2058            .await
2059            .unwrap();
2060        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
2061            .await
2062            .unwrap();
2063
2064        let result = db
2065            .ingest(
2066                "feature",
2067                Some("missing-base"),
2068                r#"{"type":"Person","data":{"name":"Bob","age":26}}
2069{"type":"Person","data":{"name":"Eve","age":31}}"#,
2070                LoadMode::Merge,
2071            )
2072            .await
2073            .unwrap();
2074
2075        assert_eq!(result.branch, "feature");
2076        assert_eq!(result.base_branch, "missing-base");
2077        assert!(!result.branch_created);
2078        assert_eq!(result.mode, LoadMode::Merge);
2079        assert_eq!(
2080            result.tables,
2081            vec![IngestTableResult {
2082                table_key: "node:Person".to_string(),
2083                rows_loaded: 2
2084            }]
2085        );
2086
2087        let snap = db
2088            .snapshot_of(crate::db::ReadTarget::branch("feature"))
2089            .await
2090            .unwrap();
2091        let person_ds = snap.open("node:Person").await.unwrap();
2092        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2093
2094        let batches: Vec<RecordBatch> = person_ds
2095            .scan()
2096            .try_into_stream()
2097            .await
2098            .unwrap()
2099            .try_collect()
2100            .await
2101            .unwrap();
2102        let mut ages_by_id = HashMap::new();
2103        for batch in &batches {
2104            let ids = batch
2105                .column_by_name("id")
2106                .unwrap()
2107                .as_any()
2108                .downcast_ref::<StringArray>()
2109                .unwrap();
2110            let ages = batch
2111                .column_by_name("age")
2112                .unwrap()
2113                .as_any()
2114                .downcast_ref::<Int32Array>()
2115                .unwrap();
2116            for idx in 0..ids.len() {
2117                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2118            }
2119        }
2120
2121        assert_eq!(ages_by_id.get("Bob"), Some(&26));
2122        assert_eq!(ages_by_id.get("Eve"), Some(&31));
2123        assert_eq!(ages_by_id.get("Alice"), Some(&30));
2124    }
2125
2126    #[tokio::test]
2127    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2128        let dir = tempfile::tempdir().unwrap();
2129        let uri = dir.path().to_str().unwrap();
2130        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2131
2132        db.ingest_as(
2133            "feature",
2134            Some("main"),
2135            TEST_DATA,
2136            LoadMode::Overwrite,
2137            Some("act-andrew"),
2138        )
2139        .await
2140        .unwrap();
2141
2142        let head = db
2143            .list_commits(Some("feature"))
2144            .await
2145            .unwrap()
2146            .into_iter()
2147            .last()
2148            .unwrap();
2149        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2150    }
2151
2152    #[test]
2153    fn test_range_constraint_rejects_nan() {
2154        use arrow_array::{Float64Array, RecordBatch, StringArray};
2155        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2156        use std::sync::Arc;
2157
2158        let schema = Arc::new(arrow_schema::Schema::new(vec![
2159            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2160            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2161        ]));
2162
2163        let batch = RecordBatch::try_new(
2164            schema.clone(),
2165            vec![
2166                Arc::new(StringArray::from(vec!["bad"])),
2167                Arc::new(Float64Array::from(vec![f64::NAN])),
2168            ],
2169        )
2170        .unwrap();
2171
2172        let node_type = NodeType {
2173            name: "Test".to_string(),
2174            implements: vec![],
2175            properties: Default::default(),
2176            key: None,
2177            unique_constraints: vec![],
2178            indices: vec![],
2179            range_constraints: vec![RangeConstraint {
2180                property: "score".to_string(),
2181                min: Some(LiteralValue::Float(0.0)),
2182                max: Some(LiteralValue::Float(1.0)),
2183            }],
2184            check_constraints: vec![],
2185            embed_sources: Default::default(),
2186            blob_properties: Default::default(),
2187            arrow_schema: schema,
2188        };
2189
2190        let result = validate_value_constraints(&batch, &node_type);
2191        assert!(result.is_err(), "expected NaN to be rejected");
2192        let err = result.unwrap_err().to_string();
2193        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2194    }
2195}