Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21    SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
22    build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
37
38use super::commit_graph::GraphCommit;
39use super::manifest::{
40    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
41    table_path_for_table_key,
42};
43use super::schema_state::{
44    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
45    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
46    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
47    write_schema_contract, write_schema_contract_staging,
48};
49use super::{
50    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
51    is_schema_apply_lock_branch,
52};
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum MergeOutcome {
56    AlreadyUpToDate,
57    FastForward,
58    Merged,
59}
60
61#[derive(Debug, Clone)]
62pub struct SchemaApplyResult {
63    pub supported: bool,
64    pub applied: bool,
65    pub manifest_version: u64,
66    pub steps: Vec<SchemaMigrationStep>,
67}
68
69/// Top-level handle to an Omnigraph database.
70///
71/// An Omnigraph is a Lance-native graph database with git-style branching.
72/// It stores typed property graphs as per-type Lance datasets coordinated
73/// through a Lance manifest table.
74pub struct Omnigraph {
75    root_uri: String,
76    storage: Arc<dyn StorageAdapter>,
77    /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
78    /// this so engine write APIs can be `&self` (the HTTP server's
79    /// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
80    /// calls without a global write lock). Reads (`snapshot`, `version`,
81    /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
82    /// `list_commits`, …) acquire `.read().await` and parallelize.
83    /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
84    /// `record_*`) acquire `.write().await` and serialize. The atomic
85    /// commit invariant — `commit_manifest_updates` followed by
86    /// `record_graph_commit` must be atomic — is preserved by the
87    /// single `.write()` covering both calls inside
88    /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
89    /// converted from `Mutex` to `RwLock` because the bench showed
90    /// the Mutex was the dominant serializer for disjoint-table
91    /// workloads. Lock acquisition order: always before `runtime_cache`
92    /// (when both are needed in one scope).
93    coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
94    table_store: TableStore,
95    runtime_cache: RuntimeCache,
96    /// Read-heavy on every query, written only by `apply_schema`. ArcSwap
97    /// gives atomic pointer swap with zero-cost reads (`load()` returns a
98    /// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
99    /// don't contend on a lock to read the catalog.
100    catalog: Arc<ArcSwap<Catalog>>,
101    /// Read-heavy on schema introspection paths, written only by
102    /// `apply_schema`. Same ArcSwap rationale as `catalog`.
103    schema_source: Arc<ArcSwap<String>>,
104    /// Per-`(table_key, branch)` writer queues. Reachable from engine
105    /// internals (mutation finalize, schema_apply, branch_merge,
106    /// ensure_indices, delete_where) and from future MR-870 recovery
107    /// reconciler. PR 1b adds the field; callers acquire in commits 4+.
108    write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
109    /// Process-wide mutex held across the swap → operate → restore window
110    /// in `branch_merge_impl`. Two concurrent merges with distinct targets
111    /// would otherwise interleave their three separate
112    /// `coordinator.write().await` acquisitions, leaving each merge's
113    /// inner body running against the other's swapped coord. Pinned by
114    /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
115    /// in `crates/omnigraph-server/tests/server.rs`.
116    ///
117    /// Cost: serializes ALL concurrent branch merges process-wide.
118    /// Acceptable because branch merges are heavy (table rewrites, index
119    /// rebuilds), per-(table, branch) queues inside `commit_all` already
120    /// serialize the data path, and merges are rare relative to /change
121    /// or /ingest. A finer-grained per-target-branch mutex is a follow-up
122    /// if telemetry shows merge concurrency matters.
123    ///
124    /// The deeper fix — refactor `branch_merge_on_current_target` to take
125    /// an explicit target coord parameter so `self.coordinator` is never
126    /// used as scratch space — is the round-1 shape applied to
127    /// `branch_create_from_impl`. Deferred because it requires unwinding
128    /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()`
129    /// call inside the merge body.
130    merge_exclusive: Arc<tokio::sync::Mutex<()>>,
131}
132
133/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
134///
135/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
136/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
137/// inspection — should not trigger writes (they may run with read-only
138/// object-store credentials, and silent open-time mutations are
139/// surprising). They also don't need recovery: reads always resolve
140/// through the manifest pin, which is the consistent snapshot regardless
141/// of any Phase B → Phase C drift on the per-table side.
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum OpenMode {
144    /// Run the recovery sweep on open. Default for `Omnigraph::open`.
145    ReadWrite,
146    /// Skip the recovery sweep. Use for read-only consumers via
147    /// [`Omnigraph::open_read_only`].
148    ReadOnly,
149}
150
151impl Omnigraph {
152    /// Create a new repo at `uri` from schema source.
153    ///
154    /// Creates `_schema.pg`, per-type Lance datasets, and `__manifest`.
155    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
156        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
157    }
158
159    pub(crate) async fn init_with_storage(
160        uri: &str,
161        schema_source: &str,
162        storage: Arc<dyn StorageAdapter>,
163    ) -> Result<Self> {
164        let root = normalize_root_uri(uri)?;
165        let schema_ir = read_schema_ir_from_source(schema_source)?;
166        let mut catalog = build_catalog_from_ir(&schema_ir)?;
167        fixup_blob_schemas(&mut catalog);
168
169        // Write _schema.pg
170        let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
171        storage.write_text(&schema_path, schema_source).await?;
172        write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
173
174        // Create manifest + per-type datasets
175        let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
176
177        Ok(Self {
178            root_uri: root.clone(),
179            storage,
180            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
181            table_store: TableStore::new(&root),
182            runtime_cache: RuntimeCache::default(),
183            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
184            schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
185            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
186            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
187        })
188    }
189
190    /// Open an existing repo (read-write).
191    ///
192    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
193    /// Runs the open-time recovery sweep before returning — see [`OpenMode`].
194    pub async fn open(uri: &str) -> Result<Self> {
195        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
196    }
197
198    /// Open an existing repo for read-only consumers (NDJSON export,
199    /// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
200    pub async fn open_read_only(uri: &str) -> Result<Self> {
201        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
202    }
203
204    /// `open_with_storage` retained for existing callers (init/test paths).
205    /// Defaults to `OpenMode::ReadWrite`.
206    pub(crate) async fn open_with_storage(
207        uri: &str,
208        storage: Arc<dyn StorageAdapter>,
209    ) -> Result<Self> {
210        Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
211    }
212
213    pub(crate) async fn open_with_storage_and_mode(
214        uri: &str,
215        storage: Arc<dyn StorageAdapter>,
216        mode: OpenMode,
217    ) -> Result<Self> {
218        let root = normalize_root_uri(uri)?;
219        // Open the coordinator first so the schema-staging recovery sweep can
220        // compare its snapshot against any leftover staging files.
221        let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
222        // Both the schema-state recovery sweep AND the manifest-drift
223        // recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
224        // consumers (NDJSON export, `commit list`, schema show) shouldn't
225        // trigger object-store mutations: they may run with read-only
226        // credentials, and silent open-time writes are surprising. Both
227        // sweeps' work is recoverable on the next ReadWrite open, so
228        // skipping under ReadOnly doesn't lose any safety guarantees —
229        // the manifest pin is the consistent snapshot regardless of
230        // drift on the per-table side or leftover schema-staging files.
231        if matches!(mode, OpenMode::ReadWrite) {
232            let schema_state_recovery =
233                recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
234                    .await?;
235            // Recovery sweep: close the Phase B → Phase C residual on
236            // any sidecar left over from a crashed writer. Continuous
237            // in-process recovery for long-running servers (no restart
238            // required between Phase B failure and recovery) is a
239            // separate background-reconciler effort.
240            crate::db::manifest::recover_manifest_drift(
241                &root,
242                Arc::clone(&storage),
243                &mut coordinator,
244                crate::db::manifest::RecoveryMode::Full,
245                schema_state_recovery,
246            )
247            .await?;
248        }
249        // Read _schema.pg (post-recovery — may have just been renamed in).
250        let schema_path = schema_source_uri(&root);
251        let schema_source = storage.read_text(&schema_path).await?;
252        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
253        let branches = coordinator.branch_list().await?;
254        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
255            &root,
256            Arc::clone(&storage),
257            &branches,
258            &current_source_ir,
259        )
260        .await?;
261        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
262        fixup_blob_schemas(&mut catalog);
263
264        Ok(Self {
265            root_uri: root.clone(),
266            storage,
267            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
268            table_store: TableStore::new(&root),
269            runtime_cache: RuntimeCache::default(),
270            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
271            schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
272            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
273            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
274        })
275    }
276
277    /// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
278    /// catalog pointer; callers can hold the returned `Arc` across awaits
279    /// without blocking concurrent `apply_schema`.
280    pub fn catalog(&self) -> Arc<Catalog> {
281        self.catalog.load_full()
282    }
283
284    /// Returns an `Arc<String>` snapshot of the schema source.
285    pub fn schema_source(&self) -> Arc<String> {
286        self.schema_source.load_full()
287    }
288
289    /// Atomically swap the in-memory catalog. Concurrent readers see
290    /// either the old or the new pointer; never a torn state. Used by
291    /// `apply_schema` and `reload_schema_if_source_changed`.
292    pub(crate) fn store_catalog(&self, catalog: Catalog) {
293        self.catalog.store(Arc::new(catalog));
294    }
295
296    /// Atomically swap the in-memory schema source. Same rationale as
297    /// [`store_catalog`](Self::store_catalog).
298    pub(crate) fn store_schema_source(&self, schema_source: String) {
299        self.schema_source.store(Arc::new(schema_source));
300    }
301
302    pub fn uri(&self) -> &str {
303        &self.root_uri
304    }
305
306    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
307        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
308    }
309
310    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
311        schema_apply::plan_schema(self, desired_schema_source).await
312    }
313
314    pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
315        schema_apply::apply_schema(self, desired_schema_source).await
316    }
317
318    pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
319        schema_apply::ensure_schema_apply_idle(self, operation).await
320    }
321
322    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
323        schema_apply::ensure_schema_apply_not_locked(self, operation).await
324    }
325
326    pub(crate) fn table_store(&self) -> &TableStore {
327        &self.table_store
328    }
329
330    /// Engine-facing trait surface around `TableStore`.
331    ///
332    /// This is the canonical accessor for newly-written engine code. The
333    /// trait's signatures use opaque `SnapshotHandle` / `StagedHandle`
334    /// instead of leaking `lance::Dataset` /
335    /// `lance::dataset::transaction::Transaction`. Existing call sites
336    /// that still use `db.table_store.X(...)` (the inherent struct
337    /// methods) are migrated incrementally.
338    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
339        &self.table_store
340    }
341
342    /// Engine-level access to the object-store adapter (S3 / local fs).
343    /// Used by the recovery sidecar protocol — writers in the engine
344    /// call this to write/delete sidecars at `__recovery/{ulid}.json`.
345    pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
346        self.storage.as_ref()
347    }
348
349    /// Per-`(table_key, branch)` writer queues.
350    ///
351    /// Engine-internal writers (mutation finalize, schema_apply,
352    /// branch_merge, ensure_indices, delete_where) and the future MR-870
353    /// recovery reconciler reach the queue manager via this accessor.
354    /// Returns an `Arc` clone so callers can hold the manager across
355    /// `&mut self` engine API boundaries.
356    pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
357        Arc::clone(&self.write_queue)
358    }
359
360    /// Engine-internal access to the merge-exclusive mutex. Held across
361    /// the swap → operate → restore window in `branch_merge_impl` so
362    /// concurrent merges with distinct targets don't corrupt
363    /// `self.coordinator` mid-operation. See the field doc on
364    /// `Omnigraph::merge_exclusive` for the full design rationale.
365    pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
366        Arc::clone(&self.merge_exclusive)
367    }
368
369    /// Engine-level access to the repo's normalized root URI. Used by
370    /// the recovery sidecar protocol to compute `__recovery/` paths.
371    pub(crate) fn root_uri(&self) -> &str {
372        &self.root_uri
373    }
374
375    pub(crate) async fn open_coordinator_for_branch(
376        &self,
377        branch: Option<&str>,
378    ) -> Result<GraphCoordinator> {
379        match branch {
380            Some(branch) => {
381                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
382            }
383            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
384        }
385    }
386
387    pub(crate) async fn swap_coordinator_for_branch(
388        &self,
389        branch: Option<&str>,
390    ) -> Result<GraphCoordinator> {
391        let next = self.open_coordinator_for_branch(branch).await?;
392        let mut coord = self.coordinator.write().await;
393        Ok(std::mem::replace(&mut *coord, next))
394    }
395
396    pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
397        *self.coordinator.write().await = coordinator;
398    }
399
400    pub(crate) async fn resolved_branch_target(
401        &self,
402        branch: Option<&str>,
403    ) -> Result<ResolvedTarget> {
404        self.ensure_schema_state_valid().await?;
405        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
406        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
407        let coord = self.coordinator.read().await;
408        if normalized.as_deref() == coord.current_branch() {
409            let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
410                SnapshotId::synthetic(coord.current_branch(), coord.version())
411            });
412            return Ok(ResolvedTarget {
413                requested,
414                branch: coord.current_branch().map(str::to_string),
415                snapshot_id,
416                snapshot: coord.snapshot(),
417            });
418        }
419        coord.resolve_target(&requested).await
420    }
421
422    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
423        self.resolved_branch_target(branch)
424            .await
425            .map(|resolved| resolved.snapshot)
426    }
427
428    pub(crate) async fn version(&self) -> u64 {
429        self.coordinator.read().await.version()
430    }
431
432    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
433    pub(crate) async fn snapshot(&self) -> Snapshot {
434        self.coordinator.read().await.snapshot()
435    }
436
437    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
438        self.resolved_target(target)
439            .await
440            .map(|resolved| resolved.snapshot)
441    }
442
443    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
444        self.snapshot_of(target)
445            .await
446            .map(|snapshot| snapshot.version())
447    }
448
449    pub async fn resolved_branch_of(
450        &self,
451        target: impl Into<ReadTarget>,
452    ) -> Result<Option<String>> {
453        self.resolved_target(target)
454            .await
455            .map(|resolved| resolved.branch)
456    }
457
458    /// Synchronize this handle's write base to the latest head of the named branch.
459    pub async fn sync_branch(&self, branch: &str) -> Result<()> {
460        self.ensure_schema_state_valid().await?;
461        let branch = normalize_branch_name(branch)?;
462        let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
463        *self.coordinator.write().await = next;
464        self.runtime_cache.invalidate_all().await;
465        Ok(())
466    }
467
468    /// Re-read the handle-local coordinator state from storage AND run
469    /// in-process recovery. Closes the Phase B → Phase C residual (e.g.
470    /// `MutationStaging::finalize` crash mid-publish in a long-running
471    /// server) without restart.
472    ///
473    /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
474    /// recovery sequence, in the same order, with one restriction: the
475    /// manifest-drift sweep runs in `RollForwardOnly` mode (rollback /
476    /// abort cases defer to the next ReadWrite open because
477    /// `Dataset::restore` is unsafe under concurrency). Each step:
478    ///
479    /// 1. `coordinator.refresh()` — re-read manifest.
480    /// 2. `recover_schema_state_files` — complete an in-flight
481    ///    schema_apply's staging→final rename if a SchemaApply sidecar
482    ///    is on disk; idempotent + early-returns when no staging files
483    ///    exist. Required BEFORE manifest-drift recovery so a
484    ///    SchemaApply roll-forward doesn't publish the manifest while
485    ///    the staging files remain unrenamed (which would corrupt the
486    ///    repo: data on new schema, catalog on old).
487    /// 3. `recover_manifest_drift(... RollForwardOnly)` — close the
488    ///    finalize→publisher residual via roll-forward; defer rollback
489    ///    work to next ReadWrite open.
490    /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
491    ///
492    /// Steady state cost: one `list_dir` of `__recovery/` (typically
493    /// returns empty → early return for both passes). No additional
494    /// Lance reads.
495    ///
496    /// Engine-internal callers that already hold an in-flight sidecar
497    /// (e.g. `schema_apply` mid-write) MUST use
498    /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
499    /// avoid the recovery sweep racing their own sidecar.
500    pub async fn refresh(&self) -> Result<()> {
501        // Scope the coord write guard to the recovery section only.
502        // `reload_schema_if_source_changed` (below) acquires
503        // `self.coordinator.read().await` when the on-disk schema source
504        // has drifted from the cached `schema_source`. Tokio's RwLock is
505        // not reentrant, so holding the write across that call deadlocks.
506        // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`.
507        {
508            let mut coord = self.coordinator.write().await;
509            coord.refresh().await?;
510            let schema_state_recovery = recover_schema_state_files(
511                &self.root_uri,
512                Arc::clone(&self.storage),
513                &coord.snapshot(),
514            )
515            .await?;
516            crate::db::manifest::recover_manifest_drift(
517                &self.root_uri,
518                Arc::clone(&self.storage),
519                &mut *coord,
520                crate::db::manifest::RecoveryMode::RollForwardOnly,
521                schema_state_recovery,
522            )
523            .await?;
524        } // ← write guard released before reload's read acquisition
525        self.reload_schema_if_source_changed().await?;
526        self.runtime_cache.invalidate_all().await;
527        Ok(())
528    }
529
530    async fn reload_schema_if_source_changed(&self) -> Result<()> {
531        let schema_path = schema_source_uri(&self.root_uri);
532        let schema_source = self.storage.read_text(&schema_path).await?;
533        if schema_source == *self.schema_source.load_full() {
534            return Ok(());
535        }
536        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
537        let branches = self.coordinator.read().await.branch_list().await?;
538        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
539            &self.root_uri,
540            Arc::clone(&self.storage),
541            &branches,
542            &current_source_ir,
543        )
544        .await?;
545        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
546        fixup_blob_schemas(&mut catalog);
547        self.store_schema_source(schema_source);
548        self.store_catalog(catalog);
549        Ok(())
550    }
551
552    /// Refresh coordinator state and invalidate the runtime cache WITHOUT
553    /// running the recovery sweep. Engine-internal callers that hold an
554    /// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
555    /// internal lease-check refresh) need this variant: running recovery
556    /// here would observe the caller's own sidecar, classify it as
557    /// RolledPastExpected, and roll it forward — racing the caller's
558    /// own publish path.
559    pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
560        self.coordinator.write().await.refresh().await?;
561        self.runtime_cache.invalidate_all().await;
562        Ok(())
563    }
564
565    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
566        self.ensure_schema_state_valid().await?;
567        self.coordinator.read().await.resolve_snapshot_id(branch).await
568    }
569
570    pub(crate) async fn resolved_target(
571        &self,
572        target: impl Into<ReadTarget>,
573    ) -> Result<ResolvedTarget> {
574        self.ensure_schema_state_valid().await?;
575        self.coordinator.read().await.resolve_target(&target.into()).await
576    }
577
578    // ─── Change detection ────────────────────────────────────────────────
579
580    pub async fn diff_between(
581        &self,
582        from: impl Into<ReadTarget>,
583        to: impl Into<ReadTarget>,
584        filter: &crate::changes::ChangeFilter,
585    ) -> Result<crate::changes::ChangeSet> {
586        let from_resolved = self.resolved_target(from).await?;
587        let to_resolved = self.resolved_target(to).await?;
588        crate::changes::diff_snapshots(
589            self.uri(),
590            &from_resolved.snapshot,
591            &to_resolved.snapshot,
592            filter,
593            to_resolved.branch.clone().or(from_resolved.branch.clone()),
594        )
595        .await
596    }
597
598    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
599    /// and creates branch-aware snapshots. Supports cross-branch comparison.
600    pub async fn diff_commits(
601        &self,
602        from_commit_id: &str,
603        to_commit_id: &str,
604        filter: &crate::changes::ChangeFilter,
605    ) -> Result<crate::changes::ChangeSet> {
606        let coord = self.coordinator.read().await;
607        let from_commit = coord.resolve_commit(&SnapshotId::new(from_commit_id)).await?;
608        let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
609        let from_snap = coord
610            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
611                from_commit.graph_commit_id.clone(),
612            )))
613            .await?;
614        let to_snap = coord
615            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
616                to_commit.graph_commit_id.clone(),
617            )))
618            .await?;
619        drop(coord);
620        crate::changes::diff_snapshots(
621            self.uri(),
622            &from_snap.snapshot,
623            &to_snap.snapshot,
624            filter,
625            to_snap.branch.clone().or(from_snap.branch.clone()),
626        )
627        .await
628    }
629
630    pub async fn entity_at_target(
631        &self,
632        target: impl Into<ReadTarget>,
633        table_key: &str,
634        id: &str,
635    ) -> Result<Option<serde_json::Value>> {
636        export::entity_at_target(self, target, table_key, id).await
637    }
638
639    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
640    pub async fn entity_at(
641        &self,
642        table_key: &str,
643        id: &str,
644        version: u64,
645    ) -> Result<Option<serde_json::Value>> {
646        export::entity_at(self, table_key, id, version).await
647    }
648
649    /// Create a Snapshot at any historical manifest version.
650    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
651        self.ensure_schema_state_valid().await?;
652        self.coordinator.read().await.snapshot_at_version(version).await
653    }
654
655    pub async fn export_jsonl(
656        &self,
657        branch: &str,
658        type_names: &[String],
659        table_keys: &[String],
660    ) -> Result<String> {
661        export::export_jsonl(self, branch, type_names, table_keys).await
662    }
663
664    pub async fn export_jsonl_to_writer<W: Write>(
665        &self,
666        branch: &str,
667        type_names: &[String],
668        table_keys: &[String],
669        writer: &mut W,
670    ) -> Result<()> {
671        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
672    }
673
674    // ─── Graph index ──────────────────────────────────────────────────────
675
676    /// Get or build the graph index for the current snapshot.
677    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
678        table_ops::graph_index(self).await
679    }
680
681    pub(crate) async fn graph_index_for_resolved(
682        &self,
683        resolved: &ResolvedTarget,
684    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
685        table_ops::graph_index_for_resolved(self, resolved).await
686    }
687
688    /// Ensure BTree scalar indices exist on key columns.
689    /// Idempotent — Lance skips if index already exists.
690    ///
691    /// Opens sub-tables at their latest version (not snapshot-pinned) because
692    /// indices must be created on the current head. Any version drift from the
693    /// snapshot is expected and logged. The resulting versions are committed
694    /// back to the manifest.
695    ///
696    /// On named branches, indexing preserves lazy branching:
697    /// unbranched subtables keep inheriting `main`, while subtables inherited
698    /// from an ancestor branch are first forked into the active branch before
699    /// their index metadata is updated.
700    pub async fn ensure_indices(&self) -> Result<()> {
701        table_ops::ensure_indices(self).await
702    }
703
704    pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
705        table_ops::ensure_indices_on(self, branch).await
706    }
707
708    #[cfg(feature = "failpoints")]
709    #[doc(hidden)]
710    pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
711        &mut self,
712        branch: &str,
713        table_key: &str,
714        table_branch: Option<&str>,
715    ) -> Result<u64> {
716        table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
717            self,
718            branch,
719            table_key,
720            table_branch,
721        )
722        .await
723    }
724
725    /// Compact small Lance fragments into fewer larger ones across every
726    /// node + edge table on `main`. See [`optimize`] for details.
727    pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
728        optimize::optimize_all_tables(self).await
729    }
730
731    /// Remove Lance manifests (and the fragments they uniquely own) per the
732    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
733    /// history. See [`optimize`] for details.
734    pub async fn cleanup(
735        &mut self,
736        options: optimize::CleanupPolicyOptions,
737    ) -> Result<Vec<optimize::TableCleanupStats>> {
738        optimize::cleanup_all_tables(self, options).await
739    }
740
741    /// Read a blob from a node by its string ID and property name.
742    ///
743    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
744    /// and metadata accessors (`size()`, `kind()`, `uri()`).
745    ///
746    /// ```ignore
747    /// let blob = db.read_blob("Document", "readme", "content").await?;
748    /// let bytes = blob.read().await?;
749    /// ```
750    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
751        self.ensure_schema_state_valid().await?;
752        let catalog = self.catalog();
753        let node_type = catalog
754            .node_types
755            .get(type_name)
756            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
757        if !node_type.blob_properties.contains(property) {
758            return Err(OmniError::manifest(format!(
759                "property '{}' on type '{}' is not a Blob",
760                property, type_name
761            )));
762        }
763
764        let snapshot = self.snapshot().await;
765        let table_key = format!("node:{}", type_name);
766        let ds = snapshot.open(&table_key).await?;
767
768        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
769        let row_id = self
770            .table_store
771            .first_row_id_for_filter(&ds, &filter_sql)
772            .await?
773            .ok_or_else(|| {
774                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
775            })?;
776
777        // Use take_blobs to get the BlobFile handle
778        let ds = Arc::new(ds);
779        let mut blobs = ds
780            .take_blobs(&[row_id], property)
781            .await
782            .map_err(|e| OmniError::Lance(e.to_string()))?;
783
784        blobs.pop().ok_or_else(|| {
785            OmniError::manifest(format!(
786                "blob '{}' on {} '{}' returned no data",
787                property, type_name, id
788            ))
789        })
790    }
791
792    pub(crate) async fn active_branch(&self) -> Option<String> {
793        self.coordinator.read().await.current_branch().map(str::to_string)
794    }
795
796    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
797        let descendants = self.coordinator.read().await.branch_descendants(branch).await?;
798        if let Some(descendant) = descendants.first() {
799            return Err(OmniError::manifest_conflict(format!(
800                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
801                branch, descendant
802            )));
803        }
804
805        for other_branch in branches
806            .iter()
807            .filter(|candidate| candidate.as_str() != branch)
808        {
809            let snapshot = self
810                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
811                .await?;
812            if snapshot
813                .entries()
814                .any(|entry| entry.table_branch.as_deref() == Some(branch))
815            {
816                return Err(OmniError::manifest_conflict(format!(
817                    "cannot delete branch '{}' because branch '{}' still depends on it",
818                    branch, other_branch
819                )));
820            }
821        }
822
823        Ok(())
824    }
825
826    async fn cleanup_deleted_branch_tables(
827        &self,
828        branch: &str,
829        owned_tables: &[(String, String)],
830    ) -> Result<()> {
831        let mut seen_paths = HashSet::new();
832        let mut cleanup_targets = owned_tables
833            .iter()
834            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
835            .cloned()
836            .collect::<Vec<_>>();
837        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
838
839        for (table_key, table_path) in cleanup_targets {
840            let dataset_uri = self.table_store.dataset_uri(&table_path);
841            if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
842                return Err(OmniError::manifest_internal(format!(
843                    "branch '{}' was deleted but cleanup failed for {}: {}",
844                    branch, table_key, err
845                )));
846            }
847        }
848
849        Ok(())
850    }
851
852    async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
853        let active = self.coordinator.read().await.current_branch().map(str::to_string);
854        if active.as_deref() == Some(branch) {
855            return Err(OmniError::manifest_conflict(format!(
856                "cannot delete currently active branch '{}'",
857                branch
858            )));
859        }
860
861        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
862        let owned_tables = branch_snapshot
863            .entries()
864            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
865            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
866            .collect::<Vec<_>>();
867
868        self.coordinator.write().await.branch_delete(branch).await?;
869        self.cleanup_deleted_branch_tables(branch, &owned_tables)
870            .await
871    }
872
873    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
874        normalize_branch_name(branch)
875    }
876
877    pub(crate) async fn head_commit_id_for_branch(
878        &self,
879        branch: Option<&str>,
880    ) -> Result<Option<String>> {
881        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
882        coordinator.ensure_commit_graph_initialized().await?;
883        coordinator
884            .head_commit_id()
885            .await
886            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
887    }
888
889    pub async fn branch_create(&self, name: &str) -> Result<()> {
890        self.ensure_schema_state_valid().await?;
891        self.ensure_schema_apply_idle("branch_create").await?;
892        ensure_public_branch_ref(name, "branch_create")?;
893        self.coordinator.write().await.branch_create(name).await
894    }
895
896    pub async fn branch_create_from(
897        &self,
898        from: impl Into<ReadTarget>,
899        name: &str,
900    ) -> Result<()> {
901        self.ensure_schema_apply_idle("branch_create_from").await?;
902        self.branch_create_from_impl(from, name, false).await
903    }
904
905    async fn branch_create_from_impl(
906        &self,
907        from: impl Into<ReadTarget>,
908        name: &str,
909        allow_internal_refs: bool,
910    ) -> Result<()> {
911        let target = from.into();
912        let ReadTarget::Branch(branch_name) = target else {
913            return Err(OmniError::manifest(
914                "branch creation from pinned snapshots is not supported yet".to_string(),
915            ));
916        };
917        if !allow_internal_refs {
918            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
919            ensure_public_branch_ref(name, "branch_create_from")?;
920        }
921        let branch = normalize_branch_name(&branch_name)?;
922        // Operate on a freshly-opened source coordinator that's owned locally
923        // — never touch `self.coordinator`. The pre-fix implementation used
924        // `swap_coordinator_for_branch` + operate + `restore_coordinator` as
925        // three separate `coordinator.write().await` acquisitions; under
926        // `&self` concurrency, a second `branch_create_from` could swap
927        // self.coordinator between this caller's swap and operate steps,
928        // making the operate run against the wrong source branch and
929        // forking off the wrong HEAD. Pinned by
930        // `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`
931        // in `crates/omnigraph-server/tests/server.rs`.
932        //
933        // `branch_create` mutates only the local coord's commit-graph cache;
934        // the manifest write is durable on disk regardless of which
935        // coord-handle issued it. Discarding `source_coord` after the call
936        // is the right shape — the new branch is reachable from any
937        // subsequent open of any coord.
938        let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
939        source_coord.branch_create(name).await
940    }
941
942    pub async fn branch_list(&self) -> Result<Vec<String>> {
943        self.ensure_schema_state_valid().await?;
944        self.coordinator.read().await.branch_list().await
945    }
946
947    pub async fn branch_delete(&self, name: &str) -> Result<()> {
948        self.ensure_schema_state_valid().await?;
949        self.ensure_schema_apply_idle("branch_delete").await?;
950        ensure_public_branch_ref(name, "branch_delete")?;
951        self.refresh().await?;
952        let branch = normalize_branch_name(name)?
953            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
954        let branches = self.coordinator.read().await.branch_list().await?;
955        if !branches.iter().any(|candidate| candidate == &branch) {
956            return Err(OmniError::manifest_not_found(format!(
957                "branch '{}' not found",
958                branch
959            )));
960        }
961
962        self.ensure_branch_delete_safe(&branch, &branches).await?;
963        self.delete_branch_storage_only(&branch).await
964    }
965
966    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
967        self.ensure_schema_state_valid().await?;
968        self.coordinator.read().await
969            .resolve_commit(&SnapshotId::new(commit_id))
970            .await
971    }
972
973    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
974        self.ensure_schema_state_valid().await?;
975        let branch = match branch {
976            Some(branch) => normalize_branch_name(branch)?,
977            None => None,
978        };
979        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
980        coordinator.list_commits().await
981    }
982
983    /// Open a sub-table for mutation with version-drift guard.
984    ///
985    /// Checks that the dataset's current version matches the snapshot-pinned
986    /// version. If another writer has advanced the version, returns an error
987    /// prompting the caller to refresh and retry (optimistic concurrency).
988    pub(crate) async fn open_for_mutation(
989        &self,
990        table_key: &str,
991        op_kind: crate::db::MutationOpKind,
992    ) -> Result<(Dataset, String, Option<String>)> {
993        table_ops::open_for_mutation(self, table_key, op_kind).await
994    }
995
996    pub(crate) async fn open_for_mutation_on_branch(
997        &self,
998        branch: Option<&str>,
999        table_key: &str,
1000        op_kind: crate::db::MutationOpKind,
1001    ) -> Result<(Dataset, String, Option<String>)> {
1002        table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1003    }
1004
1005    pub(crate) async fn fork_dataset_from_entry_state(
1006        &self,
1007        table_key: &str,
1008        full_path: &str,
1009        source_branch: Option<&str>,
1010        source_version: u64,
1011        active_branch: &str,
1012    ) -> Result<Dataset> {
1013        table_ops::fork_dataset_from_entry_state(
1014            self,
1015            table_key,
1016            full_path,
1017            source_branch,
1018            source_version,
1019            active_branch,
1020        )
1021        .await
1022    }
1023
1024    pub(crate) async fn reopen_for_mutation(
1025        &self,
1026        table_key: &str,
1027        full_path: &str,
1028        table_branch: Option<&str>,
1029        expected_version: u64,
1030        op_kind: crate::db::MutationOpKind,
1031    ) -> Result<Dataset> {
1032        table_ops::reopen_for_mutation(
1033            self,
1034            table_key,
1035            full_path,
1036            table_branch,
1037            expected_version,
1038            op_kind,
1039        )
1040        .await
1041    }
1042
1043    pub(crate) async fn open_dataset_at_state(
1044        &self,
1045        table_path: &str,
1046        table_branch: Option<&str>,
1047        table_version: u64,
1048    ) -> Result<Dataset> {
1049        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1050    }
1051
1052    pub(crate) async fn build_indices_on_dataset(
1053        &self,
1054        table_key: &str,
1055        ds: &mut Dataset,
1056    ) -> Result<()> {
1057        table_ops::build_indices_on_dataset(self, table_key, ds).await
1058    }
1059
1060    pub(crate) async fn build_indices_on_dataset_for_catalog(
1061        &self,
1062        catalog: &Catalog,
1063        table_key: &str,
1064        ds: &mut Dataset,
1065    ) -> Result<()> {
1066        table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1067    }
1068
1069    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
1070    // uses `commit_updates_on_branch_with_expected` exclusively.
1071    #[cfg(test)]
1072    pub(crate) async fn commit_updates(
1073        &mut self,
1074        updates: &[crate::db::SubTableUpdate],
1075    ) -> Result<u64> {
1076        table_ops::commit_updates(self, updates).await
1077    }
1078
1079    pub(crate) async fn commit_manifest_updates(
1080        &self,
1081        updates: &[crate::db::SubTableUpdate],
1082    ) -> Result<u64> {
1083        table_ops::commit_manifest_updates(self, updates).await
1084    }
1085
1086    pub(crate) async fn record_merge_commit(
1087        &self,
1088        manifest_version: u64,
1089        parent_commit_id: &str,
1090        merged_parent_commit_id: &str,
1091        actor_id: Option<&str>,
1092    ) -> Result<String> {
1093        table_ops::record_merge_commit(
1094            self,
1095            manifest_version,
1096            parent_commit_id,
1097            merged_parent_commit_id,
1098            actor_id,
1099        )
1100        .await
1101    }
1102
1103    pub(crate) async fn commit_updates_on_branch_with_expected(
1104        &self,
1105        branch: Option<&str>,
1106        updates: &[crate::db::SubTableUpdate],
1107        expected_table_versions: &std::collections::HashMap<String, u64>,
1108        actor_id: Option<&str>,
1109    ) -> Result<u64> {
1110        table_ops::commit_updates_on_branch_with_expected(
1111            self,
1112            branch,
1113            updates,
1114            expected_table_versions,
1115            actor_id,
1116        )
1117        .await
1118    }
1119
1120    pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1121        table_ops::ensure_commit_graph_initialized(self).await
1122    }
1123
1124    /// Invalidate the cached graph index. Called after edge mutations.
1125    pub(crate) async fn invalidate_graph_index(&self) {
1126        table_ops::invalidate_graph_index(self).await
1127    }
1128}
1129
1130pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1131    let branch = branch.trim();
1132    if branch.is_empty() {
1133        return Err(OmniError::manifest(
1134            "branch name cannot be empty".to_string(),
1135        ));
1136    }
1137    if branch == "main" {
1138        return Ok(None);
1139    }
1140    Ok(Some(branch.to_string()))
1141}
1142
1143pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1144    if super::is_internal_run_branch(branch) {
1145        return Err(OmniError::manifest(format!(
1146            "{} does not allow internal run ref '{}'",
1147            operation, branch
1148        )));
1149    }
1150    if is_internal_system_branch(branch) {
1151        return Err(OmniError::manifest(format!(
1152            "{} does not allow internal system ref '{}'",
1153            operation, branch
1154        )));
1155    }
1156    Ok(())
1157}
1158
1159fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1160    if batches.is_empty() {
1161        return Ok(RecordBatch::new_empty(schema));
1162    }
1163    if batches.len() == 1 {
1164        return Ok(batches.into_iter().next().unwrap());
1165    }
1166    let batch_schema = batches[0].schema();
1167    arrow_select::concat::concat_batches(&batch_schema, &batches)
1168        .map_err(|e| OmniError::Lance(e.to_string()))
1169}
1170
1171fn blob_properties_for_table_key<'a>(
1172    catalog: &'a Catalog,
1173    table_key: &str,
1174) -> Result<&'a std::collections::HashSet<String>> {
1175    if let Some(type_name) = table_key.strip_prefix("node:") {
1176        return catalog
1177            .node_types
1178            .get(type_name)
1179            .map(|node_type| &node_type.blob_properties)
1180            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1181    }
1182    if let Some(type_name) = table_key.strip_prefix("edge:") {
1183        return catalog
1184            .edge_types
1185            .get(type_name)
1186            .map(|edge_type| &edge_type.blob_properties)
1187            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1188    }
1189    Err(OmniError::manifest(format!(
1190        "invalid table key '{}'",
1191        table_key
1192    )))
1193}
1194
1195fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1196    if descriptions.is_null(row) {
1197        return Ok(true);
1198    }
1199
1200    let kind = descriptions
1201        .column_by_name("kind")
1202        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1203        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1204        .or_else(|| {
1205            descriptions
1206                .column_by_name("kind")
1207                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1208                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1209        });
1210    let position = descriptions
1211        .column_by_name("position")
1212        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1213        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1214    let size = descriptions
1215        .column_by_name("size")
1216        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1217        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1218    let blob_uri = descriptions
1219        .column_by_name("blob_uri")
1220        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1221        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1222
1223    let Some(kind) = kind else {
1224        return Ok(true);
1225    };
1226    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1227    if kind != BlobKind::Inline {
1228        return Ok(false);
1229    }
1230
1231    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1232}
1233
1234/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1235///
1236/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1237/// `DataType::LargeBinary` as a placeholder. This function replaces those
1238/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1239fn fixup_blob_schemas(catalog: &mut Catalog) {
1240    for node_type in catalog.node_types.values_mut() {
1241        if node_type.blob_properties.is_empty() {
1242            continue;
1243        }
1244        let fields: Vec<Field> = node_type
1245            .arrow_schema
1246            .fields()
1247            .iter()
1248            .map(|f| {
1249                if node_type.blob_properties.contains(f.name()) {
1250                    blob_field(f.name(), f.is_nullable())
1251                } else {
1252                    f.as_ref().clone()
1253                }
1254            })
1255            .collect();
1256        node_type.arrow_schema = Arc::new(Schema::new(fields));
1257    }
1258    for edge_type in catalog.edge_types.values_mut() {
1259        if edge_type.blob_properties.is_empty() {
1260            continue;
1261        }
1262        let fields: Vec<Field> = edge_type
1263            .arrow_schema
1264            .fields()
1265            .iter()
1266            .map(|f| {
1267                if edge_type.blob_properties.contains(f.name()) {
1268                    blob_field(f.name(), f.is_nullable())
1269                } else {
1270                    f.as_ref().clone()
1271                }
1272            })
1273            .collect();
1274        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1275    }
1276}
1277
1278fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1279    let schema_ast = parse_schema(schema_source)?;
1280    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1281}
1282
1283fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1284    match type_kind {
1285        SchemaTypeKind::Node => format!("node:{}", name),
1286        SchemaTypeKind::Edge => format!("edge:{}", name),
1287        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1288    }
1289}
1290
1291fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1292    if let Some(type_name) = table_key.strip_prefix("node:") {
1293        let node_type: &NodeType = catalog
1294            .node_types
1295            .get(type_name)
1296            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1297        return Ok(node_type.arrow_schema.clone());
1298    }
1299    if let Some(type_name) = table_key.strip_prefix("edge:") {
1300        let edge_type: &EdgeType = catalog
1301            .edge_types
1302            .get(type_name)
1303            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1304        return Ok(edge_type.arrow_schema.clone());
1305    }
1306    Err(OmniError::manifest(format!(
1307        "invalid table key '{}'",
1308        table_key
1309    )))
1310}
1311
1312fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1313    let mut obj = serde_json::Map::new();
1314    for (i, field) in batch.schema().fields().iter().enumerate() {
1315        obj.insert(
1316            field.name().clone(),
1317            json_value_from_array(batch.column(i).as_ref(), row)?,
1318        );
1319    }
1320    Ok(serde_json::Value::Object(obj))
1321}
1322
1323fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1324    if array.is_null(row) {
1325        return Ok(serde_json::Value::Null);
1326    }
1327
1328    match array.data_type() {
1329        DataType::Utf8 => Ok(serde_json::Value::String(
1330            array
1331                .as_any()
1332                .downcast_ref::<StringArray>()
1333                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1334                .value(row)
1335                .to_string(),
1336        )),
1337        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1338            array
1339                .as_any()
1340                .downcast_ref::<LargeStringArray>()
1341                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1342                .value(row)
1343                .to_string(),
1344        )),
1345        DataType::Boolean => Ok(serde_json::Value::Bool(
1346            array
1347                .as_any()
1348                .downcast_ref::<BooleanArray>()
1349                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1350                .value(row),
1351        )),
1352        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1353            array
1354                .as_any()
1355                .downcast_ref::<Int32Array>()
1356                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1357                .value(row),
1358        ))),
1359        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1360            array
1361                .as_any()
1362                .downcast_ref::<Int64Array>()
1363                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1364                .value(row),
1365        ))),
1366        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1367            array
1368                .as_any()
1369                .downcast_ref::<UInt32Array>()
1370                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1371                .value(row),
1372        ))),
1373        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1374            array
1375                .as_any()
1376                .downcast_ref::<UInt64Array>()
1377                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1378                .value(row),
1379        ))),
1380        DataType::Float32 => {
1381            let value = array
1382                .as_any()
1383                .downcast_ref::<Float32Array>()
1384                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1385                .value(row) as f64;
1386            Ok(serde_json::Value::Number(
1387                serde_json::Number::from_f64(value).ok_or_else(|| {
1388                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1389                })?,
1390            ))
1391        }
1392        DataType::Float64 => {
1393            let value = array
1394                .as_any()
1395                .downcast_ref::<Float64Array>()
1396                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1397                .value(row);
1398            Ok(serde_json::Value::Number(
1399                serde_json::Number::from_f64(value).ok_or_else(|| {
1400                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1401                })?,
1402            ))
1403        }
1404        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1405            array
1406                .as_any()
1407                .downcast_ref::<Date32Array>()
1408                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1409                .value(row),
1410        ))),
1411        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1412            &base64::engine::general_purpose::STANDARD,
1413            array
1414                .as_any()
1415                .downcast_ref::<BinaryArray>()
1416                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1417                .value(row),
1418        ))),
1419        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1420            &base64::engine::general_purpose::STANDARD,
1421            array
1422                .as_any()
1423                .downcast_ref::<LargeBinaryArray>()
1424                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1425                .value(row),
1426        ))),
1427        DataType::List(_) => {
1428            let list = array
1429                .as_any()
1430                .downcast_ref::<ListArray>()
1431                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1432            let values = list.value(row);
1433            let mut out = Vec::with_capacity(values.len());
1434            for idx in 0..values.len() {
1435                out.push(json_value_from_array(values.as_ref(), idx)?);
1436            }
1437            Ok(serde_json::Value::Array(out))
1438        }
1439        DataType::LargeList(_) => {
1440            let list = array
1441                .as_any()
1442                .downcast_ref::<LargeListArray>()
1443                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1444            let values = list.value(row);
1445            let mut out = Vec::with_capacity(values.len());
1446            for idx in 0..values.len() {
1447                out.push(json_value_from_array(values.as_ref(), idx)?);
1448            }
1449            Ok(serde_json::Value::Array(out))
1450        }
1451        DataType::FixedSizeList(_, _) => {
1452            let list = array
1453                .as_any()
1454                .downcast_ref::<FixedSizeListArray>()
1455                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1456            let values = list.value(row);
1457            let mut out = Vec::with_capacity(values.len());
1458            for idx in 0..values.len() {
1459                out.push(json_value_from_array(values.as_ref(), idx)?);
1460            }
1461            Ok(serde_json::Value::Array(out))
1462        }
1463        DataType::Struct(fields) => {
1464            let struct_array = array
1465                .as_any()
1466                .downcast_ref::<StructArray>()
1467                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1468            let mut obj = serde_json::Map::new();
1469            for (field_idx, field) in fields.iter().enumerate() {
1470                obj.insert(
1471                    field.name().clone(),
1472                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1473                );
1474            }
1475            Ok(serde_json::Value::Object(obj))
1476        }
1477        _ => {
1478            let value = arrow_cast::display::array_value_to_string(array, row)
1479                .map_err(|e| OmniError::Lance(e.to_string()))?;
1480            Ok(serde_json::Value::String(value))
1481        }
1482    }
1483}
1484
1485#[cfg(test)]
1486mod tests {
1487    use super::*;
1488    use crate::db::is_internal_run_branch;
1489    use crate::db::manifest::ManifestCoordinator;
1490    use async_trait::async_trait;
1491    use serde_json::Value;
1492    use std::sync::Mutex;
1493
1494    use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1495
1496    const TEST_SCHEMA: &str = r#"
1497node Person {
1498    name: String @key
1499    age: I32?
1500}
1501node Company {
1502    name: String @key
1503}
1504edge Knows: Person -> Person {
1505    since: Date?
1506}
1507edge WorksAt: Person -> Company
1508"#;
1509
1510    #[derive(Debug, Default)]
1511    struct RecordingStorageAdapter {
1512        inner: LocalStorageAdapter,
1513        reads: Mutex<Vec<String>>,
1514        writes: Mutex<Vec<String>>,
1515        exists_checks: Mutex<Vec<String>>,
1516        renames: Mutex<Vec<(String, String)>>,
1517        deletes: Mutex<Vec<String>>,
1518    }
1519
1520    impl RecordingStorageAdapter {
1521        fn reads(&self) -> Vec<String> {
1522            self.reads.lock().unwrap().clone()
1523        }
1524
1525        fn writes(&self) -> Vec<String> {
1526            self.writes.lock().unwrap().clone()
1527        }
1528
1529        fn exists_checks(&self) -> Vec<String> {
1530            self.exists_checks.lock().unwrap().clone()
1531        }
1532    }
1533
1534    #[async_trait]
1535    impl StorageAdapter for RecordingStorageAdapter {
1536        async fn read_text(&self, uri: &str) -> Result<String> {
1537            self.reads.lock().unwrap().push(uri.to_string());
1538            self.inner.read_text(uri).await
1539        }
1540
1541        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1542            self.writes.lock().unwrap().push(uri.to_string());
1543            self.inner.write_text(uri, contents).await
1544        }
1545
1546        async fn exists(&self, uri: &str) -> Result<bool> {
1547            self.exists_checks.lock().unwrap().push(uri.to_string());
1548            self.inner.exists(uri).await
1549        }
1550
1551        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1552            self.renames
1553                .lock()
1554                .unwrap()
1555                .push((from_uri.to_string(), to_uri.to_string()));
1556            self.inner.rename_text(from_uri, to_uri).await
1557        }
1558
1559        async fn delete(&self, uri: &str) -> Result<()> {
1560            self.deletes.lock().unwrap().push(uri.to_string());
1561            self.inner.delete(uri).await
1562        }
1563
1564        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1565            self.inner.list_dir(dir_uri).await
1566        }
1567    }
1568
1569    #[tokio::test]
1570    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1571        let dir = tempfile::tempdir().unwrap();
1572        let uri = dir.path().to_str().unwrap();
1573        let adapter = Arc::new(RecordingStorageAdapter::default());
1574
1575        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1576            .await
1577            .unwrap();
1578        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1579        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1580        assert!(
1581            adapter
1582                .writes()
1583                .contains(&join_uri(uri, "__schema_state.json"))
1584        );
1585
1586        Omnigraph::open_with_storage(uri, adapter.clone())
1587            .await
1588            .unwrap();
1589        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1590        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1591        assert!(
1592            adapter
1593                .reads()
1594                .contains(&join_uri(uri, "__schema_state.json"))
1595        );
1596        assert!(
1597            adapter
1598                .exists_checks()
1599                .contains(&join_uri(uri, "_schema.ir.json"))
1600        );
1601        assert!(
1602            adapter
1603                .exists_checks()
1604                .contains(&join_uri(uri, "__schema_state.json"))
1605        );
1606        assert!(
1607            adapter
1608                .exists_checks()
1609                .contains(&join_uri(uri, "_graph_commits.lance"))
1610        );
1611    }
1612
1613    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1614        let snapshot = db.snapshot().await;
1615        let ds = snapshot.open(table_key).await.unwrap();
1616        let batches = db.table_store().scan_batches(&ds).await.unwrap();
1617        batches
1618            .into_iter()
1619            .flat_map(|batch| {
1620                (0..batch.num_rows())
1621                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1622                    .collect::<Vec<_>>()
1623            })
1624            .collect()
1625    }
1626
1627    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1628        let (mut ds, full_path, table_branch) = db
1629            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
1630            .await
1631            .unwrap();
1632        let schema: Arc<Schema> = Arc::new(ds.schema().into());
1633        let columns: Vec<Arc<dyn Array>> = schema
1634            .fields()
1635            .iter()
1636            .map(|field| match field.name().as_str() {
1637                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1638                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1639                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1640                _ => new_null_array(field.data_type(), 1),
1641            })
1642            .collect();
1643        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1644        let state = db
1645            .table_store()
1646            .append_batch(&full_path, &mut ds, batch)
1647            .await
1648            .unwrap();
1649        db.commit_updates(&[crate::db::SubTableUpdate {
1650            table_key: "node:Person".to_string(),
1651            table_version: state.version,
1652            table_branch,
1653            row_count: state.row_count,
1654            version_metadata: state.version_metadata,
1655        }])
1656        .await
1657        .unwrap();
1658    }
1659
1660    #[tokio::test]
1661    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1662        let dir = tempfile::tempdir().unwrap();
1663        let uri = dir.path().to_str().unwrap();
1664        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1665        seed_person_row(&mut db, "Alice", Some(30)).await;
1666
1667        let desired = TEST_SCHEMA.replace(
1668            "    age: I32?\n}",
1669            "    age: I32?\n    nickname: String?\n}",
1670        );
1671        let result = db.apply_schema(&desired).await.unwrap();
1672        assert!(result.applied);
1673
1674        let reopened = Omnigraph::open(uri).await.unwrap();
1675        let rows = table_rows_json(&reopened, "node:Person").await;
1676        assert_eq!(rows.len(), 1);
1677        assert_eq!(rows[0]["name"], "Alice");
1678        assert_eq!(rows[0]["age"], 30);
1679        assert!(rows[0]["nickname"].is_null());
1680        assert!(
1681            reopened.catalog().node_types["Person"]
1682                .properties
1683                .contains_key("nickname")
1684        );
1685        assert!(dir.path().join("_schema.pg").exists());
1686    }
1687
1688    #[tokio::test]
1689    async fn test_apply_schema_renames_property_and_preserves_values() {
1690        let dir = tempfile::tempdir().unwrap();
1691        let uri = dir.path().to_str().unwrap();
1692        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1693        seed_person_row(&mut db, "Alice", Some(30)).await;
1694
1695        let desired = TEST_SCHEMA.replace(
1696            "    age: I32?\n}",
1697            "    years: I32? @rename_from(\"age\")\n}",
1698        );
1699        db.apply_schema(&desired).await.unwrap();
1700
1701        let reopened = Omnigraph::open(uri).await.unwrap();
1702        let rows = table_rows_json(&reopened, "node:Person").await;
1703        assert_eq!(rows[0]["name"], "Alice");
1704        assert_eq!(rows[0]["years"], 30);
1705        assert!(rows[0].get("age").is_none());
1706    }
1707
1708    #[tokio::test]
1709    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1710        let dir = tempfile::tempdir().unwrap();
1711        let uri = dir.path().to_str().unwrap();
1712        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1713        seed_person_row(&mut db, "Alice", Some(30)).await;
1714        let before_version = db.snapshot().await.version();
1715
1716        let desired = TEST_SCHEMA
1717            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1718            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1719            .replace(
1720                "edge WorksAt: Person -> Company",
1721                "edge WorksAt: Human -> Company",
1722            );
1723        db.apply_schema(&desired).await.unwrap();
1724
1725        let head = db.snapshot().await;
1726        assert!(head.entry("node:Person").is_none());
1727        assert!(head.entry("node:Human").is_some());
1728        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1729            .await
1730            .unwrap();
1731        assert!(historical.entry("node:Person").is_some());
1732        assert!(historical.entry("node:Human").is_none());
1733    }
1734
1735    #[tokio::test]
1736    async fn test_apply_schema_succeeds_after_load() {
1737        // Historical: schema apply used to be blocked by leftover
1738        // `__run__` branches. A defense-in-depth filter now skips
1739        // internal system branches, and run branches were made
1740        // ephemeral on every terminal state — so in practice no
1741        // `__run__` branch survives publish. The filter still guards
1742        // the invariant.
1743        let dir = tempfile::tempdir().unwrap();
1744        let uri = dir.path().to_str().unwrap();
1745        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1746
1747        crate::loader::load_jsonl(
1748            &mut db,
1749            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
1750            crate::loader::LoadMode::Overwrite,
1751        )
1752        .await
1753        .unwrap();
1754
1755        let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
1756        assert!(
1757            !all_branches.iter().any(|b| is_internal_run_branch(b)),
1758            "run branch should be deleted after publish, got: {:?}",
1759            all_branches
1760        );
1761
1762        let desired = TEST_SCHEMA.replace(
1763            "    age: I32?\n}",
1764            "    age: I32?\n    nickname: String?\n}",
1765        );
1766        let result = db.apply_schema(&desired).await.unwrap();
1767        assert!(result.applied, "schema apply should have applied");
1768    }
1769
1770    #[tokio::test]
1771    async fn test_apply_schema_adds_index_for_existing_property() {
1772        let dir = tempfile::tempdir().unwrap();
1773        let uri = dir.path().to_str().unwrap();
1774        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1775
1776        let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1777        db.apply_schema(&desired).await.unwrap();
1778
1779        let snapshot = db.snapshot().await;
1780        let ds = snapshot.open("node:Person").await.unwrap();
1781        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1782    }
1783
1784    #[tokio::test]
1785    async fn test_apply_schema_rewrite_preserves_existing_indices() {
1786        let dir = tempfile::tempdir().unwrap();
1787        let uri = dir.path().to_str().unwrap();
1788        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1789        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1790        seed_person_row(&mut db, "Alice", Some(30)).await;
1791
1792        let desired = initial_schema.replace(
1793            "    age: I32?\n}",
1794            "    age: I32?\n    nickname: String?\n}",
1795        );
1796        db.apply_schema(&desired).await.unwrap();
1797
1798        let snapshot = db.snapshot().await;
1799        let ds = snapshot.open("node:Person").await.unwrap();
1800        assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1801        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1802    }
1803
1804    #[tokio::test]
1805    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1806        let dir = tempfile::tempdir().unwrap();
1807        let uri = dir.path().to_str().unwrap();
1808        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1809        let mut db = db;
1810        db.coordinator
1811            .write()
1812            .await
1813            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1814            .await
1815            .unwrap();
1816
1817        let err = db
1818            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
1819            .await
1820            .unwrap_err();
1821        assert!(
1822            err.to_string()
1823                .contains("write is unavailable while schema apply is in progress")
1824        );
1825    }
1826
1827    #[tokio::test]
1828    async fn test_commit_updates_rejects_while_schema_apply_locked() {
1829        let dir = tempfile::tempdir().unwrap();
1830        let uri = dir.path().to_str().unwrap();
1831        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1832        db.coordinator
1833            .write()
1834            .await
1835            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1836            .await
1837            .unwrap();
1838
1839        let err = db.commit_updates(&[]).await.unwrap_err();
1840        assert!(
1841            err.to_string()
1842                .contains("write commit is unavailable while schema apply is in progress")
1843        );
1844    }
1845
1846    #[tokio::test]
1847    async fn test_branch_list_hides_schema_apply_lock_branch() {
1848        let dir = tempfile::tempdir().unwrap();
1849        let uri = dir.path().to_str().unwrap();
1850        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1851        db.coordinator
1852            .write()
1853            .await
1854            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1855            .await
1856            .unwrap();
1857
1858        let branches = db.branch_list().await.unwrap();
1859        assert_eq!(branches, vec!["main".to_string()]);
1860    }
1861}