Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arrow_array::{
6    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
7    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
8    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
9};
10use arrow_schema::{DataType, Field, Schema};
11use lance::Dataset;
12use lance::blob::{BlobArrayBuilder, blob_field};
13use lance::dataset::BlobFile;
14use lance::dataset::scanner::ColumnOrdering;
15use lance::datatypes::BlobKind;
16use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
17use omnigraph_compiler::schema::parser::parse_schema;
18use omnigraph_compiler::types::ScalarType;
19use omnigraph_compiler::{
20    SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
21    build_schema_ir, plan_schema_migration,
22};
23
24use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
25use crate::db::run_registry::{RunRecord, RunStatus};
26use crate::error::{ManifestErrorKind, OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
37
38use super::commit_graph::GraphCommit;
39use super::manifest::{
40    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
41    table_path_for_table_key,
42};
43use super::schema_state::{
44    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
45    validate_schema_contract, write_schema_contract,
46};
47use super::{
48    ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId,
49    is_internal_system_branch, is_schema_apply_lock_branch,
50};
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum MergeOutcome {
54    AlreadyUpToDate,
55    FastForward,
56    Merged,
57}
58
59#[derive(Debug, Clone)]
60pub struct SchemaApplyResult {
61    pub supported: bool,
62    pub applied: bool,
63    pub manifest_version: u64,
64    pub steps: Vec<SchemaMigrationStep>,
65}
66
67/// Top-level handle to an Omnigraph database.
68///
69/// An Omnigraph is a Lance-native graph database with git-style branching.
70/// It stores typed property graphs as per-type Lance datasets coordinated
71/// through a Lance manifest table.
72pub struct Omnigraph {
73    root_uri: String,
74    storage: Arc<dyn StorageAdapter>,
75    coordinator: GraphCoordinator,
76    table_store: TableStore,
77    runtime_cache: RuntimeCache,
78    catalog: Catalog,
79    schema_source: String,
80    pub(crate) audit_actor_id: Option<String>,
81}
82
83impl Omnigraph {
84    /// Create a new repo at `uri` from schema source.
85    ///
86    /// Creates `_schema.pg`, per-type Lance datasets, and `__manifest`.
87    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
88        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
89    }
90
91    pub(crate) async fn init_with_storage(
92        uri: &str,
93        schema_source: &str,
94        storage: Arc<dyn StorageAdapter>,
95    ) -> Result<Self> {
96        let root = normalize_root_uri(uri)?;
97        let schema_ir = read_schema_ir_from_source(schema_source)?;
98        let mut catalog = build_catalog_from_ir(&schema_ir)?;
99        fixup_blob_schemas(&mut catalog);
100
101        // Write _schema.pg
102        let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
103        storage.write_text(&schema_path, schema_source).await?;
104        write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
105
106        // Create manifest + per-type datasets
107        let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
108
109        Ok(Self {
110            root_uri: root.clone(),
111            storage,
112            coordinator,
113            table_store: TableStore::new(&root),
114            runtime_cache: RuntimeCache::default(),
115            catalog,
116            schema_source: schema_source.to_string(),
117            audit_actor_id: None,
118        })
119    }
120
121    /// Open an existing repo.
122    ///
123    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
124    pub async fn open(uri: &str) -> Result<Self> {
125        Self::open_with_storage(uri, storage_for_uri(uri)?).await
126    }
127
128    pub(crate) async fn open_with_storage(
129        uri: &str,
130        storage: Arc<dyn StorageAdapter>,
131    ) -> Result<Self> {
132        let root = normalize_root_uri(uri)?;
133        // Read _schema.pg
134        let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
135        let schema_source = storage.read_text(&schema_path).await?;
136        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
137        let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
138        let branches = coordinator.branch_list().await?;
139        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
140            &root,
141            Arc::clone(&storage),
142            &branches,
143            &current_source_ir,
144        )
145        .await?;
146        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
147        fixup_blob_schemas(&mut catalog);
148
149        Ok(Self {
150            root_uri: root.clone(),
151            storage,
152            coordinator,
153            table_store: TableStore::new(&root),
154            runtime_cache: RuntimeCache::default(),
155            catalog,
156            schema_source,
157            audit_actor_id: None,
158        })
159    }
160
161    pub fn catalog(&self) -> &Catalog {
162        &self.catalog
163    }
164
165    pub fn schema_source(&self) -> &str {
166        &self.schema_source
167    }
168
169    pub fn uri(&self) -> &str {
170        &self.root_uri
171    }
172
173    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
174        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
175    }
176
177    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
178        schema_apply::plan_schema(self, desired_schema_source).await
179    }
180
181    pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
182        schema_apply::apply_schema(self, desired_schema_source).await
183    }
184
185    pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
186        schema_apply::ensure_schema_apply_idle(self, operation).await
187    }
188
189    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
190        schema_apply::ensure_schema_apply_not_locked(self, operation).await
191    }
192
193    pub(crate) fn table_store(&self) -> &TableStore {
194        &self.table_store
195    }
196
197    pub(crate) async fn open_coordinator_for_branch(
198        &self,
199        branch: Option<&str>,
200    ) -> Result<GraphCoordinator> {
201        match branch {
202            Some(branch) => {
203                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
204            }
205            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
206        }
207    }
208
209    pub(crate) async fn swap_coordinator_for_branch(
210        &mut self,
211        branch: Option<&str>,
212    ) -> Result<GraphCoordinator> {
213        let next = self.open_coordinator_for_branch(branch).await?;
214        Ok(std::mem::replace(&mut self.coordinator, next))
215    }
216
217    pub(crate) fn restore_coordinator(&mut self, coordinator: GraphCoordinator) {
218        self.coordinator = coordinator;
219    }
220
221    pub(crate) async fn resolved_branch_target(
222        &self,
223        branch: Option<&str>,
224    ) -> Result<ResolvedTarget> {
225        self.ensure_schema_state_valid().await?;
226        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
227        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
228        if normalized.as_deref() == self.coordinator.current_branch() {
229            let snapshot_id = self.coordinator.head_commit_id().await?.unwrap_or_else(|| {
230                SnapshotId::synthetic(
231                    self.coordinator.current_branch(),
232                    self.coordinator.version(),
233                )
234            });
235            return Ok(ResolvedTarget {
236                requested,
237                branch: self.coordinator.current_branch().map(str::to_string),
238                snapshot_id,
239                snapshot: self.coordinator.snapshot(),
240            });
241        }
242        self.coordinator.resolve_target(&requested).await
243    }
244
245    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
246        self.resolved_branch_target(branch)
247            .await
248            .map(|resolved| resolved.snapshot)
249    }
250
251    pub(crate) fn version(&self) -> u64 {
252        self.coordinator.version()
253    }
254
255    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
256    pub(crate) fn snapshot(&self) -> Snapshot {
257        self.coordinator.snapshot()
258    }
259
260    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
261        self.resolved_target(target)
262            .await
263            .map(|resolved| resolved.snapshot)
264    }
265
266    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
267        self.snapshot_of(target)
268            .await
269            .map(|snapshot| snapshot.version())
270    }
271
272    pub async fn resolved_branch_of(
273        &self,
274        target: impl Into<ReadTarget>,
275    ) -> Result<Option<String>> {
276        self.resolved_target(target)
277            .await
278            .map(|resolved| resolved.branch)
279    }
280
281    /// Synchronize this handle's write base to the latest head of the named branch.
282    pub async fn sync_branch(&mut self, branch: &str) -> Result<()> {
283        self.ensure_schema_state_valid().await?;
284        let branch = normalize_branch_name(branch)?;
285        self.coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
286        self.runtime_cache.invalidate_all().await;
287        Ok(())
288    }
289
290    /// Re-read the handle-local coordinator state from storage.
291    pub(crate) async fn refresh(&mut self) -> Result<()> {
292        self.coordinator.refresh().await?;
293        self.runtime_cache.invalidate_all().await;
294        Ok(())
295    }
296
297    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
298        self.ensure_schema_state_valid().await?;
299        self.coordinator.resolve_snapshot_id(branch).await
300    }
301
302    pub(crate) async fn resolved_target(
303        &self,
304        target: impl Into<ReadTarget>,
305    ) -> Result<ResolvedTarget> {
306        self.ensure_schema_state_valid().await?;
307        self.coordinator.resolve_target(&target.into()).await
308    }
309
310    // ─── Change detection ────────────────────────────────────────────────
311
312    pub async fn diff_between(
313        &self,
314        from: impl Into<ReadTarget>,
315        to: impl Into<ReadTarget>,
316        filter: &crate::changes::ChangeFilter,
317    ) -> Result<crate::changes::ChangeSet> {
318        let from_resolved = self.resolved_target(from).await?;
319        let to_resolved = self.resolved_target(to).await?;
320        crate::changes::diff_snapshots(
321            self.uri(),
322            &from_resolved.snapshot,
323            &to_resolved.snapshot,
324            filter,
325            to_resolved.branch.clone().or(from_resolved.branch.clone()),
326        )
327        .await
328    }
329
330    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
331    /// and creates branch-aware snapshots. Supports cross-branch comparison.
332    pub async fn diff_commits(
333        &self,
334        from_commit_id: &str,
335        to_commit_id: &str,
336        filter: &crate::changes::ChangeFilter,
337    ) -> Result<crate::changes::ChangeSet> {
338        let from_commit = self
339            .coordinator
340            .resolve_commit(&SnapshotId::new(from_commit_id))
341            .await?;
342        let to_commit = self
343            .coordinator
344            .resolve_commit(&SnapshotId::new(to_commit_id))
345            .await?;
346        let from_snap = self
347            .coordinator
348            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
349                from_commit.graph_commit_id.clone(),
350            )))
351            .await?;
352        let to_snap = self
353            .coordinator
354            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
355                to_commit.graph_commit_id.clone(),
356            )))
357            .await?;
358        crate::changes::diff_snapshots(
359            self.uri(),
360            &from_snap.snapshot,
361            &to_snap.snapshot,
362            filter,
363            to_snap.branch.clone().or(from_snap.branch.clone()),
364        )
365        .await
366    }
367
368    pub async fn entity_at_target(
369        &self,
370        target: impl Into<ReadTarget>,
371        table_key: &str,
372        id: &str,
373    ) -> Result<Option<serde_json::Value>> {
374        export::entity_at_target(self, target, table_key, id).await
375    }
376
377    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
378    pub async fn entity_at(
379        &self,
380        table_key: &str,
381        id: &str,
382        version: u64,
383    ) -> Result<Option<serde_json::Value>> {
384        export::entity_at(self, table_key, id, version).await
385    }
386
387    /// Create a Snapshot at any historical manifest version.
388    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
389        self.ensure_schema_state_valid().await?;
390        self.coordinator.snapshot_at_version(version).await
391    }
392
393    pub async fn export_jsonl(
394        &self,
395        branch: &str,
396        type_names: &[String],
397        table_keys: &[String],
398    ) -> Result<String> {
399        export::export_jsonl(self, branch, type_names, table_keys).await
400    }
401
402    pub async fn export_jsonl_to_writer<W: Write>(
403        &self,
404        branch: &str,
405        type_names: &[String],
406        table_keys: &[String],
407        writer: &mut W,
408    ) -> Result<()> {
409        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
410    }
411
412    // ─── Graph index ──────────────────────────────────────────────────────
413
414    /// Get or build the graph index for the current snapshot.
415    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
416        table_ops::graph_index(self).await
417    }
418
419    pub(crate) async fn graph_index_for_resolved(
420        &self,
421        resolved: &ResolvedTarget,
422    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
423        table_ops::graph_index_for_resolved(self, resolved).await
424    }
425
426    /// Ensure BTree scalar indices exist on key columns.
427    /// Idempotent — Lance skips if index already exists.
428    ///
429    /// Opens sub-tables at their latest version (not snapshot-pinned) because
430    /// indices must be created on the current head. Any version drift from the
431    /// snapshot is expected and logged. The resulting versions are committed
432    /// back to the manifest.
433    ///
434    /// On named branches, indexing preserves lazy branching:
435    /// unbranched subtables keep inheriting `main`, while subtables inherited
436    /// from an ancestor branch are first forked into the active branch before
437    /// their index metadata is updated.
438    pub async fn ensure_indices(&mut self) -> Result<()> {
439        table_ops::ensure_indices(self).await
440    }
441
442    pub async fn ensure_indices_on(&mut self, branch: &str) -> Result<()> {
443        table_ops::ensure_indices_on(self, branch).await
444    }
445
446    /// Compact small Lance fragments into fewer larger ones across every
447    /// node + edge table on `main`. See [`optimize`] for details.
448    pub async fn optimize(&mut self) -> Result<Vec<optimize::TableOptimizeStats>> {
449        optimize::optimize_all_tables(self).await
450    }
451
452    /// Remove Lance manifests (and the fragments they uniquely own) per the
453    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
454    /// history. See [`optimize`] for details.
455    pub async fn cleanup(
456        &mut self,
457        options: optimize::CleanupPolicyOptions,
458    ) -> Result<Vec<optimize::TableCleanupStats>> {
459        optimize::cleanup_all_tables(self, options).await
460    }
461
462    /// Read a blob from a node by its string ID and property name.
463    ///
464    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
465    /// and metadata accessors (`size()`, `kind()`, `uri()`).
466    ///
467    /// ```ignore
468    /// let blob = db.read_blob("Document", "readme", "content").await?;
469    /// let bytes = blob.read().await?;
470    /// ```
471    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
472        self.ensure_schema_state_valid().await?;
473        let node_type = self
474            .catalog
475            .node_types
476            .get(type_name)
477            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
478        if !node_type.blob_properties.contains(property) {
479            return Err(OmniError::manifest(format!(
480                "property '{}' on type '{}' is not a Blob",
481                property, type_name
482            )));
483        }
484
485        let snapshot = self.snapshot();
486        let table_key = format!("node:{}", type_name);
487        let ds = snapshot.open(&table_key).await?;
488
489        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
490        let row_id = self
491            .table_store
492            .first_row_id_for_filter(&ds, &filter_sql)
493            .await?
494            .ok_or_else(|| {
495                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
496            })?;
497
498        // Use take_blobs to get the BlobFile handle
499        let ds = Arc::new(ds);
500        let mut blobs = ds
501            .take_blobs(&[row_id], property)
502            .await
503            .map_err(|e| OmniError::Lance(e.to_string()))?;
504
505        blobs.pop().ok_or_else(|| {
506            OmniError::manifest(format!(
507                "blob '{}' on {} '{}' returned no data",
508                property, type_name, id
509            ))
510        })
511    }
512
513    pub(crate) fn active_branch(&self) -> Option<&str> {
514        self.coordinator.current_branch()
515    }
516
517    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
518        let descendants = self.coordinator.branch_descendants(branch).await?;
519        if let Some(descendant) = descendants.first() {
520            return Err(OmniError::manifest_conflict(format!(
521                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
522                branch, descendant
523            )));
524        }
525
526        for run in self.list_runs().await? {
527            if run.target_branch == branch && matches!(run.status, RunStatus::Running) {
528                return Err(OmniError::manifest_conflict(format!(
529                    "cannot delete branch '{}' while run '{}' targeting it is {}",
530                    branch,
531                    run.run_id,
532                    run.status.as_str()
533                )));
534            }
535        }
536
537        for other_branch in branches
538            .iter()
539            .filter(|candidate| candidate.as_str() != branch)
540        {
541            let snapshot = self
542                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
543                .await?;
544            if snapshot
545                .entries()
546                .any(|entry| entry.table_branch.as_deref() == Some(branch))
547            {
548                return Err(OmniError::manifest_conflict(format!(
549                    "cannot delete branch '{}' because branch '{}' still depends on it",
550                    branch, other_branch
551                )));
552            }
553        }
554
555        Ok(())
556    }
557
558    async fn cleanup_deleted_branch_tables(
559        &self,
560        branch: &str,
561        owned_tables: &[(String, String)],
562    ) -> Result<()> {
563        let mut seen_paths = HashSet::new();
564        let mut cleanup_targets = owned_tables
565            .iter()
566            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
567            .cloned()
568            .collect::<Vec<_>>();
569        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
570
571        for (table_key, table_path) in cleanup_targets {
572            let dataset_uri = self.table_store.dataset_uri(&table_path);
573            if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
574                return Err(OmniError::manifest_internal(format!(
575                    "branch '{}' was deleted but cleanup failed for {}: {}",
576                    branch, table_key, err
577                )));
578            }
579        }
580
581        Ok(())
582    }
583
584    async fn delete_branch_storage_only(&mut self, branch: &str) -> Result<()> {
585        if self.coordinator.current_branch() == Some(branch) {
586            return Err(OmniError::manifest_conflict(format!(
587                "cannot delete currently active branch '{}'",
588                branch
589            )));
590        }
591
592        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
593        let owned_tables = branch_snapshot
594            .entries()
595            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
596            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
597            .collect::<Vec<_>>();
598
599        self.coordinator.branch_delete(branch).await?;
600        self.cleanup_deleted_branch_tables(branch, &owned_tables)
601            .await
602    }
603
604    async fn cleanup_terminal_run_branches_for_target(&mut self, branch: &str) -> Result<()> {
605        let terminal_run_branches = self
606            .list_runs()
607            .await?
608            .into_iter()
609            .filter(|run| {
610                run.target_branch == branch
611                    && matches!(
612                        run.status,
613                        RunStatus::Published | RunStatus::Aborted | RunStatus::Failed
614                    )
615            })
616            .map(|run| run.run_branch)
617            .collect::<Vec<_>>();
618
619        let live_branches: HashSet<String> =
620            self.coordinator.all_branches().await?.into_iter().collect();
621
622        for run_branch in terminal_run_branches {
623            if !live_branches.contains(&run_branch) {
624                continue;
625            }
626            match self.delete_branch_storage_only(&run_branch).await {
627                Ok(()) => {}
628                Err(OmniError::Manifest(err)) if err.kind == ManifestErrorKind::NotFound => {}
629                Err(err) => return Err(err),
630            }
631        }
632
633        Ok(())
634    }
635
636    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
637        normalize_branch_name(branch)
638    }
639
640    pub(crate) async fn head_commit_id_for_branch(
641        &self,
642        branch: Option<&str>,
643    ) -> Result<Option<String>> {
644        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
645        coordinator.ensure_commit_graph_initialized().await?;
646        coordinator
647            .head_commit_id()
648            .await
649            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
650    }
651
652    pub async fn branch_create(&mut self, name: &str) -> Result<()> {
653        self.ensure_schema_state_valid().await?;
654        self.ensure_schema_apply_idle("branch_create").await?;
655        ensure_public_branch_ref(name, "branch_create")?;
656        self.coordinator.branch_create(name).await
657    }
658
659    pub(crate) fn current_audit_actor(&self) -> Option<&str> {
660        self.audit_actor_id.as_deref()
661    }
662
663    pub async fn branch_create_from(
664        &mut self,
665        from: impl Into<ReadTarget>,
666        name: &str,
667    ) -> Result<()> {
668        self.ensure_schema_apply_idle("branch_create_from").await?;
669        self.branch_create_from_impl(from, name, false).await
670    }
671
672    async fn branch_create_from_impl(
673        &mut self,
674        from: impl Into<ReadTarget>,
675        name: &str,
676        allow_internal_refs: bool,
677    ) -> Result<()> {
678        let target = from.into();
679        let ReadTarget::Branch(branch_name) = target else {
680            return Err(OmniError::manifest(
681                "branch creation from pinned snapshots is not supported yet".to_string(),
682            ));
683        };
684        if !allow_internal_refs {
685            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
686            ensure_public_branch_ref(name, "branch_create_from")?;
687        }
688        let branch = normalize_branch_name(&branch_name)?;
689        let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?;
690        let result = self.coordinator.branch_create(name).await;
691        self.restore_coordinator(previous);
692        result
693    }
694
695    pub async fn branch_list(&self) -> Result<Vec<String>> {
696        self.ensure_schema_state_valid().await?;
697        self.coordinator.branch_list().await
698    }
699
700    pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
701        self.ensure_schema_state_valid().await?;
702        self.ensure_schema_apply_idle("branch_delete").await?;
703        ensure_public_branch_ref(name, "branch_delete")?;
704        self.refresh().await?;
705        let branch = normalize_branch_name(name)?
706            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
707        let branches = self.coordinator.branch_list().await?;
708        if !branches.iter().any(|candidate| candidate == &branch) {
709            return Err(OmniError::manifest_not_found(format!(
710                "branch '{}' not found",
711                branch
712            )));
713        }
714
715        self.ensure_branch_delete_safe(&branch, &branches).await?;
716        self.cleanup_terminal_run_branches_for_target(&branch)
717            .await?;
718        self.delete_branch_storage_only(&branch).await
719    }
720
721    pub(crate) async fn latest_branch_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
722        let normalized = normalize_branch_name(branch)?;
723        let fresh = self
724            .open_coordinator_for_branch(normalized.as_deref())
725            .await?;
726        fresh.resolve_snapshot_id(branch).await
727    }
728
729    pub async fn begin_run(
730        &mut self,
731        target_branch: &str,
732        operation_hash: Option<&str>,
733    ) -> Result<RunRecord> {
734        self.begin_run_as(target_branch, operation_hash, None).await
735    }
736
737    pub async fn begin_run_as(
738        &mut self,
739        target_branch: &str,
740        operation_hash: Option<&str>,
741        actor_id: Option<&str>,
742    ) -> Result<RunRecord> {
743        self.ensure_schema_state_valid().await?;
744        self.ensure_schema_apply_idle("begin_run").await?;
745        ensure_public_branch_ref(target_branch, "begin_run")?;
746        let target_branch =
747            normalize_branch_name(target_branch)?.unwrap_or_else(|| "main".to_string());
748        let fresh = self
749            .open_coordinator_for_branch(Self::normalize_branch_name(&target_branch)?.as_deref())
750            .await?;
751        let base_snapshot_id = fresh.resolve_snapshot_id(&target_branch).await?;
752        let base_manifest_version = fresh.version();
753        let record = RunRecord::new(
754            target_branch.clone(),
755            base_snapshot_id.as_str(),
756            base_manifest_version,
757            operation_hash.map(str::to_string),
758            actor_id
759                .map(str::to_string)
760                .or_else(|| self.current_audit_actor().map(str::to_string)),
761        )?;
762
763        self.branch_create_from_impl(
764            ReadTarget::branch(target_branch.clone()),
765            &record.run_branch,
766            true,
767        )
768        .await?;
769        self.coordinator.append_run_record(&record).await?;
770        Ok(record)
771    }
772
773    pub async fn get_run(&self, run_id: &RunId) -> Result<RunRecord> {
774        self.ensure_schema_state_valid().await?;
775        self.coordinator.get_run(run_id).await
776    }
777
778    pub async fn list_runs(&self) -> Result<Vec<RunRecord>> {
779        self.ensure_schema_state_valid().await?;
780        self.coordinator.list_runs().await
781    }
782
783    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
784        self.ensure_schema_state_valid().await?;
785        self.coordinator
786            .resolve_commit(&SnapshotId::new(commit_id))
787            .await
788    }
789
790    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
791        self.ensure_schema_state_valid().await?;
792        let branch = match branch {
793            Some(branch) => normalize_branch_name(branch)?,
794            None => None,
795        };
796        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
797        coordinator.list_commits().await
798    }
799
800    pub async fn abort_run(&mut self, run_id: &RunId) -> Result<RunRecord> {
801        self.ensure_schema_state_valid().await?;
802        let run = self.get_run(run_id).await?;
803        match run.status {
804            RunStatus::Running | RunStatus::Failed => {
805                self.terminate_run(&run, RunStatus::Aborted, None).await
806            }
807            RunStatus::Published => Err(OmniError::manifest_conflict(format!(
808                "run '{}' is already published",
809                run_id
810            ))),
811            RunStatus::Aborted => Err(OmniError::manifest_conflict(format!(
812                "run '{}' is already aborted",
813                run_id
814            ))),
815        }
816    }
817
818    pub async fn fail_run(&mut self, run_id: &RunId) -> Result<RunRecord> {
819        self.ensure_schema_state_valid().await?;
820        let run = self.get_run(run_id).await?;
821        match run.status {
822            RunStatus::Running => self.terminate_run(&run, RunStatus::Failed, None).await,
823            RunStatus::Failed => Ok(run),
824            RunStatus::Published => Err(OmniError::manifest_conflict(format!(
825                "run '{}' is already published",
826                run_id
827            ))),
828            RunStatus::Aborted => Err(OmniError::manifest_conflict(format!(
829                "run '{}' is already aborted",
830                run_id
831            ))),
832        }
833    }
834
835    /// Append a terminal-state run record and delete the `__run__` branch.
836    /// The status record is authoritative; the branch is scaffolding. Delete
837    /// errors are swallowed — a later `branch_delete` of the target will
838    /// retry via `cleanup_terminal_run_branches_for_target`.
839    async fn terminate_run(
840        &mut self,
841        run: &RunRecord,
842        status: RunStatus,
843        published_snapshot_id: Option<String>,
844    ) -> Result<RunRecord> {
845        let updated = run.with_status(status, published_snapshot_id)?;
846        self.coordinator.append_run_record(&updated).await?;
847        let _ = self.delete_branch_storage_only(&updated.run_branch).await;
848        Ok(updated)
849    }
850
851    pub async fn publish_run(&mut self, run_id: &RunId) -> Result<SnapshotId> {
852        self.publish_run_as(run_id, None).await
853    }
854
855    pub async fn publish_run_as(
856        &mut self,
857        run_id: &RunId,
858        actor_id: Option<&str>,
859    ) -> Result<SnapshotId> {
860        self.ensure_schema_state_valid().await?;
861        self.ensure_schema_apply_idle("publish_run").await?;
862        let run = self.get_run(run_id).await?;
863        match run.status {
864            RunStatus::Running => {}
865            RunStatus::Published => {
866                return run
867                    .published_snapshot_id
868                    .clone()
869                    .map(SnapshotId::new)
870                    .ok_or_else(|| {
871                        OmniError::manifest(format!(
872                            "run '{}' is published but missing published snapshot id",
873                            run_id
874                        ))
875                    });
876            }
877            RunStatus::Failed | RunStatus::Aborted => {
878                return Err(OmniError::manifest_conflict(format!(
879                    "run '{}' is not publishable from status '{}'",
880                    run_id,
881                    run.status.as_str()
882                )));
883            }
884        }
885
886        let publish_actor = actor_id
887            .map(str::to_string)
888            .or_else(|| run.actor_id.clone());
889        let current_target_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
890        let previous_actor = self.audit_actor_id.clone();
891        self.audit_actor_id = publish_actor.clone();
892        let publish_result = if current_target_snapshot_id.as_str() == run.base_snapshot_id {
893            let run_for_promotion = run.clone();
894            self.sync_branch(&run_for_promotion.target_branch).await?;
895            self.promote_run_snapshot_to_target(&run_for_promotion)
896                .await
897        } else {
898            let run_branch = run.run_branch.clone();
899            let target_branch = run.target_branch.clone();
900            self.branch_merge_internal(&run_branch, &target_branch)
901                .await?;
902            self.reify_internal_run_refs(&target_branch, &run_branch)
903                .await
904        };
905        self.audit_actor_id = previous_actor;
906        publish_result?;
907        let published_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
908        self.terminate_run(
909            &run,
910            RunStatus::Published,
911            Some(published_snapshot_id.as_str().to_string()),
912        )
913        .await?;
914        Ok(published_snapshot_id)
915    }
916
917    async fn promote_run_snapshot_to_target(&mut self, run: &RunRecord) -> Result<()> {
918        let target_snapshot = self
919            .snapshot_of(ReadTarget::branch(run.target_branch.as_str()))
920            .await?;
921        let run_snapshot = self
922            .snapshot_of(ReadTarget::branch(run.run_branch.as_str()))
923            .await?;
924        let mut table_keys = std::collections::BTreeSet::new();
925        for entry in target_snapshot.entries() {
926            table_keys.insert(entry.table_key.clone());
927        }
928        for entry in run_snapshot.entries() {
929            table_keys.insert(entry.table_key.clone());
930        }
931
932        let mut updates = Vec::new();
933        let mut changed_edge_tables = false;
934        let target_branch = normalize_branch_name(&run.target_branch)?;
935
936        for table_key in table_keys {
937            let target_entry = target_snapshot.entry(&table_key);
938            let run_entry = run_snapshot.entry(&table_key);
939            if same_manifest_state(target_entry, run_entry) {
940                continue;
941            }
942            let Some(_run_entry) = run_entry else {
943                return Err(OmniError::manifest(format!(
944                    "run '{}' removed table '{}' which publish_run does not support",
945                    run.run_id, table_key
946                )));
947            };
948
949            let source_ds = run_snapshot.open(&table_key).await?;
950            let batch = self.batch_for_table_rewrite(&source_ds, &table_key).await?;
951
952            let (mut target_ds, full_path, table_branch) = self
953                .open_for_mutation_on_branch(target_branch.as_deref(), &table_key)
954                .await?;
955            let state = self
956                .table_store()
957                .overwrite_batch(&full_path, &mut target_ds, batch)
958                .await?;
959            updates.push(crate::db::SubTableUpdate {
960                table_key: table_key.clone(),
961                table_version: state.version,
962                table_branch,
963                row_count: state.row_count,
964                version_metadata: state.version_metadata,
965            });
966            if table_key.starts_with("edge:") {
967                changed_edge_tables = true;
968            }
969        }
970
971        if !updates.is_empty() {
972            self.commit_updates_on_branch(target_branch.as_deref(), &updates)
973                .await?;
974            if changed_edge_tables {
975                self.invalidate_graph_index().await;
976            }
977        }
978
979        Ok(())
980    }
981
982    async fn reify_internal_run_refs(
983        &mut self,
984        target_branch: &str,
985        run_branch: &str,
986    ) -> Result<()> {
987        let target_snapshot = self.snapshot_of(ReadTarget::branch(target_branch)).await?;
988        let mut updates = Vec::new();
989        let mut changed_edge_tables = false;
990        let target_branch = normalize_branch_name(target_branch)?;
991
992        for entry in target_snapshot.entries() {
993            if entry.table_branch.as_deref() != Some(run_branch) {
994                continue;
995            }
996
997            let source_ds = target_snapshot.open(&entry.table_key).await?;
998            let batch = self
999                .batch_for_table_rewrite(&source_ds, &entry.table_key)
1000                .await?;
1001
1002            let (mut target_ds, full_path, table_branch) = self
1003                .open_for_mutation_on_branch(target_branch.as_deref(), &entry.table_key)
1004                .await?;
1005            let state = self
1006                .table_store()
1007                .overwrite_batch(&full_path, &mut target_ds, batch)
1008                .await?;
1009            updates.push(crate::db::SubTableUpdate {
1010                table_key: entry.table_key.clone(),
1011                table_version: state.version,
1012                table_branch,
1013                row_count: state.row_count,
1014                version_metadata: state.version_metadata,
1015            });
1016            if entry.table_key.starts_with("edge:") {
1017                changed_edge_tables = true;
1018            }
1019        }
1020
1021        if !updates.is_empty() {
1022            self.commit_updates_on_branch(target_branch.as_deref(), &updates)
1023                .await?;
1024            if changed_edge_tables {
1025                self.invalidate_graph_index().await;
1026            }
1027        }
1028
1029        Ok(())
1030    }
1031
1032    /// Open a sub-table for mutation with version-drift guard.
1033    ///
1034    /// Checks that the dataset's current version matches the snapshot-pinned
1035    /// version. If another writer has advanced the version, returns an error
1036    /// prompting the caller to refresh and retry (optimistic concurrency).
1037    pub(crate) async fn open_for_mutation(
1038        &self,
1039        table_key: &str,
1040    ) -> Result<(Dataset, String, Option<String>)> {
1041        table_ops::open_for_mutation(self, table_key).await
1042    }
1043
1044    pub(crate) async fn open_for_mutation_on_branch(
1045        &self,
1046        branch: Option<&str>,
1047        table_key: &str,
1048    ) -> Result<(Dataset, String, Option<String>)> {
1049        table_ops::open_for_mutation_on_branch(self, branch, table_key).await
1050    }
1051
1052    pub(crate) async fn fork_dataset_from_entry_state(
1053        &self,
1054        table_key: &str,
1055        full_path: &str,
1056        source_branch: Option<&str>,
1057        source_version: u64,
1058        active_branch: &str,
1059    ) -> Result<Dataset> {
1060        table_ops::fork_dataset_from_entry_state(
1061            self,
1062            table_key,
1063            full_path,
1064            source_branch,
1065            source_version,
1066            active_branch,
1067        )
1068        .await
1069    }
1070
1071    pub(crate) async fn reopen_for_mutation(
1072        &self,
1073        table_key: &str,
1074        full_path: &str,
1075        table_branch: Option<&str>,
1076        expected_version: u64,
1077    ) -> Result<Dataset> {
1078        table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version)
1079            .await
1080    }
1081
1082    pub(crate) async fn open_dataset_at_state(
1083        &self,
1084        table_path: &str,
1085        table_branch: Option<&str>,
1086        table_version: u64,
1087    ) -> Result<Dataset> {
1088        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1089    }
1090
1091    pub(crate) async fn build_indices_on_dataset(
1092        &self,
1093        table_key: &str,
1094        ds: &mut Dataset,
1095    ) -> Result<()> {
1096        table_ops::build_indices_on_dataset(self, table_key, ds).await
1097    }
1098
1099    pub(crate) async fn build_indices_on_dataset_for_catalog(
1100        &self,
1101        catalog: &Catalog,
1102        table_key: &str,
1103        ds: &mut Dataset,
1104    ) -> Result<()> {
1105        table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1106    }
1107
1108    pub(crate) async fn commit_updates(
1109        &mut self,
1110        updates: &[crate::db::SubTableUpdate],
1111    ) -> Result<u64> {
1112        table_ops::commit_updates(self, updates).await
1113    }
1114
1115    pub(crate) async fn commit_manifest_updates(
1116        &mut self,
1117        updates: &[crate::db::SubTableUpdate],
1118    ) -> Result<u64> {
1119        table_ops::commit_manifest_updates(self, updates).await
1120    }
1121
1122    pub(crate) async fn record_merge_commit(
1123        &mut self,
1124        manifest_version: u64,
1125        parent_commit_id: &str,
1126        merged_parent_commit_id: &str,
1127    ) -> Result<String> {
1128        table_ops::record_merge_commit(
1129            self,
1130            manifest_version,
1131            parent_commit_id,
1132            merged_parent_commit_id,
1133        )
1134        .await
1135    }
1136
1137    pub(crate) async fn commit_updates_on_branch(
1138        &mut self,
1139        branch: Option<&str>,
1140        updates: &[crate::db::SubTableUpdate],
1141    ) -> Result<u64> {
1142        table_ops::commit_updates_on_branch(self, branch, updates).await
1143    }
1144
1145    pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
1146        table_ops::ensure_commit_graph_initialized(self).await
1147    }
1148
1149    /// Invalidate the cached graph index. Called after edge mutations.
1150    pub(crate) async fn invalidate_graph_index(&self) {
1151        table_ops::invalidate_graph_index(self).await
1152    }
1153
1154    async fn batch_for_table_rewrite(
1155        &self,
1156        source_ds: &Dataset,
1157        table_key: &str,
1158    ) -> Result<RecordBatch> {
1159        schema_apply::batch_for_table_rewrite(self, source_ds, table_key).await
1160    }
1161}
1162
1163pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1164    let branch = branch.trim();
1165    if branch.is_empty() {
1166        return Err(OmniError::manifest(
1167            "branch name cannot be empty".to_string(),
1168        ));
1169    }
1170    if branch == "main" {
1171        return Ok(None);
1172    }
1173    Ok(Some(branch.to_string()))
1174}
1175
1176fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1177    if super::is_internal_run_branch(branch) {
1178        return Err(OmniError::manifest(format!(
1179            "{} does not allow internal run ref '{}'",
1180            operation, branch
1181        )));
1182    }
1183    if is_internal_system_branch(branch) {
1184        return Err(OmniError::manifest(format!(
1185            "{} does not allow internal system ref '{}'",
1186            operation, branch
1187        )));
1188    }
1189    Ok(())
1190}
1191
1192fn same_manifest_state(
1193    left: Option<&crate::db::SubTableEntry>,
1194    right: Option<&crate::db::SubTableEntry>,
1195) -> bool {
1196    match (left, right) {
1197        (None, None) => true,
1198        (Some(left), Some(right)) => {
1199            left.table_path == right.table_path
1200                && left.table_version == right.table_version
1201                && left.table_branch == right.table_branch
1202                && left.row_count == right.row_count
1203        }
1204        _ => false,
1205    }
1206}
1207
1208fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1209    if batches.is_empty() {
1210        return Ok(RecordBatch::new_empty(schema));
1211    }
1212    if batches.len() == 1 {
1213        return Ok(batches.into_iter().next().unwrap());
1214    }
1215    let batch_schema = batches[0].schema();
1216    arrow_select::concat::concat_batches(&batch_schema, &batches)
1217        .map_err(|e| OmniError::Lance(e.to_string()))
1218}
1219
1220fn blob_properties_for_table_key<'a>(
1221    catalog: &'a Catalog,
1222    table_key: &str,
1223) -> Result<&'a std::collections::HashSet<String>> {
1224    if let Some(type_name) = table_key.strip_prefix("node:") {
1225        return catalog
1226            .node_types
1227            .get(type_name)
1228            .map(|node_type| &node_type.blob_properties)
1229            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1230    }
1231    if let Some(type_name) = table_key.strip_prefix("edge:") {
1232        return catalog
1233            .edge_types
1234            .get(type_name)
1235            .map(|edge_type| &edge_type.blob_properties)
1236            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1237    }
1238    Err(OmniError::manifest(format!(
1239        "invalid table key '{}'",
1240        table_key
1241    )))
1242}
1243
1244fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1245    if descriptions.is_null(row) {
1246        return Ok(true);
1247    }
1248
1249    let kind = descriptions
1250        .column_by_name("kind")
1251        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1252        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1253        .or_else(|| {
1254            descriptions
1255                .column_by_name("kind")
1256                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1257                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1258        });
1259    let position = descriptions
1260        .column_by_name("position")
1261        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1262        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1263    let size = descriptions
1264        .column_by_name("size")
1265        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1266        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1267    let blob_uri = descriptions
1268        .column_by_name("blob_uri")
1269        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1270        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1271
1272    let Some(kind) = kind else {
1273        return Ok(true);
1274    };
1275    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1276    if kind != BlobKind::Inline {
1277        return Ok(false);
1278    }
1279
1280    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1281}
1282
1283/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1284///
1285/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1286/// `DataType::LargeBinary` as a placeholder. This function replaces those
1287/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1288fn fixup_blob_schemas(catalog: &mut Catalog) {
1289    for node_type in catalog.node_types.values_mut() {
1290        if node_type.blob_properties.is_empty() {
1291            continue;
1292        }
1293        let fields: Vec<Field> = node_type
1294            .arrow_schema
1295            .fields()
1296            .iter()
1297            .map(|f| {
1298                if node_type.blob_properties.contains(f.name()) {
1299                    blob_field(f.name(), f.is_nullable())
1300                } else {
1301                    f.as_ref().clone()
1302                }
1303            })
1304            .collect();
1305        node_type.arrow_schema = Arc::new(Schema::new(fields));
1306    }
1307    for edge_type in catalog.edge_types.values_mut() {
1308        if edge_type.blob_properties.is_empty() {
1309            continue;
1310        }
1311        let fields: Vec<Field> = edge_type
1312            .arrow_schema
1313            .fields()
1314            .iter()
1315            .map(|f| {
1316                if edge_type.blob_properties.contains(f.name()) {
1317                    blob_field(f.name(), f.is_nullable())
1318                } else {
1319                    f.as_ref().clone()
1320                }
1321            })
1322            .collect();
1323        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1324    }
1325}
1326
1327fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1328    let schema_ast = parse_schema(schema_source)?;
1329    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1330}
1331
1332fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1333    match type_kind {
1334        SchemaTypeKind::Node => format!("node:{}", name),
1335        SchemaTypeKind::Edge => format!("edge:{}", name),
1336        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1337    }
1338}
1339
1340fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1341    if let Some(type_name) = table_key.strip_prefix("node:") {
1342        let node_type: &NodeType = catalog
1343            .node_types
1344            .get(type_name)
1345            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1346        return Ok(node_type.arrow_schema.clone());
1347    }
1348    if let Some(type_name) = table_key.strip_prefix("edge:") {
1349        let edge_type: &EdgeType = catalog
1350            .edge_types
1351            .get(type_name)
1352            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1353        return Ok(edge_type.arrow_schema.clone());
1354    }
1355    Err(OmniError::manifest(format!(
1356        "invalid table key '{}'",
1357        table_key
1358    )))
1359}
1360
1361fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1362    let mut obj = serde_json::Map::new();
1363    for (i, field) in batch.schema().fields().iter().enumerate() {
1364        obj.insert(
1365            field.name().clone(),
1366            json_value_from_array(batch.column(i).as_ref(), row)?,
1367        );
1368    }
1369    Ok(serde_json::Value::Object(obj))
1370}
1371
1372fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1373    if array.is_null(row) {
1374        return Ok(serde_json::Value::Null);
1375    }
1376
1377    match array.data_type() {
1378        DataType::Utf8 => Ok(serde_json::Value::String(
1379            array
1380                .as_any()
1381                .downcast_ref::<StringArray>()
1382                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1383                .value(row)
1384                .to_string(),
1385        )),
1386        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1387            array
1388                .as_any()
1389                .downcast_ref::<LargeStringArray>()
1390                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1391                .value(row)
1392                .to_string(),
1393        )),
1394        DataType::Boolean => Ok(serde_json::Value::Bool(
1395            array
1396                .as_any()
1397                .downcast_ref::<BooleanArray>()
1398                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1399                .value(row),
1400        )),
1401        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1402            array
1403                .as_any()
1404                .downcast_ref::<Int32Array>()
1405                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1406                .value(row),
1407        ))),
1408        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1409            array
1410                .as_any()
1411                .downcast_ref::<Int64Array>()
1412                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1413                .value(row),
1414        ))),
1415        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1416            array
1417                .as_any()
1418                .downcast_ref::<UInt32Array>()
1419                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1420                .value(row),
1421        ))),
1422        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1423            array
1424                .as_any()
1425                .downcast_ref::<UInt64Array>()
1426                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1427                .value(row),
1428        ))),
1429        DataType::Float32 => {
1430            let value = array
1431                .as_any()
1432                .downcast_ref::<Float32Array>()
1433                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1434                .value(row) as f64;
1435            Ok(serde_json::Value::Number(
1436                serde_json::Number::from_f64(value).ok_or_else(|| {
1437                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1438                })?,
1439            ))
1440        }
1441        DataType::Float64 => {
1442            let value = array
1443                .as_any()
1444                .downcast_ref::<Float64Array>()
1445                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1446                .value(row);
1447            Ok(serde_json::Value::Number(
1448                serde_json::Number::from_f64(value).ok_or_else(|| {
1449                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1450                })?,
1451            ))
1452        }
1453        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1454            array
1455                .as_any()
1456                .downcast_ref::<Date32Array>()
1457                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1458                .value(row),
1459        ))),
1460        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1461            &base64::engine::general_purpose::STANDARD,
1462            array
1463                .as_any()
1464                .downcast_ref::<BinaryArray>()
1465                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1466                .value(row),
1467        ))),
1468        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1469            &base64::engine::general_purpose::STANDARD,
1470            array
1471                .as_any()
1472                .downcast_ref::<LargeBinaryArray>()
1473                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1474                .value(row),
1475        ))),
1476        DataType::List(_) => {
1477            let list = array
1478                .as_any()
1479                .downcast_ref::<ListArray>()
1480                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1481            let values = list.value(row);
1482            let mut out = Vec::with_capacity(values.len());
1483            for idx in 0..values.len() {
1484                out.push(json_value_from_array(values.as_ref(), idx)?);
1485            }
1486            Ok(serde_json::Value::Array(out))
1487        }
1488        DataType::LargeList(_) => {
1489            let list = array
1490                .as_any()
1491                .downcast_ref::<LargeListArray>()
1492                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1493            let values = list.value(row);
1494            let mut out = Vec::with_capacity(values.len());
1495            for idx in 0..values.len() {
1496                out.push(json_value_from_array(values.as_ref(), idx)?);
1497            }
1498            Ok(serde_json::Value::Array(out))
1499        }
1500        DataType::FixedSizeList(_, _) => {
1501            let list = array
1502                .as_any()
1503                .downcast_ref::<FixedSizeListArray>()
1504                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1505            let values = list.value(row);
1506            let mut out = Vec::with_capacity(values.len());
1507            for idx in 0..values.len() {
1508                out.push(json_value_from_array(values.as_ref(), idx)?);
1509            }
1510            Ok(serde_json::Value::Array(out))
1511        }
1512        DataType::Struct(fields) => {
1513            let struct_array = array
1514                .as_any()
1515                .downcast_ref::<StructArray>()
1516                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1517            let mut obj = serde_json::Map::new();
1518            for (field_idx, field) in fields.iter().enumerate() {
1519                obj.insert(
1520                    field.name().clone(),
1521                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1522                );
1523            }
1524            Ok(serde_json::Value::Object(obj))
1525        }
1526        _ => {
1527            let value = arrow_cast::display::array_value_to_string(array, row)
1528                .map_err(|e| OmniError::Lance(e.to_string()))?;
1529            Ok(serde_json::Value::String(value))
1530        }
1531    }
1532}
1533
1534#[cfg(test)]
1535mod tests {
1536    use super::*;
1537    use crate::db::is_internal_run_branch;
1538    use crate::db::manifest::ManifestCoordinator;
1539    use async_trait::async_trait;
1540    use serde_json::Value;
1541    use std::sync::Mutex;
1542
1543    use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1544
1545    const TEST_SCHEMA: &str = r#"
1546node Person {
1547    name: String @key
1548    age: I32?
1549}
1550node Company {
1551    name: String @key
1552}
1553edge Knows: Person -> Person {
1554    since: Date?
1555}
1556edge WorksAt: Person -> Company
1557"#;
1558
1559    #[derive(Debug, Default)]
1560    struct RecordingStorageAdapter {
1561        inner: LocalStorageAdapter,
1562        reads: Mutex<Vec<String>>,
1563        writes: Mutex<Vec<String>>,
1564        exists_checks: Mutex<Vec<String>>,
1565    }
1566
1567    impl RecordingStorageAdapter {
1568        fn reads(&self) -> Vec<String> {
1569            self.reads.lock().unwrap().clone()
1570        }
1571
1572        fn writes(&self) -> Vec<String> {
1573            self.writes.lock().unwrap().clone()
1574        }
1575
1576        fn exists_checks(&self) -> Vec<String> {
1577            self.exists_checks.lock().unwrap().clone()
1578        }
1579    }
1580
1581    #[async_trait]
1582    impl StorageAdapter for RecordingStorageAdapter {
1583        async fn read_text(&self, uri: &str) -> Result<String> {
1584            self.reads.lock().unwrap().push(uri.to_string());
1585            self.inner.read_text(uri).await
1586        }
1587
1588        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1589            self.writes.lock().unwrap().push(uri.to_string());
1590            self.inner.write_text(uri, contents).await
1591        }
1592
1593        async fn exists(&self, uri: &str) -> Result<bool> {
1594            self.exists_checks.lock().unwrap().push(uri.to_string());
1595            self.inner.exists(uri).await
1596        }
1597    }
1598
1599    #[tokio::test]
1600    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1601        let dir = tempfile::tempdir().unwrap();
1602        let uri = dir.path().to_str().unwrap();
1603        let adapter = Arc::new(RecordingStorageAdapter::default());
1604
1605        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1606            .await
1607            .unwrap();
1608        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1609        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1610        assert!(
1611            adapter
1612                .writes()
1613                .contains(&join_uri(uri, "__schema_state.json"))
1614        );
1615
1616        Omnigraph::open_with_storage(uri, adapter.clone())
1617            .await
1618            .unwrap();
1619        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1620        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1621        assert!(
1622            adapter
1623                .reads()
1624                .contains(&join_uri(uri, "__schema_state.json"))
1625        );
1626        assert!(
1627            adapter
1628                .exists_checks()
1629                .contains(&join_uri(uri, "_schema.ir.json"))
1630        );
1631        assert!(
1632            adapter
1633                .exists_checks()
1634                .contains(&join_uri(uri, "__schema_state.json"))
1635        );
1636        assert!(
1637            adapter
1638                .exists_checks()
1639                .contains(&join_uri(uri, "_graph_commits.lance"))
1640        );
1641    }
1642
1643    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1644        let snapshot = db.snapshot();
1645        let ds = snapshot.open(table_key).await.unwrap();
1646        let batches = db.table_store().scan_batches(&ds).await.unwrap();
1647        batches
1648            .into_iter()
1649            .flat_map(|batch| {
1650                (0..batch.num_rows())
1651                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1652                    .collect::<Vec<_>>()
1653            })
1654            .collect()
1655    }
1656
1657    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1658        let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
1659        let schema: Arc<Schema> = Arc::new(ds.schema().into());
1660        let columns: Vec<Arc<dyn Array>> = schema
1661            .fields()
1662            .iter()
1663            .map(|field| match field.name().as_str() {
1664                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1665                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1666                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1667                _ => new_null_array(field.data_type(), 1),
1668            })
1669            .collect();
1670        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1671        let state = db
1672            .table_store()
1673            .append_batch(&full_path, &mut ds, batch)
1674            .await
1675            .unwrap();
1676        db.commit_updates(&[crate::db::SubTableUpdate {
1677            table_key: "node:Person".to_string(),
1678            table_version: state.version,
1679            table_branch,
1680            row_count: state.row_count,
1681            version_metadata: state.version_metadata,
1682        }])
1683        .await
1684        .unwrap();
1685    }
1686
1687    #[tokio::test]
1688    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1689        let dir = tempfile::tempdir().unwrap();
1690        let uri = dir.path().to_str().unwrap();
1691        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1692        seed_person_row(&mut db, "Alice", Some(30)).await;
1693
1694        let desired = TEST_SCHEMA.replace(
1695            "    age: I32?\n}",
1696            "    age: I32?\n    nickname: String?\n}",
1697        );
1698        let result = db.apply_schema(&desired).await.unwrap();
1699        assert!(result.applied);
1700
1701        let reopened = Omnigraph::open(uri).await.unwrap();
1702        let rows = table_rows_json(&reopened, "node:Person").await;
1703        assert_eq!(rows.len(), 1);
1704        assert_eq!(rows[0]["name"], "Alice");
1705        assert_eq!(rows[0]["age"], 30);
1706        assert!(rows[0]["nickname"].is_null());
1707        assert!(
1708            reopened.catalog().node_types["Person"]
1709                .properties
1710                .contains_key("nickname")
1711        );
1712        assert!(dir.path().join("_schema.pg").exists());
1713    }
1714
1715    #[tokio::test]
1716    async fn test_apply_schema_renames_property_and_preserves_values() {
1717        let dir = tempfile::tempdir().unwrap();
1718        let uri = dir.path().to_str().unwrap();
1719        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1720        seed_person_row(&mut db, "Alice", Some(30)).await;
1721
1722        let desired = TEST_SCHEMA.replace(
1723            "    age: I32?\n}",
1724            "    years: I32? @rename_from(\"age\")\n}",
1725        );
1726        db.apply_schema(&desired).await.unwrap();
1727
1728        let reopened = Omnigraph::open(uri).await.unwrap();
1729        let rows = table_rows_json(&reopened, "node:Person").await;
1730        assert_eq!(rows[0]["name"], "Alice");
1731        assert_eq!(rows[0]["years"], 30);
1732        assert!(rows[0].get("age").is_none());
1733    }
1734
1735    #[tokio::test]
1736    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1737        let dir = tempfile::tempdir().unwrap();
1738        let uri = dir.path().to_str().unwrap();
1739        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1740        seed_person_row(&mut db, "Alice", Some(30)).await;
1741        let before_version = db.snapshot().version();
1742
1743        let desired = TEST_SCHEMA
1744            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1745            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1746            .replace(
1747                "edge WorksAt: Person -> Company",
1748                "edge WorksAt: Human -> Company",
1749            );
1750        db.apply_schema(&desired).await.unwrap();
1751
1752        let head = db.snapshot();
1753        assert!(head.entry("node:Person").is_none());
1754        assert!(head.entry("node:Human").is_some());
1755        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1756            .await
1757            .unwrap();
1758        assert!(historical.entry("node:Person").is_some());
1759        assert!(historical.entry("node:Human").is_none());
1760    }
1761
1762    #[tokio::test]
1763    async fn test_apply_schema_succeeds_after_load() {
1764        // MR-670 + MR-674: schema apply used to be blocked by leftover
1765        // __run__ branches. MR-670 added a defense-in-depth filter that
1766        // skips internal system branches. MR-674 made run branches
1767        // ephemeral on every terminal state, so in practice no __run__
1768        // branch survives publish — but the filter still guards the
1769        // invariant.
1770        let dir = tempfile::tempdir().unwrap();
1771        let uri = dir.path().to_str().unwrap();
1772        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1773
1774        crate::loader::load_jsonl(
1775            &mut db,
1776            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
1777            crate::loader::LoadMode::Overwrite,
1778        )
1779        .await
1780        .unwrap();
1781
1782        let all_branches = db.coordinator.all_branches().await.unwrap();
1783        assert!(
1784            !all_branches.iter().any(|b| is_internal_run_branch(b)),
1785            "MR-674: run branch should be deleted after publish, got: {:?}",
1786            all_branches
1787        );
1788
1789        let desired = TEST_SCHEMA.replace(
1790            "    age: I32?\n}",
1791            "    age: I32?\n    nickname: String?\n}",
1792        );
1793        let result = db.apply_schema(&desired).await.unwrap();
1794        assert!(result.applied, "schema apply should have applied");
1795    }
1796
1797    #[tokio::test]
1798    async fn test_apply_schema_adds_index_for_existing_property() {
1799        let dir = tempfile::tempdir().unwrap();
1800        let uri = dir.path().to_str().unwrap();
1801        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1802
1803        let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1804        db.apply_schema(&desired).await.unwrap();
1805
1806        let snapshot = db.snapshot();
1807        let ds = snapshot.open("node:Person").await.unwrap();
1808        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1809    }
1810
1811    #[tokio::test]
1812    async fn test_apply_schema_rewrite_preserves_existing_indices() {
1813        let dir = tempfile::tempdir().unwrap();
1814        let uri = dir.path().to_str().unwrap();
1815        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1816        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1817        seed_person_row(&mut db, "Alice", Some(30)).await;
1818
1819        let desired = initial_schema.replace(
1820            "    age: I32?\n}",
1821            "    age: I32?\n    nickname: String?\n}",
1822        );
1823        db.apply_schema(&desired).await.unwrap();
1824
1825        let snapshot = db.snapshot();
1826        let ds = snapshot.open("node:Person").await.unwrap();
1827        assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1828        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1829    }
1830
1831    #[tokio::test]
1832    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1833        let dir = tempfile::tempdir().unwrap();
1834        let uri = dir.path().to_str().unwrap();
1835        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1836        let mut db = db;
1837        db.coordinator
1838            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1839            .await
1840            .unwrap();
1841
1842        let err = db.open_for_mutation("node:Person").await.unwrap_err();
1843        assert!(
1844            err.to_string()
1845                .contains("write is unavailable while schema apply is in progress")
1846        );
1847    }
1848
1849    #[tokio::test]
1850    async fn test_commit_updates_rejects_while_schema_apply_locked() {
1851        let dir = tempfile::tempdir().unwrap();
1852        let uri = dir.path().to_str().unwrap();
1853        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1854        db.coordinator
1855            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1856            .await
1857            .unwrap();
1858
1859        let err = db.commit_updates(&[]).await.unwrap_err();
1860        assert!(
1861            err.to_string()
1862                .contains("write commit is unavailable while schema apply is in progress")
1863        );
1864    }
1865
1866    #[tokio::test]
1867    async fn test_branch_list_hides_schema_apply_lock_branch() {
1868        let dir = tempfile::tempdir().unwrap();
1869        let uri = dir.path().to_str().unwrap();
1870        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1871        db.coordinator
1872            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1873            .await
1874            .unwrap();
1875
1876        let branches = db.branch_list().await.unwrap();
1877        assert_eq!(branches, vec!["main".to_string()]);
1878    }
1879}