Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arrow_array::{
6    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
7    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
8    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
9};
10use arrow_schema::{DataType, Field, Schema};
11use lance::Dataset;
12use lance::blob::{BlobArrayBuilder, blob_field};
13use lance::dataset::BlobFile;
14use lance::dataset::scanner::ColumnOrdering;
15use lance::datatypes::BlobKind;
16use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
17use omnigraph_compiler::schema::parser::parse_schema;
18use omnigraph_compiler::types::ScalarType;
19use omnigraph_compiler::{
20    SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
21    build_schema_ir, plan_schema_migration,
22};
23
24use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
25use crate::error::{OmniError, Result};
26use crate::runtime_cache::RuntimeCache;
27use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
28use crate::table_store::TableStore;
29
30mod export;
31mod optimize;
32mod schema_apply;
33mod table_ops;
34
35pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
36
37use super::commit_graph::GraphCommit;
38use super::manifest::{
39    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
40    table_path_for_table_key,
41};
42use super::schema_state::{
43    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
44    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
45    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
46    write_schema_contract, write_schema_contract_staging,
47};
48use super::{
49    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
50    is_schema_apply_lock_branch,
51};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum MergeOutcome {
55    AlreadyUpToDate,
56    FastForward,
57    Merged,
58}
59
60#[derive(Debug, Clone)]
61pub struct SchemaApplyResult {
62    pub supported: bool,
63    pub applied: bool,
64    pub manifest_version: u64,
65    pub steps: Vec<SchemaMigrationStep>,
66}
67
68/// Top-level handle to an Omnigraph database.
69///
70/// An Omnigraph is a Lance-native graph database with git-style branching.
71/// It stores typed property graphs as per-type Lance datasets coordinated
72/// through a Lance manifest table.
73pub struct Omnigraph {
74    root_uri: String,
75    storage: Arc<dyn StorageAdapter>,
76    coordinator: GraphCoordinator,
77    table_store: TableStore,
78    runtime_cache: RuntimeCache,
79    catalog: Catalog,
80    schema_source: String,
81    pub(crate) audit_actor_id: Option<String>,
82}
83
84impl Omnigraph {
85    /// Create a new repo at `uri` from schema source.
86    ///
87    /// Creates `_schema.pg`, per-type Lance datasets, and `__manifest`.
88    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
89        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
90    }
91
92    pub(crate) async fn init_with_storage(
93        uri: &str,
94        schema_source: &str,
95        storage: Arc<dyn StorageAdapter>,
96    ) -> Result<Self> {
97        let root = normalize_root_uri(uri)?;
98        let schema_ir = read_schema_ir_from_source(schema_source)?;
99        let mut catalog = build_catalog_from_ir(&schema_ir)?;
100        fixup_blob_schemas(&mut catalog);
101
102        // Write _schema.pg
103        let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
104        storage.write_text(&schema_path, schema_source).await?;
105        write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
106
107        // Create manifest + per-type datasets
108        let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
109
110        Ok(Self {
111            root_uri: root.clone(),
112            storage,
113            coordinator,
114            table_store: TableStore::new(&root),
115            runtime_cache: RuntimeCache::default(),
116            catalog,
117            schema_source: schema_source.to_string(),
118            audit_actor_id: None,
119        })
120    }
121
122    /// Open an existing repo.
123    ///
124    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
125    pub async fn open(uri: &str) -> Result<Self> {
126        Self::open_with_storage(uri, storage_for_uri(uri)?).await
127    }
128
129    pub(crate) async fn open_with_storage(
130        uri: &str,
131        storage: Arc<dyn StorageAdapter>,
132    ) -> Result<Self> {
133        let root = normalize_root_uri(uri)?;
134        // Open the coordinator first so the schema-staging recovery sweep can
135        // compare its snapshot against any leftover staging files. Recovery
136        // either deletes staging (pre-commit crash) or completes the rename
137        // (post-commit crash) before the live schema files are read.
138        let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
139        recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?;
140        // Read _schema.pg (post-recovery — may have just been renamed in).
141        let schema_path = schema_source_uri(&root);
142        let schema_source = storage.read_text(&schema_path).await?;
143        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
144        let branches = coordinator.branch_list().await?;
145        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
146            &root,
147            Arc::clone(&storage),
148            &branches,
149            &current_source_ir,
150        )
151        .await?;
152        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
153        fixup_blob_schemas(&mut catalog);
154
155        Ok(Self {
156            root_uri: root.clone(),
157            storage,
158            coordinator,
159            table_store: TableStore::new(&root),
160            runtime_cache: RuntimeCache::default(),
161            catalog,
162            schema_source,
163            audit_actor_id: None,
164        })
165    }
166
167    pub fn catalog(&self) -> &Catalog {
168        &self.catalog
169    }
170
171    pub fn schema_source(&self) -> &str {
172        &self.schema_source
173    }
174
175    pub fn uri(&self) -> &str {
176        &self.root_uri
177    }
178
179    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
180        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
181    }
182
183    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
184        schema_apply::plan_schema(self, desired_schema_source).await
185    }
186
187    pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
188        schema_apply::apply_schema(self, desired_schema_source).await
189    }
190
191    pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
192        schema_apply::ensure_schema_apply_idle(self, operation).await
193    }
194
195    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
196        schema_apply::ensure_schema_apply_not_locked(self, operation).await
197    }
198
199    pub(crate) fn table_store(&self) -> &TableStore {
200        &self.table_store
201    }
202
203    /// Engine-facing trait surface around `TableStore`.
204    ///
205    /// MR-793 Phase 1: this is the canonical accessor for newly-written
206    /// engine code. The trait's signatures use opaque `SnapshotHandle` /
207    /// `StagedHandle` instead of leaking `lance::Dataset` /
208    /// `lance::dataset::transaction::Transaction`. Existing call sites
209    /// that still use `db.table_store.X(...)` (the inherent struct
210    /// methods) are migrated incrementally — see §9 of
211    /// `.context/mr-793-design.md`.
212    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
213        &self.table_store
214    }
215
216    pub(crate) async fn open_coordinator_for_branch(
217        &self,
218        branch: Option<&str>,
219    ) -> Result<GraphCoordinator> {
220        match branch {
221            Some(branch) => {
222                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
223            }
224            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
225        }
226    }
227
228    pub(crate) async fn swap_coordinator_for_branch(
229        &mut self,
230        branch: Option<&str>,
231    ) -> Result<GraphCoordinator> {
232        let next = self.open_coordinator_for_branch(branch).await?;
233        Ok(std::mem::replace(&mut self.coordinator, next))
234    }
235
236    pub(crate) fn restore_coordinator(&mut self, coordinator: GraphCoordinator) {
237        self.coordinator = coordinator;
238    }
239
240    pub(crate) async fn resolved_branch_target(
241        &self,
242        branch: Option<&str>,
243    ) -> Result<ResolvedTarget> {
244        self.ensure_schema_state_valid().await?;
245        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
246        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
247        if normalized.as_deref() == self.coordinator.current_branch() {
248            let snapshot_id = self.coordinator.head_commit_id().await?.unwrap_or_else(|| {
249                SnapshotId::synthetic(
250                    self.coordinator.current_branch(),
251                    self.coordinator.version(),
252                )
253            });
254            return Ok(ResolvedTarget {
255                requested,
256                branch: self.coordinator.current_branch().map(str::to_string),
257                snapshot_id,
258                snapshot: self.coordinator.snapshot(),
259            });
260        }
261        self.coordinator.resolve_target(&requested).await
262    }
263
264    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
265        self.resolved_branch_target(branch)
266            .await
267            .map(|resolved| resolved.snapshot)
268    }
269
270    pub(crate) fn version(&self) -> u64 {
271        self.coordinator.version()
272    }
273
274    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
275    pub(crate) fn snapshot(&self) -> Snapshot {
276        self.coordinator.snapshot()
277    }
278
279    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
280        self.resolved_target(target)
281            .await
282            .map(|resolved| resolved.snapshot)
283    }
284
285    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
286        self.snapshot_of(target)
287            .await
288            .map(|snapshot| snapshot.version())
289    }
290
291    pub async fn resolved_branch_of(
292        &self,
293        target: impl Into<ReadTarget>,
294    ) -> Result<Option<String>> {
295        self.resolved_target(target)
296            .await
297            .map(|resolved| resolved.branch)
298    }
299
300    /// Synchronize this handle's write base to the latest head of the named branch.
301    pub async fn sync_branch(&mut self, branch: &str) -> Result<()> {
302        self.ensure_schema_state_valid().await?;
303        let branch = normalize_branch_name(branch)?;
304        self.coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
305        self.runtime_cache.invalidate_all().await;
306        Ok(())
307    }
308
309    /// Re-read the handle-local coordinator state from storage.
310    pub(crate) async fn refresh(&mut self) -> Result<()> {
311        self.coordinator.refresh().await?;
312        self.runtime_cache.invalidate_all().await;
313        Ok(())
314    }
315
316    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
317        self.ensure_schema_state_valid().await?;
318        self.coordinator.resolve_snapshot_id(branch).await
319    }
320
321    pub(crate) async fn resolved_target(
322        &self,
323        target: impl Into<ReadTarget>,
324    ) -> Result<ResolvedTarget> {
325        self.ensure_schema_state_valid().await?;
326        self.coordinator.resolve_target(&target.into()).await
327    }
328
329    // ─── Change detection ────────────────────────────────────────────────
330
331    pub async fn diff_between(
332        &self,
333        from: impl Into<ReadTarget>,
334        to: impl Into<ReadTarget>,
335        filter: &crate::changes::ChangeFilter,
336    ) -> Result<crate::changes::ChangeSet> {
337        let from_resolved = self.resolved_target(from).await?;
338        let to_resolved = self.resolved_target(to).await?;
339        crate::changes::diff_snapshots(
340            self.uri(),
341            &from_resolved.snapshot,
342            &to_resolved.snapshot,
343            filter,
344            to_resolved.branch.clone().or(from_resolved.branch.clone()),
345        )
346        .await
347    }
348
349    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
350    /// and creates branch-aware snapshots. Supports cross-branch comparison.
351    pub async fn diff_commits(
352        &self,
353        from_commit_id: &str,
354        to_commit_id: &str,
355        filter: &crate::changes::ChangeFilter,
356    ) -> Result<crate::changes::ChangeSet> {
357        let from_commit = self
358            .coordinator
359            .resolve_commit(&SnapshotId::new(from_commit_id))
360            .await?;
361        let to_commit = self
362            .coordinator
363            .resolve_commit(&SnapshotId::new(to_commit_id))
364            .await?;
365        let from_snap = self
366            .coordinator
367            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
368                from_commit.graph_commit_id.clone(),
369            )))
370            .await?;
371        let to_snap = self
372            .coordinator
373            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
374                to_commit.graph_commit_id.clone(),
375            )))
376            .await?;
377        crate::changes::diff_snapshots(
378            self.uri(),
379            &from_snap.snapshot,
380            &to_snap.snapshot,
381            filter,
382            to_snap.branch.clone().or(from_snap.branch.clone()),
383        )
384        .await
385    }
386
387    pub async fn entity_at_target(
388        &self,
389        target: impl Into<ReadTarget>,
390        table_key: &str,
391        id: &str,
392    ) -> Result<Option<serde_json::Value>> {
393        export::entity_at_target(self, target, table_key, id).await
394    }
395
396    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
397    pub async fn entity_at(
398        &self,
399        table_key: &str,
400        id: &str,
401        version: u64,
402    ) -> Result<Option<serde_json::Value>> {
403        export::entity_at(self, table_key, id, version).await
404    }
405
406    /// Create a Snapshot at any historical manifest version.
407    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
408        self.ensure_schema_state_valid().await?;
409        self.coordinator.snapshot_at_version(version).await
410    }
411
412    pub async fn export_jsonl(
413        &self,
414        branch: &str,
415        type_names: &[String],
416        table_keys: &[String],
417    ) -> Result<String> {
418        export::export_jsonl(self, branch, type_names, table_keys).await
419    }
420
421    pub async fn export_jsonl_to_writer<W: Write>(
422        &self,
423        branch: &str,
424        type_names: &[String],
425        table_keys: &[String],
426        writer: &mut W,
427    ) -> Result<()> {
428        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
429    }
430
431    // ─── Graph index ──────────────────────────────────────────────────────
432
433    /// Get or build the graph index for the current snapshot.
434    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
435        table_ops::graph_index(self).await
436    }
437
438    pub(crate) async fn graph_index_for_resolved(
439        &self,
440        resolved: &ResolvedTarget,
441    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
442        table_ops::graph_index_for_resolved(self, resolved).await
443    }
444
445    /// Ensure BTree scalar indices exist on key columns.
446    /// Idempotent — Lance skips if index already exists.
447    ///
448    /// Opens sub-tables at their latest version (not snapshot-pinned) because
449    /// indices must be created on the current head. Any version drift from the
450    /// snapshot is expected and logged. The resulting versions are committed
451    /// back to the manifest.
452    ///
453    /// On named branches, indexing preserves lazy branching:
454    /// unbranched subtables keep inheriting `main`, while subtables inherited
455    /// from an ancestor branch are first forked into the active branch before
456    /// their index metadata is updated.
457    pub async fn ensure_indices(&mut self) -> Result<()> {
458        table_ops::ensure_indices(self).await
459    }
460
461    pub async fn ensure_indices_on(&mut self, branch: &str) -> Result<()> {
462        table_ops::ensure_indices_on(self, branch).await
463    }
464
465    /// Compact small Lance fragments into fewer larger ones across every
466    /// node + edge table on `main`. See [`optimize`] for details.
467    pub async fn optimize(&mut self) -> Result<Vec<optimize::TableOptimizeStats>> {
468        optimize::optimize_all_tables(self).await
469    }
470
471    /// Remove Lance manifests (and the fragments they uniquely own) per the
472    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
473    /// history. See [`optimize`] for details.
474    pub async fn cleanup(
475        &mut self,
476        options: optimize::CleanupPolicyOptions,
477    ) -> Result<Vec<optimize::TableCleanupStats>> {
478        optimize::cleanup_all_tables(self, options).await
479    }
480
481    /// Read a blob from a node by its string ID and property name.
482    ///
483    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
484    /// and metadata accessors (`size()`, `kind()`, `uri()`).
485    ///
486    /// ```ignore
487    /// let blob = db.read_blob("Document", "readme", "content").await?;
488    /// let bytes = blob.read().await?;
489    /// ```
490    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
491        self.ensure_schema_state_valid().await?;
492        let node_type = self
493            .catalog
494            .node_types
495            .get(type_name)
496            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
497        if !node_type.blob_properties.contains(property) {
498            return Err(OmniError::manifest(format!(
499                "property '{}' on type '{}' is not a Blob",
500                property, type_name
501            )));
502        }
503
504        let snapshot = self.snapshot();
505        let table_key = format!("node:{}", type_name);
506        let ds = snapshot.open(&table_key).await?;
507
508        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
509        let row_id = self
510            .table_store
511            .first_row_id_for_filter(&ds, &filter_sql)
512            .await?
513            .ok_or_else(|| {
514                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
515            })?;
516
517        // Use take_blobs to get the BlobFile handle
518        let ds = Arc::new(ds);
519        let mut blobs = ds
520            .take_blobs(&[row_id], property)
521            .await
522            .map_err(|e| OmniError::Lance(e.to_string()))?;
523
524        blobs.pop().ok_or_else(|| {
525            OmniError::manifest(format!(
526                "blob '{}' on {} '{}' returned no data",
527                property, type_name, id
528            ))
529        })
530    }
531
532    pub(crate) fn active_branch(&self) -> Option<&str> {
533        self.coordinator.current_branch()
534    }
535
536    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
537        let descendants = self.coordinator.branch_descendants(branch).await?;
538        if let Some(descendant) = descendants.first() {
539            return Err(OmniError::manifest_conflict(format!(
540                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
541                branch, descendant
542            )));
543        }
544
545        for other_branch in branches
546            .iter()
547            .filter(|candidate| candidate.as_str() != branch)
548        {
549            let snapshot = self
550                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
551                .await?;
552            if snapshot
553                .entries()
554                .any(|entry| entry.table_branch.as_deref() == Some(branch))
555            {
556                return Err(OmniError::manifest_conflict(format!(
557                    "cannot delete branch '{}' because branch '{}' still depends on it",
558                    branch, other_branch
559                )));
560            }
561        }
562
563        Ok(())
564    }
565
566    async fn cleanup_deleted_branch_tables(
567        &self,
568        branch: &str,
569        owned_tables: &[(String, String)],
570    ) -> Result<()> {
571        let mut seen_paths = HashSet::new();
572        let mut cleanup_targets = owned_tables
573            .iter()
574            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
575            .cloned()
576            .collect::<Vec<_>>();
577        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
578
579        for (table_key, table_path) in cleanup_targets {
580            let dataset_uri = self.table_store.dataset_uri(&table_path);
581            if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
582                return Err(OmniError::manifest_internal(format!(
583                    "branch '{}' was deleted but cleanup failed for {}: {}",
584                    branch, table_key, err
585                )));
586            }
587        }
588
589        Ok(())
590    }
591
592    async fn delete_branch_storage_only(&mut self, branch: &str) -> Result<()> {
593        if self.coordinator.current_branch() == Some(branch) {
594            return Err(OmniError::manifest_conflict(format!(
595                "cannot delete currently active branch '{}'",
596                branch
597            )));
598        }
599
600        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
601        let owned_tables = branch_snapshot
602            .entries()
603            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
604            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
605            .collect::<Vec<_>>();
606
607        self.coordinator.branch_delete(branch).await?;
608        self.cleanup_deleted_branch_tables(branch, &owned_tables)
609            .await
610    }
611
612    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
613        normalize_branch_name(branch)
614    }
615
616    pub(crate) async fn head_commit_id_for_branch(
617        &self,
618        branch: Option<&str>,
619    ) -> Result<Option<String>> {
620        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
621        coordinator.ensure_commit_graph_initialized().await?;
622        coordinator
623            .head_commit_id()
624            .await
625            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
626    }
627
628    pub async fn branch_create(&mut self, name: &str) -> Result<()> {
629        self.ensure_schema_state_valid().await?;
630        self.ensure_schema_apply_idle("branch_create").await?;
631        ensure_public_branch_ref(name, "branch_create")?;
632        self.coordinator.branch_create(name).await
633    }
634
635    pub(crate) fn current_audit_actor(&self) -> Option<&str> {
636        self.audit_actor_id.as_deref()
637    }
638
639    pub async fn branch_create_from(
640        &mut self,
641        from: impl Into<ReadTarget>,
642        name: &str,
643    ) -> Result<()> {
644        self.ensure_schema_apply_idle("branch_create_from").await?;
645        self.branch_create_from_impl(from, name, false).await
646    }
647
648    async fn branch_create_from_impl(
649        &mut self,
650        from: impl Into<ReadTarget>,
651        name: &str,
652        allow_internal_refs: bool,
653    ) -> Result<()> {
654        let target = from.into();
655        let ReadTarget::Branch(branch_name) = target else {
656            return Err(OmniError::manifest(
657                "branch creation from pinned snapshots is not supported yet".to_string(),
658            ));
659        };
660        if !allow_internal_refs {
661            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
662            ensure_public_branch_ref(name, "branch_create_from")?;
663        }
664        let branch = normalize_branch_name(&branch_name)?;
665        let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?;
666        let result = self.coordinator.branch_create(name).await;
667        self.restore_coordinator(previous);
668        result
669    }
670
671    pub async fn branch_list(&self) -> Result<Vec<String>> {
672        self.ensure_schema_state_valid().await?;
673        self.coordinator.branch_list().await
674    }
675
676    pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
677        self.ensure_schema_state_valid().await?;
678        self.ensure_schema_apply_idle("branch_delete").await?;
679        ensure_public_branch_ref(name, "branch_delete")?;
680        self.refresh().await?;
681        let branch = normalize_branch_name(name)?
682            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
683        let branches = self.coordinator.branch_list().await?;
684        if !branches.iter().any(|candidate| candidate == &branch) {
685            return Err(OmniError::manifest_not_found(format!(
686                "branch '{}' not found",
687                branch
688            )));
689        }
690
691        self.ensure_branch_delete_safe(&branch, &branches).await?;
692        self.delete_branch_storage_only(&branch).await
693    }
694
695    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
696        self.ensure_schema_state_valid().await?;
697        self.coordinator
698            .resolve_commit(&SnapshotId::new(commit_id))
699            .await
700    }
701
702    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
703        self.ensure_schema_state_valid().await?;
704        let branch = match branch {
705            Some(branch) => normalize_branch_name(branch)?,
706            None => None,
707        };
708        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
709        coordinator.list_commits().await
710    }
711
712    /// Open a sub-table for mutation with version-drift guard.
713    ///
714    /// Checks that the dataset's current version matches the snapshot-pinned
715    /// version. If another writer has advanced the version, returns an error
716    /// prompting the caller to refresh and retry (optimistic concurrency).
717    pub(crate) async fn open_for_mutation(
718        &self,
719        table_key: &str,
720    ) -> Result<(Dataset, String, Option<String>)> {
721        table_ops::open_for_mutation(self, table_key).await
722    }
723
724    pub(crate) async fn open_for_mutation_on_branch(
725        &self,
726        branch: Option<&str>,
727        table_key: &str,
728    ) -> Result<(Dataset, String, Option<String>)> {
729        table_ops::open_for_mutation_on_branch(self, branch, table_key).await
730    }
731
732    pub(crate) async fn fork_dataset_from_entry_state(
733        &self,
734        table_key: &str,
735        full_path: &str,
736        source_branch: Option<&str>,
737        source_version: u64,
738        active_branch: &str,
739    ) -> Result<Dataset> {
740        table_ops::fork_dataset_from_entry_state(
741            self,
742            table_key,
743            full_path,
744            source_branch,
745            source_version,
746            active_branch,
747        )
748        .await
749    }
750
751    pub(crate) async fn reopen_for_mutation(
752        &self,
753        table_key: &str,
754        full_path: &str,
755        table_branch: Option<&str>,
756        expected_version: u64,
757    ) -> Result<Dataset> {
758        table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version)
759            .await
760    }
761
762    pub(crate) async fn open_dataset_at_state(
763        &self,
764        table_path: &str,
765        table_branch: Option<&str>,
766        table_version: u64,
767    ) -> Result<Dataset> {
768        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
769    }
770
771    pub(crate) async fn build_indices_on_dataset(
772        &self,
773        table_key: &str,
774        ds: &mut Dataset,
775    ) -> Result<()> {
776        table_ops::build_indices_on_dataset(self, table_key, ds).await
777    }
778
779    pub(crate) async fn build_indices_on_dataset_for_catalog(
780        &self,
781        catalog: &Catalog,
782        table_key: &str,
783        ds: &mut Dataset,
784    ) -> Result<()> {
785        table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
786    }
787
788    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
789    // uses `commit_updates_on_branch_with_expected` exclusively.
790    #[cfg(test)]
791    pub(crate) async fn commit_updates(
792        &mut self,
793        updates: &[crate::db::SubTableUpdate],
794    ) -> Result<u64> {
795        table_ops::commit_updates(self, updates).await
796    }
797
798    pub(crate) async fn commit_manifest_updates(
799        &mut self,
800        updates: &[crate::db::SubTableUpdate],
801    ) -> Result<u64> {
802        table_ops::commit_manifest_updates(self, updates).await
803    }
804
805    pub(crate) async fn record_merge_commit(
806        &mut self,
807        manifest_version: u64,
808        parent_commit_id: &str,
809        merged_parent_commit_id: &str,
810    ) -> Result<String> {
811        table_ops::record_merge_commit(
812            self,
813            manifest_version,
814            parent_commit_id,
815            merged_parent_commit_id,
816        )
817        .await
818    }
819
820    pub(crate) async fn commit_updates_on_branch_with_expected(
821        &mut self,
822        branch: Option<&str>,
823        updates: &[crate::db::SubTableUpdate],
824        expected_table_versions: &std::collections::HashMap<String, u64>,
825    ) -> Result<u64> {
826        table_ops::commit_updates_on_branch_with_expected(
827            self,
828            branch,
829            updates,
830            expected_table_versions,
831        )
832        .await
833    }
834
835    pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
836        table_ops::ensure_commit_graph_initialized(self).await
837    }
838
839    /// Invalidate the cached graph index. Called after edge mutations.
840    pub(crate) async fn invalidate_graph_index(&self) {
841        table_ops::invalidate_graph_index(self).await
842    }
843
844}
845
846pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
847    let branch = branch.trim();
848    if branch.is_empty() {
849        return Err(OmniError::manifest(
850            "branch name cannot be empty".to_string(),
851        ));
852    }
853    if branch == "main" {
854        return Ok(None);
855    }
856    Ok(Some(branch.to_string()))
857}
858
859pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
860    if super::is_internal_run_branch(branch) {
861        return Err(OmniError::manifest(format!(
862            "{} does not allow internal run ref '{}'",
863            operation, branch
864        )));
865    }
866    if is_internal_system_branch(branch) {
867        return Err(OmniError::manifest(format!(
868            "{} does not allow internal system ref '{}'",
869            operation, branch
870        )));
871    }
872    Ok(())
873}
874
875fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
876    if batches.is_empty() {
877        return Ok(RecordBatch::new_empty(schema));
878    }
879    if batches.len() == 1 {
880        return Ok(batches.into_iter().next().unwrap());
881    }
882    let batch_schema = batches[0].schema();
883    arrow_select::concat::concat_batches(&batch_schema, &batches)
884        .map_err(|e| OmniError::Lance(e.to_string()))
885}
886
887fn blob_properties_for_table_key<'a>(
888    catalog: &'a Catalog,
889    table_key: &str,
890) -> Result<&'a std::collections::HashSet<String>> {
891    if let Some(type_name) = table_key.strip_prefix("node:") {
892        return catalog
893            .node_types
894            .get(type_name)
895            .map(|node_type| &node_type.blob_properties)
896            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
897    }
898    if let Some(type_name) = table_key.strip_prefix("edge:") {
899        return catalog
900            .edge_types
901            .get(type_name)
902            .map(|edge_type| &edge_type.blob_properties)
903            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
904    }
905    Err(OmniError::manifest(format!(
906        "invalid table key '{}'",
907        table_key
908    )))
909}
910
911fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
912    if descriptions.is_null(row) {
913        return Ok(true);
914    }
915
916    let kind = descriptions
917        .column_by_name("kind")
918        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
919        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
920        .or_else(|| {
921            descriptions
922                .column_by_name("kind")
923                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
924                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
925        });
926    let position = descriptions
927        .column_by_name("position")
928        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
929        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
930    let size = descriptions
931        .column_by_name("size")
932        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
933        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
934    let blob_uri = descriptions
935        .column_by_name("blob_uri")
936        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
937        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
938
939    let Some(kind) = kind else {
940        return Ok(true);
941    };
942    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
943    if kind != BlobKind::Inline {
944        return Ok(false);
945    }
946
947    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
948}
949
950/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
951///
952/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
953/// `DataType::LargeBinary` as a placeholder. This function replaces those
954/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
955fn fixup_blob_schemas(catalog: &mut Catalog) {
956    for node_type in catalog.node_types.values_mut() {
957        if node_type.blob_properties.is_empty() {
958            continue;
959        }
960        let fields: Vec<Field> = node_type
961            .arrow_schema
962            .fields()
963            .iter()
964            .map(|f| {
965                if node_type.blob_properties.contains(f.name()) {
966                    blob_field(f.name(), f.is_nullable())
967                } else {
968                    f.as_ref().clone()
969                }
970            })
971            .collect();
972        node_type.arrow_schema = Arc::new(Schema::new(fields));
973    }
974    for edge_type in catalog.edge_types.values_mut() {
975        if edge_type.blob_properties.is_empty() {
976            continue;
977        }
978        let fields: Vec<Field> = edge_type
979            .arrow_schema
980            .fields()
981            .iter()
982            .map(|f| {
983                if edge_type.blob_properties.contains(f.name()) {
984                    blob_field(f.name(), f.is_nullable())
985                } else {
986                    f.as_ref().clone()
987                }
988            })
989            .collect();
990        edge_type.arrow_schema = Arc::new(Schema::new(fields));
991    }
992}
993
994fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
995    let schema_ast = parse_schema(schema_source)?;
996    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
997}
998
999fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1000    match type_kind {
1001        SchemaTypeKind::Node => format!("node:{}", name),
1002        SchemaTypeKind::Edge => format!("edge:{}", name),
1003        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1004    }
1005}
1006
1007fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1008    if let Some(type_name) = table_key.strip_prefix("node:") {
1009        let node_type: &NodeType = catalog
1010            .node_types
1011            .get(type_name)
1012            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1013        return Ok(node_type.arrow_schema.clone());
1014    }
1015    if let Some(type_name) = table_key.strip_prefix("edge:") {
1016        let edge_type: &EdgeType = catalog
1017            .edge_types
1018            .get(type_name)
1019            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1020        return Ok(edge_type.arrow_schema.clone());
1021    }
1022    Err(OmniError::manifest(format!(
1023        "invalid table key '{}'",
1024        table_key
1025    )))
1026}
1027
1028fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1029    let mut obj = serde_json::Map::new();
1030    for (i, field) in batch.schema().fields().iter().enumerate() {
1031        obj.insert(
1032            field.name().clone(),
1033            json_value_from_array(batch.column(i).as_ref(), row)?,
1034        );
1035    }
1036    Ok(serde_json::Value::Object(obj))
1037}
1038
1039fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1040    if array.is_null(row) {
1041        return Ok(serde_json::Value::Null);
1042    }
1043
1044    match array.data_type() {
1045        DataType::Utf8 => Ok(serde_json::Value::String(
1046            array
1047                .as_any()
1048                .downcast_ref::<StringArray>()
1049                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1050                .value(row)
1051                .to_string(),
1052        )),
1053        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1054            array
1055                .as_any()
1056                .downcast_ref::<LargeStringArray>()
1057                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1058                .value(row)
1059                .to_string(),
1060        )),
1061        DataType::Boolean => Ok(serde_json::Value::Bool(
1062            array
1063                .as_any()
1064                .downcast_ref::<BooleanArray>()
1065                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1066                .value(row),
1067        )),
1068        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1069            array
1070                .as_any()
1071                .downcast_ref::<Int32Array>()
1072                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1073                .value(row),
1074        ))),
1075        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1076            array
1077                .as_any()
1078                .downcast_ref::<Int64Array>()
1079                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1080                .value(row),
1081        ))),
1082        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1083            array
1084                .as_any()
1085                .downcast_ref::<UInt32Array>()
1086                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1087                .value(row),
1088        ))),
1089        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1090            array
1091                .as_any()
1092                .downcast_ref::<UInt64Array>()
1093                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1094                .value(row),
1095        ))),
1096        DataType::Float32 => {
1097            let value = array
1098                .as_any()
1099                .downcast_ref::<Float32Array>()
1100                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1101                .value(row) as f64;
1102            Ok(serde_json::Value::Number(
1103                serde_json::Number::from_f64(value).ok_or_else(|| {
1104                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1105                })?,
1106            ))
1107        }
1108        DataType::Float64 => {
1109            let value = array
1110                .as_any()
1111                .downcast_ref::<Float64Array>()
1112                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1113                .value(row);
1114            Ok(serde_json::Value::Number(
1115                serde_json::Number::from_f64(value).ok_or_else(|| {
1116                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1117                })?,
1118            ))
1119        }
1120        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1121            array
1122                .as_any()
1123                .downcast_ref::<Date32Array>()
1124                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1125                .value(row),
1126        ))),
1127        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1128            &base64::engine::general_purpose::STANDARD,
1129            array
1130                .as_any()
1131                .downcast_ref::<BinaryArray>()
1132                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1133                .value(row),
1134        ))),
1135        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1136            &base64::engine::general_purpose::STANDARD,
1137            array
1138                .as_any()
1139                .downcast_ref::<LargeBinaryArray>()
1140                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1141                .value(row),
1142        ))),
1143        DataType::List(_) => {
1144            let list = array
1145                .as_any()
1146                .downcast_ref::<ListArray>()
1147                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1148            let values = list.value(row);
1149            let mut out = Vec::with_capacity(values.len());
1150            for idx in 0..values.len() {
1151                out.push(json_value_from_array(values.as_ref(), idx)?);
1152            }
1153            Ok(serde_json::Value::Array(out))
1154        }
1155        DataType::LargeList(_) => {
1156            let list = array
1157                .as_any()
1158                .downcast_ref::<LargeListArray>()
1159                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1160            let values = list.value(row);
1161            let mut out = Vec::with_capacity(values.len());
1162            for idx in 0..values.len() {
1163                out.push(json_value_from_array(values.as_ref(), idx)?);
1164            }
1165            Ok(serde_json::Value::Array(out))
1166        }
1167        DataType::FixedSizeList(_, _) => {
1168            let list = array
1169                .as_any()
1170                .downcast_ref::<FixedSizeListArray>()
1171                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1172            let values = list.value(row);
1173            let mut out = Vec::with_capacity(values.len());
1174            for idx in 0..values.len() {
1175                out.push(json_value_from_array(values.as_ref(), idx)?);
1176            }
1177            Ok(serde_json::Value::Array(out))
1178        }
1179        DataType::Struct(fields) => {
1180            let struct_array = array
1181                .as_any()
1182                .downcast_ref::<StructArray>()
1183                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1184            let mut obj = serde_json::Map::new();
1185            for (field_idx, field) in fields.iter().enumerate() {
1186                obj.insert(
1187                    field.name().clone(),
1188                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1189                );
1190            }
1191            Ok(serde_json::Value::Object(obj))
1192        }
1193        _ => {
1194            let value = arrow_cast::display::array_value_to_string(array, row)
1195                .map_err(|e| OmniError::Lance(e.to_string()))?;
1196            Ok(serde_json::Value::String(value))
1197        }
1198    }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203    use super::*;
1204    use crate::db::is_internal_run_branch;
1205    use crate::db::manifest::ManifestCoordinator;
1206    use async_trait::async_trait;
1207    use serde_json::Value;
1208    use std::sync::Mutex;
1209
1210    use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1211
1212    const TEST_SCHEMA: &str = r#"
1213node Person {
1214    name: String @key
1215    age: I32?
1216}
1217node Company {
1218    name: String @key
1219}
1220edge Knows: Person -> Person {
1221    since: Date?
1222}
1223edge WorksAt: Person -> Company
1224"#;
1225
1226    #[derive(Debug, Default)]
1227    struct RecordingStorageAdapter {
1228        inner: LocalStorageAdapter,
1229        reads: Mutex<Vec<String>>,
1230        writes: Mutex<Vec<String>>,
1231        exists_checks: Mutex<Vec<String>>,
1232        renames: Mutex<Vec<(String, String)>>,
1233        deletes: Mutex<Vec<String>>,
1234    }
1235
1236    impl RecordingStorageAdapter {
1237        fn reads(&self) -> Vec<String> {
1238            self.reads.lock().unwrap().clone()
1239        }
1240
1241        fn writes(&self) -> Vec<String> {
1242            self.writes.lock().unwrap().clone()
1243        }
1244
1245        fn exists_checks(&self) -> Vec<String> {
1246            self.exists_checks.lock().unwrap().clone()
1247        }
1248    }
1249
1250    #[async_trait]
1251    impl StorageAdapter for RecordingStorageAdapter {
1252        async fn read_text(&self, uri: &str) -> Result<String> {
1253            self.reads.lock().unwrap().push(uri.to_string());
1254            self.inner.read_text(uri).await
1255        }
1256
1257        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1258            self.writes.lock().unwrap().push(uri.to_string());
1259            self.inner.write_text(uri, contents).await
1260        }
1261
1262        async fn exists(&self, uri: &str) -> Result<bool> {
1263            self.exists_checks.lock().unwrap().push(uri.to_string());
1264            self.inner.exists(uri).await
1265        }
1266
1267        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1268            self.renames
1269                .lock()
1270                .unwrap()
1271                .push((from_uri.to_string(), to_uri.to_string()));
1272            self.inner.rename_text(from_uri, to_uri).await
1273        }
1274
1275        async fn delete(&self, uri: &str) -> Result<()> {
1276            self.deletes.lock().unwrap().push(uri.to_string());
1277            self.inner.delete(uri).await
1278        }
1279    }
1280
1281    #[tokio::test]
1282    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1283        let dir = tempfile::tempdir().unwrap();
1284        let uri = dir.path().to_str().unwrap();
1285        let adapter = Arc::new(RecordingStorageAdapter::default());
1286
1287        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1288            .await
1289            .unwrap();
1290        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1291        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1292        assert!(
1293            adapter
1294                .writes()
1295                .contains(&join_uri(uri, "__schema_state.json"))
1296        );
1297
1298        Omnigraph::open_with_storage(uri, adapter.clone())
1299            .await
1300            .unwrap();
1301        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1302        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1303        assert!(
1304            adapter
1305                .reads()
1306                .contains(&join_uri(uri, "__schema_state.json"))
1307        );
1308        assert!(
1309            adapter
1310                .exists_checks()
1311                .contains(&join_uri(uri, "_schema.ir.json"))
1312        );
1313        assert!(
1314            adapter
1315                .exists_checks()
1316                .contains(&join_uri(uri, "__schema_state.json"))
1317        );
1318        assert!(
1319            adapter
1320                .exists_checks()
1321                .contains(&join_uri(uri, "_graph_commits.lance"))
1322        );
1323    }
1324
1325    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1326        let snapshot = db.snapshot();
1327        let ds = snapshot.open(table_key).await.unwrap();
1328        let batches = db.table_store().scan_batches(&ds).await.unwrap();
1329        batches
1330            .into_iter()
1331            .flat_map(|batch| {
1332                (0..batch.num_rows())
1333                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1334                    .collect::<Vec<_>>()
1335            })
1336            .collect()
1337    }
1338
1339    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1340        let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
1341        let schema: Arc<Schema> = Arc::new(ds.schema().into());
1342        let columns: Vec<Arc<dyn Array>> = schema
1343            .fields()
1344            .iter()
1345            .map(|field| match field.name().as_str() {
1346                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1347                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1348                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1349                _ => new_null_array(field.data_type(), 1),
1350            })
1351            .collect();
1352        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1353        let state = db
1354            .table_store()
1355            .append_batch(&full_path, &mut ds, batch)
1356            .await
1357            .unwrap();
1358        db.commit_updates(&[crate::db::SubTableUpdate {
1359            table_key: "node:Person".to_string(),
1360            table_version: state.version,
1361            table_branch,
1362            row_count: state.row_count,
1363            version_metadata: state.version_metadata,
1364        }])
1365        .await
1366        .unwrap();
1367    }
1368
1369    #[tokio::test]
1370    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1371        let dir = tempfile::tempdir().unwrap();
1372        let uri = dir.path().to_str().unwrap();
1373        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1374        seed_person_row(&mut db, "Alice", Some(30)).await;
1375
1376        let desired = TEST_SCHEMA.replace(
1377            "    age: I32?\n}",
1378            "    age: I32?\n    nickname: String?\n}",
1379        );
1380        let result = db.apply_schema(&desired).await.unwrap();
1381        assert!(result.applied);
1382
1383        let reopened = Omnigraph::open(uri).await.unwrap();
1384        let rows = table_rows_json(&reopened, "node:Person").await;
1385        assert_eq!(rows.len(), 1);
1386        assert_eq!(rows[0]["name"], "Alice");
1387        assert_eq!(rows[0]["age"], 30);
1388        assert!(rows[0]["nickname"].is_null());
1389        assert!(
1390            reopened.catalog().node_types["Person"]
1391                .properties
1392                .contains_key("nickname")
1393        );
1394        assert!(dir.path().join("_schema.pg").exists());
1395    }
1396
1397    #[tokio::test]
1398    async fn test_apply_schema_renames_property_and_preserves_values() {
1399        let dir = tempfile::tempdir().unwrap();
1400        let uri = dir.path().to_str().unwrap();
1401        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1402        seed_person_row(&mut db, "Alice", Some(30)).await;
1403
1404        let desired = TEST_SCHEMA.replace(
1405            "    age: I32?\n}",
1406            "    years: I32? @rename_from(\"age\")\n}",
1407        );
1408        db.apply_schema(&desired).await.unwrap();
1409
1410        let reopened = Omnigraph::open(uri).await.unwrap();
1411        let rows = table_rows_json(&reopened, "node:Person").await;
1412        assert_eq!(rows[0]["name"], "Alice");
1413        assert_eq!(rows[0]["years"], 30);
1414        assert!(rows[0].get("age").is_none());
1415    }
1416
1417    #[tokio::test]
1418    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1419        let dir = tempfile::tempdir().unwrap();
1420        let uri = dir.path().to_str().unwrap();
1421        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1422        seed_person_row(&mut db, "Alice", Some(30)).await;
1423        let before_version = db.snapshot().version();
1424
1425        let desired = TEST_SCHEMA
1426            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1427            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1428            .replace(
1429                "edge WorksAt: Person -> Company",
1430                "edge WorksAt: Human -> Company",
1431            );
1432        db.apply_schema(&desired).await.unwrap();
1433
1434        let head = db.snapshot();
1435        assert!(head.entry("node:Person").is_none());
1436        assert!(head.entry("node:Human").is_some());
1437        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1438            .await
1439            .unwrap();
1440        assert!(historical.entry("node:Person").is_some());
1441        assert!(historical.entry("node:Human").is_none());
1442    }
1443
1444    #[tokio::test]
1445    async fn test_apply_schema_succeeds_after_load() {
1446        // Historical: schema apply used to be blocked by leftover
1447        // `__run__` branches. A defense-in-depth filter now skips
1448        // internal system branches, and run branches were made
1449        // ephemeral on every terminal state — so in practice no
1450        // `__run__` branch survives publish. The filter still guards
1451        // the invariant.
1452        let dir = tempfile::tempdir().unwrap();
1453        let uri = dir.path().to_str().unwrap();
1454        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1455
1456        crate::loader::load_jsonl(
1457            &mut db,
1458            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
1459            crate::loader::LoadMode::Overwrite,
1460        )
1461        .await
1462        .unwrap();
1463
1464        let all_branches = db.coordinator.all_branches().await.unwrap();
1465        assert!(
1466            !all_branches.iter().any(|b| is_internal_run_branch(b)),
1467            "run branch should be deleted after publish, got: {:?}",
1468            all_branches
1469        );
1470
1471        let desired = TEST_SCHEMA.replace(
1472            "    age: I32?\n}",
1473            "    age: I32?\n    nickname: String?\n}",
1474        );
1475        let result = db.apply_schema(&desired).await.unwrap();
1476        assert!(result.applied, "schema apply should have applied");
1477    }
1478
1479    #[tokio::test]
1480    async fn test_apply_schema_adds_index_for_existing_property() {
1481        let dir = tempfile::tempdir().unwrap();
1482        let uri = dir.path().to_str().unwrap();
1483        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1484
1485        let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1486        db.apply_schema(&desired).await.unwrap();
1487
1488        let snapshot = db.snapshot();
1489        let ds = snapshot.open("node:Person").await.unwrap();
1490        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1491    }
1492
1493    #[tokio::test]
1494    async fn test_apply_schema_rewrite_preserves_existing_indices() {
1495        let dir = tempfile::tempdir().unwrap();
1496        let uri = dir.path().to_str().unwrap();
1497        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1498        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1499        seed_person_row(&mut db, "Alice", Some(30)).await;
1500
1501        let desired = initial_schema.replace(
1502            "    age: I32?\n}",
1503            "    age: I32?\n    nickname: String?\n}",
1504        );
1505        db.apply_schema(&desired).await.unwrap();
1506
1507        let snapshot = db.snapshot();
1508        let ds = snapshot.open("node:Person").await.unwrap();
1509        assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1510        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1511    }
1512
1513    #[tokio::test]
1514    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1515        let dir = tempfile::tempdir().unwrap();
1516        let uri = dir.path().to_str().unwrap();
1517        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1518        let mut db = db;
1519        db.coordinator
1520            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1521            .await
1522            .unwrap();
1523
1524        let err = db.open_for_mutation("node:Person").await.unwrap_err();
1525        assert!(
1526            err.to_string()
1527                .contains("write is unavailable while schema apply is in progress")
1528        );
1529    }
1530
1531    #[tokio::test]
1532    async fn test_commit_updates_rejects_while_schema_apply_locked() {
1533        let dir = tempfile::tempdir().unwrap();
1534        let uri = dir.path().to_str().unwrap();
1535        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1536        db.coordinator
1537            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1538            .await
1539            .unwrap();
1540
1541        let err = db.commit_updates(&[]).await.unwrap_err();
1542        assert!(
1543            err.to_string()
1544                .contains("write commit is unavailable while schema apply is in progress")
1545        );
1546    }
1547
1548    #[tokio::test]
1549    async fn test_branch_list_hides_schema_apply_lock_branch() {
1550        let dir = tempfile::tempdir().unwrap();
1551        let uri = dir.path().to_str().unwrap();
1552        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1553        db.coordinator
1554            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1555            .await
1556            .unwrap();
1557
1558        let branches = db.branch_list().await.unwrap();
1559        assert_eq!(branches, vec!["main".to_string()]);
1560    }
1561}