Skip to main content

omnigraph/loader/
mod.rs

1use std::collections::{HashMap, HashSet};
2
3use std::io::{BufRead, BufReader, Cursor};
4use std::sync::Arc;
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
8    Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array,
9    builder::{
10        ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, FixedSizeListBuilder,
11        Float32Builder, Float64Builder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
12        UInt32Builder, UInt64Builder,
13    },
14};
15use arrow_schema::DataType;
16use base64::Engine;
17use lance::blob::BlobArrayBuilder;
18use omnigraph_compiler::catalog::NodeType;
19use serde::{Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22use crate::db::Omnigraph;
23use crate::error::{OmniError, Result};
24use crate::exec::staging::{MutationStaging, PendingMode};
25
26/// Result of a load operation.
27#[derive(Debug, Clone, Default)]
28pub struct LoadResult {
29    pub nodes_loaded: HashMap<String, usize>,
30    pub edges_loaded: HashMap<String, usize>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct IngestTableResult {
35    pub table_key: String,
36    pub rows_loaded: usize,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IngestResult {
41    pub branch: String,
42    pub base_branch: String,
43    pub branch_created: bool,
44    pub mode: LoadMode,
45    pub tables: Vec<IngestTableResult>,
46}
47
48/// Load mode for data ingestion.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum LoadMode {
52    /// Overwrite existing data.
53    Overwrite,
54    /// Append to existing data.
55    Append,
56    /// Merge by `id` key (upsert).
57    Merge,
58}
59
60/// Load JSONL data into an Omnigraph database.
61pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
62    let current_branch = db.active_branch().map(str::to_string);
63    let branch = current_branch.as_deref().unwrap_or("main");
64    db.load(branch, data, mode).await
65}
66
67/// Load JSONL data from a file path.
68pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
69    let current_branch = db.active_branch().map(str::to_string);
70    let branch = current_branch.as_deref().unwrap_or("main");
71    db.load_file(branch, path, mode).await
72}
73
74impl Omnigraph {
75    pub async fn ingest(
76        &mut self,
77        branch: &str,
78        from: Option<&str>,
79        data: &str,
80        mode: LoadMode,
81    ) -> Result<IngestResult> {
82        self.ingest_as(branch, from, data, mode, None).await
83    }
84
85    pub async fn ingest_as(
86        &mut self,
87        branch: &str,
88        from: Option<&str>,
89        data: &str,
90        mode: LoadMode,
91        actor_id: Option<&str>,
92    ) -> Result<IngestResult> {
93        let previous_actor = self.audit_actor_id.clone();
94        self.audit_actor_id = actor_id.map(str::to_string);
95        let result = self
96            .ingest_with_current_actor(branch, from, data, mode)
97            .await;
98        self.audit_actor_id = previous_actor;
99        result
100    }
101
102    pub async fn ingest_file(
103        &mut self,
104        branch: &str,
105        from: Option<&str>,
106        path: &str,
107        mode: LoadMode,
108    ) -> Result<IngestResult> {
109        self.ingest_file_as(branch, from, path, mode, None).await
110    }
111
112    pub async fn ingest_file_as(
113        &mut self,
114        branch: &str,
115        from: Option<&str>,
116        path: &str,
117        mode: LoadMode,
118        actor_id: Option<&str>,
119    ) -> Result<IngestResult> {
120        let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
121        self.ingest_as(branch, from, &data, mode, actor_id).await
122    }
123
124    async fn ingest_with_current_actor(
125        &mut self,
126        branch: &str,
127        from: Option<&str>,
128        data: &str,
129        mode: LoadMode,
130    ) -> Result<IngestResult> {
131        self.ensure_schema_state_valid().await?;
132        let target_branch =
133            Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
134        let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
135            .unwrap_or_else(|| "main".to_string());
136        let branch_created = !self
137            .branch_list()
138            .await?
139            .iter()
140            .any(|name| name == &target_branch);
141        if branch_created {
142            self.branch_create_from(crate::db::ReadTarget::branch(&base_branch), &target_branch)
143                .await?;
144        }
145
146        let result = self.load(&target_branch, data, mode).await?;
147        Ok(IngestResult {
148            branch: target_branch,
149            base_branch,
150            branch_created,
151            mode,
152            tables: result.to_ingest_tables(),
153        })
154    }
155
156    pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
157        self.ensure_schema_state_valid().await?;
158        // Reject internal `__run__*` / system-prefixed branches at the
159        // public write boundary. Direct-publish paths assert this
160        // explicitly so a caller can't write to legacy or system
161        // staging branches by passing the prefix verbatim.
162        crate::db::ensure_public_branch_ref(branch, "load")?;
163        // Branch convention: `None` represents `main`. Re-normalizing to
164        // `Some("main")` here would route the publisher commit through a
165        // separate coordinator (the cross-branch path in
166        // `commit_prepared_updates_on_branch_with_expected`) and leave
167        // `self.coordinator` with a stale manifest snapshot.
168        let requested = Self::normalize_branch_name(branch)?;
169        // Direct-to-target writes: no Run state machine, no `__run__` staging
170        // branch. Cross-table OCC is enforced by the publisher's
171        // `expected_table_versions` CAS inside `load_jsonl_reader`.
172        self.load_direct_on_branch(requested.as_deref(), data, mode)
173            .await
174    }
175
176    pub async fn load_file(
177        &mut self,
178        branch: &str,
179        path: &str,
180        mode: LoadMode,
181    ) -> Result<LoadResult> {
182        let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
183        self.load(branch, &data, mode).await
184    }
185
186    async fn load_direct_on_branch(
187        &mut self,
188        branch: Option<&str>,
189        data: &str,
190        mode: LoadMode,
191    ) -> Result<LoadResult> {
192        let reader = BufReader::new(Cursor::new(data.as_bytes()));
193        load_jsonl_reader(self, branch, reader, mode).await
194    }
195}
196
197impl LoadMode {
198    pub fn as_str(self) -> &'static str {
199        match self {
200            LoadMode::Overwrite => "overwrite",
201            LoadMode::Append => "append",
202            LoadMode::Merge => "merge",
203        }
204    }
205}
206
207impl LoadResult {
208    pub fn to_ingest_tables(&self) -> Vec<IngestTableResult> {
209        let mut tables = self
210            .nodes_loaded
211            .iter()
212            .map(|(type_name, rows_loaded)| IngestTableResult {
213                table_key: format!("node:{type_name}"),
214                rows_loaded: *rows_loaded,
215            })
216            .chain(
217                self.edges_loaded
218                    .iter()
219                    .map(|(edge_name, rows_loaded)| IngestTableResult {
220                        table_key: format!("edge:{edge_name}"),
221                        rows_loaded: *rows_loaded,
222                    }),
223            )
224            .collect::<Vec<_>>();
225        tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
226        tables
227    }
228}
229
230async fn load_jsonl_reader<R: BufRead>(
231    db: &mut Omnigraph,
232    branch: Option<&str>,
233    reader: R,
234    mode: LoadMode,
235) -> Result<LoadResult> {
236    let catalog = db.catalog().clone();
237
238    // Phase 1: Parse all lines, spool into per-type collections
239    let mut node_rows: HashMap<String, Vec<JsonValue>> = HashMap::new();
240    let mut edge_rows: HashMap<String, Vec<(String, String, JsonValue)>> = HashMap::new();
241
242    for (line_num, line) in reader.lines().enumerate() {
243        let line = line?;
244        let line = line.trim();
245        if line.is_empty() {
246            continue;
247        }
248        let value: JsonValue = serde_json::from_str(line).map_err(|e| {
249            OmniError::manifest(format!("invalid JSON on line {}: {}", line_num + 1, e))
250        })?;
251
252        if let Some(type_name) = value.get("type").and_then(|v| v.as_str()) {
253            if !catalog.node_types.contains_key(type_name) {
254                return Err(OmniError::manifest(format!(
255                    "line {}: unknown node type '{}'",
256                    line_num + 1,
257                    type_name
258                )));
259            }
260            let data = value
261                .get("data")
262                .cloned()
263                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
264            node_rows
265                .entry(type_name.to_string())
266                .or_default()
267                .push(data);
268        } else if let Some(edge_name) = value.get("edge").and_then(|v| v.as_str()) {
269            if catalog.lookup_edge_by_name(edge_name).is_none() {
270                return Err(OmniError::manifest(format!(
271                    "line {}: unknown edge type '{}'",
272                    line_num + 1,
273                    edge_name
274                )));
275            }
276            let from = value
277                .get("from")
278                .and_then(|v| v.as_str())
279                .ok_or_else(|| {
280                    OmniError::manifest(format!("line {}: edge missing 'from'", line_num + 1))
281                })?
282                .to_string();
283            let to = value
284                .get("to")
285                .and_then(|v| v.as_str())
286                .ok_or_else(|| {
287                    OmniError::manifest(format!("line {}: edge missing 'to'", line_num + 1))
288                })?
289                .to_string();
290            let data = value
291                .get("data")
292                .cloned()
293                .unwrap_or(JsonValue::Object(serde_json::Map::new()));
294            let canonical = catalog.lookup_edge_by_name(edge_name).unwrap().name.clone();
295            edge_rows
296                .entry(canonical)
297                .or_default()
298                .push((from, to, data));
299        } else {
300            return Err(OmniError::manifest(format!(
301                "line {}: expected 'type' or 'edge' field",
302                line_num + 1
303            )));
304        }
305    }
306
307    // Phase 2: Build per-type RecordBatches and accumulate into the
308    // staging pipeline. For Append/Merge, batches go into an in-memory
309    // accumulator and a single `stage_*` + `commit_staged` per touched
310    // table runs at end-of-load — a mid-load failure (RI / cardinality
311    // violation) leaves Lance HEAD untouched. For Overwrite, the legacy
312    // inline-commit path is preserved (truncate+append doesn't fit the
313    // staged shape cleanly, and overwrite has no in-flight read-your-writes
314    // requirement).
315
316    let mut result = LoadResult::default();
317    let snapshot = db.snapshot_for_branch(branch).await?;
318    let use_staging = !matches!(mode, LoadMode::Overwrite);
319    let mut staging = MutationStaging::default();
320    let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
321    let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
322    let pending_mode = match mode {
323        LoadMode::Merge => PendingMode::Merge,
324        // Append-mode loads accumulate as Append. Edge tables (no @key)
325        // and no-key node tables stay safe on the stage_append path. The
326        // Merge mode applies dedupe-by-id; Append assumes unique inputs.
327        LoadMode::Append => PendingMode::Append,
328        LoadMode::Overwrite => PendingMode::Append, // unused
329    };
330
331    // Phase 2a: build and validate every node batch up front. Cheap and
332    // synchronous — surfaces validation errors before any S3 traffic.
333    let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
334        Vec::with_capacity(node_rows.len());
335    for (type_name, rows) in &node_rows {
336        let node_type = &catalog.node_types[type_name];
337        let batch = build_node_batch(node_type, rows)?;
338        validate_value_constraints(&batch, node_type)?;
339        validate_enum_constraints(&batch, &node_type.properties, type_name)?;
340        let unique_props = unique_property_names_for_node(node_type);
341        if !unique_props.is_empty() {
342            enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
343        }
344        let loaded_count = batch.num_rows();
345        let table_key = format!("node:{}", type_name);
346        let entry = snapshot
347            .entry(&table_key)
348            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
349        if !use_staging {
350            overwrite_expected.insert(table_key.clone(), entry.table_version);
351        }
352        prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
353    }
354
355    // Phase 2b: write every node type. Append/Merge → in-memory
356    // accumulator. Overwrite → concurrent inline-commit (legacy path).
357    if use_staging {
358        for (type_name, table_key, batch, loaded_count) in prepared_nodes {
359            let (ds, full_path, table_branch) = db
360                .open_for_mutation_on_branch(branch, &table_key)
361                .await?;
362            let expected_version = ds.version().version;
363            staging.ensure_path(
364                &table_key,
365                full_path,
366                table_branch,
367                expected_version,
368            );
369            let schema = batch.schema();
370            staging.append_batch(&table_key, schema, pending_mode, batch)?;
371            result.nodes_loaded.insert(type_name, loaded_count);
372        }
373    } else {
374        let node_write_results =
375            write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
376        for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
377            overwrite_updates.push(crate::db::SubTableUpdate {
378                table_key,
379                table_version: state.version,
380                table_branch,
381                row_count: state.row_count,
382                version_metadata: state.version_metadata,
383            });
384            result.nodes_loaded.insert(type_name, loaded_count);
385        }
386    }
387
388    // Phase 2c: Validate edge referential integrity — every src/dst must
389    // reference an existing node ID in the appropriate type. For staged
390    // loads, the lookup unions snapshot-committed IDs with the in-memory
391    // pending batches (which carry the just-staged node inserts).
392    for (edge_name, rows) in &edge_rows {
393        let edge_type = &catalog.edge_types[edge_name];
394        let from_ids = if use_staging {
395            collect_node_ids_with_pending(
396                db,
397                branch,
398                &edge_type.from_type,
399                &staging,
400            )
401            .await?
402        } else {
403            collect_node_ids(
404                db,
405                branch,
406                &edge_type.from_type,
407                &node_rows,
408                &catalog,
409                &overwrite_updates,
410            )
411            .await?
412        };
413        let to_ids = if use_staging {
414            collect_node_ids_with_pending(
415                db,
416                branch,
417                &edge_type.to_type,
418                &staging,
419            )
420            .await?
421        } else {
422            collect_node_ids(
423                db,
424                branch,
425                &edge_type.to_type,
426                &node_rows,
427                &catalog,
428                &overwrite_updates,
429            )
430            .await?
431        };
432
433        for (i, (src, dst, _)) in rows.iter().enumerate() {
434            if !from_ids.contains(src.as_str()) {
435                return Err(OmniError::manifest(format!(
436                    "edge {} row {}: src '{}' not found in {}",
437                    edge_name,
438                    i + 1,
439                    src,
440                    edge_type.from_type
441                )));
442            }
443            if !to_ids.contains(dst.as_str()) {
444                return Err(OmniError::manifest(format!(
445                    "edge {} row {}: dst '{}' not found in {}",
446                    edge_name,
447                    i + 1,
448                    dst,
449                    edge_type.to_type
450                )));
451            }
452        }
453    }
454
455    // Phase 2d: build edge batches.
456    let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
457        Vec::with_capacity(edge_rows.len());
458    for (edge_name, rows) in &edge_rows {
459        let edge_type = &catalog.edge_types[edge_name];
460        let batch = build_edge_batch(edge_type, rows)?;
461        validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
462        let unique_props = unique_property_names_for_edge(edge_type);
463        if !unique_props.is_empty() {
464            enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
465        }
466        let loaded_count = batch.num_rows();
467        let table_key = format!("edge:{}", edge_name);
468        let entry = snapshot
469            .entry(&table_key)
470            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
471        if !use_staging {
472            overwrite_expected.insert(table_key.clone(), entry.table_version);
473        }
474        prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
475    }
476
477    // Phase 2e: write every edge type. Same dispatch as Phase 2b.
478    if use_staging {
479        for (edge_name, table_key, batch, loaded_count) in prepared_edges {
480            let (ds, full_path, table_branch) = db
481                .open_for_mutation_on_branch(branch, &table_key)
482                .await?;
483            let expected_version = ds.version().version;
484            staging.ensure_path(
485                &table_key,
486                full_path,
487                table_branch,
488                expected_version,
489            );
490            let schema = batch.schema();
491            staging.append_batch(&table_key, schema, pending_mode, batch)?;
492            result.edges_loaded.insert(edge_name, loaded_count);
493        }
494    } else {
495        let edge_write_results =
496            write_batches_concurrently(db, branch, mode, prepared_edges).await?;
497        for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
498            overwrite_updates.push(crate::db::SubTableUpdate {
499                table_key,
500                table_version: state.version,
501                table_branch,
502                row_count: state.row_count,
503                version_metadata: state.version_metadata,
504            });
505            result.edges_loaded.insert(edge_name, loaded_count);
506        }
507    }
508
509    // Phase 3: Validate edge cardinality constraints (before commit —
510    // invalid data must not be committed). Staged path scans committed
511    // edges via Lance + iterates pending edges in-memory. Overwrite path
512    // opens the just-written version (legacy behavior).
513    for (edge_name, _) in &edge_rows {
514        let edge_type = &catalog.edge_types[edge_name];
515        let table_key = format!("edge:{}", edge_name);
516        if use_staging {
517            validate_edge_cardinality_with_pending_loader(
518                db,
519                branch,
520                edge_type,
521                &table_key,
522                &staging,
523                mode,
524            )
525            .await?;
526        } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
527            validate_edge_cardinality(
528                db,
529                branch,
530                edge_name,
531                update.table_version,
532                update.table_branch.as_deref(),
533            )
534            .await?;
535        }
536    }
537
538    // Phase 4: Atomic manifest commit with publisher-level OCC.
539    if use_staging {
540        let (updates, expected_versions) = staging.finalize(db, branch).await?;
541        db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
542            .await?;
543    } else {
544        db.commit_updates_on_branch_with_expected(
545            branch,
546            &overwrite_updates,
547            &overwrite_expected,
548        )
549        .await?;
550    }
551
552    Ok(result)
553}
554
555fn build_node_batch(node_type: &NodeType, rows: &[JsonValue]) -> Result<RecordBatch> {
556    let schema = node_type.arrow_schema.clone();
557
558    // Build id column: explicit id, @key value, or generated ULID.
559    let ids: Vec<String> = rows
560        .iter()
561        .map(|row| {
562            let explicit_id = row.get("id").and_then(|v| v.as_str()).map(str::to_string);
563            if let Some(key_prop) = node_type.key_property() {
564                let key_value = row
565                    .get(key_prop)
566                    .and_then(|v| v.as_str())
567                    .map(|s| s.to_string())
568                    .ok_or_else(|| {
569                        OmniError::manifest(format!(
570                            "node {} missing @key property '{}'",
571                            node_type.name, key_prop
572                        ))
573                    })?;
574                if let Some(explicit_id) = explicit_id {
575                    if explicit_id != key_value {
576                        return Err(OmniError::manifest(format!(
577                            "node {} has explicit id '{}' that does not match @key property '{}' value '{}'",
578                            node_type.name, explicit_id, key_prop, key_value
579                        )));
580                    }
581                }
582                Ok(key_value)
583            } else if let Some(explicit_id) = explicit_id {
584                Ok(explicit_id)
585            } else {
586                Ok(generate_id())
587            }
588        })
589        .collect::<Result<Vec<_>>>()?;
590
591    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
592    columns.push(Arc::new(StringArray::from(ids)));
593
594    // Build property columns (skip "id" field at index 0)
595    for field in schema.fields().iter().skip(1) {
596        if node_type.blob_properties.contains(field.name()) {
597            let col = build_blob_column(field.name(), field.is_nullable(), rows)?;
598            columns.push(col);
599        } else {
600            let col =
601                build_column_from_json(field.name(), field.data_type(), field.is_nullable(), rows)?;
602            columns.push(col);
603        }
604    }
605
606    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
607}
608
609fn build_edge_batch(
610    edge_type: &omnigraph_compiler::catalog::EdgeType,
611    rows: &[(String, String, JsonValue)],
612) -> Result<RecordBatch> {
613    let schema = edge_type.arrow_schema.clone();
614
615    let ids: Vec<String> = rows
616        .iter()
617        .map(|(_, _, data)| {
618            data.get("id")
619                .and_then(|v| v.as_str())
620                .map(str::to_string)
621                .unwrap_or_else(generate_id)
622        })
623        .collect();
624    let srcs: Vec<&str> = rows.iter().map(|(from, _, _)| from.as_str()).collect();
625    let dsts: Vec<&str> = rows.iter().map(|(_, to, _)| to.as_str()).collect();
626
627    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
628    columns.push(Arc::new(StringArray::from(ids)));
629    columns.push(Arc::new(StringArray::from(srcs)));
630    columns.push(Arc::new(StringArray::from(dsts)));
631
632    // Build edge property columns (skip id, src, dst at indices 0-2)
633    let data_values: Vec<JsonValue> = rows.iter().map(|(_, _, data)| data.clone()).collect();
634    for field in schema.fields().iter().skip(3) {
635        if edge_type.blob_properties.contains(field.name()) {
636            let col = build_blob_column(field.name(), field.is_nullable(), &data_values)?;
637            columns.push(col);
638        } else {
639            let col = build_column_from_json(
640                field.name(),
641                field.data_type(),
642                field.is_nullable(),
643                &data_values,
644            )?;
645            columns.push(col);
646        }
647    }
648
649    RecordBatch::try_new(schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
650}
651
652/// Append a blob value (URI or base64 bytes) to a BlobArrayBuilder.
653pub(crate) fn append_blob_value(builder: &mut BlobArrayBuilder, value: &str) -> Result<()> {
654    if let Some(encoded) = value.strip_prefix("base64:") {
655        let bytes = base64::engine::general_purpose::STANDARD
656            .decode(encoded)
657            .map_err(|e| OmniError::manifest(format!("invalid base64 blob data: {}", e)))?;
658        builder
659            .push_bytes(bytes)
660            .map_err(|e| OmniError::Lance(e.to_string()))
661    } else {
662        // Treat as URI (file://, s3://, gs://, or any other scheme)
663        builder
664            .push_uri(value)
665            .map_err(|e| OmniError::Lance(e.to_string()))
666    }
667}
668
669/// Build a blob column from JSON values using Lance BlobArrayBuilder.
670fn build_blob_column(name: &str, nullable: bool, rows: &[JsonValue]) -> Result<ArrayRef> {
671    let mut builder = BlobArrayBuilder::new(rows.len());
672    for row in rows {
673        match row.get(name) {
674            Some(JsonValue::String(s)) => {
675                append_blob_value(&mut builder, s)?;
676            }
677            Some(JsonValue::Null) | None if nullable => {
678                builder
679                    .push_null()
680                    .map_err(|e| OmniError::Lance(e.to_string()))?;
681            }
682            Some(JsonValue::Null) | None => {
683                return Err(OmniError::manifest(format!(
684                    "non-nullable blob property '{}' has null values",
685                    name
686                )));
687            }
688            _ => {
689                return Err(OmniError::manifest(format!(
690                    "blob property '{}' must be a URI string or base64: prefixed data",
691                    name
692                )));
693            }
694        }
695    }
696    builder
697        .finish()
698        .map_err(|e| OmniError::Lance(e.to_string()))
699}
700
701fn build_column_from_json(
702    name: &str,
703    data_type: &DataType,
704    nullable: bool,
705    rows: &[JsonValue],
706) -> Result<ArrayRef> {
707    let array: ArrayRef = match data_type {
708        DataType::Utf8 => {
709            let values: Vec<Option<String>> = rows
710                .iter()
711                .map(|row| {
712                    row.get(name)
713                        .and_then(|v| v.as_str())
714                        .map(|s| s.to_string())
715                })
716                .collect();
717            Arc::new(StringArray::from(values))
718        }
719        DataType::Int32 => {
720            let values: Vec<Option<i32>> = rows
721                .iter()
722                .map(|row| row.get(name).and_then(|v| v.as_i64()).map(|v| v as i32))
723                .collect();
724            Arc::new(Int32Array::from(values))
725        }
726        DataType::Int64 => {
727            let values: Vec<Option<i64>> = rows
728                .iter()
729                .map(|row| row.get(name).and_then(|v| v.as_i64()))
730                .collect();
731            Arc::new(Int64Array::from(values))
732        }
733        DataType::UInt32 => {
734            let values: Vec<Option<u32>> = rows
735                .iter()
736                .map(|row| row.get(name).and_then(|v| v.as_u64()).map(|v| v as u32))
737                .collect();
738            Arc::new(UInt32Array::from(values))
739        }
740        DataType::UInt64 => {
741            let values: Vec<Option<u64>> = rows
742                .iter()
743                .map(|row| row.get(name).and_then(|v| v.as_u64()))
744                .collect();
745            Arc::new(UInt64Array::from(values))
746        }
747        DataType::Float32 => {
748            let values: Vec<Option<f32>> = rows
749                .iter()
750                .map(|row| row.get(name).and_then(|v| v.as_f64()).map(|v| v as f32))
751                .collect();
752            Arc::new(Float32Array::from(values))
753        }
754        DataType::Float64 => {
755            let values: Vec<Option<f64>> = rows
756                .iter()
757                .map(|row| row.get(name).and_then(|v| v.as_f64()))
758                .collect();
759            Arc::new(Float64Array::from(values))
760        }
761        DataType::Boolean => {
762            let values: Vec<Option<bool>> = rows
763                .iter()
764                .map(|row| row.get(name).and_then(|v| v.as_bool()))
765                .collect();
766            Arc::new(BooleanArray::from(values))
767        }
768        DataType::Date32 => {
769            let mut values = Vec::with_capacity(rows.len());
770            for row in rows {
771                values.push(parse_date32_json_value(
772                    row.get(name).unwrap_or(&JsonValue::Null),
773                )?);
774            }
775            Arc::new(Date32Array::from(values))
776        }
777        DataType::Date64 => {
778            let mut values = Vec::with_capacity(rows.len());
779            for row in rows {
780                values.push(parse_date64_json_value(
781                    row.get(name).unwrap_or(&JsonValue::Null),
782                )?);
783            }
784            Arc::new(Date64Array::from(values))
785        }
786        DataType::List(field) => {
787            let mut builder = ListBuilder::with_capacity(
788                make_list_value_builder(field.data_type(), rows.len())?,
789                rows.len(),
790            )
791            .with_field(field.clone());
792            for row in rows {
793                let value = row.get(name).unwrap_or(&JsonValue::Null);
794                if value.is_null() {
795                    builder.append(false);
796                    continue;
797                }
798                let items = value.as_array().ok_or_else(|| {
799                    OmniError::manifest(format!(
800                        "list property '{}' expects a JSON array, got {}",
801                        name, value
802                    ))
803                })?;
804                for item in items {
805                    append_json_list_item(builder.values(), field.data_type(), item)?;
806                }
807                builder.append(true);
808            }
809            Arc::new(builder.finish())
810        }
811        DataType::FixedSizeList(child_field, dim) => {
812            // Vector type: parse JSON array of floats into FixedSizeList<Float32>
813            let dim = *dim;
814            let mut builder = FixedSizeListBuilder::with_capacity(
815                Float32Builder::with_capacity(rows.len() * dim as usize),
816                dim,
817                rows.len(),
818            )
819            .with_field(child_field.clone());
820            for row in rows {
821                if let Some(arr) = row.get(name).and_then(|v| v.as_array()) {
822                    if arr.len() != dim as usize {
823                        return Err(OmniError::manifest(format!(
824                            "vector property '{}' expects {} dimensions, got {}",
825                            name,
826                            dim,
827                            arr.len()
828                        )));
829                    }
830                    for val in arr {
831                        builder
832                            .values()
833                            .append_value(val.as_f64().unwrap_or(0.0) as f32);
834                    }
835                    builder.append(true);
836                } else if nullable {
837                    for _ in 0..dim as usize {
838                        builder.values().append_null();
839                    }
840                    builder.append(false);
841                } else {
842                    return Err(OmniError::manifest(format!(
843                        "non-nullable vector property '{}' has null values",
844                        name
845                    )));
846                }
847            }
848            Arc::new(builder.finish())
849        }
850        _ => {
851            // Unsupported type: fill with nulls
852            let values: Vec<Option<&str>> = vec![None; rows.len()];
853            Arc::new(StringArray::from(values))
854        }
855    };
856
857    if !nullable && array.null_count() > 0 {
858        return Err(OmniError::manifest(format!(
859            "non-nullable property '{}' has null or invalid values",
860            name
861        )));
862    }
863
864    Ok(array)
865}
866
867fn make_list_value_builder(data_type: &DataType, capacity: usize) -> Result<Box<dyn ArrayBuilder>> {
868    Ok(match data_type {
869        DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 8)),
870        DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
871        DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
872        DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
873        DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
874        DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
875        DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
876        DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
877        DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
878        DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
879        other => {
880            return Err(OmniError::manifest(format!(
881                "unsupported list element data type {:?}",
882                other
883            )));
884        }
885    })
886}
887
888fn append_json_list_item(
889    builder: &mut Box<dyn ArrayBuilder>,
890    data_type: &DataType,
891    value: &JsonValue,
892) -> Result<()> {
893    match data_type {
894        DataType::Utf8 => {
895            let builder = builder
896                .as_any_mut()
897                .downcast_mut::<StringBuilder>()
898                .ok_or_else(|| OmniError::manifest("list Utf8 builder downcast failed"))?;
899            if let Some(value) = value.as_str() {
900                builder.append_value(value);
901            } else {
902                builder.append_null();
903            }
904        }
905        DataType::Boolean => {
906            let builder = builder
907                .as_any_mut()
908                .downcast_mut::<BooleanBuilder>()
909                .ok_or_else(|| OmniError::manifest("list Boolean builder downcast failed"))?;
910            if let Some(value) = value.as_bool() {
911                builder.append_value(value);
912            } else {
913                builder.append_null();
914            }
915        }
916        DataType::Int32 => {
917            let builder = builder
918                .as_any_mut()
919                .downcast_mut::<Int32Builder>()
920                .ok_or_else(|| OmniError::manifest("list Int32 builder downcast failed"))?;
921            if let Some(value) = value.as_i64() {
922                let value = i32::try_from(value).map_err(|_| {
923                    OmniError::manifest(format!("list value {} exceeds Int32 range", value))
924                })?;
925                builder.append_value(value);
926            } else {
927                builder.append_null();
928            }
929        }
930        DataType::Int64 => {
931            let builder = builder
932                .as_any_mut()
933                .downcast_mut::<Int64Builder>()
934                .ok_or_else(|| OmniError::manifest("list Int64 builder downcast failed"))?;
935            if let Some(value) = value.as_i64() {
936                builder.append_value(value);
937            } else {
938                builder.append_null();
939            }
940        }
941        DataType::UInt32 => {
942            let builder = builder
943                .as_any_mut()
944                .downcast_mut::<UInt32Builder>()
945                .ok_or_else(|| OmniError::manifest("list UInt32 builder downcast failed"))?;
946            if let Some(value) = value.as_u64() {
947                let value = u32::try_from(value).map_err(|_| {
948                    OmniError::manifest(format!("list value {} exceeds UInt32 range", value))
949                })?;
950                builder.append_value(value);
951            } else {
952                builder.append_null();
953            }
954        }
955        DataType::UInt64 => {
956            let builder = builder
957                .as_any_mut()
958                .downcast_mut::<UInt64Builder>()
959                .ok_or_else(|| OmniError::manifest("list UInt64 builder downcast failed"))?;
960            if let Some(value) = value.as_u64() {
961                builder.append_value(value);
962            } else {
963                builder.append_null();
964            }
965        }
966        DataType::Float32 => {
967            let builder = builder
968                .as_any_mut()
969                .downcast_mut::<Float32Builder>()
970                .ok_or_else(|| OmniError::manifest("list Float32 builder downcast failed"))?;
971            if let Some(value) = value.as_f64() {
972                builder.append_value(value as f32);
973            } else {
974                builder.append_null();
975            }
976        }
977        DataType::Float64 => {
978            let builder = builder
979                .as_any_mut()
980                .downcast_mut::<Float64Builder>()
981                .ok_or_else(|| OmniError::manifest("list Float64 builder downcast failed"))?;
982            if let Some(value) = value.as_f64() {
983                builder.append_value(value);
984            } else {
985                builder.append_null();
986            }
987        }
988        DataType::Date32 => {
989            let builder = builder
990                .as_any_mut()
991                .downcast_mut::<Date32Builder>()
992                .ok_or_else(|| OmniError::manifest("list Date32 builder downcast failed"))?;
993            if let Some(value) = parse_date32_json_value(value)? {
994                builder.append_value(value);
995            } else {
996                builder.append_null();
997            }
998        }
999        DataType::Date64 => {
1000            let builder = builder
1001                .as_any_mut()
1002                .downcast_mut::<Date64Builder>()
1003                .ok_or_else(|| OmniError::manifest("list Date64 builder downcast failed"))?;
1004            if let Some(value) = parse_date64_json_value(value)? {
1005                builder.append_value(value);
1006            } else {
1007                builder.append_null();
1008            }
1009        }
1010        other => {
1011            return Err(OmniError::manifest(format!(
1012                "unsupported list element data type {:?}",
1013                other
1014            )));
1015        }
1016    }
1017
1018    Ok(())
1019}
1020
1021fn parse_date32_json_value(value: &JsonValue) -> Result<Option<i32>> {
1022    if value.is_null() {
1023        return Ok(None);
1024    }
1025    if let Some(days) = value.as_i64() {
1026        let days = i32::try_from(days)
1027            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1028        return Ok(Some(days));
1029    }
1030    if let Some(days) = value.as_u64() {
1031        let days = i32::try_from(days)
1032            .map_err(|_| OmniError::manifest(format!("Date value out of range: {}", days)))?;
1033        return Ok(Some(days));
1034    }
1035    if let Some(value) = value.as_str() {
1036        return Ok(Some(parse_date32_literal(value)?));
1037    }
1038    Ok(None)
1039}
1040
1041fn parse_date64_json_value(value: &JsonValue) -> Result<Option<i64>> {
1042    if value.is_null() {
1043        return Ok(None);
1044    }
1045    if let Some(ms) = value.as_i64() {
1046        return Ok(Some(ms));
1047    }
1048    if let Some(ms) = value.as_u64() {
1049        let ms = i64::try_from(ms)
1050            .map_err(|_| OmniError::manifest(format!("DateTime value out of range: {}", ms)))?;
1051        return Ok(Some(ms));
1052    }
1053    if let Some(value) = value.as_str() {
1054        return Ok(Some(parse_date64_literal(value)?));
1055    }
1056    Ok(None)
1057}
1058
1059/// Write a batch to a Lance dataset, returning (new_version, total_row_count).
1060/// How many per-type Lance writes to run concurrently during a load.
1061///
1062/// Each write is an independent S3 manifest + fragment write against a
1063/// different table. Ops within a single table must still be serial (Lance
1064/// OCC on the manifest), but cross-table writes have no shared state.
1065///
1066/// 8 is a conservative default — enough to overlap S3 round-trip latency
1067/// across the typical 10-30 table schemas without flooding the runtime.
1068/// Override via `OMNIGRAPH_LOAD_CONCURRENCY` for benchmarking.
1069const DEFAULT_LOAD_WRITE_CONCURRENCY: usize = 8;
1070
1071fn load_write_concurrency() -> usize {
1072    std::env::var("OMNIGRAPH_LOAD_CONCURRENCY")
1073        .ok()
1074        .and_then(|v| v.parse::<usize>().ok())
1075        .filter(|v| *v > 0)
1076        .unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
1077}
1078
1079/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples
1080/// concurrently. Returns results in original iteration order so callers can
1081/// zip them back to per-type metadata.
1082async fn write_batches_concurrently(
1083    db: &Omnigraph,
1084    branch: Option<&str>,
1085    mode: LoadMode,
1086    prepared: Vec<(String, String, RecordBatch, usize)>,
1087) -> Result<
1088    Vec<(
1089        String,
1090        String,
1091        usize,
1092        crate::table_store::TableState,
1093        Option<String>,
1094    )>,
1095> {
1096    use futures::stream::StreamExt;
1097
1098    if prepared.is_empty() {
1099        return Ok(Vec::new());
1100    }
1101
1102    let concurrency = load_write_concurrency().min(prepared.len()).max(1);
1103
1104    futures::stream::iter(prepared.into_iter().map(
1105        |(type_name, table_key, batch, loaded_count)| async move {
1106            let (state, table_branch) =
1107                write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
1108            Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
1109        },
1110    ))
1111    .buffered(concurrency)
1112    .collect::<Vec<Result<_>>>()
1113    .await
1114    .into_iter()
1115    .collect()
1116}
1117
1118async fn write_batch_to_dataset(
1119    db: &Omnigraph,
1120    branch: Option<&str>,
1121    table_key: &str,
1122    batch: RecordBatch,
1123    mode: LoadMode,
1124) -> Result<(crate::table_store::TableState, Option<String>)> {
1125    let (mut ds, full_path, table_branch) =
1126        db.open_for_mutation_on_branch(branch, table_key).await?;
1127    let table_store = db.table_store();
1128
1129    match mode {
1130        LoadMode::Overwrite => {
1131            let state = table_store
1132                .overwrite_batch(&full_path, &mut ds, batch)
1133                .await?;
1134            Ok((state, table_branch))
1135        }
1136        LoadMode::Append => {
1137            let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
1138            Ok((state, table_branch))
1139        }
1140        LoadMode::Merge => {
1141            let state = table_store
1142                .merge_insert_batch(
1143                    &full_path,
1144                    ds,
1145                    batch,
1146                    vec!["id".to_string()],
1147                    lance::dataset::WhenMatched::UpdateAll,
1148                    lance::dataset::WhenNotMatched::InsertAll,
1149                )
1150                .await?;
1151            Ok((state, table_branch))
1152        }
1153    }
1154}
1155
1156fn generate_id() -> String {
1157    ulid::Ulid::new().to_string()
1158}
1159
1160pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
1161    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1162    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date32)
1163        .map_err(|e| OmniError::manifest(format!("invalid Date literal '{}': {}", value, e)))?;
1164    let out = casted
1165        .as_any()
1166        .downcast_ref::<Date32Array>()
1167        .ok_or_else(|| OmniError::manifest("Date32 cast produced unexpected array"))?;
1168    if out.is_null(0) {
1169        return Err(OmniError::manifest(format!(
1170            "invalid Date literal '{}'",
1171            value
1172        )));
1173    }
1174    Ok(out.value(0))
1175}
1176
1177pub(crate) fn parse_date64_literal(value: &str) -> Result<i64> {
1178    let raw: Arc<dyn Array> = Arc::new(StringArray::from(vec![Some(value)]));
1179    let casted = arrow_cast::cast::cast(raw.as_ref(), &DataType::Date64)
1180        .map_err(|e| OmniError::manifest(format!("invalid DateTime literal '{}': {}", value, e)))?;
1181    let out = casted
1182        .as_any()
1183        .downcast_ref::<Date64Array>()
1184        .ok_or_else(|| OmniError::manifest("Date64 cast produced unexpected array"))?;
1185    if out.is_null(0) {
1186        return Err(OmniError::manifest(format!(
1187            "invalid DateTime literal '{}'",
1188            value
1189        )));
1190    }
1191    Ok(out.value(0))
1192}
1193
1194// ─── Value constraint validation ─────────────────────────────────────────────
1195
1196pub(crate) fn validate_value_constraints(
1197    batch: &RecordBatch,
1198    node_type: &omnigraph_compiler::catalog::NodeType,
1199) -> Result<()> {
1200    use arrow_array::Array;
1201
1202    // Range constraints
1203    for rc in &node_type.range_constraints {
1204        let Some(col) = batch.column_by_name(&rc.property) else {
1205            continue;
1206        };
1207        for row in 0..batch.num_rows() {
1208            if col.is_null(row) {
1209                continue;
1210            }
1211            let value = extract_numeric_value(col, row);
1212            if let Some(val) = value {
1213                if val.is_nan() {
1214                    return Err(OmniError::manifest(format!(
1215                        "@range violation on {}.{}: value is NaN",
1216                        node_type.name, rc.property
1217                    )));
1218                }
1219                if let Some(ref min) = rc.min {
1220                    let min_f = literal_value_to_f64(min);
1221                    if val < min_f {
1222                        return Err(OmniError::manifest(format!(
1223                            "@range violation on {}.{}: value {} < min {}",
1224                            node_type.name, rc.property, val, min_f
1225                        )));
1226                    }
1227                }
1228                if let Some(ref max) = rc.max {
1229                    let max_f = literal_value_to_f64(max);
1230                    if val > max_f {
1231                        return Err(OmniError::manifest(format!(
1232                            "@range violation on {}.{}: value {} > max {}",
1233                            node_type.name, rc.property, val, max_f
1234                        )));
1235                    }
1236                }
1237            }
1238        }
1239    }
1240
1241    // Check constraints (regex)
1242    for cc in &node_type.check_constraints {
1243        let re = regex::Regex::new(&cc.pattern).map_err(|e| {
1244            OmniError::manifest(format!(
1245                "@check on {}.{} has invalid regex '{}': {}",
1246                node_type.name, cc.property, cc.pattern, e
1247            ))
1248        })?;
1249        let Some(col) = batch.column_by_name(&cc.property) else {
1250            continue;
1251        };
1252        let str_col = col.as_any().downcast_ref::<StringArray>();
1253        if let Some(str_col) = str_col {
1254            for row in 0..str_col.len() {
1255                if str_col.is_null(row) {
1256                    continue;
1257                }
1258                let val = str_col.value(row);
1259                if !re.is_match(val) {
1260                    return Err(OmniError::manifest(format!(
1261                        "@check violation on {}.{}: value '{}' does not match pattern '{}'",
1262                        node_type.name, cc.property, val, cc.pattern
1263                    )));
1264                }
1265            }
1266        }
1267    }
1268
1269    Ok(())
1270}
1271
1272/// Validate that every enum-typed property in `properties` only contains values
1273/// from its declared enum value set. Operates on a single `RecordBatch` so it
1274/// can be called from any write path that already holds a batch.
1275///
1276/// Scalar string enums are checked directly. List-of-enum properties are
1277/// checked element-by-element across the underlying string values.
1278pub(crate) fn validate_enum_constraints(
1279    batch: &RecordBatch,
1280    properties: &HashMap<String, omnigraph_compiler::types::PropType>,
1281    type_name: &str,
1282) -> Result<()> {
1283    use arrow_array::{Array, ListArray};
1284
1285    for (prop_name, prop_type) in properties {
1286        let Some(allowed) = prop_type.enum_values.as_ref() else {
1287            continue;
1288        };
1289        let Some(col) = batch.column_by_name(prop_name) else {
1290            continue;
1291        };
1292        if prop_type.list {
1293            let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
1294                continue;
1295            };
1296            for row in 0..list_col.len() {
1297                if list_col.is_null(row) {
1298                    continue;
1299                }
1300                let item_arr = list_col.value(row);
1301                let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
1302                    continue;
1303                };
1304                for i in 0..str_arr.len() {
1305                    if str_arr.is_null(i) {
1306                        continue;
1307                    }
1308                    let val = str_arr.value(i);
1309                    if !allowed.iter().any(|a| a.as_str() == val) {
1310                        return Err(OmniError::manifest(format!(
1311                            "invalid enum value '{}' for {}.{} (expected: {})",
1312                            val,
1313                            type_name,
1314                            prop_name,
1315                            allowed.join(", ")
1316                        )));
1317                    }
1318                }
1319            }
1320        } else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
1321            for row in 0..str_col.len() {
1322                if str_col.is_null(row) {
1323                    continue;
1324                }
1325                let val = str_col.value(row);
1326                if !allowed.iter().any(|a| a.as_str() == val) {
1327                    return Err(OmniError::manifest(format!(
1328                        "invalid enum value '{}' for {}.{} (expected: {})",
1329                        val,
1330                        type_name,
1331                        prop_name,
1332                        allowed.join(", ")
1333                    )));
1334                }
1335            }
1336        }
1337    }
1338    Ok(())
1339}
1340
1341/// Detect duplicate values within a single `RecordBatch` for any of the named
1342/// `unique_properties`. Returns an error on the first duplicate found.
1343///
1344/// Note: this only catches duplicates *within* the batch. Cross-batch
1345/// uniqueness against already-committed rows is not enforced here — that
1346/// requires a dataset scan and is tracked separately.
1347pub(crate) fn enforce_unique_constraints_intra_batch(
1348    batch: &RecordBatch,
1349    type_name: &str,
1350    unique_properties: &[String],
1351) -> Result<()> {
1352    for property in unique_properties {
1353        let Some(col_idx) = batch.schema().index_of(property).ok() else {
1354            continue;
1355        };
1356        let arr = batch.column(col_idx);
1357        let mut seen: HashMap<String, usize> = HashMap::new();
1358        for row in 0..batch.num_rows() {
1359            let Some(value) = scalar_to_string(arr, row) else {
1360                continue;
1361            };
1362            if let Some(prev_row) = seen.insert(value.clone(), row) {
1363                return Err(OmniError::manifest(format!(
1364                    "@unique violation on {}.{}: value '{}' appears in rows {} and {}",
1365                    type_name, property, value, prev_row, row
1366                )));
1367            }
1368        }
1369    }
1370    Ok(())
1371}
1372
1373/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
1374/// uniqueness comparison. Returns `None` for null values (nulls are exempt
1375/// from uniqueness in standard SQL semantics).
1376fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
1377    use arrow_array::Array;
1378    if array.is_null(row) {
1379        return None;
1380    }
1381    if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1382        return Some(a.value(row).to_string());
1383    }
1384    if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
1385        return Some(a.value(row).to_string());
1386    }
1387    if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
1388        return Some(a.value(row).to_string());
1389    }
1390    if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
1391        return Some(a.value(row).to_string());
1392    }
1393    if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
1394        return Some(a.value(row).to_string());
1395    }
1396    if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
1397        return Some(a.value(row).to_string());
1398    }
1399    if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
1400        return Some(a.value(row).to_string());
1401    }
1402    if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
1403        return Some(a.value(row).to_string());
1404    }
1405    if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
1406        return Some(a.value(row).to_string());
1407    }
1408    if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
1409        return Some(a.value(row).to_string());
1410    }
1411    None
1412}
1413
1414/// Build the flat list of property names that must be checked for uniqueness
1415/// on a node type. Includes both `@unique` properties (from
1416/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
1417pub(crate) fn unique_property_names_for_node(
1418    node_type: &omnigraph_compiler::catalog::NodeType,
1419) -> Vec<String> {
1420    let mut props: Vec<String> = node_type
1421        .unique_constraints
1422        .iter()
1423        .flatten()
1424        .cloned()
1425        .collect();
1426    if let Some(key) = &node_type.key {
1427        props.extend(key.iter().cloned());
1428    }
1429    props.sort();
1430    props.dedup();
1431    props
1432}
1433
1434/// Same as [`unique_property_names_for_node`] but for an edge type.
1435pub(crate) fn unique_property_names_for_edge(
1436    edge_type: &omnigraph_compiler::catalog::EdgeType,
1437) -> Vec<String> {
1438    let mut props: Vec<String> = edge_type
1439        .unique_constraints
1440        .iter()
1441        .flatten()
1442        .cloned()
1443        .collect();
1444    props.sort();
1445    props.dedup();
1446    props
1447}
1448
1449fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
1450    use arrow_array::{
1451        Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
1452    };
1453    if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
1454        return Some(a.value(row) as f64);
1455    }
1456    if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
1457        return Some(a.value(row) as f64);
1458    }
1459    if let Some(a) = col.as_any().downcast_ref::<UInt32Array>() {
1460        return Some(a.value(row) as f64);
1461    }
1462    if let Some(a) = col.as_any().downcast_ref::<UInt64Array>() {
1463        return Some(a.value(row) as f64);
1464    }
1465    if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
1466        return Some(a.value(row) as f64);
1467    }
1468    if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
1469        return Some(a.value(row));
1470    }
1471    None
1472}
1473
1474fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
1475    use omnigraph_compiler::catalog::LiteralValue;
1476    match v {
1477        LiteralValue::Integer(n) => *n as f64,
1478        LiteralValue::Float(f) => *f,
1479    }
1480}
1481
1482// ─── Edge cardinality validation ─────────────────────────────────────────────
1483
1484pub(crate) async fn validate_edge_cardinality(
1485    db: &crate::db::Omnigraph,
1486    branch: Option<&str>,
1487    edge_name: &str,
1488    written_version: u64,
1489    written_branch: Option<&str>,
1490) -> Result<()> {
1491    use arrow_array::Array;
1492    let catalog = db.catalog();
1493    let edge_type = &catalog.edge_types[edge_name];
1494    if edge_type.cardinality.is_default() {
1495        return Ok(());
1496    }
1497
1498    // Open edge sub-table at the just-written version, not the snapshot's
1499    // (the snapshot still pins to the pre-write version).
1500    let snapshot = db.snapshot_for_branch(branch).await?;
1501    let table_key = format!("edge:{}", edge_name);
1502    let entry = snapshot
1503        .entry(&table_key)
1504        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
1505    let ds = db
1506        .open_dataset_at_state(
1507            &entry.table_path,
1508            written_branch.or(entry.table_branch.as_deref()),
1509            written_version,
1510        )
1511        .await?;
1512
1513    // Scan src column, count per source
1514    let batches = db
1515        .table_store()
1516        .scan(&ds, Some(&["src"]), None, None)
1517        .await?;
1518
1519    let mut counts: HashMap<String, u32> = HashMap::new();
1520    for batch in &batches {
1521        let srcs = batch
1522            .column_by_name("src")
1523            .unwrap()
1524            .as_any()
1525            .downcast_ref::<StringArray>()
1526            .unwrap();
1527        for i in 0..srcs.len() {
1528            *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
1529        }
1530    }
1531
1532    let card = &edge_type.cardinality;
1533    for (src, count) in &counts {
1534        if let Some(max) = card.max {
1535            if *count > max {
1536                return Err(OmniError::manifest(format!(
1537                    "@card violation on edge {}: source '{}' has {} edges (max {})",
1538                    edge_name, src, count, max
1539                )));
1540            }
1541        }
1542        if *count < card.min {
1543            return Err(OmniError::manifest(format!(
1544                "@card violation on edge {}: source '{}' has {} edges (min {})",
1545                edge_name, src, count, card.min
1546            )));
1547        }
1548    }
1549
1550    Ok(())
1551}
1552
1553/// Validate edge `@card` cardinality with in-memory pending edges visible.
1554///
1555/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
1556/// opens the committed dataset at the pre-load snapshot version, then
1557/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
1558/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
1559/// path uses `validate_edge_cardinality` which opens the just-written
1560/// Lance version).
1561///
1562/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
1563/// so committed edges that the load is *updating* (same edge id,
1564/// possibly changed `src`) are not double-counted. `LoadMode::Append`
1565/// passes `None` because each line generates a fresh ULID id that
1566/// never collides with committed.
1567async fn validate_edge_cardinality_with_pending_loader(
1568    db: &Omnigraph,
1569    branch: Option<&str>,
1570    edge_type: &omnigraph_compiler::catalog::EdgeType,
1571    table_key: &str,
1572    staging: &MutationStaging,
1573    mode: LoadMode,
1574) -> Result<()> {
1575    if edge_type.cardinality.is_default() {
1576        return Ok(());
1577    }
1578    let snapshot = db.snapshot_for_branch(branch).await?;
1579    let Some(entry) = snapshot.entry(table_key) else {
1580        // No manifest entry — table doesn't exist yet. Pending-only is
1581        // fine; the helper handles empty committed scans.
1582        return Ok(());
1583    };
1584    let ds = db
1585        .open_dataset_at_state(
1586            &entry.table_path,
1587            entry.table_branch.as_deref(),
1588            entry.table_version,
1589        )
1590        .await?;
1591    let dedupe_key = match mode {
1592        LoadMode::Merge => Some("id"),
1593        LoadMode::Append | LoadMode::Overwrite => None,
1594    };
1595    let counts =
1596        crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
1597            .await?;
1598    crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
1599}
1600
1601/// Collect all valid node IDs for a given type, with in-memory pending
1602/// node inserts visible. Used by the staged loader's Phase 2c
1603/// referential-integrity validation.
1604///
1605/// Union of:
1606/// - IDs from the staged loader's pending batches (in-memory; just-staged
1607///   inserts of this type)
1608/// - IDs from the committed sub-table at the pre-load snapshot version
1609async fn collect_node_ids_with_pending(
1610    db: &Omnigraph,
1611    branch: Option<&str>,
1612    type_name: &str,
1613    staging: &MutationStaging,
1614) -> Result<HashSet<String>> {
1615    let mut ids = HashSet::new();
1616    let table_key = format!("node:{}", type_name);
1617
1618    // From staging.pending: walk the in-memory accumulator's id column.
1619    for batch in staging.pending_batches(&table_key) {
1620        if let Some(col) = batch.column_by_name("id") {
1621            if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
1622                for i in 0..arr.len() {
1623                    if arr.is_valid(i) {
1624                        ids.insert(arr.value(i).to_string());
1625                    }
1626                }
1627            }
1628        }
1629    }
1630
1631    // From the committed Lance sub-table at the pre-load snapshot version.
1632    let snapshot = db.snapshot_for_branch(branch).await?;
1633    let Some(entry) = snapshot.entry(&table_key) else {
1634        return Ok(ids);
1635    };
1636    let ds = db
1637        .open_dataset_at_state(
1638            &entry.table_path,
1639            entry.table_branch.as_deref(),
1640            entry.table_version,
1641        )
1642        .await?;
1643
1644    let batches = db
1645        .table_store()
1646        .scan(&ds, Some(&["id"]), None, None)
1647        .await?;
1648
1649    for batch in &batches {
1650        let id_col = batch
1651            .column_by_name("id")
1652            .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
1653            .as_any()
1654            .downcast_ref::<StringArray>()
1655            .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
1656        for i in 0..batch.num_rows() {
1657            // Defensive: `id` is the @key column on every node type and
1658            // is non-nullable by schema, but a committed-row corruption
1659            // (or future schema change) could surface a NULL. Skip
1660            // rather than insert "" — pending-side does the same.
1661            if id_col.is_valid(i) {
1662                ids.insert(id_col.value(i).to_string());
1663            }
1664        }
1665    }
1666
1667    Ok(ids)
1668}
1669
1670/// Collect all valid node IDs for a given type. Union of:
1671/// - IDs from the just-loaded batch (in memory, from node_rows)
1672/// - IDs from the sub-table at the just-written version (if it was updated)
1673/// - IDs from the sub-table at the snapshot-pinned version (if it was not updated)
1674async fn collect_node_ids(
1675    db: &Omnigraph,
1676    branch: Option<&str>,
1677    type_name: &str,
1678    node_rows: &HashMap<String, Vec<JsonValue>>,
1679    catalog: &omnigraph_compiler::catalog::Catalog,
1680    updates: &[crate::db::SubTableUpdate],
1681) -> Result<HashSet<String>> {
1682    let mut ids = HashSet::new();
1683
1684    // IDs from the in-memory batch (just loaded in this operation)
1685    if let Some(rows) = node_rows.get(type_name) {
1686        if let Some(node_type) = catalog.node_types.get(type_name) {
1687            if let Some(key_prop) = node_type.key_property() {
1688                for row in rows {
1689                    if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
1690                        ids.insert(id.to_string());
1691                    }
1692                }
1693            }
1694        }
1695    }
1696
1697    // IDs from the Lance sub-table
1698    let table_key = format!("node:{}", type_name);
1699    let snapshot = db.snapshot_for_branch(branch).await?;
1700    let Some(entry) = snapshot.entry(&table_key) else {
1701        return Ok(ids);
1702    };
1703    // Use the just-written version if this type was updated, else snapshot version
1704    let updated = updates
1705        .iter()
1706        .find(|u| u.table_key == table_key)
1707        .map(|u| (u.table_version, u.table_branch.as_deref()));
1708    let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
1709    let ds = db
1710        .open_dataset_at_state(&entry.table_path, branch, version)
1711        .await?;
1712
1713    let batches = db
1714        .table_store()
1715        .scan(&ds, Some(&["id"]), None, None)
1716        .await?;
1717
1718    for batch in &batches {
1719        let id_col = batch
1720            .column_by_name("id")
1721            .unwrap()
1722            .as_any()
1723            .downcast_ref::<StringArray>()
1724            .unwrap();
1725        for i in 0..batch.num_rows() {
1726            if !id_col.is_valid(i) {
1727                continue;
1728            }
1729            ids.insert(id_col.value(i).to_string());
1730        }
1731    }
1732
1733    Ok(ids)
1734}
1735
1736#[cfg(test)]
1737mod tests {
1738    use super::*;
1739    use crate::db::Omnigraph;
1740    use arrow_array::Array;
1741    use futures::TryStreamExt;
1742    use std::collections::HashMap;
1743
1744    const TEST_SCHEMA: &str = r#"
1745node Person {
1746    name: String @key
1747    age: I32?
1748}
1749node Company {
1750    name: String @key
1751}
1752edge Knows: Person -> Person {
1753    since: Date?
1754}
1755edge WorksAt: Person -> Company
1756"#;
1757
1758    const TEST_DATA: &str = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
1759{"type": "Person", "data": {"name": "Bob", "age": 25}}
1760{"type": "Company", "data": {"name": "Acme"}}
1761{"edge": "Knows", "from": "Alice", "to": "Bob"}
1762{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
1763"#;
1764
1765    #[tokio::test]
1766    async fn test_load_creates_data() {
1767        let dir = tempfile::tempdir().unwrap();
1768        let uri = dir.path().to_str().unwrap();
1769        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1770
1771        let result = load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1772            .await
1773            .unwrap();
1774
1775        assert_eq!(result.nodes_loaded["Person"], 2);
1776        assert_eq!(result.nodes_loaded["Company"], 1);
1777        assert_eq!(result.edges_loaded["Knows"], 1);
1778        assert_eq!(result.edges_loaded["WorksAt"], 1);
1779    }
1780
1781    #[tokio::test]
1782    async fn test_load_data_readable_via_lance() {
1783        let dir = tempfile::tempdir().unwrap();
1784        let uri = dir.path().to_str().unwrap();
1785        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1786        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1787            .await
1788            .unwrap();
1789
1790        // Read back via snapshot
1791        let snap = db.snapshot();
1792        let person_ds = snap.open("node:Person").await.unwrap();
1793
1794        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1795
1796        // Verify data
1797        let batches: Vec<RecordBatch> = person_ds
1798            .scan()
1799            .try_into_stream()
1800            .await
1801            .unwrap()
1802            .try_collect()
1803            .await
1804            .unwrap();
1805
1806        let batch = &batches[0];
1807        let ids = batch
1808            .column_by_name("id")
1809            .unwrap()
1810            .as_any()
1811            .downcast_ref::<StringArray>()
1812            .unwrap();
1813        // @key=name, so ids should be "Alice" and "Bob"
1814        let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
1815        assert!(id_values.contains(&"Alice"));
1816        assert!(id_values.contains(&"Bob"));
1817    }
1818
1819    #[tokio::test]
1820    async fn test_load_edges_reference_node_keys() {
1821        let dir = tempfile::tempdir().unwrap();
1822        let uri = dir.path().to_str().unwrap();
1823        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1824        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1825            .await
1826            .unwrap();
1827
1828        let snap = db.snapshot();
1829        let knows_ds = snap.open("edge:Knows").await.unwrap();
1830
1831        let batches: Vec<RecordBatch> = knows_ds
1832            .scan()
1833            .try_into_stream()
1834            .await
1835            .unwrap()
1836            .try_collect()
1837            .await
1838            .unwrap();
1839
1840        let batch = &batches[0];
1841        let srcs = batch
1842            .column_by_name("src")
1843            .unwrap()
1844            .as_any()
1845            .downcast_ref::<StringArray>()
1846            .unwrap();
1847        let dsts = batch
1848            .column_by_name("dst")
1849            .unwrap()
1850            .as_any()
1851            .downcast_ref::<StringArray>()
1852            .unwrap();
1853
1854        assert_eq!(srcs.value(0), "Alice");
1855        assert_eq!(dsts.value(0), "Bob");
1856    }
1857
1858    #[tokio::test]
1859    async fn test_load_manifest_version_advances() {
1860        let dir = tempfile::tempdir().unwrap();
1861        let uri = dir.path().to_str().unwrap();
1862        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1863        let v1 = db.version();
1864
1865        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1866            .await
1867            .unwrap();
1868
1869        assert!(db.version() > v1);
1870    }
1871
1872    #[tokio::test]
1873    async fn test_load_append_adds_rows() {
1874        let dir = tempfile::tempdir().unwrap();
1875        let uri = dir.path().to_str().unwrap();
1876        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1877
1878        let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
1879        let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
1880
1881        load_jsonl(&mut db, batch1, LoadMode::Overwrite)
1882            .await
1883            .unwrap();
1884        load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
1885
1886        let snap = db.snapshot();
1887        let person_ds = snap.open("node:Person").await.unwrap();
1888        assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
1889    }
1890
1891    #[tokio::test]
1892    async fn test_load_unknown_type_rejected() {
1893        let dir = tempfile::tempdir().unwrap();
1894        let uri = dir.path().to_str().unwrap();
1895        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1896
1897        let bad = r#"{"type": "FakeType", "data": {"name": "x"}}"#;
1898        let result = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
1899        assert!(result.is_err());
1900    }
1901
1902    #[tokio::test]
1903    async fn test_ingest_creates_branch_and_reports_tables() {
1904        let dir = tempfile::tempdir().unwrap();
1905        let uri = dir.path().to_str().unwrap();
1906        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1907
1908        let result = db
1909            .ingest("feature", Some("main"), TEST_DATA, LoadMode::Overwrite)
1910            .await
1911            .unwrap();
1912
1913        assert_eq!(result.branch, "feature");
1914        assert_eq!(result.base_branch, "main");
1915        assert!(result.branch_created);
1916        assert_eq!(result.mode, LoadMode::Overwrite);
1917        assert_eq!(
1918            result.tables,
1919            vec![
1920                IngestTableResult {
1921                    table_key: "edge:Knows".to_string(),
1922                    rows_loaded: 1
1923                },
1924                IngestTableResult {
1925                    table_key: "edge:WorksAt".to_string(),
1926                    rows_loaded: 1
1927                },
1928                IngestTableResult {
1929                    table_key: "node:Company".to_string(),
1930                    rows_loaded: 1
1931                },
1932                IngestTableResult {
1933                    table_key: "node:Person".to_string(),
1934                    rows_loaded: 2
1935                },
1936            ]
1937        );
1938        assert!(
1939            db.branch_list()
1940                .await
1941                .unwrap()
1942                .contains(&"feature".to_string())
1943        );
1944    }
1945
1946    #[tokio::test]
1947    async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
1948        let dir = tempfile::tempdir().unwrap();
1949        let uri = dir.path().to_str().unwrap();
1950        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1951        load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
1952            .await
1953            .unwrap();
1954        db.branch_create_from(crate::db::ReadTarget::branch("main"), "feature")
1955            .await
1956            .unwrap();
1957
1958        let result = db
1959            .ingest(
1960                "feature",
1961                Some("missing-base"),
1962                r#"{"type":"Person","data":{"name":"Bob","age":26}}
1963{"type":"Person","data":{"name":"Eve","age":31}}"#,
1964                LoadMode::Merge,
1965            )
1966            .await
1967            .unwrap();
1968
1969        assert_eq!(result.branch, "feature");
1970        assert_eq!(result.base_branch, "missing-base");
1971        assert!(!result.branch_created);
1972        assert_eq!(result.mode, LoadMode::Merge);
1973        assert_eq!(
1974            result.tables,
1975            vec![IngestTableResult {
1976                table_key: "node:Person".to_string(),
1977                rows_loaded: 2
1978            }]
1979        );
1980
1981        let snap = db
1982            .snapshot_of(crate::db::ReadTarget::branch("feature"))
1983            .await
1984            .unwrap();
1985        let person_ds = snap.open("node:Person").await.unwrap();
1986        assert_eq!(person_ds.count_rows(None).await.unwrap(), 3);
1987
1988        let batches: Vec<RecordBatch> = person_ds
1989            .scan()
1990            .try_into_stream()
1991            .await
1992            .unwrap()
1993            .try_collect()
1994            .await
1995            .unwrap();
1996        let mut ages_by_id = HashMap::new();
1997        for batch in &batches {
1998            let ids = batch
1999                .column_by_name("id")
2000                .unwrap()
2001                .as_any()
2002                .downcast_ref::<StringArray>()
2003                .unwrap();
2004            let ages = batch
2005                .column_by_name("age")
2006                .unwrap()
2007                .as_any()
2008                .downcast_ref::<Int32Array>()
2009                .unwrap();
2010            for idx in 0..ids.len() {
2011                ages_by_id.insert(ids.value(idx).to_string(), ages.value(idx));
2012            }
2013        }
2014
2015        assert_eq!(ages_by_id.get("Bob"), Some(&26));
2016        assert_eq!(ages_by_id.get("Eve"), Some(&31));
2017        assert_eq!(ages_by_id.get("Alice"), Some(&30));
2018    }
2019
2020    #[tokio::test]
2021    async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
2022        let dir = tempfile::tempdir().unwrap();
2023        let uri = dir.path().to_str().unwrap();
2024        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2025
2026        db.ingest_as(
2027            "feature",
2028            Some("main"),
2029            TEST_DATA,
2030            LoadMode::Overwrite,
2031            Some("act-andrew"),
2032        )
2033        .await
2034        .unwrap();
2035
2036        let head = db
2037            .list_commits(Some("feature"))
2038            .await
2039            .unwrap()
2040            .into_iter()
2041            .last()
2042            .unwrap();
2043        assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
2044    }
2045
2046    #[test]
2047    fn test_range_constraint_rejects_nan() {
2048        use arrow_array::{Float64Array, RecordBatch, StringArray};
2049        use omnigraph_compiler::catalog::{LiteralValue, NodeType, RangeConstraint};
2050        use std::sync::Arc;
2051
2052        let schema = Arc::new(arrow_schema::Schema::new(vec![
2053            arrow_schema::Field::new("name", arrow_schema::DataType::Utf8, false),
2054            arrow_schema::Field::new("score", arrow_schema::DataType::Float64, true),
2055        ]));
2056
2057        let batch = RecordBatch::try_new(
2058            schema.clone(),
2059            vec![
2060                Arc::new(StringArray::from(vec!["bad"])),
2061                Arc::new(Float64Array::from(vec![f64::NAN])),
2062            ],
2063        )
2064        .unwrap();
2065
2066        let node_type = NodeType {
2067            name: "Test".to_string(),
2068            implements: vec![],
2069            properties: Default::default(),
2070            key: None,
2071            unique_constraints: vec![],
2072            indices: vec![],
2073            range_constraints: vec![RangeConstraint {
2074                property: "score".to_string(),
2075                min: Some(LiteralValue::Float(0.0)),
2076                max: Some(LiteralValue::Float(1.0)),
2077            }],
2078            check_constraints: vec![],
2079            embed_sources: Default::default(),
2080            blob_properties: Default::default(),
2081            arrow_schema: schema,
2082        };
2083
2084        let result = validate_value_constraints(&batch, &node_type);
2085        assert!(result.is_err(), "expected NaN to be rejected");
2086        let err = result.unwrap_err().to_string();
2087        assert!(err.contains("NaN"), "error should mention NaN: {}", err);
2088    }
2089}