Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26/// Result of a load operation.
27#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29    pub nodes_loaded: HashMap<String, usize>,
30    pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35    pub table_key: String,
36    pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41    pub branch: String,
42    pub base_branch: String,
43    pub branch_created: bool,
44    pub mode: LoadMode,
45    pub tables: Vec<IngestTableResult>,
46}
47
48/// Load mode for data ingestion.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52    /// Overwrite existing data.
53    Overwrite,
54    /// Append to existing data.
55    Append,
56    /// Merge by `id` key (upsert).
57    Merge,
58}
59
60/// Load JSONL data into an Omnigraph database.
61pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62    let current_branch = db.active_branch().await;
63    let branch = current_branch.as_deref().unwrap_or("main");
64    db.load(branch, data, mode).await
65}
66
67/// Load JSONL data from a file path.
68pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69    let current_branch = db.active_branch().await;
70    let branch = current_branch.as_deref().unwrap_or("main");
71    db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75    pub async fn ingest(
76        &self,
77        branch: &str,
78        from: Option<&str>,
79        data: &str,
80        mode: LoadMode,
81    ) -> Result<IngestResult> {
82        self.ingest_as(branch, from, data, mode, None).await
83    }
84
85    pub async fn ingest_as(
86        &self,
87        branch: &str,
88        from: Option<&str>,
89        data: &str,
90        mode: LoadMode,
91        actor_id: Option<&str>,
92    ) -> Result<IngestResult> {
93        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
94        // `Branch(branch)` for the data-write portion. If ingest creates
95        // a new branch as a side-effect (target branch doesn't exist),
96        // the inner `branch_create_from_as` call below additionally
97        // checks `BranchCreate` — both authorities are genuinely needed
98        // for "ingest into a fresh branch", so the layered check is
99        // correct, not redundant.
100        self.enforce(
101            omnigraph_policy::PolicyAction::Change,
102            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
103            actor_id,
104        )?;
105        self.ingest_with_current_actor(branch, from, data, mode, actor_id)
106            .await
107    }
108
109    pub async fn ingest_file(
110        &self,
111        branch: &str,
112        from: Option<&str>,
113        path: &str,
114        mode: LoadMode,
115    ) -> Result<IngestResult> {
116        self.ingest_file_as(branch, from, path, mode, None).await
117    }
118
119    pub async fn ingest_file_as(
120        &self,
121        branch: &str,
122        from: Option<&str>,
123        path: &str,
124        mode: LoadMode,
125        actor_id: Option<&str>,
126    ) -> Result<IngestResult> {
127        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
128        self.ingest_as(branch, from, &data, mode, actor_id).await
129    }
130
131    async fn ingest_with_current_actor(
132        &self,
133        branch: &str,
134        from: Option<&str>,
135        data: &str,
136        mode: LoadMode,
137        actor_id: Option<&str>,
138    ) -> Result<IngestResult> {
139        self.ensure_schema_state_valid().await?;
140        let target_branch =
141            Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
142        let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
143            .unwrap_or_else(|| "main".to_string());
144        let branch_created = !self
145            .branch_list()
146            .await?
147            .iter()
148            .any(|name| name == &target_branch);
149        if branch_created {
150            // Thread the actor through to the implicit BranchCreate so
151            // policy decisions match what an explicit `branch_create_from_as`
152            // call would see. Calling the no-actor variant here would
153            // bypass BranchCreate enforcement when policy is installed —
154            // the footgun guard catches that case too, but threading is
155            // the correct fix.
156            self.branch_create_from_as(
157                crate::db::ReadTarget::branch(&base_branch),
158                &target_branch,
159                actor_id,
160            )
161            .await?;
162        }
163
164        let result = self.load_as(&target_branch, data, mode, actor_id).await?;
165        Ok(IngestResult {
166            branch: target_branch,
167            base_branch,
168            branch_created,
169            mode,
170            tables: result.to_ingest_tables(),
171        })
172    }
173
174    pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
175        self.load_as(branch, data, mode, None).await
176    }
177
178    pub async fn load_as(
179        &self,
180        branch: &str,
181        data: &str,
182        mode: LoadMode,
183        actor_id: Option<&str>,
184    ) -> Result<LoadResult> {
185        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
186        // `Branch(branch)` to match the HTTP-layer Change convention.
187        // `ingest_as` also calls `load_as` after enforcing its own
188        // Change gate — that double-check is fine because both gates
189        // resolve to identical Cedar decisions for the same actor +
190        // branch (the second check is a structurally-correct no-op).
191        self.enforce(
192            omnigraph_policy::PolicyAction::Change,
193            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
194            actor_id,
195        )?;
196        self.ensure_schema_state_valid().await?;
197        // Reject internal `__run__*` / system-prefixed branches at the
198        // public write boundary. Direct-publish paths assert this
199        // explicitly so a caller can't write to legacy or system
200        // staging branches by passing the prefix verbatim.
201        crate::db::ensure_public_branch_ref(branch, "load")?;
202        // Branch convention: `None` represents `main`. Re-normalizing to
203        // `Some("main")` here would route the publisher commit through a
204        // separate coordinator (the cross-branch path in
205        // `commit_prepared_updates_on_branch_with_expected`) and leave
206        // `self.coordinator` with a stale manifest snapshot.
207        let requested = Self::normalize_branch_name(branch)?;
208        // Direct-to-target writes: no Run state machine, no `__run__` staging
209        // branch. Cross-table OCC is enforced by the publisher's
210        // `expected_table_versions` CAS inside `load_jsonl_reader`.
211        self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
212            .await
213    }
214
215    pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
216        self.load_file_as(branch, path, mode, None).await
217    }
218
219    /// Read a file into memory and delegate to `load_as`. Used by the
220    /// CLI's `omnigraph load` so file-path-based writes flow through
221    /// the same engine-layer policy gate as in-memory `load_as` calls.
222    pub async fn load_file_as(
223        &self,
224        branch: &str,
225        path: &str,
226        mode: LoadMode,
227        actor_id: Option<&str>,
228    ) -> Result<LoadResult> {
229        let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
230        self.load_as(branch, &data, mode, actor_id).await
231    }
232
233    async fn load_direct_on_branch(
234        &self,
235        branch: Option<&str>,
236        data: &str,
237        mode: LoadMode,
238        actor_id: Option<&str>,
239    ) -> Result<LoadResult> {
240        let reader = BufReader::new(Cursor::new(data.as_bytes()));
241        load_jsonl_reader(self, branch, reader, mode, actor_id).await
242    }
243}
244
245impl LoadMode {
246    pub fn as_str(self) -> &'static str {
247        match self {
248            LoadMode::Overwrite => "overwrite",
249            LoadMode::Append => "append",
250            LoadMode::Merge => "merge",
251        }
252    }
253}
254
255impl LoadResult {
256    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
257        let mut tables = self
258            .nodes_loaded
259            .iter()
260            .map(|(type_name, rows_loaded)| IngestTableResult {
261                table_key: format!("node:{type_name}"),
262                rows_loaded: *rows_loaded,
263            })
264            .chain(
265                self.edges_loaded
266                    .iter()
267                    .map(|(edge_name, rows_loaded)| IngestTableResult {
268                        table_key: format!("edge:{edge_name}"),
269                        rows_loaded: *rows_loaded,
270                    }),
271            )
272            .collect::<Vec<_>>();
273        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
274        tables
275    }
276}
277
278async fn load_jsonl_reader<R: BufRead>(
279    db: &Omnigraph,
280    branch: Option<&str>,
281    reader: R,
282    mode: LoadMode,
283    actor_id: Option<&str>,
284) -> Result<LoadResult> {
285    let catalog = db.catalog().clone();
286
287    // Phase 1: Parse all lines, spool into per-type collections
288    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
289    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
290
291    // Parse a stream of JSON values. Accepts both compact JSONL (one object
292    // per line) and pretty-printed JSON where a single object spans multiple
293    // lines — serde's streaming deserializer treats any whitespace (including
294    // newlines) between top-level values as a separator.
295    for (idx, parsed) in serde_json::Deserializer::from_reader(reader)
296        .into_iter::<JsonValue>()
297        .enumerate()
298    {
299        let record_num = idx + 1;
300        let value: JsonValue = parsed.map_err(|e| {
301            OmniError::manifest(format!("invalid JSON at record {}: {}", record_num, e))
302        })?;
303
304        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
305            if !catalog.node_types.contains_key(type_name) {
306                return Err(OmniError::manifest(format!(
307                    "record {}: unknown node type '{}'",
308                    record_num,
309                    type_name
310                )));
311            }
312            let data = value
313                .get("data")
314                .cloned()
315                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
316            node_rows
317                .entry(type_name.to_string())
318                .or_default()
319                .push(data);
320        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
321            if catalog.lookup_edge_by_name(edge_name).is_none() {
322                return Err(OmniError::manifest(format!(
323                    "record {}: unknown edge type '{}'",
324                    record_num,
325                    edge_name
326                )));
327            }
328            let from = value
329                .get("from")
330                .and_then(|v| v.as_str())
331                .ok_or_else(|| {
332                    OmniError::manifest(format!("record {}: edge missing 'from'", record_num))
333                })?
334                .to_string();
335            let to = value
336                .get("to")
337                .and_then(|v| v.as_str())
338                .ok_or_else(|| {
339                    OmniError::manifest(format!("record {}: edge missing 'to'", record_num))
340                })?
341                .to_string();
342            let data = value
343                .get("data")
344                .cloned()
345                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
346            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
347            edge_rows
348                .entry(canonical)
349                .or_default()
350                .push((from, to, data));
351        } else {
352            return Err(OmniError::manifest(format!(
353                "record {}: expected 'type' or 'edge' field",
354                record_num
355            )));
356        }
357    }
358
359    // Phase 2: Build per-type RecordBatches and accumulate into the
360    // staging pipeline. For Append/Merge, batches go into an in-memory
361    // accumulator and a single `stage_*` + `commit_staged` per touched
362    // table runs at end-of-load — a mid-load failure (RI / cardinality
363    // violation) leaves Lance HEAD untouched. For Overwrite, the legacy
364    // inline-commit path is preserved (truncate+append doesn't fit the
365    // staged shape cleanly, and overwrite has no in-flight read-your-writes
366    // requirement).
367
368    let mut result = LoadResult::default();
369    let snapshot = db.snapshot_for_branch(branch).await?;
370    let use_staging = !matches!(mode, LoadMode::Overwrite);
371    let mut staging = MutationStaging::default();
372    let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
373    let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
374    let pending_mode = match mode {
375        LoadMode::Merge => PendingMode::Merge,
376        // Append-mode loads accumulate as Append. Edge tables (no @key)
377        // and no-key node tables stay safe on the stage_append path. The
378        // Merge mode applies dedupe-by-id; Append assumes unique inputs.
379        LoadMode::Append => PendingMode::Append,
380        LoadMode::Overwrite => PendingMode::Append, // unused
381    };
382    // Map LoadMode to MutationOpKind for the version-check policy.
383    // Append/Merge skip the strict pre-stage check (concurrency-safe
384    // under the per-(table, branch) queue + publisher CAS); Overwrite
385    // uses the strict check because it truncates and replaces the
386    // dataset — concurrent advances change what "replace" means.
387    let load_op_kind = match mode {
388        LoadMode::Append => crate::db::MutationOpKind::Insert,
389        LoadMode::Merge => crate::db::MutationOpKind::Merge,
390        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
391    };
392
393    // Phase 2a: build and validate every node batch up front. Cheap and
394    // synchronous — surfaces validation errors before any S3 traffic.
395    let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
396        Vec::with_capacity(node_rows.len());
397    for (type_name, rows) in &node_rows {
398        let node_type = &catalog.node_types[type_name];
399        let batch = build_node_batch(node_type, rows)?;
400        validate_value_constraints(&batch, node_type)?;
401        validate_enum_constraints(&batch, &node_type.properties, type_name)?;
402        let unique_props = unique_property_names_for_node(node_type);
403        if !unique_props.is_empty() {
404            enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
405        }
406        let loaded_count = batch.num_rows();
407        let table_key = format!("node:{}", type_name);
408        let entry = snapshot
409            .entry(&table_key)
410            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
411        if !use_staging {
412            overwrite_expected.insert(table_key.clone(), entry.table_version);
413        }
414        prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
415    }
416
417    // Phase 2b: write every node type. Append/Merge → in-memory
418    // accumulator. Overwrite → concurrent inline-commit (legacy path).
419    if use_staging {
420        for (type_name, table_key, batch, loaded_count) in prepared_nodes {
421            let (ds, full_path, table_branch) = db
422                .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
423                .await?;
424            let expected_version = ds.version().version;
425            staging.ensure_path(
426                &table_key,
427                full_path,
428                table_branch,
429                expected_version,
430                load_op_kind,
431            );
432            let schema = batch.schema();
433            staging.append_batch(&table_key, schema, pending_mode, batch)?;
434            result.nodes_loaded.insert(type_name, loaded_count);
435        }
436    } else {
437        let node_write_results =
438            write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
439        for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
440            overwrite_updates.push(crate::db::SubTableUpdate {
441                table_key,
442                table_version: state.version,
443                table_branch,
444                row_count: state.row_count,
445                version_metadata: state.version_metadata,
446            });
447            result.nodes_loaded.insert(type_name, loaded_count);
448        }
449    }
450
451    // Phase 2c: Validate edge referential integrity — every src/dst must
452    // reference an existing node ID in the appropriate type. For staged
453    // loads, the lookup unions snapshot-committed IDs with the in-memory
454    // pending batches (which carry the just-staged node inserts).
455    for (edge_name, rows) in &edge_rows {
456        let edge_type = &catalog.edge_types[edge_name];
457        let from_ids = if use_staging {
458            collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?
459        } else {
460            collect_node_ids(
461                db,
462                branch,
463                &edge_type.from_type,
464                &node_rows,
465                &catalog,
466                &overwrite_updates,
467            )
468            .await?
469        };
470        let to_ids = if use_staging {
471            collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?
472        } else {
473            collect_node_ids(
474                db,
475                branch,
476                &edge_type.to_type,
477                &node_rows,
478                &catalog,
479                &overwrite_updates,
480            )
481            .await?
482        };
483
484        for (i, (src, dst, _)) in rows.iter().enumerate() {
485            if !from_ids.contains(src.as_str()) {
486                return Err(OmniError::manifest(format!(
487                    "edge {} row {}: src '{}' not found in {}",
488                    edge_name,
489                    i + 1,
490                    src,
491                    edge_type.from_type
492                )));
493            }
494            if !to_ids.contains(dst.as_str()) {
495                return Err(OmniError::manifest(format!(
496                    "edge {} row {}: dst '{}' not found in {}",
497                    edge_name,
498                    i + 1,
499                    dst,
500                    edge_type.to_type
501                )));
502            }
503        }
504    }
505
506    // Phase 2d: build edge batches.
507    let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
508        Vec::with_capacity(edge_rows.len());
509    for (edge_name, rows) in &edge_rows {
510        let edge_type = &catalog.edge_types[edge_name];
511        let batch = build_edge_batch(edge_type, rows)?;
512        validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
513        let unique_props = unique_property_names_for_edge(edge_type);
514        if !unique_props.is_empty() {
515            enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
516        }
517        let loaded_count = batch.num_rows();
518        let table_key = format!("edge:{}", edge_name);
519        let entry = snapshot
520            .entry(&table_key)
521            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
522        if !use_staging {
523            overwrite_expected.insert(table_key.clone(), entry.table_version);
524        }
525        prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
526    }
527
528    // Phase 2e: write every edge type. Same dispatch as Phase 2b.
529    if use_staging {
530        for (edge_name, table_key, batch, loaded_count) in prepared_edges {
531            let (ds, full_path, table_branch) = db
532                .open_for_mutation_on_branch(branch, &table_key, load_op_kind)
533                .await?;
534            let expected_version = ds.version().version;
535            staging.ensure_path(
536                &table_key,
537                full_path,
538                table_branch,
539                expected_version,
540                load_op_kind,
541            );
542            let schema = batch.schema();
543            staging.append_batch(&table_key, schema, pending_mode, batch)?;
544            result.edges_loaded.insert(edge_name, loaded_count);
545        }
546    } else {
547        let edge_write_results =
548            write_batches_concurrently(db, branch, mode, prepared_edges).await?;
549        for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
550            overwrite_updates.push(crate::db::SubTableUpdate {
551                table_key,
552                table_version: state.version,
553                table_branch,
554                row_count: state.row_count,
555                version_metadata: state.version_metadata,
556            });
557            result.edges_loaded.insert(edge_name, loaded_count);
558        }
559    }
560
561    // Phase 3: Validate edge cardinality constraints (before commit —
562    // invalid data must not be committed). Staged path scans committed
563    // edges via Lance + iterates pending edges in-memory. Overwrite path
564    // opens the just-written version (legacy behavior).
565    for (edge_name, _) in &edge_rows {
566        let edge_type = &catalog.edge_types[edge_name];
567        let table_key = format!("edge:{}", edge_name);
568        if use_staging {
569            validate_edge_cardinality_with_pending_loader(
570                db, branch, edge_type, &table_key, &staging, mode,
571            )
572            .await?;
573        } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
574            validate_edge_cardinality(
575                db,
576                branch,
577                edge_name,
578                update.table_version,
579                update.table_branch.as_deref(),
580            )
581            .await?;
582        }
583    }
584
585    // Phase 4: Atomic manifest commit with publisher-level OCC.
586    if use_staging {
587        let staged = staging.stage_all(db, branch).await?;
588        // `_queue_guards` holds per-(table_key, branch) write queues
589        // across the manifest publish below — see exec/mutation.rs for
590        // the rationale (interleaving prevention).
591        let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
592            .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
593            .await?;
594        // Same finalize → publisher residual as mutations: per-table
595        // staged commits have advanced Lance HEAD, but the manifest
596        // publish has not run yet. Reuse the mutation failpoint name so
597        // one failpoint pins the shared `MutationStaging` boundary.
598        crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
599        db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
600            .await?;
601        // The recovery sidecar protects the per-table commit_staged →
602        // manifest publish window. Phase C succeeded — clean up
603        // best-effort: failing the user here would error out a write
604        // that already landed durably.
605        if let Some(handle) = sidecar_handle {
606            if let Err(err) =
607                crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
608            {
609                tracing::warn!(
610                    error = %err,
611                    operation_id = handle.operation_id.as_str(),
612                    "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
613                );
614            }
615        }
616    } else {
617        // LoadMode::Overwrite keeps the legacy inline-commit path —
618        // truncate-then-append doesn't fit the staged shape (see
619        // `docs/dev/writes.md` "LoadMode::Overwrite residual"). The recovery
620        // sidecar is not applicable here because the writer doesn't go
621        // through MutationStaging; per-table inline commits + a final
622        // manifest publish handle their own residual via the documented
623        // operator workflow (re-run overwrite to recover).
624        db.commit_updates_on_branch_with_expected(
625            branch,
626            &overwrite_updates,
627            &overwrite_expected,
628            actor_id,
629        )
630        .await?;
631    }
632
633    Ok(result)
634}
635
636fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
637    let schema = node_type.arrow_schema.clone();
638
639    // Build id column: explicit id, @key value, or generated ULID.
640    let ids: Vec<String> = rows
641        .iter()
642        .map(|row| {
643            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
644            if let Some(key_prop) = node_type.key_property() {
645                let key_value = row
646                    .get(key_prop)
647                    .and_then(|v| v.as_str())
648                    .map(|s| s.to_string())
649                    .ok_or_else(|| {
650                        OmniError::manifest(format!(
651                            "node {} missing @key property '{}'",
652                            node_type.name, key_prop
653                        ))
654                    })?;
655                if let Some(explicit_id) = explicit_id {
656                    if explicit_id != key_value {
657                        return Err(OmniError::manifest(format!(
658                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
659                            node_type.name, explicit_id, key_prop, key_value
660                        )));
661                    }
662                }
663                Ok(key_value)
664            } else if let Some(explicit_id) = explicit_id {
665                Ok(explicit_id)
666            } else {
667                Ok(generate_id())
668            }
669        })
670        .collect::<Result<Vec<_>>>()?;
671
672    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
673    columns.push(Arc::new(StringArray::from(ids)));
674
675    // Build property columns (skip "id" field at index 0)
676    for field in schema.fields().iter().skip(1) {
677        if node_type.blob_properties.contains(field.name()) {
678            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
679            columns.push(col);
680        } else {
681            let col =
682                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
683            columns.push(col);
684        }
685    }
686
687    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
688}
689
690fn build_edge_batch(
691    edge_type: &omnigraph_compiler::catalog::EdgeType,
692    rows: &[(String, String, JsonValue)],
693) -> Result<RecordBatch> {
694    let schema = edge_type.arrow_schema.clone();
695
696    let ids: Vec<String> = rows
697        .iter()
698        .map(|(_, _, data)| {
699            data.get("id")
700                .and_then(|v| v.as_str())
701                .map(str::to_string)
702                .unwrap_or_else(generate_id)
703        })
704        .collect();
705    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
706    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
707
708    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
709    columns.push(Arc::new(StringArray::from(ids)));
710    columns.push(Arc::new(StringArray::from(srcs)));
711    columns.push(Arc::new(StringArray::from(dsts)));
712
713    // Build edge property columns (skip id, src, dst at indices 0-2)
714    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
715    for field in schema.fields().iter().skip(3) {
716        if edge_type.blob_properties.contains(field.name()) {
717            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
718            columns.push(col);
719        } else {
720            let col = build_column_from_json(
721                field.name(),
722                field.data_type(),
723                field.is_nullable(),
724                &data_values,
725            )?;
726            columns.push(col);
727        }
728    }
729
730    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
731}
732
733/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
734pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
735    if let Some(encoded) = value.strip_prefix("base64:") {
736        let bytes = base64::engine::general_purpose::STANDARD
737            .decode(encoded)
738            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
739        builder
740            .push_bytes(bytes)
741            .map_err(|e| OmniError::Lance(e.to_string()))
742    } else {
743        // Treat as URI (file://, s3://, gs://, or any other scheme)
744        builder
745            .push_uri(value)
746            .map_err(|e| OmniError::Lance(e.to_string()))
747    }
748}
749
750/// Build a blob column from JSON values using Lance BlobArrayBuilder.
751fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
752    let mut builder = BlobArrayBuilder::new(rows.len());
753    for row in rows {
754        match row.get(name) {
755            Some(JsonValue::String(s)) => {
756                append_blob_value(&mut builder, s)?;
757            }
758            Some(JsonValue::Null) | None if nullable => {
759                builder
760                    .push_null()
761                    .map_err(|e| OmniError::Lance(e.to_string()))?;
762            }
763            Some(JsonValue::Null) | None => {
764                return Err(OmniError::manifest(format!(
765                    "non-nullable blob property '{}' has null values",
766                    name
767                )));
768            }
769            _ => {
770                return Err(OmniError::manifest(format!(
771                    "blob property '{}' must be a URI string or base64: prefixed data",
772                    name
773                )));
774            }
775        }
776    }
777    builder
778        .finish()
779        .map_err(|e| OmniError::Lance(e.to_string()))
780}
781
782fn build_column_from_json(
783    name: &str,
784    data_type: &DataType,
785    nullable: bool,
786    rows: &[JsonValue],
787) -> Result<ArrayRef> {
788    let array: ArrayRef = match data_type {
789        DataType::Utf8 => {
790            let values: Vec<Option<String>> = rows
791                .iter()
792                .map(|row| {
793                    row.get(name)
794                        .and_then(|v| v.as_str())
795                        .map(|s| s.to_string())
796                })
797                .collect();
798            Arc::new(StringArray::from(values))
799        }
800        DataType::Int32 => {
801            let values: Vec<Option<i32>> = rows
802                .iter()
803                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
804                .collect();
805            Arc::new(Int32Array::from(values))
806        }
807        DataType::Int64 => {
808            let values: Vec<Option<i64>> = rows
809                .iter()
810                .map(|row| row.get(name).and_then(|v| v.as_i64()))
811                .collect();
812            Arc::new(Int64Array::from(values))
813        }
814        DataType::UInt32 => {
815            let values: Vec<Option<u32>> = rows
816                .iter()
817                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
818                .collect();
819            Arc::new(UInt32Array::from(values))
820        }
821        DataType::UInt64 => {
822            let values: Vec<Option<u64>> = rows
823                .iter()
824                .map(|row| row.get(name).and_then(|v| v.as_u64()))
825                .collect();
826            Arc::new(UInt64Array::from(values))
827        }
828        DataType::Float32 => {
829            let values: Vec<Option<f32>> = rows
830                .iter()
831                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
832                .collect();
833            Arc::new(Float32Array::from(values))
834        }
835        DataType::Float64 => {
836            let values: Vec<Option<f64>> = rows
837                .iter()
838                .map(|row| row.get(name).and_then(|v| v.as_f64()))
839                .collect();
840            Arc::new(Float64Array::from(values))
841        }
842        DataType::Boolean => {
843            let values: Vec<Option<bool>> = rows
844                .iter()
845                .map(|row| row.get(name).and_then(|v| v.as_bool()))
846                .collect();
847            Arc::new(BooleanArray::from(values))
848        }
849        DataType::Date32 => {
850            let mut values = Vec::with_capacity(rows.len());
851            for row in rows {
852                values.push(parse_date32_json_value(
853                    row.get(name).unwrap_or(&JsonValue::Null),
854                )?);
855            }
856            Arc::new(Date32Array::from(values))
857        }
858        DataType::Date64 => {
859            let mut values = Vec::with_capacity(rows.len());
860            for row in rows {
861                values.push(parse_date64_json_value(
862                    row.get(name).unwrap_or(&JsonValue::Null),
863                )?);
864            }
865            Arc::new(Date64Array::from(values))
866        }
867        DataType::List(field) => {
868            let mut builder = ListBuilder::with_capacity(
869                make_list_value_builder(field.data_type(), rows.len())?,
870                rows.len(),
871            )
872            .with_field(field.clone());
873            for row in rows {
874                let value = row.get(name).unwrap_or(&JsonValue::Null);
875                if value.is_null() {
876                    builder.append(false);
877                    continue;
878                }
879                let items = value.as_array().ok_or_else(|| {
880                    OmniError::manifest(format!(
881                        "list property '{}' expects a JSON array, got {}",
882                        name, value
883                    ))
884                })?;
885                for item in items {
886                    append_json_list_item(builder.values(), field.data_type(), item)?;
887                }
888                builder.append(true);
889            }
890            Arc::new(builder.finish())
891        }
892        DataType::FixedSizeList(child_field, dim) => {
893            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
894            let dim = *dim;
895            let mut builder = FixedSizeListBuilder::with_capacity(
896                Float32Builder::with_capacity(rows.len() * dim as usize),
897                dim,
898                rows.len(),
899            )
900            .with_field(child_field.clone());
901            for row in rows {
902                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
903                    if arr.len() != dim as usize {
904                        return Err(OmniError::manifest(format!(
905                            "vector property '{}' expects {} dimensions, got {}",
906                            name,
907                            dim,
908                            arr.len()
909                        )));
910                    }
911                    for val in arr {
912                        builder
913                            .values()
914                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
915                    }
916                    builder.append(true);
917                } else if nullable {
918                    for _ in 0..dim as usize {
919                        builder.values().append_null();
920                    }
921                    builder.append(false);
922                } else {
923                    return Err(OmniError::manifest(format!(
924                        "non-nullable vector property '{}' has null values",
925                        name
926                    )));
927                }
928            }
929            Arc::new(builder.finish())
930        }
931        _ => {
932            // Unsupported type: fill with nulls
933            let values: Vec<Option<&str>> = vec![None; rows.len()];
934            Arc::new(StringArray::from(values))
935        }
936    };
937
938    if !nullable && array.null_count() > 0 {
939        return Err(OmniError::manifest(format!(
940            "non-nullable property '{}' has null or invalid values",
941            name
942        )));
943    }
944
945    Ok(array)
946}
947
948fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
949    Ok(match data_type {
950        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
951        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
952        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
953        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
954        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
955        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
956        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
957        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
958        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
959        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
960        other => {
961            return Err(OmniError::manifest(format!(
962                "unsupported list element data type {:?}",
963                other
964            )));
965        }
966    })
967}
968
969fn append_json_list_item(
970    builder: &mut Box<dyn ArrayBuilder>,
971    data_type: &DataType,
972    value: &JsonValue,
973) -> Result<()> {
974    match data_type {
975        DataType::Utf8 => {
976            let builder = builder
977                .as_any_mut()
978                .downcast_mut::<StringBuilder>()
979                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
980            if let Some(value) = value.as_str() {
981                builder.append_value(value);
982            } else {
983                builder.append_null();
984            }
985        }
986        DataType::Boolean => {
987            let builder = builder
988                .as_any_mut()
989                .downcast_mut::<BooleanBuilder>()
990                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
991            if let Some(value) = value.as_bool() {
992                builder.append_value(value);
993            } else {
994                builder.append_null();
995            }
996        }
997        DataType::Int32 => {
998            let builder = builder
999                .as_any_mut()
1000                .downcast_mut::<Int32Builder>()
1001                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
1002            if let Some(value) = value.as_i64() {
1003                let value = i32::try_from(value).map_err(|_| {
1004                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
1005                })?;
1006                builder.append_value(value);
1007            } else {
1008                builder.append_null();
1009            }
1010        }
1011        DataType::Int64 => {
1012            let builder = builder
1013                .as_any_mut()
1014                .downcast_mut::<Int64Builder>()
1015                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
1016            if let Some(value) = value.as_i64() {
1017                builder.append_value(value);
1018            } else {
1019                builder.append_null();
1020            }
1021        }
1022        DataType::UInt32 => {
1023            let builder = builder
1024                .as_any_mut()
1025                .downcast_mut::<UInt32Builder>()
1026                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
1027            if let Some(value) = value.as_u64() {
1028                let value = u32::try_from(value).map_err(|_| {
1029                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
1030                })?;
1031                builder.append_value(value);
1032            } else {
1033                builder.append_null();
1034            }
1035        }
1036        DataType::UInt64 => {
1037            let builder = builder
1038                .as_any_mut()
1039                .downcast_mut::<UInt64Builder>()
1040                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
1041            if let Some(value) = value.as_u64() {
1042                builder.append_value(value);
1043            } else {
1044                builder.append_null();
1045            }
1046        }
1047        DataType::Float32 => {
1048            let builder = builder
1049                .as_any_mut()
1050                .downcast_mut::<Float32Builder>()
1051                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
1052            if let Some(value) = value.as_f64() {
1053                builder.append_value(value as f32);
1054            } else {
1055                builder.append_null();
1056            }
1057        }
1058        DataType::Float64 => {
1059            let builder = builder
1060                .as_any_mut()
1061                .downcast_mut::<Float64Builder>()
1062                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
1063            if let Some(value) = value.as_f64() {
1064                builder.append_value(value);
1065            } else {
1066                builder.append_null();
1067            }
1068        }
1069        DataType::Date32 => {
1070            let builder = builder
1071                .as_any_mut()
1072                .downcast_mut::<Date32Builder>()
1073                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
1074            if let Some(value) = parse_date32_json_value(value)? {
1075                builder.append_value(value);
1076            } else {
1077                builder.append_null();
1078            }
1079        }
1080        DataType::Date64 => {
1081            let builder = builder
1082                .as_any_mut()
1083                .downcast_mut::<Date64Builder>()
1084                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1085            if let Some(value) = parse_date64_json_value(value)? {
1086                builder.append_value(value);
1087            } else {
1088                builder.append_null();
1089            }
1090        }
1091        other => {
1092            return Err(OmniError::manifest(format!(
1093                "unsupported list element data type {:?}",
1094                other
1095            )));
1096        }
1097    }
1098
1099    Ok(())
1100}
1101
1102fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1103    if value.is_null() {
1104        return Ok(None);
1105    }
1106    if let Some(days) = value.as_i64() {
1107        let days = i32::try_from(days)
1108            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1109        return Ok(Some(days));
1110    }
1111    if let Some(days) = value.as_u64() {
1112        let days = i32::try_from(days)
1113            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1114        return Ok(Some(days));
1115    }
1116    if let Some(value) = value.as_str() {
1117        return Ok(Some(parse_date32_literal(value)?));
1118    }
1119    Ok(None)
1120}
1121
1122fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1123    if value.is_null() {
1124        return Ok(None);
1125    }
1126    if let Some(ms) = value.as_i64() {
1127        return Ok(Some(ms));
1128    }
1129    if let Some(ms) = value.as_u64() {
1130        let ms = i64::try_from(ms)
1131            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1132        return Ok(Some(ms));
1133    }
1134    if let Some(value) = value.as_str() {
1135        return Ok(Some(parse_date64_literal(value)?));
1136    }
1137    Ok(None)
1138}
1139
1140/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
1141/// How many per-type Lance writes to run concurrently during a load.
1142///
1143/// Each write is an independent S3 manifest + fragment write against a
1144/// different table. Ops within a single table must still be serial (Lance
1145/// OCC on the manifest), but cross-table writes have no shared state.
1146///
1147/// 8 is a conservative default — enough to overlap S3 round-trip latency
1148/// across the typical 10-30 table schemas without flooding the runtime.
1149/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
1150const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1151
1152fn load_write_concurrency() -> usize {
1153    std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1154        .ok()
1155        .and_then(|v| v.parse::<usize>().ok())
1156        .filter(|v| *v > 0)
1157        .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1158}
1159
1160/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples
1161/// concurrently. Returns results in original iteration order so callers can
1162/// zip them back to per-type metadata.
1163async fn write_batches_concurrently(
1164    db: &Omnigraph,
1165    branch: Option<&str>,
1166    mode: LoadMode,
1167    prepared: Vec<(String, String, RecordBatch, usize)>,
1168) -> Result<
1169    Vec<(
1170        String,
1171        String,
1172        usize,
1173        crate::table_store::TableState,
1174        Option<String>,
1175    )>,
1176> {
1177    use futures::stream::StreamExt;
1178
1179    if prepared.is_empty() {
1180        return Ok(Vec::new());
1181    }
1182
1183    let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1184
1185    futures::stream::iter(prepared.into_iter().map(
1186        |(type_name, table_key, batch, loaded_count)| async move {
1187            let (state, table_branch) =
1188                write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1189            Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1190        },
1191    ))
1192    .buffered(concurrency)
1193    .collect::<Vec<Result<_>>>()
1194    .await
1195    .into_iter()
1196    .collect()
1197}
1198
1199async fn write_batch_to_dataset(
1200    db: &Omnigraph,
1201    branch: Option<&str>,
1202    table_key: &str,
1203    batch: RecordBatch,
1204    mode: LoadMode,
1205) -> Result<(crate::table_store::TableState, Option<String>)> {
1206    let op_kind = match mode {
1207        LoadMode::Append => crate::db::MutationOpKind::Insert,
1208        LoadMode::Merge => crate::db::MutationOpKind::Merge,
1209        LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
1210    };
1211    let (mut ds, full_path, table_branch) = db
1212        .open_for_mutation_on_branch(branch, table_key, op_kind)
1213        .await?;
1214    let table_store = db.table_store();
1215
1216    match mode {
1217        LoadMode::Overwrite => {
1218            let state = table_store
1219                .overwrite_batch(&full_path, &mut ds, batch)
1220                .await?;
1221            Ok((state, table_branch))
1222        }
1223        LoadMode::Append => {
1224            let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1225            Ok((state, table_branch))
1226        }
1227        LoadMode::Merge => {
1228            let state = table_store
1229                .merge_insert_batch(
1230                    &full_path,
1231                    ds,
1232                    batch,
1233                    vec!["id".to_string()],
1234                    lance::dataset::WhenMatched::UpdateAll,
1235                    lance::dataset::WhenNotMatched::InsertAll,
1236                )
1237                .await?;
1238            Ok((state, table_branch))
1239        }
1240    }
1241}
1242
1243fn generate_id() -> String {
1244    ulid::Ulid::new().to_string()
1245}
1246
1247pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1248    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1249    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1250        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1251    let out = casted
1252        .as_any()
1253        .downcast_ref::<Date32Array>()
1254        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1255    if out.is_null(0) {
1256        return Err(OmniError::manifest(format!(
1257            "invalid Date literal '{}'",
1258            value
1259        )));
1260    }
1261    Ok(out.value(0))
1262}
1263
1264pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1265    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1266    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1267        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1268    let out = casted
1269        .as_any()
1270        .downcast_ref::<Date64Array>()
1271        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1272    if out.is_null(0) {
1273        return Err(OmniError::manifest(format!(
1274            "invalid DateTime literal '{}'",
1275            value
1276        )));
1277    }
1278    Ok(out.value(0))
1279}
1280
1281// ─── Value constraint validation ─────────────────────────────────────────────
1282
1283pub(crate) fn validate_value_constraints(
1284    batch: &RecordBatch,
1285    node_type: &omnigraph_compiler::catalog::NodeType,
1286) -> Result<()> {
1287    use arrow_array::Array;
1288
1289    // Range constraints
1290    for rc in &node_type.range_constraints {
1291        let Some(col) = batch.column_by_name(&rc.property) else {
1292            continue;
1293        };
1294        for row in 0..batch.num_rows() {
1295            if col.is_null(row) {
1296                continue;
1297            }
1298            let value = extract_numeric_value(col, row);
1299            if let Some(val) = value {
1300                if val.is_nan() {
1301                    return Err(OmniError::manifest(format!(
1302                        "@range violation on {}.{}: value is NaN",
1303                        node_type.name, rc.property
1304                    )));
1305                }
1306                if let Some(ref min) = rc.min {
1307                    let min_f = literal_value_to_f64(min);
1308                    if val < min_f {
1309                        return Err(OmniError::manifest(format!(
1310                            "@range violation on {}.{}: value {} < min {}",
1311                            node_type.name, rc.property, val, min_f
1312                        )));
1313                    }
1314                }
1315                if let Some(ref max) = rc.max {
1316                    let max_f = literal_value_to_f64(max);
1317                    if val > max_f {
1318                        return Err(OmniError::manifest(format!(
1319                            "@range violation on {}.{}: value {} > max {}",
1320                            node_type.name, rc.property, val, max_f
1321                        )));
1322                    }
1323                }
1324            }
1325        }
1326    }
1327
1328    // Check constraints (regex)
1329    for cc in &node_type.check_constraints {
1330        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1331            OmniError::manifest(format!(
1332                "@check on {}.{} has invalid regex '{}': {}",
1333                node_type.name, cc.property, cc.pattern, e
1334            ))
1335        })?;
1336        let Some(col) = batch.column_by_name(&cc.property) else {
1337            continue;
1338        };
1339        let str_col = col.as_any().downcast_ref::<StringArray>();
1340        if let Some(str_col) = str_col {
1341            for row in 0..str_col.len() {
1342                if str_col.is_null(row) {
1343                    continue;
1344                }
1345                let val = str_col.value(row);
1346                if !re.is_match(val) {
1347                    return Err(OmniError::manifest(format!(
1348                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1349                        node_type.name, cc.property, val, cc.pattern
1350                    )));
1351                }
1352            }
1353        }
1354    }
1355
1356    Ok(())
1357}
1358
1359/// Validate that every enum-typed property in `properties` only contains values
1360/// from its declared enum value set. Operates on a single `RecordBatch` so it
1361/// can be called from any write path that already holds a batch.
1362///
1363/// Scalar string enums are checked directly. List-of-enum properties are
1364/// checked element-by-element across the underlying string values.
1365pub(crate) fn validate_enum_constraints(
1366    batch: &RecordBatch,
1367    properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1368    type_name: &str,
1369) -> Result<()> {
1370    use arrow_array::{Array, ListArray};
1371
1372    for (prop_name, prop_type) in properties {
1373        let Some(allowed) = prop_type.enum_values.as_ref() else {
1374            continue;
1375        };
1376        let Some(col) = batch.column_by_name(prop_name) else {
1377            continue;
1378        };
1379        if prop_type.list {
1380            let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1381                continue;
1382            };
1383            for row in 0..list_col.len() {
1384                if list_col.is_null(row) {
1385                    continue;
1386                }
1387                let item_arr = list_col.value(row);
1388                let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1389                    continue;
1390                };
1391                for i in 0..str_arr.len() {
1392                    if str_arr.is_null(i) {
1393                        continue;
1394                    }
1395                    let val = str_arr.value(i);
1396                    if !allowed.iter().any(|a| a.as_str() == val) {
1397                        return Err(OmniError::manifest(format!(
1398                            "invalid enum value '{}' for {}.{} (expected: {})",
1399                            val,
1400                            type_name,
1401                            prop_name,
1402                            allowed.join(", ")
1403                        )));
1404                    }
1405                }
1406            }
1407        } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1408            for row in 0..str_col.len() {
1409                if str_col.is_null(row) {
1410                    continue;
1411                }
1412                let val = str_col.value(row);
1413                if !allowed.iter().any(|a| a.as_str() == val) {
1414                    return Err(OmniError::manifest(format!(
1415                        "invalid enum value '{}' for {}.{} (expected: {})",
1416                        val,
1417                        type_name,
1418                        prop_name,
1419                        allowed.join(", ")
1420                    )));
1421                }
1422            }
1423        }
1424    }
1425    Ok(())
1426}
1427
1428/// Detect duplicate values within a single `RecordBatch` for any of the named
1429/// `unique_properties`. Returns an error on the first duplicate found.
1430///
1431/// Note: this only catches duplicates *within* the batch. Cross-batch
1432/// uniqueness against already-committed rows is not enforced here — that
1433/// requires a dataset scan and is tracked separately.
1434pub(crate) fn enforce_unique_constraints_intra_batch(
1435    batch: &RecordBatch,
1436    type_name: &str,
1437    unique_properties: &[String],
1438) -> Result<()> {
1439    for property in unique_properties {
1440        let Some(col_idx) = batch.schema().index_of(property).ok() else {
1441            continue;
1442        };
1443        let arr = batch.column(col_idx);
1444        let mut seen: HashMap<String, usize> = HashMap::new();
1445        for row in 0..batch.num_rows() {
1446            let Some(value) = scalar_to_string(arr, row) else {
1447                continue;
1448            };
1449            if let Some(prev_row) = seen.insert(value.clone(), row) {
1450                return Err(OmniError::manifest(format!(
1451                    "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1452                    type_name, property, value, prev_row, row
1453                )));
1454            }
1455        }
1456    }
1457    Ok(())
1458}
1459
1460/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
1461/// uniqueness comparison. Returns `None` for null values (nulls are exempt
1462/// from uniqueness in standard SQL semantics).
1463fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1464    use arrow_array::Array;
1465    if array.is_null(row) {
1466        return None;
1467    }
1468    if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1469        return Some(a.value(row).to_string());
1470    }
1471    if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1472        return Some(a.value(row).to_string());
1473    }
1474    if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1475        return Some(a.value(row).to_string());
1476    }
1477    if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1478        return Some(a.value(row).to_string());
1479    }
1480    if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1481        return Some(a.value(row).to_string());
1482    }
1483    if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1484        return Some(a.value(row).to_string());
1485    }
1486    if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1487        return Some(a.value(row).to_string());
1488    }
1489    if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1490        return Some(a.value(row).to_string());
1491    }
1492    if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1493        return Some(a.value(row).to_string());
1494    }
1495    if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1496        return Some(a.value(row).to_string());
1497    }
1498    None
1499}
1500
1501/// Build the flat list of property names that must be checked for uniqueness
1502/// on a node type. Includes both `@unique` properties (from
1503/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
1504pub(crate) fn unique_property_names_for_node(
1505    node_type: &omnigraph_compiler::catalog::NodeType,
1506) -> Vec<String> {
1507    let mut props: Vec<String> = node_type
1508        .unique_constraints
1509        .iter()
1510        .flatten()
1511        .cloned()
1512        .collect();
1513    if let Some(key) = &node_type.key {
1514        props.extend(key.iter().cloned());
1515    }
1516    props.sort();
1517    props.dedup();
1518    props
1519}
1520
1521/// Same as [`unique_property_names_for_node`] but for an edge type.
1522pub(crate) fn unique_property_names_for_edge(
1523    edge_type: &omnigraph_compiler::catalog::EdgeType,
1524) -> Vec<String> {
1525    let mut props: Vec<String> = edge_type
1526        .unique_constraints
1527        .iter()
1528        .flatten()
1529        .cloned()
1530        .collect();
1531    props.sort();
1532    props.dedup();
1533    props
1534}
1535
1536fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1537    use arrow_array::{
1538        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1539    };
1540    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1541        return Some(a.value(row) as f64);
1542    }
1543    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1544        return Some(a.value(row) as f64);
1545    }
1546    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1547        return Some(a.value(row) as f64);
1548    }
1549    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1550        return Some(a.value(row) as f64);
1551    }
1552    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1553        return Some(a.value(row) as f64);
1554    }
1555    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1556        return Some(a.value(row));
1557    }
1558    None
1559}
1560
1561fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1562    use omnigraph_compiler::catalog::LiteralValue;
1563    match v {
1564        LiteralValue::Integer(n) => *n as f64,
1565        LiteralValue::Float(f) => *f,
1566    }
1567}
1568
1569// ─── Edge cardinality validation ─────────────────────────────────────────────
1570
1571pub(crate) async fn validate_edge_cardinality(
1572    db: &crate::db::Omnigraph,
1573    branch: Option<&str>,
1574    edge_name: &str,
1575    written_version: u64,
1576    written_branch: Option<&str>,
1577) -> Result<()> {
1578    use arrow_array::Array;
1579    let catalog = db.catalog();
1580    let edge_type = &catalog.edge_types[edge_name];
1581    if edge_type.cardinality.is_default() {
1582        return Ok(());
1583    }
1584
1585    // Open edge sub-table at the just-written version, not the snapshot's
1586    // (the snapshot still pins to the pre-write version).
1587    let snapshot = db.snapshot_for_branch(branch).await?;
1588    let table_key = format!("edge:{}", edge_name);
1589    let entry = snapshot
1590        .entry(&table_key)
1591        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1592    let ds = db
1593        .open_dataset_at_state(
1594            &entry.table_path,
1595            written_branch.or(entry.table_branch.as_deref()),
1596            written_version,
1597        )
1598        .await?;
1599
1600    // Scan src column, count per source
1601    let batches = db
1602        .table_store()
1603        .scan(&ds, Some(&["src"]), None, None)
1604        .await?;
1605
1606    let mut counts: HashMap<String, u32> = HashMap::new();
1607    for batch in &batches {
1608        let srcs = batch
1609            .column_by_name("src")
1610            .unwrap()
1611            .as_any()
1612            .downcast_ref::<StringArray>()
1613            .unwrap();
1614        for i in 0..srcs.len() {
1615            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1616        }
1617    }
1618
1619    let card = &edge_type.cardinality;
1620    for (src, count) in &counts {
1621        if let Some(max) = card.max {
1622            if *count > max {
1623                return Err(OmniError::manifest(format!(
1624                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1625                    edge_name, src, count, max
1626                )));
1627            }
1628        }
1629        if *count < card.min {
1630            return Err(OmniError::manifest(format!(
1631                "@card violation on edge {}: source '{}' has {} edges (min {})",
1632                edge_name, src, count, card.min
1633            )));
1634        }
1635    }
1636
1637    Ok(())
1638}
1639
1640/// Validate edge `@card` cardinality with in-memory pending edges visible.
1641///
1642/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
1643/// opens the committed dataset at the pre-load snapshot version, then
1644/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
1645/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
1646/// path uses `validate_edge_cardinality` which opens the just-written
1647/// Lance version).
1648///
1649/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
1650/// so committed edges that the load is *updating* (same edge id,
1651/// possibly changed `src`) are not double-counted. `LoadMode::Append`
1652/// passes `None` because each line generates a fresh ULID id that
1653/// never collides with committed.
1654async fn validate_edge_cardinality_with_pending_loader(
1655    db: &Omnigraph,
1656    branch: Option<&str>,
1657    edge_type: &omnigraph_compiler::catalog::EdgeType,
1658    table_key: &str,
1659    staging: &MutationStaging,
1660    mode: LoadMode,
1661) -> Result<()> {
1662    if edge_type.cardinality.is_default() {
1663        return Ok(());
1664    }
1665    let snapshot = db.snapshot_for_branch(branch).await?;
1666    let Some(entry) = snapshot.entry(table_key) else {
1667        // No manifest entry — table doesn't exist yet. Pending-only is
1668        // fine; the helper handles empty committed scans.
1669        return Ok(());
1670    };
1671    let ds = db
1672        .open_dataset_at_state(
1673            &entry.table_path,
1674            entry.table_branch.as_deref(),
1675            entry.table_version,
1676        )
1677        .await?;
1678    let dedupe_key = match mode {
1679        LoadMode::Merge => Some("id"),
1680        LoadMode::Append | LoadMode::Overwrite => None,
1681    };
1682    let counts =
1683        crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
1684    crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1685}
1686
1687/// Collect all valid node IDs for a given type, with in-memory pending
1688/// node inserts visible. Used by the staged loader's Phase 2c
1689/// referential-integrity validation.
1690///
1691/// Union of:
1692/// - IDs from the staged loader's pending batches (in-memory; just-staged
1693///   inserts of this type)
1694/// - IDs from the committed sub-table at the pre-load snapshot version
1695async fn collect_node_ids_with_pending(
1696    db: &Omnigraph,
1697    branch: Option<&str>,
1698    type_name: &str,
1699    staging: &MutationStaging,
1700) -> Result<HashSet<String>> {
1701    let mut ids = HashSet::new();
1702    let table_key = format!("node:{}", type_name);
1703
1704    // From staging.pending: walk the in-memory accumulator's id column.
1705    for batch in staging.pending_batches(&table_key) {
1706        if let Some(col) = batch.column_by_name("id") {
1707            if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1708                for i in 0..arr.len() {
1709                    if arr.is_valid(i) {
1710                        ids.insert(arr.value(i).to_string());
1711                    }
1712                }
1713            }
1714        }
1715    }
1716
1717    // From the committed Lance sub-table at the pre-load snapshot version.
1718    let snapshot = db.snapshot_for_branch(branch).await?;
1719    let Some(entry) = snapshot.entry(&table_key) else {
1720        return Ok(ids);
1721    };
1722    let ds = db
1723        .open_dataset_at_state(
1724            &entry.table_path,
1725            entry.table_branch.as_deref(),
1726            entry.table_version,
1727        )
1728        .await?;
1729
1730    let batches = db
1731        .table_store()
1732        .scan(&ds, Some(&["id"]), None, None)
1733        .await?;
1734
1735    for batch in &batches {
1736        let id_col = batch
1737            .column_by_name("id")
1738            .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1739            .as_any()
1740            .downcast_ref::<StringArray>()
1741            .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1742        for i in 0..batch.num_rows() {
1743            // Defensive: `id` is the @key column on every node type and
1744            // is non-nullable by schema, but a committed-row corruption
1745            // (or future schema change) could surface a NULL. Skip
1746            // rather than insert "" — pending-side does the same.
1747            if id_col.is_valid(i) {
1748                ids.insert(id_col.value(i).to_string());
1749            }
1750        }
1751    }
1752
1753    Ok(ids)
1754}
1755
1756/// Collect all valid node IDs for a given type. Union of:
1757/// - IDs from the just-loaded batch (in memory, from node_rows)
1758/// - IDs from the sub-table at the just-written version (if it was updated)
1759/// - IDs from the sub-table at the snapshot-pinned version (if it was not updated)
1760async fn collect_node_ids(
1761    db: &Omnigraph,
1762    branch: Option<&str>,
1763    type_name: &str,
1764    node_rows: &HashMap<String, Vec<JsonValue>>,
1765    catalog: &omnigraph_compiler::catalog::Catalog,
1766    updates: &[crate::db::SubTableUpdate],
1767) -> Result<HashSet<String>> {
1768    let mut ids = HashSet::new();
1769
1770    // IDs from the in-memory batch (just loaded in this operation)
1771    if let Some(rows) = node_rows.get(type_name) {
1772        if let Some(node_type) = catalog.node_types.get(type_name) {
1773            if let Some(key_prop) = node_type.key_property() {
1774                for row in rows {
1775                    if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1776                        ids.insert(id.to_string());
1777                    }
1778                }
1779            }
1780        }
1781    }
1782
1783    // IDs from the Lance sub-table
1784    let table_key = format!("node:{}", type_name);
1785    let snapshot = db.snapshot_for_branch(branch).await?;
1786    let Some(entry) = snapshot.entry(&table_key) else {
1787        return Ok(ids);
1788    };
1789    // Use the just-written version if this type was updated, else snapshot version
1790    let updated = updates
1791        .iter()
1792        .find(|u| u.table_key == table_key)
1793        .map(|u| (u.table_version, u.table_branch.as_deref()));
1794    let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1795    let ds = db
1796        .open_dataset_at_state(&entry.table_path, branch, version)
1797        .await?;
1798
1799    let batches = db
1800        .table_store()
1801        .scan(&ds, Some(&["id"]), None, None)
1802        .await?;
1803
1804    for batch in &batches {
1805        let id_col = batch
1806            .column_by_name("id")
1807            .unwrap()
1808            .as_any()
1809            .downcast_ref::<StringArray>()
1810            .unwrap();
1811        for i in 0..batch.num_rows() {
1812            if !id_col.is_valid(i) {
1813                continue;
1814            }
1815            ids.insert(id_col.value(i).to_string());
1816        }
1817    }
1818
1819    Ok(ids)
1820}
1821
1822#[cfg(test)]
1823mod tests {
1824    use super::*;
1825    use crate::db::Omnigraph;
1826    use arrow_array::Array;
1827    use futures::TryStreamExt;
1828    use std::collections::HashMap;
1829
1830    const TEST_SCHEMA: &str = r#"
1831node Person {
1832    name: String @key
1833    age: I32?
1834}
1835node Company {
1836    name: String @key
1837}
1838edge Knows: Person -> Person {
1839    since: Date?
1840}
1841edge WorksAt: Person -> Company
1842"#;
1843
1844    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1845{"type": "Person", "data": {"name": "Bob", "age": 25}}
1846{"type": "Company", "data": {"name": "Acme"}}
1847{"edge": "Knows", "from": "Alice", "to": "Bob"}
1848{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1849"#;
1850
1851    #[tokio::test]
1852    async fn test_load_creates_data() {
1853        let dir = tempfile::tempdir().unwrap();
1854        let uri = dir.path().to_str().unwrap();
1855        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1856
1857        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1858            .await
1859            .unwrap();
1860
1861        assert_eq!(result.nodes_loaded["Person"], 2);
1862        assert_eq!(result.nodes_loaded["Company"], 1);
1863        assert_eq!(result.edges_loaded["Knows"], 1);
1864        assert_eq!(result.edges_loaded["WorksAt"], 1);
1865    }
1866
1867    #[tokio::test]
1868    async fn test_load_data_readable_via_lance() {
1869        let dir = tempfile::tempdir().unwrap();
1870        let uri = dir.path().to_str().unwrap();
1871        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1872        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1873            .await
1874            .unwrap();
1875
1876        // Read back via snapshot
1877        let snap = db.snapshot().await;
1878        let person_ds = snap.open("node:Person").await.unwrap();
1879
1880        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1881
1882        // Verify data
1883        let batches: Vec<RecordBatch> = person_ds
1884            .scan()
1885            .try_into_stream()
1886            .await
1887            .unwrap()
1888            .try_collect()
1889            .await
1890            .unwrap();
1891
1892        let batch = &batches[0];
1893        let ids = batch
1894            .column_by_name("id")
1895            .unwrap()
1896            .as_any()
1897            .downcast_ref::<StringArray>()
1898            .unwrap();
1899        // @key=name, so ids should be "Alice" and "Bob"
1900        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1901        assert!(id_values.contains(&"Alice"));
1902        assert!(id_values.contains(&"Bob"));
1903    }
1904
1905    #[tokio::test]
1906    async fn test_load_edges_reference_node_keys() {
1907        let dir = tempfile::tempdir().unwrap();
1908        let uri = dir.path().to_str().unwrap();
1909        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1910        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1911            .await
1912            .unwrap();
1913
1914        let snap = db.snapshot().await;
1915        let knows_ds = snap.open("edge:Knows").await.unwrap();
1916
1917        let batches: Vec<RecordBatch> = knows_ds
1918            .scan()
1919            .try_into_stream()
1920            .await
1921            .unwrap()
1922            .try_collect()
1923            .await
1924            .unwrap();
1925
1926        let batch = &batches[0];
1927        let srcs = batch
1928            .column_by_name("src")
1929            .unwrap()
1930            .as_any()
1931            .downcast_ref::<StringArray>()
1932            .unwrap();
1933        let dsts = batch
1934            .column_by_name("dst")
1935            .unwrap()
1936            .as_any()
1937            .downcast_ref::<StringArray>()
1938            .unwrap();
1939
1940        assert_eq!(srcs.value(0), "Alice");
1941        assert_eq!(dsts.value(0), "Bob");
1942    }
1943
1944    #[tokio::test]
1945    async fn test_load_manifest_version_advances() {
1946        let dir = tempfile::tempdir().unwrap();
1947        let uri = dir.path().to_str().unwrap();
1948        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1949        let v1 = db.version().await;
1950
1951        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1952            .await
1953            .unwrap();
1954
1955        assert!(db.version().await > v1);
1956    }
1957
1958    #[tokio::test]
1959    async fn test_load_append_adds_rows() {
1960        let dir = tempfile::tempdir().unwrap();
1961        let uri = dir.path().to_str().unwrap();
1962        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1963
1964        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1965        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1966
1967        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1968            .await
1969            .unwrap();
1970        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1971
1972        let snap = db.snapshot().await;
1973        let person_ds = snap.open("node:Person").await.unwrap();
1974        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1975    }
1976
1977    #[tokio::test]
1978    async fn test_load_unknown_type_rejected() {
1979        let dir = tempfile::tempdir().unwrap();
1980        let uri = dir.path().to_str().unwrap();
1981        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1982
1983        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1984        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1985        assert!(result.is_err());
1986    }
1987
1988    #[tokio::test]
1989    async fn test_ingest_creates_branch_and_reports_tables() {
1990        let dir = tempfile::tempdir().unwrap();
1991        let uri = dir.path().to_str().unwrap();
1992        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1993
1994        let result = db
1995            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1996            .await
1997            .unwrap();
1998
1999        assert_eq!(result.branch, "feature");
2000        assert_eq!(result.base_branch, "main");
2001        assert!(result.branch_created);
2002        assert_eq!(result.mode, LoadMode::Overwrite);
2003        assert_eq!(
2004            result.tables,
2005            vec![
2006                IngestTableResult {
2007                    table_key: "edge:Knows".to_string(),
2008                    rows_loaded: 1
2009                },
2010                IngestTableResult {
2011                    table_key: "edge:WorksAt".to_string(),
2012                    rows_loaded: 1
2013                },
2014                IngestTableResult {
2015                    table_key: "node:Company".to_string(),
2016                    rows_loaded: 1
2017                },
2018                IngestTableResult {
2019                    table_key: "node:Person".to_string(),
2020                    rows_loaded: 2
2021                },
2022            ]
2023        );
2024        assert!(
2025            db.branch_list()
2026                .await
2027                .unwrap()
2028                .contains(&"feature".to_string())
2029        );
2030    }
2031
2032    #[tokio::test]
2033    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
2034        let dir = tempfile::tempdir().unwrap();
2035        let uri = dir.path().to_str().unwrap();
2036        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2037        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
2038            .await
2039            .unwrap();
2040        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
2041            .await
2042            .unwrap();
2043
2044        let result = db
2045            .ingest(
2046                "feature",
2047                Some("missing-base"),
2048                r#"{"type":"Person","data":{"name":"Bob","age":26}}
2049{"type":"Person","data":{"name":"Eve","age":31}}"#,
2050                LoadMode::Merge,
2051            )
2052            .await
2053            .unwrap();
2054
2055        assert_eq!(result.branch, "feature");
2056        assert_eq!(result.base_branch, "missing-base");
2057        assert!(!result.branch_created);
2058        assert_eq!(result.mode, LoadMode::Merge);
2059        assert_eq!(
2060            result.tables,
2061            vec![IngestTableResult {
2062                table_key: "node:Person".to_string(),
2063                rows_loaded: 2
2064            }]
2065        );
2066
2067        let snap = db
2068            .snapshot_of(crate::db::ReadTarget::branch("feature"))
2069            .await
2070            .unwrap();
2071        let person_ds = snap.open("node:Person").await.unwrap();
2072        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
2073
2074        let batches: Vec<RecordBatch> = person_ds
2075            .scan()
2076            .try_into_stream()
2077            .await
2078            .unwrap()
2079            .try_collect()
2080            .await
2081            .unwrap();
2082        let mut ages_by_id = HashMap::new();
2083        for batch in &batches {
2084            let ids = batch
2085                .column_by_name("id")
2086                .unwrap()
2087                .as_any()
2088                .downcast_ref::<StringArray>()
2089                .unwrap();
2090            let ages = batch
2091                .column_by_name("age")
2092                .unwrap()
2093                .as_any()
2094                .downcast_ref::<Int32Array>()
2095                .unwrap();
2096            for idx in 0..ids.len() {
2097                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2098            }
2099        }
2100
2101        assert_eq!(ages_by_id.get("Bob"), Some(&26));
2102        assert_eq!(ages_by_id.get("Eve"), Some(&31));
2103        assert_eq!(ages_by_id.get("Alice"), Some(&30));
2104    }
2105
2106    #[tokio::test]
2107    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2108        let dir = tempfile::tempdir().unwrap();
2109        let uri = dir.path().to_str().unwrap();
2110        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2111
2112        db.ingest_as(
2113            "feature",
2114            Some("main"),
2115            TEST_DATA,
2116            LoadMode::Overwrite,
2117            Some("act-andrew"),
2118        )
2119        .await
2120        .unwrap();
2121
2122        let head = db
2123            .list_commits(Some("feature"))
2124            .await
2125            .unwrap()
2126            .into_iter()
2127            .last()
2128            .unwrap();
2129        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2130    }
2131
2132    #[test]
2133    fn test_range_constraint_rejects_nan() {
2134        use arrow_array::{Float64Array, RecordBatch, StringArray};
2135        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2136        use std::sync::Arc;
2137
2138        let schema = Arc::new(arrow_schema::Schema::new(vec![
2139            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2140            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2141        ]));
2142
2143        let batch = RecordBatch::try_new(
2144            schema.clone(),
2145            vec![
2146                Arc::new(StringArray::from(vec!["bad"])),
2147                Arc::new(Float64Array::from(vec![f64::NAN])),
2148            ],
2149        )
2150        .unwrap();
2151
2152        let node_type = NodeType {
2153            name: "Test".to_string(),
2154            implements: vec![],
2155            properties: Default::default(),
2156            key: None,
2157            unique_constraints: vec![],
2158            indices: vec![],
2159            range_constraints: vec![RangeConstraint {
2160                property: "score".to_string(),
2161                min: Some(LiteralValue::Float(0.0)),
2162                max: Some(LiteralValue::Float(1.0)),
2163            }],
2164            check_constraints: vec![],
2165            embed_sources: Default::default(),
2166            blob_properties: Default::default(),
2167            arrow_schema: schema,
2168        };
2169
2170        let result = validate_value_constraints(&batch, &node_type);
2171        assert!(result.is_err(), "expected NaN to be rejected");
2172        let err = result.unwrap_err().to_string();
2173        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2174    }
2175}