Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21    DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22    build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod repair;
34mod schema_apply;
35mod table_ops;
36
37pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
38pub use repair::{
39    RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
40};
41pub use schema_apply::SchemaApplyOptions;
42
43use super::commit_graph::GraphCommit;
44use super::manifest::{
45    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
46    table_path_for_table_key,
47};
48use super::schema_state::{
49    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
50    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
51    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
52    write_schema_contract, write_schema_contract_staging,
53};
54use super::{
55    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
56    is_schema_apply_lock_branch,
57};
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum MergeOutcome {
61    AlreadyUpToDate,
62    FastForward,
63    Merged,
64}
65
66#[derive(Debug, Clone)]
67pub struct SchemaApplyResult {
68    pub supported: bool,
69    pub applied: bool,
70    pub manifest_version: u64,
71    pub steps: Vec<SchemaMigrationStep>,
72}
73
74#[derive(Debug, Clone)]
75pub struct SchemaApplyPreview {
76    pub plan: SchemaMigrationPlan,
77    pub catalog: Catalog,
78}
79
80/// Top-level handle to an Omnigraph database.
81///
82/// An Omnigraph is a Lance-native graph database with git-style branching.
83/// It stores typed property graphs as per-type Lance datasets coordinated
84/// through a Lance manifest table.
85pub struct Omnigraph {
86    root_uri: String,
87    storage: Arc<dyn StorageAdapter>,
88    /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
89    /// this so engine write APIs can be `&self` (the HTTP server's
90    /// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
91    /// calls without a global write lock). Reads (`snapshot`, `version`,
92    /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
93    /// `list_commits`, …) acquire `.read().await` and parallelize.
94    /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
95    /// `record_*`) acquire `.write().await` and serialize. The atomic
96    /// commit invariant — `commit_manifest_updates` followed by
97    /// `record_graph_commit` must be atomic — is preserved by the
98    /// single `.write()` covering both calls inside
99    /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
100    /// converted from `Mutex` to `RwLock` because the bench showed
101    /// the Mutex was the dominant serializer for disjoint-table
102    /// workloads. Lock acquisition order: always before `runtime_cache`
103    /// (when both are needed in one scope).
104    coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
105    table_store: TableStore,
106    runtime_cache: RuntimeCache,
107    /// Read-heavy on every query, written only by `apply_schema`. ArcSwap
108    /// gives atomic pointer swap with zero-cost reads (`load()` returns a
109    /// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
110    /// don't contend on a lock to read the catalog.
111    catalog: Arc<ArcSwap<Catalog>>,
112    /// Read-heavy on schema introspection paths, written only by
113    /// `apply_schema`. Same ArcSwap rationale as `catalog`.
114    schema_source: Arc<ArcSwap<String>>,
115    /// Per-`(table_key, branch)` writer queues. Reachable from engine
116    /// internals (mutation finalize, schema_apply, branch_merge,
117    /// ensure_indices, delete_where) and from future MR-870 recovery
118    /// reconciler. PR 1b adds the field; callers acquire in commits 4+.
119    write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
120    /// Process-wide mutex held across the swap → operate → restore window
121    /// in `branch_merge_impl`. Two concurrent merges with distinct targets
122    /// would otherwise interleave their three separate
123    /// `coordinator.write().await` acquisitions, leaving each merge's
124    /// inner body running against the other's swapped coord. Pinned by
125    /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
126    /// in `crates/omnigraph-server/tests/server.rs`.
127    ///
128    /// Cost: serializes ALL concurrent branch merges process-wide.
129    /// Acceptable because branch merges are heavy (table rewrites, index
130    /// rebuilds), per-(table, branch) queues inside `commit_all` already
131    /// serialize the data path, and merges are rare relative to /change
132    /// or /ingest. A finer-grained per-target-branch mutex is a follow-up
133    /// if telemetry shows merge concurrency matters.
134    ///
135    /// The deeper fix — refactor `branch_merge_on_current_target` to take
136    /// an explicit target coord parameter so `self.coordinator` is never
137    /// used as scratch space — is the round-1 shape applied to
138    /// `branch_create_from_impl`. Deferred because it requires unwinding
139    /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()`
140    /// call inside the merge body.
141    merge_exclusive: Arc<tokio::sync::Mutex<()>>,
142    /// Optional policy checker for engine-layer enforcement (MR-722).
143    /// `None` = no enforcement; mutating methods are unconditionally
144    /// allowed (this is the embedded/dev default). `Some` = every
145    /// mutating method calls `self.enforce(action, scope, actor)` at
146    /// entry; denial returns `OmniError::Policy`.
147    ///
148    /// Per chassis design (see `omnigraph_policy::PolicyChecker`), the
149    /// trait surface is deliberately coarse — action × scope × actor.
150    /// Per-row / per-type / per-column scope lives at the query layer
151    /// (MR-725), which extends the same trait with a different method.
152    /// Don't be tempted to add per-row enforcement here.
153    ///
154    /// Set via `with_policy(checker)` after construction. Today only
155    /// `apply_schema_as` consults this field (PR #2 proof-of-concept);
156    /// PR #3 fans the `enforce()` call out to the remaining writers.
157    policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
158}
159
160/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
161///
162/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
163/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
164/// inspection — should not trigger writes (they may run with read-only
165/// object-store credentials, and silent open-time mutations are
166/// surprising). They also don't need recovery: reads always resolve
167/// through the manifest pin, which is the consistent snapshot regardless
168/// of any Phase B → Phase C drift on the per-table side.
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum OpenMode {
171    /// Run the recovery sweep on open. Default for `Omnigraph::open`.
172    ReadWrite,
173    /// Skip the recovery sweep. Use for read-only consumers via
174    /// [`Omnigraph::open_read_only`].
175    ReadOnly,
176}
177
178/// Options for [`Omnigraph::init_with_options`].
179///
180/// `force` controls the safety preflight that prevents an
181/// accidental re-init from overwriting an existing graph's schema
182/// metadata. Default behavior (`force: false`) fails fast with
183/// [`OmniError::AlreadyInitialized`] if any of `_schema.pg`,
184/// `_schema.ir.json`, or `__schema_state.json` already exists at
185/// the target URI. With `force: true` the preflight is skipped —
186/// existing schema files are overwritten in place. Force does NOT
187/// purge old Lance datasets or `__manifest/`; reclaiming those
188/// still requires deleting the graph directory by hand (or via a
189/// future `DELETE /graphs/{id}`).
190#[derive(Debug, Clone, Copy, Default)]
191pub struct InitOptions {
192    /// Skip the existing-graph preflight. Operators set this when
193    /// they actually mean to overwrite — e.g. `omnigraph init --force`.
194    pub force: bool,
195}
196
197impl Omnigraph {
198    /// Create a new graph at `uri` from schema source.
199    ///
200    /// Strict mode: errors with [`OmniError::AlreadyInitialized`] if
201    /// `uri` already holds any of the three schema artifacts. To
202    /// overwrite an existing graph deliberately, call
203    /// [`Self::init_with_options`] with `InitOptions { force: true }`.
204    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
205        Self::init_with_options(uri, schema_source, InitOptions::default()).await
206    }
207
208    /// Create a new graph at `uri`, with explicit init-time options.
209    ///
210    /// See [`InitOptions`] for the safety contract — by default this
211    /// behaves identically to [`Self::init`].
212    pub async fn init_with_options(
213        uri: &str,
214        schema_source: &str,
215        options: InitOptions,
216    ) -> Result<Self> {
217        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
218    }
219
220    pub(crate) async fn init_with_storage(
221        uri: &str,
222        schema_source: &str,
223        storage: Arc<dyn StorageAdapter>,
224        options: InitOptions,
225    ) -> Result<Self> {
226        let root = normalize_root_uri(uri)?;
227
228        // Preflight: refuse to clobber an existing graph unless the
229        // operator passed `force`. This runs BEFORE any parse or
230        // write so a misdirected `init` against an existing graph
231        // URI cannot reach a code path that overwrites or, on a
232        // later cleanup, deletes the schema files.
233        //
234        // Closes the "init is destructive against existing state"
235        // class: there is no longer a code path where strict-mode
236        // `init` can mutate a populated graph root.
237        if !options.force {
238            for candidate in [
239                schema_source_uri(&root),
240                schema_ir_uri(&root),
241                schema_state_uri(&root),
242            ] {
243                if storage.exists(&candidate).await? {
244                    return Err(OmniError::AlreadyInitialized { uri: root.clone() });
245                }
246            }
247        }
248
249        let schema_ir = read_schema_ir_from_source(schema_source)?;
250        let mut catalog = build_catalog_from_ir(&schema_ir)?;
251        fixup_blob_schemas(&mut catalog);
252
253        // Establish an atomic ownership claim on `_schema.pg` before
254        // writing the remaining init artifacts. A check-then-write preflight
255        // is not enough under concurrent `init` calls: two callers can both
256        // observe an empty root, one can successfully initialize, and the
257        // loser can then fail in Lance `WriteMode::Create`. Only the caller
258        // that atomically created `_schema.pg` may clean up schema artifacts
259        // on later failure.
260        let schema_pg_claimed = if options.force {
261            false
262        } else {
263            let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
264            if !storage
265                .write_text_if_absent(&schema_path, schema_source)
266                .await?
267            {
268                return Err(OmniError::AlreadyInitialized { uri: root.clone() });
269            }
270            if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
271                best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
272                return Err(err);
273            }
274            true
275        };
276
277        // Run the I/O phase. On any error, best-effort-clean schema
278        // artifacts only when this invocation owns them: strict mode owns
279        // them after the atomic `_schema.pg` claim above; force mode owns
280        // destructive overwrite semantics by explicit operator request.
281        //
282        // Coverage gap: Lance per-type datasets and `__manifest/`
283        // directory created by `GraphCoordinator::init` are NOT cleaned
284        // up here — fully recursive directory deletion requires a
285        // `StorageAdapter::delete_prefix` primitive that's deferred
286        // along with `DELETE /graphs/{id}` (PR 2b in the MR-668 plan
287        // is currently deferred). If `init` fails after coordinator
288        // init succeeds, operators may need to remove the graph
289        // directory manually before retrying `init` on the same URI.
290        // Documented in the PR 2a commit message and `init` rustdoc.
291        let coordinator = match init_storage_phase(
292            &root,
293            schema_source,
294            &schema_ir,
295            &catalog,
296            &storage,
297            !schema_pg_claimed,
298        )
299        .await
300        {
301            Ok(coordinator) => coordinator,
302            Err(err) => {
303                if schema_pg_claimed || options.force {
304                    best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
305                }
306                return Err(err);
307            }
308        };
309
310        Ok(Self {
311            root_uri: root.clone(),
312            storage,
313            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
314            table_store: TableStore::new(&root),
315            runtime_cache: RuntimeCache::default(),
316            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
317            schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
318            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
319            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
320            policy: None,
321        })
322    }
323
324    /// Open an existing graph (read-write).
325    ///
326    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
327    /// Runs the open-time recovery sweep before returning — see [`OpenMode`].
328    pub async fn open(uri: &str) -> Result<Self> {
329        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
330    }
331
332    /// Open an existing graph for read-only consumers (NDJSON export,
333    /// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
334    pub async fn open_read_only(uri: &str) -> Result<Self> {
335        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
336    }
337
338    /// `open_with_storage` retained for existing callers (init/test paths).
339    /// Defaults to `OpenMode::ReadWrite`.
340    pub(crate) async fn open_with_storage(
341        uri: &str,
342        storage: Arc<dyn StorageAdapter>,
343    ) -> Result<Self> {
344        Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
345    }
346
347    pub(crate) async fn open_with_storage_and_mode(
348        uri: &str,
349        storage: Arc<dyn StorageAdapter>,
350        mode: OpenMode,
351    ) -> Result<Self> {
352        let root = normalize_root_uri(uri)?;
353        // Apply pending internal-schema migrations before the coordinator reads
354        // branch state, so `branch_list` and the schema-apply blocking-branch
355        // checks observe the post-migration graph — notably the v2→v3 sweep of
356        // legacy `__run__*` staging branches (MR-770). ReadWrite only: a
357        // read-only open must not trigger object-store writes, so a read-only
358        // open of an unmigrated legacy graph still lists `__run__*` until its
359        // first read-write open (an accepted, documented limitation).
360        if matches!(mode, OpenMode::ReadWrite) {
361            crate::db::manifest::migrate_on_open(&root).await?;
362        }
363        // Open the coordinator first so the schema-staging recovery sweep can
364        // compare its snapshot against any leftover staging files.
365        let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
366        // Both the schema-state recovery sweep AND the manifest-drift
367        // recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
368        // consumers (NDJSON export, `commit list`, schema show) shouldn't
369        // trigger object-store mutations: they may run with read-only
370        // credentials, and silent open-time writes are surprising. Both
371        // sweeps' work is recoverable on the next ReadWrite open, so
372        // skipping under ReadOnly doesn't lose any safety guarantees —
373        // the manifest pin is the consistent snapshot regardless of
374        // drift on the per-table side or leftover schema-staging files.
375        if matches!(mode, OpenMode::ReadWrite) {
376            let schema_state_recovery =
377                recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
378                    .await?;
379            // Recovery sweep: close the Phase B → Phase C residual on
380            // any sidecar left over from a crashed writer. Continuous
381            // in-process recovery for long-running servers (no restart
382            // required between Phase B failure and recovery) is a
383            // separate background-reconciler effort.
384            crate::db::manifest::recover_manifest_drift(
385                &root,
386                Arc::clone(&storage),
387                &mut coordinator,
388                crate::db::manifest::RecoveryMode::Full,
389                schema_state_recovery,
390            )
391            .await?;
392        }
393        // Read _schema.pg (post-recovery — may have just been renamed in).
394        let schema_path = schema_source_uri(&root);
395        let schema_source = storage.read_text(&schema_path).await?;
396        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
397        let branches = coordinator.branch_list().await?;
398        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
399            &root,
400            Arc::clone(&storage),
401            &branches,
402            &current_source_ir,
403        )
404        .await?;
405        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
406        fixup_blob_schemas(&mut catalog);
407
408        Ok(Self {
409            root_uri: root.clone(),
410            storage,
411            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
412            table_store: TableStore::new(&root),
413            runtime_cache: RuntimeCache::default(),
414            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
415            schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
416            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
417            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
418            policy: None,
419        })
420    }
421
422    /// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
423    /// catalog pointer; callers can hold the returned `Arc` across awaits
424    /// without blocking concurrent `apply_schema`.
425    pub fn catalog(&self) -> Arc<Catalog> {
426        self.catalog.load_full()
427    }
428
429    /// Returns an `Arc<String>` snapshot of the schema source.
430    pub fn schema_source(&self) -> Arc<String> {
431        self.schema_source.load_full()
432    }
433
434    /// Atomically swap the in-memory catalog. Concurrent readers see
435    /// either the old or the new pointer; never a torn state. Used by
436    /// `apply_schema` and `reload_schema_if_source_changed`.
437    pub(crate) fn store_catalog(&self, catalog: Catalog) {
438        self.catalog.store(Arc::new(catalog));
439    }
440
441    /// Atomically swap the in-memory schema source. Same rationale as
442    /// [`store_catalog`](Self::store_catalog).
443    pub(crate) fn store_schema_source(&self, schema_source: String) {
444        self.schema_source.store(Arc::new(schema_source));
445    }
446
447    pub fn uri(&self) -> &str {
448        &self.root_uri
449    }
450
451    /// Install a policy checker for engine-layer enforcement (MR-722).
452    /// Builder-style setter — consumes `self`, returns `Self`. Calling
453    /// this on a `Omnigraph` previously without policy enables
454    /// `enforce()` to fire at every mutating engine method that's been
455    /// wired to call it (currently `apply_schema_as`; PR #3 fans out to
456    /// the remaining writers).
457    ///
458    /// Embedded callers that don't care about authorization should
459    /// just not call this. Server / CLI callers that have loaded a
460    /// `PolicyEngine` from `policy.yaml` pass it here.
461    pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
462        self.policy = Some(checker);
463        self
464    }
465
466    /// Engine-layer policy enforcement gate (MR-722 chassis core).
467    ///
468    /// * If no policy is installed → no-op (returns `Ok(())`).
469    /// * If policy is installed AND actor is None → denial with a
470    ///   clear "no actor for engine-layer policy check" message.
471    ///   Forces server / CLI / SDK callers to thread an actor through
472    ///   when policy is configured — silent bypass via "I forgot the
473    ///   actor" is exactly the footgun this gate is here to prevent.
474    /// * If policy is installed AND actor is Some → call
475    ///   `PolicyChecker::check(action, scope, actor)`; map denial /
476    ///   internal failure to `OmniError::Policy(...)`.
477    pub(crate) fn enforce(
478        &self,
479        action: omnigraph_policy::PolicyAction,
480        scope: &omnigraph_policy::ResourceScope,
481        actor: Option<&str>,
482    ) -> Result<()> {
483        let Some(checker) = self.policy.as_ref() else {
484            return Ok(());
485        };
486        let Some(actor) = actor else {
487            return Err(OmniError::Policy(
488                "no actor for engine-layer policy check (policy is configured but the call site \
489                 didn't thread an actor through — this is almost certainly a bug, not an \
490                 intended bypass)"
491                    .to_string(),
492            ));
493        };
494        checker
495            .check(action, scope, actor)
496            .map_err(|err| OmniError::Policy(err.to_string()))
497    }
498
499    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
500        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
501    }
502
503    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
504        self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
505            .await
506    }
507
508    pub async fn plan_schema_with_options(
509        &self,
510        desired_schema_source: &str,
511        options: SchemaApplyOptions,
512    ) -> Result<SchemaMigrationPlan> {
513        schema_apply::plan_schema(self, desired_schema_source, options).await
514    }
515
516    pub async fn preview_schema_apply_with_options(
517        &self,
518        desired_schema_source: &str,
519        options: SchemaApplyOptions,
520    ) -> Result<SchemaApplyPreview> {
521        schema_apply::preview_schema_apply(self, desired_schema_source, options).await
522    }
523
524    pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
525        self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
526            .await
527    }
528
529    pub async fn apply_schema_with_options(
530        &self,
531        desired_schema_source: &str,
532        options: SchemaApplyOptions,
533    ) -> Result<SchemaApplyResult> {
534        self.apply_schema_as(desired_schema_source, options, None)
535            .await
536    }
537
538    /// Apply a schema migration with an explicit actor for engine-layer
539    /// policy enforcement (MR-722). When a `PolicyChecker` is installed
540    /// via [`Self::with_policy`], this method calls `enforce(SchemaApply,
541    /// Branch("main"), actor)` before any apply work happens. Denial
542    /// returns `OmniError::Policy` and leaves the manifest untouched.
543    ///
544    /// The no-actor variants (`apply_schema`, `apply_schema_with_options`)
545    /// pass `None` here. They work fine without a policy; if a policy IS
546    /// installed and actor is None, enforcement intentionally fails to
547    /// prevent silent-bypass-via-forgetting-the-actor footguns.
548    pub async fn apply_schema_as(
549        &self,
550        desired_schema_source: &str,
551        options: SchemaApplyOptions,
552        actor: Option<&str>,
553    ) -> Result<SchemaApplyResult> {
554        self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
555            .await
556    }
557
558    pub async fn apply_schema_as_with_catalog_check<F>(
559        &self,
560        desired_schema_source: &str,
561        options: SchemaApplyOptions,
562        actor: Option<&str>,
563        validate_catalog: F,
564    ) -> Result<SchemaApplyResult>
565    where
566        F: FnOnce(&Catalog) -> Result<()>,
567    {
568        schema_apply::apply_schema(
569            self,
570            desired_schema_source,
571            options,
572            actor,
573            validate_catalog,
574        )
575        .await
576    }
577
578    pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
579        schema_apply::ensure_schema_apply_idle(self, operation).await
580    }
581
582    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
583        schema_apply::ensure_schema_apply_not_locked(self, operation).await
584    }
585
586    pub(crate) fn table_store(&self) -> &TableStore {
587        &self.table_store
588    }
589
590    /// Engine-facing trait surface around `TableStore`.
591    ///
592    /// This is the canonical accessor for newly-written engine code. The
593    /// trait's signatures use opaque `SnapshotHandle` / `StagedHandle`
594    /// instead of leaking `lance::Dataset` /
595    /// `lance::dataset::transaction::Transaction`. Existing call sites
596    /// that still use `db.table_store.X(...)` (the inherent struct
597    /// methods) are migrated incrementally.
598    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
599        &self.table_store
600    }
601
602    /// Engine-level access to the object-store adapter (S3 / local fs).
603    /// Used by the recovery sidecar protocol — writers in the engine
604    /// call this to write/delete sidecars at `__recovery/{ulid}.json`.
605    pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
606        self.storage.as_ref()
607    }
608
609    /// Per-`(table_key, branch)` writer queues.
610    ///
611    /// Engine-internal writers (mutation finalize, schema_apply,
612    /// branch_merge, ensure_indices, delete_where) and the future MR-870
613    /// recovery reconciler reach the queue manager via this accessor.
614    /// Returns an `Arc` clone so callers can hold the manager across
615    /// `&mut self` engine API boundaries.
616    pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
617        Arc::clone(&self.write_queue)
618    }
619
620    /// Engine-internal access to the merge-exclusive mutex. Held across
621    /// the swap → operate → restore window in `branch_merge_impl` so
622    /// concurrent merges with distinct targets don't corrupt
623    /// `self.coordinator` mid-operation. See the field doc on
624    /// `Omnigraph::merge_exclusive` for the full design rationale.
625    pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
626        Arc::clone(&self.merge_exclusive)
627    }
628
629    /// Engine-level access to the graph's normalized root URI. Used by
630    /// the recovery sidecar protocol to compute `__recovery/` paths.
631    pub(crate) fn root_uri(&self) -> &str {
632        &self.root_uri
633    }
634
635    pub(crate) async fn open_coordinator_for_branch(
636        &self,
637        branch: Option<&str>,
638    ) -> Result<GraphCoordinator> {
639        match branch {
640            Some(branch) => {
641                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
642            }
643            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
644        }
645    }
646
647    pub(crate) async fn swap_coordinator_for_branch(
648        &self,
649        branch: Option<&str>,
650    ) -> Result<GraphCoordinator> {
651        let next = self.open_coordinator_for_branch(branch).await?;
652        let mut coord = self.coordinator.write().await;
653        Ok(std::mem::replace(&mut *coord, next))
654    }
655
656    pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
657        *self.coordinator.write().await = coordinator;
658    }
659
660    pub(crate) async fn resolved_branch_target(
661        &self,
662        branch: Option<&str>,
663    ) -> Result<ResolvedTarget> {
664        self.ensure_schema_state_valid().await?;
665        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
666        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
667        let coord = self.coordinator.read().await;
668        if normalized.as_deref() == coord.current_branch() {
669            let snapshot_id = coord
670                .head_commit_id()
671                .await?
672                .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
673            return Ok(ResolvedTarget {
674                requested,
675                branch: coord.current_branch().map(str::to_string),
676                snapshot_id,
677                snapshot: coord.snapshot(),
678            });
679        }
680        coord.resolve_target(&requested).await
681    }
682
683    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
684        self.resolved_branch_target(branch)
685            .await
686            .map(|resolved| resolved.snapshot)
687    }
688
689    pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
690        self.ensure_schema_state_valid().await?;
691        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
692        let coord = self.coordinator.read().await;
693        coord
694            .resolve_target(&requested)
695            .await
696            .map(|resolved| resolved.snapshot)
697    }
698
699    pub(crate) async fn version(&self) -> u64 {
700        self.coordinator.read().await.version()
701    }
702
703    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
704    pub(crate) async fn snapshot(&self) -> Snapshot {
705        self.coordinator.read().await.snapshot()
706    }
707
708    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
709        self.resolved_target(target)
710            .await
711            .map(|resolved| resolved.snapshot)
712    }
713
714    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
715        self.snapshot_of(target)
716            .await
717            .map(|snapshot| snapshot.version())
718    }
719
720    pub async fn resolved_branch_of(
721        &self,
722        target: impl Into<ReadTarget>,
723    ) -> Result<Option<String>> {
724        self.resolved_target(target)
725            .await
726            .map(|resolved| resolved.branch)
727    }
728
729    /// Synchronize this handle's write base to the latest head of the named branch.
730    pub async fn sync_branch(&self, branch: &str) -> Result<()> {
731        self.ensure_schema_state_valid().await?;
732        let branch = normalize_branch_name(branch)?;
733        let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
734        *self.coordinator.write().await = next;
735        self.runtime_cache.invalidate_all().await;
736        Ok(())
737    }
738
739    /// Re-read the handle-local coordinator state from storage AND run
740    /// in-process recovery. Closes the Phase B → Phase C residual (e.g.
741    /// `MutationStaging::finalize` crash mid-publish in a long-running
742    /// server) without restart.
743    ///
744    /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
745    /// recovery sequence, in the same order, with one restriction: the
746    /// manifest-drift sweep runs in `RollForwardOnly` mode (rollback /
747    /// abort cases defer to the next ReadWrite open because
748    /// `Dataset::restore` is unsafe under concurrency). Each step:
749    ///
750    /// 1. `coordinator.refresh()` — re-read manifest.
751    /// 2. `recover_schema_state_files` — complete an in-flight
752    ///    schema_apply's staging→final rename if a SchemaApply sidecar
753    ///    is on disk; idempotent + early-returns when no staging files
754    ///    exist. Required BEFORE manifest-drift recovery so a
755    ///    SchemaApply roll-forward doesn't publish the manifest while
756    ///    the staging files remain unrenamed (which would corrupt the
757    ///    graph: data on new schema, catalog on old).
758    /// 3. `recover_manifest_drift(... RollForwardOnly)` — close the
759    ///    finalize→publisher residual via roll-forward; defer rollback
760    ///    work to next ReadWrite open.
761    /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
762    ///
763    /// Steady state cost: one `list_dir` of `__recovery/` (typically
764    /// returns empty → early return for both passes). No additional
765    /// Lance reads.
766    ///
767    /// Engine-internal callers that already hold an in-flight sidecar
768    /// (e.g. `schema_apply` mid-write) MUST use
769    /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
770    /// avoid the recovery sweep racing their own sidecar.
771    pub async fn refresh(&self) -> Result<()> {
772        // Scope the coord write guard to the recovery section only.
773        // `reload_schema_if_source_changed` (below) acquires
774        // `self.coordinator.read().await` when the on-disk schema source
775        // has drifted from the cached `schema_source`. Tokio's RwLock is
776        // not reentrant, so holding the write across that call deadlocks.
777        // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`.
778        {
779            let mut coord = self.coordinator.write().await;
780            coord.refresh().await?;
781            let schema_state_recovery = recover_schema_state_files(
782                &self.root_uri,
783                Arc::clone(&self.storage),
784                &coord.snapshot(),
785            )
786            .await?;
787            crate::db::manifest::recover_manifest_drift(
788                &self.root_uri,
789                Arc::clone(&self.storage),
790                &mut *coord,
791                crate::db::manifest::RecoveryMode::RollForwardOnly,
792                schema_state_recovery,
793            )
794            .await?;
795        } // ← write guard released before reload's read acquisition
796        self.reload_schema_if_source_changed().await?;
797        self.runtime_cache.invalidate_all().await;
798        Ok(())
799    }
800
801    async fn reload_schema_if_source_changed(&self) -> Result<()> {
802        let schema_path = schema_source_uri(&self.root_uri);
803        let schema_source = self.storage.read_text(&schema_path).await?;
804        if schema_source == *self.schema_source.load_full() {
805            return Ok(());
806        }
807        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
808        let branches = self.coordinator.read().await.branch_list().await?;
809        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
810            &self.root_uri,
811            Arc::clone(&self.storage),
812            &branches,
813            &current_source_ir,
814        )
815        .await?;
816        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
817        fixup_blob_schemas(&mut catalog);
818        self.store_schema_source(schema_source);
819        self.store_catalog(catalog);
820        Ok(())
821    }
822
823    /// Refresh coordinator state and invalidate the runtime cache WITHOUT
824    /// running the recovery sweep. Engine-internal callers that hold an
825    /// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
826    /// internal lease-check refresh) need this variant: running recovery
827    /// here would observe the caller's own sidecar, classify it as
828    /// RolledPastExpected, and roll it forward — racing the caller's
829    /// own publish path.
830    pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
831        self.coordinator.write().await.refresh().await?;
832        self.runtime_cache.invalidate_all().await;
833        Ok(())
834    }
835
836    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
837        self.ensure_schema_state_valid().await?;
838        self.coordinator
839            .read()
840            .await
841            .resolve_snapshot_id(branch)
842            .await
843    }
844
845    pub(crate) async fn resolved_target(
846        &self,
847        target: impl Into<ReadTarget>,
848    ) -> Result<ResolvedTarget> {
849        self.ensure_schema_state_valid().await?;
850        self.coordinator
851            .read()
852            .await
853            .resolve_target(&target.into())
854            .await
855    }
856
857    // ─── Change detection ────────────────────────────────────────────────
858
859    pub async fn diff_between(
860        &self,
861        from: impl Into<ReadTarget>,
862        to: impl Into<ReadTarget>,
863        filter: &crate::changes::ChangeFilter,
864    ) -> Result<crate::changes::ChangeSet> {
865        let from_resolved = self.resolved_target(from).await?;
866        let to_resolved = self.resolved_target(to).await?;
867        crate::changes::diff_snapshots(
868            self.uri(),
869            &from_resolved.snapshot,
870            &to_resolved.snapshot,
871            filter,
872            to_resolved.branch.clone().or(from_resolved.branch.clone()),
873        )
874        .await
875    }
876
877    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
878    /// and creates branch-aware snapshots. Supports cross-branch comparison.
879    pub async fn diff_commits(
880        &self,
881        from_commit_id: &str,
882        to_commit_id: &str,
883        filter: &crate::changes::ChangeFilter,
884    ) -> Result<crate::changes::ChangeSet> {
885        let coord = self.coordinator.read().await;
886        let from_commit = coord
887            .resolve_commit(&SnapshotId::new(from_commit_id))
888            .await?;
889        let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
890        let from_snap = coord
891            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
892                from_commit.graph_commit_id.clone(),
893            )))
894            .await?;
895        let to_snap = coord
896            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
897                to_commit.graph_commit_id.clone(),
898            )))
899            .await?;
900        drop(coord);
901        crate::changes::diff_snapshots(
902            self.uri(),
903            &from_snap.snapshot,
904            &to_snap.snapshot,
905            filter,
906            to_snap.branch.clone().or(from_snap.branch.clone()),
907        )
908        .await
909    }
910
911    pub async fn entity_at_target(
912        &self,
913        target: impl Into<ReadTarget>,
914        table_key: &str,
915        id: &str,
916    ) -> Result<Option<serde_json::Value>> {
917        export::entity_at_target(self, target, table_key, id).await
918    }
919
920    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
921    pub async fn entity_at(
922        &self,
923        table_key: &str,
924        id: &str,
925        version: u64,
926    ) -> Result<Option<serde_json::Value>> {
927        export::entity_at(self, table_key, id, version).await
928    }
929
930    /// Create a Snapshot at any historical manifest version.
931    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
932        self.ensure_schema_state_valid().await?;
933        self.coordinator
934            .read()
935            .await
936            .snapshot_at_version(version)
937            .await
938    }
939
940    pub async fn export_jsonl(
941        &self,
942        branch: &str,
943        type_names: &[String],
944        table_keys: &[String],
945    ) -> Result<String> {
946        export::export_jsonl(self, branch, type_names, table_keys).await
947    }
948
949    pub async fn export_jsonl_to_writer<W: Write>(
950        &self,
951        branch: &str,
952        type_names: &[String],
953        table_keys: &[String],
954        writer: &mut W,
955    ) -> Result<()> {
956        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
957    }
958
959    // ─── Graph index ──────────────────────────────────────────────────────
960
961    /// Get or build the graph index for the current snapshot.
962    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
963        table_ops::graph_index(self).await
964    }
965
966    pub(crate) async fn graph_index_for_resolved(
967        &self,
968        resolved: &ResolvedTarget,
969    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
970        table_ops::graph_index_for_resolved(self, resolved).await
971    }
972
973    /// Ensure BTree scalar indices exist on key columns.
974    /// Idempotent — Lance skips if index already exists.
975    ///
976    /// Opens sub-tables at their latest version (not snapshot-pinned) because
977    /// indices must be created on the current head. Any version drift from the
978    /// snapshot is expected and logged. The resulting versions are committed
979    /// back to the manifest.
980    ///
981    /// On named branches, indexing preserves lazy branching:
982    /// unbranched subtables keep inheriting `main`, while subtables inherited
983    /// from an ancestor branch are first forked into the active branch before
984    /// their index metadata is updated.
985    pub async fn ensure_indices(&self) -> Result<()> {
986        table_ops::ensure_indices(self).await
987    }
988
989    pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
990        table_ops::ensure_indices_on(self, branch).await
991    }
992
993    #[cfg(feature = "failpoints")]
994    #[doc(hidden)]
995    pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
996        &mut self,
997        branch: &str,
998        table_key: &str,
999        table_branch: Option<&str>,
1000    ) -> Result<u64> {
1001        table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
1002            self,
1003            branch,
1004            table_key,
1005            table_branch,
1006        )
1007        .await
1008    }
1009
1010    /// Compact small Lance fragments into fewer larger ones across every
1011    /// node + edge table on `main`. See [`optimize`] for details.
1012    pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
1013        optimize::optimize_all_tables(self).await
1014    }
1015
1016    /// Classify and explicitly repair uncovered manifest/head drift. See
1017    /// [`repair`] for the distinction between safe maintenance drift and
1018    /// suspicious/unverifiable drift.
1019    pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
1020        repair::repair_all_tables(self, options).await
1021    }
1022
1023    /// Remove Lance manifests (and the fragments they uniquely own) per the
1024    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
1025    /// history. See [`optimize`] for details.
1026    pub async fn cleanup(
1027        &mut self,
1028        options: optimize::CleanupPolicyOptions,
1029    ) -> Result<Vec<optimize::TableCleanupStats>> {
1030        optimize::cleanup_all_tables(self, options).await
1031    }
1032
1033    /// Read a blob from a node by its string ID and property name.
1034    ///
1035    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
1036    /// and metadata accessors (`size()`, `kind()`, `uri()`).
1037    ///
1038    /// ```ignore
1039    /// let blob = db.read_blob("Document", "readme", "content").await?;
1040    /// let bytes = blob.read().await?;
1041    /// ```
1042    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1043        self.ensure_schema_state_valid().await?;
1044        let catalog = self.catalog();
1045        let node_type = catalog
1046            .node_types
1047            .get(type_name)
1048            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1049        if !node_type.blob_properties.contains(property) {
1050            return Err(OmniError::manifest(format!(
1051                "property '{}' on type '{}' is not a Blob",
1052                property, type_name
1053            )));
1054        }
1055
1056        let snapshot = self.snapshot().await;
1057        let table_key = format!("node:{}", type_name);
1058        let ds = snapshot.open(&table_key).await?;
1059
1060        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1061        let row_id = self
1062            .table_store
1063            .first_row_id_for_filter(&ds, &filter_sql)
1064            .await?
1065            .ok_or_else(|| {
1066                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1067            })?;
1068
1069        // Use take_blobs to get the BlobFile handle
1070        let ds = Arc::new(ds);
1071        let mut blobs = ds
1072            .take_blobs(&[row_id], property)
1073            .await
1074            .map_err(|e| OmniError::Lance(e.to_string()))?;
1075
1076        blobs.pop().ok_or_else(|| {
1077            OmniError::manifest(format!(
1078                "blob '{}' on {} '{}' returned no data",
1079                property, type_name, id
1080            ))
1081        })
1082    }
1083
1084    pub(crate) async fn active_branch(&self) -> Option<String> {
1085        self.coordinator
1086            .read()
1087            .await
1088            .current_branch()
1089            .map(str::to_string)
1090    }
1091
1092    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1093        let descendants = self
1094            .coordinator
1095            .read()
1096            .await
1097            .branch_descendants(branch)
1098            .await?;
1099        if let Some(descendant) = descendants.first() {
1100            return Err(OmniError::manifest_conflict(format!(
1101                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1102                branch, descendant
1103            )));
1104        }
1105
1106        for other_branch in branches
1107            .iter()
1108            .filter(|candidate| candidate.as_str() != branch)
1109        {
1110            let snapshot = self
1111                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1112                .await?;
1113            if snapshot
1114                .entries()
1115                .any(|entry| entry.table_branch.as_deref() == Some(branch))
1116            {
1117                return Err(OmniError::manifest_conflict(format!(
1118                    "cannot delete branch '{}' because branch '{}' still depends on it",
1119                    branch, other_branch
1120                )));
1121            }
1122        }
1123
1124        Ok(())
1125    }
1126
1127    /// Best-effort reclaim of the per-table Lance forks a just-deleted branch
1128    /// owned. Runs AFTER the manifest authority flip, so the branch is already
1129    /// gone and these forks are unreachable orphans. A failure here (transient
1130    /// object-store error, the `branch_delete.before_table_cleanup` failpoint)
1131    /// is logged and swallowed: the `cleanup` reconciler is the guaranteed
1132    /// backstop that converges any leftover orphan. Uses `force_delete_branch`
1133    /// so a partially-reclaimed retry is idempotent.
1134    async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1135        let mut seen_paths = HashSet::new();
1136        let mut cleanup_targets = owned_tables
1137            .iter()
1138            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1139            .cloned()
1140            .collect::<Vec<_>>();
1141        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1142
1143        for (table_key, table_path) in cleanup_targets {
1144            let dataset_uri = self.table_store.dataset_uri(&table_path);
1145            let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
1146            {
1147                Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await,
1148                Err(injected) => Err(injected),
1149            };
1150            if let Err(err) = outcome {
1151                tracing::warn!(
1152                    target: "omnigraph::branch_delete::cleanup",
1153                    branch = %branch,
1154                    table = %table_key,
1155                    error = %err,
1156                    "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1157                );
1158            }
1159        }
1160    }
1161
1162    async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1163        let active = self
1164            .coordinator
1165            .read()
1166            .await
1167            .current_branch()
1168            .map(str::to_string);
1169        if active.as_deref() == Some(branch) {
1170            return Err(OmniError::manifest_conflict(format!(
1171                "cannot delete currently active branch '{}'",
1172                branch
1173            )));
1174        }
1175
1176        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1177        let owned_tables = branch_snapshot
1178            .entries()
1179            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1180            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1181            .collect::<Vec<_>>();
1182
1183        // Authority flip (+ best-effort commit-graph reclaim) — must succeed.
1184        self.coordinator.write().await.branch_delete(branch).await?;
1185        // Best-effort per-table fork reclaim; cleanup reconciles any leftover.
1186        self.cleanup_deleted_branch_tables(branch, &owned_tables)
1187            .await;
1188        Ok(())
1189    }
1190
1191    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1192        normalize_branch_name(branch)
1193    }
1194
1195    pub(crate) async fn head_commit_id_for_branch(
1196        &self,
1197        branch: Option<&str>,
1198    ) -> Result<Option<String>> {
1199        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1200        coordinator.ensure_commit_graph_initialized().await?;
1201        coordinator
1202            .head_commit_id()
1203            .await
1204            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1205    }
1206
1207    pub async fn branch_create(&self, name: &str) -> Result<()> {
1208        self.branch_create_as(name, None).await
1209    }
1210
1211    /// Create a branch from the coordinator's currently-open snapshot,
1212    /// with an explicit actor for engine-layer policy enforcement
1213    /// (MR-722 fan-out). Scope is `TargetBranch(name)` — symmetric with
1214    /// `branch_delete_as`: the branch being acted upon is the target.
1215    /// Cedar rules using `target_branch_scope: protected` therefore see
1216    /// the new-branch name and can deny e.g. creating any branch named
1217    /// `main` from a non-privileged actor.
1218    pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1219        self.enforce(
1220            omnigraph_policy::PolicyAction::BranchCreate,
1221            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1222            actor,
1223        )?;
1224        self.ensure_schema_state_valid().await?;
1225        self.ensure_schema_apply_idle("branch_create").await?;
1226        ensure_public_branch_ref(name, "branch_create")?;
1227        self.coordinator.write().await.branch_create(name).await
1228    }
1229
1230    pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1231        self.branch_create_from_as(from, name, None).await
1232    }
1233
1234    /// Create a branch from a specific source branch with an explicit
1235    /// actor for engine-layer policy enforcement (MR-722 fan-out).
1236    ///
1237    /// Scope is `BranchTransition { source, target }` — matches the
1238    /// HTTP-layer convention at `server_branch_create`
1239    /// (branch=Some(from), target_branch=Some(name)), so engine and
1240    /// HTTP fire the same Cedar decision. Pinned-snapshot sources
1241    /// (which aren't a branch ref) materialize as the sentinel
1242    /// `<snapshot>` for the policy check; Cedar rules using
1243    /// `branch_scope: any` still match, rules pinning a specific
1244    /// source branch correctly do not.
1245    pub async fn branch_create_from_as(
1246        &self,
1247        from: impl Into<ReadTarget>,
1248        name: &str,
1249        actor: Option<&str>,
1250    ) -> Result<()> {
1251        let target = from.into();
1252        let source_branch = match &target {
1253            ReadTarget::Branch(b) => b.clone(),
1254            _ => "<snapshot>".to_string(),
1255        };
1256        self.enforce(
1257            omnigraph_policy::PolicyAction::BranchCreate,
1258            &omnigraph_policy::ResourceScope::BranchTransition {
1259                source: source_branch,
1260                target: name.to_string(),
1261            },
1262            actor,
1263        )?;
1264        self.ensure_schema_apply_idle("branch_create_from").await?;
1265        self.branch_create_from_impl(target, name, false).await
1266    }
1267
1268    async fn branch_create_from_impl(
1269        &self,
1270        from: impl Into<ReadTarget>,
1271        name: &str,
1272        allow_internal_refs: bool,
1273    ) -> Result<()> {
1274        let target = from.into();
1275        let ReadTarget::Branch(branch_name) = target else {
1276            return Err(OmniError::manifest(
1277                "branch creation from pinned snapshots is not supported yet".to_string(),
1278            ));
1279        };
1280        if !allow_internal_refs {
1281            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1282            ensure_public_branch_ref(name, "branch_create_from")?;
1283        }
1284        let branch = normalize_branch_name(&branch_name)?;
1285        // Operate on a freshly-opened source coordinator that's owned locally
1286        // — never touch `self.coordinator`. The pre-fix implementation used
1287        // `swap_coordinator_for_branch` + operate + `restore_coordinator` as
1288        // three separate `coordinator.write().await` acquisitions; under
1289        // `&self` concurrency, a second `branch_create_from` could swap
1290        // self.coordinator between this caller's swap and operate steps,
1291        // making the operate run against the wrong source branch and
1292        // forking off the wrong HEAD. Pinned by
1293        // `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`
1294        // in `crates/omnigraph-server/tests/server.rs`.
1295        //
1296        // `branch_create` mutates only the local coord's commit-graph cache;
1297        // the manifest write is durable on disk regardless of which
1298        // coord-handle issued it. Discarding `source_coord` after the call
1299        // is the right shape — the new branch is reachable from any
1300        // subsequent open of any coord.
1301        let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1302        source_coord.branch_create(name).await
1303    }
1304
1305    pub async fn branch_list(&self) -> Result<Vec<String>> {
1306        self.ensure_schema_state_valid().await?;
1307        self.coordinator.read().await.branch_list().await
1308    }
1309
1310    pub async fn branch_delete(&self, name: &str) -> Result<()> {
1311        self.branch_delete_as(name, None).await
1312    }
1313
1314    /// Delete a branch with an explicit actor for engine-layer policy
1315    /// enforcement (MR-722 fan-out). Scope is `TargetBranch(name)` —
1316    /// matches the HTTP-layer convention at `server_branch_delete`
1317    /// (branch=None, target_branch=Some(name)). Cedar rules using
1318    /// `target_branch_scope: protected` therefore correctly gate
1319    /// deletion of protected branches (e.g. deny BranchDelete against
1320    /// `main`).
1321    pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1322        self.enforce(
1323            omnigraph_policy::PolicyAction::BranchDelete,
1324            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1325            actor,
1326        )?;
1327        self.ensure_schema_state_valid().await?;
1328        self.ensure_schema_apply_idle("branch_delete").await?;
1329        ensure_public_branch_ref(name, "branch_delete")?;
1330        self.refresh().await?;
1331        let branch = normalize_branch_name(name)?
1332            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1333        let branches = self.coordinator.read().await.branch_list().await?;
1334        if !branches.iter().any(|candidate| candidate == &branch) {
1335            return Err(OmniError::manifest_not_found(format!(
1336                "branch '{}' not found",
1337                branch
1338            )));
1339        }
1340
1341        self.ensure_branch_delete_safe(&branch, &branches).await?;
1342        self.delete_branch_storage_only(&branch).await
1343    }
1344
1345    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1346        self.ensure_schema_state_valid().await?;
1347        self.coordinator
1348            .read()
1349            .await
1350            .resolve_commit(&SnapshotId::new(commit_id))
1351            .await
1352    }
1353
1354    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1355        self.ensure_schema_state_valid().await?;
1356        let branch = match branch {
1357            Some(branch) => normalize_branch_name(branch)?,
1358            None => None,
1359        };
1360        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1361        coordinator.list_commits().await
1362    }
1363
1364    /// Open a sub-table for mutation with version-drift guard.
1365    ///
1366    /// Checks that the dataset's current version matches the snapshot-pinned
1367    /// version. If another writer has advanced the version, returns an error
1368    /// prompting the caller to refresh and retry (optimistic concurrency).
1369    pub(crate) async fn open_for_mutation(
1370        &self,
1371        table_key: &str,
1372        op_kind: crate::db::MutationOpKind,
1373    ) -> Result<(Dataset, String, Option<String>)> {
1374        table_ops::open_for_mutation(self, table_key, op_kind).await
1375    }
1376
1377    pub(crate) async fn open_for_mutation_on_branch(
1378        &self,
1379        branch: Option<&str>,
1380        table_key: &str,
1381        op_kind: crate::db::MutationOpKind,
1382    ) -> Result<(Dataset, String, Option<String>)> {
1383        table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1384    }
1385
1386    pub(crate) async fn fork_dataset_from_entry_state(
1387        &self,
1388        table_key: &str,
1389        full_path: &str,
1390        source_branch: Option<&str>,
1391        source_version: u64,
1392        active_branch: &str,
1393    ) -> Result<Dataset> {
1394        table_ops::fork_dataset_from_entry_state(
1395            self,
1396            table_key,
1397            full_path,
1398            source_branch,
1399            source_version,
1400            active_branch,
1401        )
1402        .await
1403    }
1404
1405    pub(crate) async fn reopen_for_mutation(
1406        &self,
1407        table_key: &str,
1408        full_path: &str,
1409        table_branch: Option<&str>,
1410        expected_version: u64,
1411        op_kind: crate::db::MutationOpKind,
1412    ) -> Result<Dataset> {
1413        table_ops::reopen_for_mutation(
1414            self,
1415            table_key,
1416            full_path,
1417            table_branch,
1418            expected_version,
1419            op_kind,
1420        )
1421        .await
1422    }
1423
1424    pub(crate) async fn open_dataset_at_state(
1425        &self,
1426        table_path: &str,
1427        table_branch: Option<&str>,
1428        table_version: u64,
1429    ) -> Result<Dataset> {
1430        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1431    }
1432
1433    pub(crate) async fn build_indices_on_dataset(
1434        &self,
1435        table_key: &str,
1436        ds: &mut Dataset,
1437    ) -> Result<()> {
1438        table_ops::build_indices_on_dataset(self, table_key, ds).await
1439    }
1440
1441    pub(crate) async fn build_indices_on_dataset_for_catalog(
1442        &self,
1443        catalog: &Catalog,
1444        table_key: &str,
1445        ds: &mut Dataset,
1446    ) -> Result<()> {
1447        table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1448    }
1449
1450    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
1451    // uses `commit_updates_on_branch_with_expected` exclusively.
1452    #[cfg(test)]
1453    pub(crate) async fn commit_updates(
1454        &mut self,
1455        updates: &[crate::db::SubTableUpdate],
1456    ) -> Result<u64> {
1457        table_ops::commit_updates(self, updates).await
1458    }
1459
1460    pub(crate) async fn commit_manifest_updates(
1461        &self,
1462        updates: &[crate::db::SubTableUpdate],
1463    ) -> Result<u64> {
1464        table_ops::commit_manifest_updates(self, updates).await
1465    }
1466
1467    pub(crate) async fn record_merge_commit(
1468        &self,
1469        manifest_version: u64,
1470        parent_commit_id: &str,
1471        merged_parent_commit_id: &str,
1472        actor_id: Option<&str>,
1473    ) -> Result<String> {
1474        table_ops::record_merge_commit(
1475            self,
1476            manifest_version,
1477            parent_commit_id,
1478            merged_parent_commit_id,
1479            actor_id,
1480        )
1481        .await
1482    }
1483
1484    pub(crate) async fn commit_updates_on_branch_with_expected(
1485        &self,
1486        branch: Option<&str>,
1487        updates: &[crate::db::SubTableUpdate],
1488        expected_table_versions: &std::collections::HashMap<String, u64>,
1489        actor_id: Option<&str>,
1490    ) -> Result<u64> {
1491        table_ops::commit_updates_on_branch_with_expected(
1492            self,
1493            branch,
1494            updates,
1495            expected_table_versions,
1496            actor_id,
1497        )
1498        .await
1499    }
1500
1501    pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1502        table_ops::ensure_commit_graph_initialized(self).await
1503    }
1504
1505    /// Invalidate the cached graph index. Called after edge mutations.
1506    pub(crate) async fn invalidate_graph_index(&self) {
1507        table_ops::invalidate_graph_index(self).await
1508    }
1509}
1510
1511pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1512    let branch = branch.trim();
1513    if branch.is_empty() {
1514        return Err(OmniError::manifest(
1515            "branch name cannot be empty".to_string(),
1516        ));
1517    }
1518    if branch == "main" {
1519        return Ok(None);
1520    }
1521    Ok(Some(branch.to_string()))
1522}
1523
1524pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1525    if is_internal_system_branch(branch) {
1526        return Err(OmniError::manifest(format!(
1527            "{} does not allow internal system ref '{}'",
1528            operation, branch
1529        )));
1530    }
1531    Ok(())
1532}
1533
1534fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1535    if batches.is_empty() {
1536        return Ok(RecordBatch::new_empty(schema));
1537    }
1538    if batches.len() == 1 {
1539        return Ok(batches.into_iter().next().unwrap());
1540    }
1541    let batch_schema = batches[0].schema();
1542    arrow_select::concat::concat_batches(&batch_schema, &batches)
1543        .map_err(|e| OmniError::Lance(e.to_string()))
1544}
1545
1546fn blob_properties_for_table_key<'a>(
1547    catalog: &'a Catalog,
1548    table_key: &str,
1549) -> Result<&'a std::collections::HashSet<String>> {
1550    if let Some(type_name) = table_key.strip_prefix("node:") {
1551        return catalog
1552            .node_types
1553            .get(type_name)
1554            .map(|node_type| &node_type.blob_properties)
1555            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1556    }
1557    if let Some(type_name) = table_key.strip_prefix("edge:") {
1558        return catalog
1559            .edge_types
1560            .get(type_name)
1561            .map(|edge_type| &edge_type.blob_properties)
1562            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1563    }
1564    Err(OmniError::manifest(format!(
1565        "invalid table key '{}'",
1566        table_key
1567    )))
1568}
1569
1570fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1571    if descriptions.is_null(row) {
1572        return Ok(true);
1573    }
1574
1575    let kind = descriptions
1576        .column_by_name("kind")
1577        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1578        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1579        .or_else(|| {
1580            descriptions
1581                .column_by_name("kind")
1582                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1583                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1584        });
1585    let position = descriptions
1586        .column_by_name("position")
1587        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1588        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1589    let size = descriptions
1590        .column_by_name("size")
1591        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1592        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1593    let blob_uri = descriptions
1594        .column_by_name("blob_uri")
1595        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1596        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1597
1598    let Some(kind) = kind else {
1599        return Ok(true);
1600    };
1601    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1602    if kind != BlobKind::Inline {
1603        return Ok(false);
1604    }
1605
1606    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1607}
1608
1609/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1610///
1611/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1612/// `DataType::LargeBinary` as a placeholder. This function replaces those
1613/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1614fn fixup_blob_schemas(catalog: &mut Catalog) {
1615    for node_type in catalog.node_types.values_mut() {
1616        if node_type.blob_properties.is_empty() {
1617            continue;
1618        }
1619        let fields: Vec<Field> = node_type
1620            .arrow_schema
1621            .fields()
1622            .iter()
1623            .map(|f| {
1624                if node_type.blob_properties.contains(f.name()) {
1625                    blob_field(f.name(), f.is_nullable())
1626                } else {
1627                    f.as_ref().clone()
1628                }
1629            })
1630            .collect();
1631        node_type.arrow_schema = Arc::new(Schema::new(fields));
1632    }
1633    for edge_type in catalog.edge_types.values_mut() {
1634        if edge_type.blob_properties.is_empty() {
1635            continue;
1636        }
1637        let fields: Vec<Field> = edge_type
1638            .arrow_schema
1639            .fields()
1640            .iter()
1641            .map(|f| {
1642                if edge_type.blob_properties.contains(f.name()) {
1643                    blob_field(f.name(), f.is_nullable())
1644                } else {
1645                    f.as_ref().clone()
1646                }
1647            })
1648            .collect();
1649        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1650    }
1651}
1652
1653fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1654    let schema_ast = parse_schema(schema_source)?;
1655    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1656}
1657
1658/// I/O phase of `Omnigraph::init_with_storage`. Split out so the caller
1659/// can pattern-match on the result and run cleanup on error before
1660/// returning the original error.
1661///
1662/// Failpoints fire at the phase boundaries:
1663/// * `init.after_schema_pg_written` — `_schema.pg` is on disk. In strict mode
1664///   this fires in the caller immediately after the atomic ownership claim; in
1665///   force mode it fires here after the explicit overwrite.
1666/// * `init.after_schema_contract_written` — `_schema.pg` + `_schema.ir.json`
1667///   + `__schema_state.json` are on disk.
1668/// * `init.after_coordinator_init` — all schema files plus Lance per-type
1669///   datasets and `__manifest/` are on disk. (The cleanup wrapper can only
1670///   remove the schema files; Lance directories need `delete_prefix` —
1671///   deferred along with `DELETE /graphs/{id}`.)
1672async fn init_storage_phase(
1673    root: &str,
1674    schema_source: &str,
1675    schema_ir: &SchemaIR,
1676    catalog: &Catalog,
1677    storage: &Arc<dyn StorageAdapter>,
1678    write_schema_pg: bool,
1679) -> Result<GraphCoordinator> {
1680    if write_schema_pg {
1681        let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1682        storage.write_text(&schema_path, schema_source).await?;
1683        crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1684    }
1685
1686    write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1687    crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1688
1689    let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1690    crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1691
1692    Ok(coordinator)
1693}
1694
1695/// Best-effort cleanup of init-phase artifacts. Called from
1696/// `init_with_storage` on any error returned by `init_storage_phase`.
1697///
1698/// Removes the three schema files: `_schema.pg`, `_schema.ir.json`,
1699/// `__schema_state.json`. Lance datasets and `__manifest/` are not
1700/// touched here — recursive directory deletion requires a
1701/// `StorageAdapter::delete_prefix` primitive that's deferred along
1702/// with `DELETE /graphs/{id}` (MR-668 PR 2b).
1703///
1704/// Failures to delete are logged via `tracing::warn` and do not mask
1705/// the original init error.
1706async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1707    for uri in [
1708        schema_source_uri(root),
1709        schema_ir_uri(root),
1710        schema_state_uri(root),
1711    ] {
1712        if let Err(err) = storage.delete(&uri).await {
1713            tracing::warn!(
1714                target: "omnigraph::init::cleanup",
1715                uri = %uri,
1716                error = %err,
1717                "init failed; best-effort cleanup could not delete artifact",
1718            );
1719        }
1720    }
1721}
1722
1723fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1724    match type_kind {
1725        SchemaTypeKind::Node => format!("node:{}", name),
1726        SchemaTypeKind::Edge => format!("edge:{}", name),
1727        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1728    }
1729}
1730
1731fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1732    if let Some(type_name) = table_key.strip_prefix("node:") {
1733        let node_type: &NodeType = catalog
1734            .node_types
1735            .get(type_name)
1736            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1737        return Ok(node_type.arrow_schema.clone());
1738    }
1739    if let Some(type_name) = table_key.strip_prefix("edge:") {
1740        let edge_type: &EdgeType = catalog
1741            .edge_types
1742            .get(type_name)
1743            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1744        return Ok(edge_type.arrow_schema.clone());
1745    }
1746    Err(OmniError::manifest(format!(
1747        "invalid table key '{}'",
1748        table_key
1749    )))
1750}
1751
1752fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1753    let mut obj = serde_json::Map::new();
1754    for (i, field) in batch.schema().fields().iter().enumerate() {
1755        obj.insert(
1756            field.name().clone(),
1757            json_value_from_array(batch.column(i).as_ref(), row)?,
1758        );
1759    }
1760    Ok(serde_json::Value::Object(obj))
1761}
1762
1763fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1764    if array.is_null(row) {
1765        return Ok(serde_json::Value::Null);
1766    }
1767
1768    match array.data_type() {
1769        DataType::Utf8 => Ok(serde_json::Value::String(
1770            array
1771                .as_any()
1772                .downcast_ref::<StringArray>()
1773                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1774                .value(row)
1775                .to_string(),
1776        )),
1777        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1778            array
1779                .as_any()
1780                .downcast_ref::<LargeStringArray>()
1781                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1782                .value(row)
1783                .to_string(),
1784        )),
1785        DataType::Boolean => Ok(serde_json::Value::Bool(
1786            array
1787                .as_any()
1788                .downcast_ref::<BooleanArray>()
1789                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1790                .value(row),
1791        )),
1792        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1793            array
1794                .as_any()
1795                .downcast_ref::<Int32Array>()
1796                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1797                .value(row),
1798        ))),
1799        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1800            array
1801                .as_any()
1802                .downcast_ref::<Int64Array>()
1803                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1804                .value(row),
1805        ))),
1806        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1807            array
1808                .as_any()
1809                .downcast_ref::<UInt32Array>()
1810                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1811                .value(row),
1812        ))),
1813        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1814            array
1815                .as_any()
1816                .downcast_ref::<UInt64Array>()
1817                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1818                .value(row),
1819        ))),
1820        DataType::Float32 => {
1821            let value = array
1822                .as_any()
1823                .downcast_ref::<Float32Array>()
1824                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1825                .value(row) as f64;
1826            Ok(serde_json::Value::Number(
1827                serde_json::Number::from_f64(value).ok_or_else(|| {
1828                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1829                })?,
1830            ))
1831        }
1832        DataType::Float64 => {
1833            let value = array
1834                .as_any()
1835                .downcast_ref::<Float64Array>()
1836                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1837                .value(row);
1838            Ok(serde_json::Value::Number(
1839                serde_json::Number::from_f64(value).ok_or_else(|| {
1840                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1841                })?,
1842            ))
1843        }
1844        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1845            array
1846                .as_any()
1847                .downcast_ref::<Date32Array>()
1848                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1849                .value(row),
1850        ))),
1851        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1852            &base64::engine::general_purpose::STANDARD,
1853            array
1854                .as_any()
1855                .downcast_ref::<BinaryArray>()
1856                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1857                .value(row),
1858        ))),
1859        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1860            &base64::engine::general_purpose::STANDARD,
1861            array
1862                .as_any()
1863                .downcast_ref::<LargeBinaryArray>()
1864                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1865                .value(row),
1866        ))),
1867        DataType::List(_) => {
1868            let list = array
1869                .as_any()
1870                .downcast_ref::<ListArray>()
1871                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1872            let values = list.value(row);
1873            let mut out = Vec::with_capacity(values.len());
1874            for idx in 0..values.len() {
1875                out.push(json_value_from_array(values.as_ref(), idx)?);
1876            }
1877            Ok(serde_json::Value::Array(out))
1878        }
1879        DataType::LargeList(_) => {
1880            let list = array
1881                .as_any()
1882                .downcast_ref::<LargeListArray>()
1883                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1884            let values = list.value(row);
1885            let mut out = Vec::with_capacity(values.len());
1886            for idx in 0..values.len() {
1887                out.push(json_value_from_array(values.as_ref(), idx)?);
1888            }
1889            Ok(serde_json::Value::Array(out))
1890        }
1891        DataType::FixedSizeList(_, _) => {
1892            let list = array
1893                .as_any()
1894                .downcast_ref::<FixedSizeListArray>()
1895                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1896            let values = list.value(row);
1897            let mut out = Vec::with_capacity(values.len());
1898            for idx in 0..values.len() {
1899                out.push(json_value_from_array(values.as_ref(), idx)?);
1900            }
1901            Ok(serde_json::Value::Array(out))
1902        }
1903        DataType::Struct(fields) => {
1904            let struct_array = array
1905                .as_any()
1906                .downcast_ref::<StructArray>()
1907                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1908            let mut obj = serde_json::Map::new();
1909            for (field_idx, field) in fields.iter().enumerate() {
1910                obj.insert(
1911                    field.name().clone(),
1912                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1913                );
1914            }
1915            Ok(serde_json::Value::Object(obj))
1916        }
1917        _ => {
1918            let value = arrow_cast::display::array_value_to_string(array, row)
1919                .map_err(|e| OmniError::Lance(e.to_string()))?;
1920            Ok(serde_json::Value::String(value))
1921        }
1922    }
1923}
1924
1925#[cfg(test)]
1926mod tests {
1927    use super::*;
1928    use crate::db::manifest::ManifestCoordinator;
1929    use async_trait::async_trait;
1930    use serde_json::Value;
1931    use std::sync::{Arc, Mutex};
1932
1933    use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1934
1935    const TEST_SCHEMA: &str = r#"
1936node Person {
1937    name: String @key
1938    age: I32?
1939}
1940node Company {
1941    name: String @key
1942}
1943edge Knows: Person -> Person {
1944    since: Date?
1945}
1946edge WorksAt: Person -> Company
1947"#;
1948
1949    #[derive(Debug, Default)]
1950    struct RecordingStorageAdapter {
1951        inner: LocalStorageAdapter,
1952        reads: Mutex<Vec<String>>,
1953        writes: Mutex<Vec<String>>,
1954        exists_checks: Mutex<Vec<String>>,
1955        renames: Mutex<Vec<(String, String)>>,
1956        deletes: Mutex<Vec<String>>,
1957    }
1958
1959    impl RecordingStorageAdapter {
1960        fn reads(&self) -> Vec<String> {
1961            self.reads.lock().unwrap().clone()
1962        }
1963
1964        fn writes(&self) -> Vec<String> {
1965            self.writes.lock().unwrap().clone()
1966        }
1967
1968        fn exists_checks(&self) -> Vec<String> {
1969            self.exists_checks.lock().unwrap().clone()
1970        }
1971    }
1972
1973    #[async_trait]
1974    impl StorageAdapter for RecordingStorageAdapter {
1975        async fn read_text(&self, uri: &str) -> Result<String> {
1976            self.reads.lock().unwrap().push(uri.to_string());
1977            self.inner.read_text(uri).await
1978        }
1979
1980        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1981            self.writes.lock().unwrap().push(uri.to_string());
1982            self.inner.write_text(uri, contents).await
1983        }
1984
1985        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
1986            self.writes.lock().unwrap().push(uri.to_string());
1987            self.inner.write_text_if_absent(uri, contents).await
1988        }
1989
1990        async fn exists(&self, uri: &str) -> Result<bool> {
1991            self.exists_checks.lock().unwrap().push(uri.to_string());
1992            self.inner.exists(uri).await
1993        }
1994
1995        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1996            self.renames
1997                .lock()
1998                .unwrap()
1999                .push((from_uri.to_string(), to_uri.to_string()));
2000            self.inner.rename_text(from_uri, to_uri).await
2001        }
2002
2003        async fn delete(&self, uri: &str) -> Result<()> {
2004            self.deletes.lock().unwrap().push(uri.to_string());
2005            self.inner.delete(uri).await
2006        }
2007
2008        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2009            self.inner.list_dir(dir_uri).await
2010        }
2011    }
2012
2013    #[derive(Debug)]
2014    struct InitRaceStorageAdapter {
2015        inner: LocalStorageAdapter,
2016        root: String,
2017        barrier: Arc<tokio::sync::Barrier>,
2018    }
2019
2020    #[async_trait]
2021    impl StorageAdapter for InitRaceStorageAdapter {
2022        async fn read_text(&self, uri: &str) -> Result<String> {
2023            self.inner.read_text(uri).await
2024        }
2025
2026        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2027            self.inner.write_text(uri, contents).await
2028        }
2029
2030        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2031            self.inner.write_text_if_absent(uri, contents).await
2032        }
2033
2034        async fn exists(&self, uri: &str) -> Result<bool> {
2035            let exists = self.inner.exists(uri).await?;
2036            if uri == schema_state_uri(&self.root) {
2037                self.barrier.wait().await;
2038            }
2039            Ok(exists)
2040        }
2041
2042        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2043            self.inner.rename_text(from_uri, to_uri).await
2044        }
2045
2046        async fn delete(&self, uri: &str) -> Result<()> {
2047            self.inner.delete(uri).await
2048        }
2049
2050        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2051            self.inner.list_dir(dir_uri).await
2052        }
2053    }
2054
2055    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2056    async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2057        let dir = tempfile::tempdir().unwrap();
2058        let uri = dir.path().to_str().unwrap().to_string();
2059        let root = normalize_root_uri(&uri).unwrap();
2060        let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2061            inner: LocalStorageAdapter,
2062            root,
2063            barrier: Arc::new(tokio::sync::Barrier::new(2)),
2064        });
2065
2066        let left = Omnigraph::init_with_storage(
2067            &uri,
2068            TEST_SCHEMA,
2069            Arc::clone(&storage),
2070            InitOptions::default(),
2071        );
2072        let right = Omnigraph::init_with_storage(
2073            &uri,
2074            TEST_SCHEMA,
2075            Arc::clone(&storage),
2076            InitOptions::default(),
2077        );
2078        let (left, right) = tokio::join!(left, right);
2079        let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2080        assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2081
2082        assert!(
2083            dir.path().join("_schema.pg").exists(),
2084            "winning init must leave _schema.pg in place"
2085        );
2086        assert!(
2087            dir.path().join("_schema.ir.json").exists(),
2088            "winning init must leave _schema.ir.json in place"
2089        );
2090        assert!(
2091            dir.path().join("__schema_state.json").exists(),
2092            "winning init must leave __schema_state.json in place"
2093        );
2094    }
2095
2096    #[tokio::test]
2097    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2098        let dir = tempfile::tempdir().unwrap();
2099        let uri = dir.path().to_str().unwrap();
2100        let adapter = Arc::new(RecordingStorageAdapter::default());
2101
2102        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2103            .await
2104            .unwrap();
2105        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2106        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2107        assert!(
2108            adapter
2109                .writes()
2110                .contains(&join_uri(uri, "__schema_state.json"))
2111        );
2112
2113        Omnigraph::open_with_storage(uri, adapter.clone())
2114            .await
2115            .unwrap();
2116        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2117        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2118        assert!(
2119            adapter
2120                .reads()
2121                .contains(&join_uri(uri, "__schema_state.json"))
2122        );
2123        assert!(
2124            adapter
2125                .exists_checks()
2126                .contains(&join_uri(uri, "_schema.ir.json"))
2127        );
2128        assert!(
2129            adapter
2130                .exists_checks()
2131                .contains(&join_uri(uri, "__schema_state.json"))
2132        );
2133        assert!(
2134            adapter
2135                .exists_checks()
2136                .contains(&join_uri(uri, "_graph_commits.lance"))
2137        );
2138    }
2139
2140    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2141        let snapshot = db.snapshot().await;
2142        let ds = snapshot.open(table_key).await.unwrap();
2143        let batches = db.table_store().scan_batches(&ds).await.unwrap();
2144        batches
2145            .into_iter()
2146            .flat_map(|batch| {
2147                (0..batch.num_rows())
2148                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2149                    .collect::<Vec<_>>()
2150            })
2151            .collect()
2152    }
2153
2154    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2155        let (mut ds, full_path, table_branch) = db
2156            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2157            .await
2158            .unwrap();
2159        let schema: Arc<Schema> = Arc::new(ds.schema().into());
2160        let columns: Vec<Arc<dyn Array>> = schema
2161            .fields()
2162            .iter()
2163            .map(|field| match field.name().as_str() {
2164                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2165                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2166                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2167                _ => new_null_array(field.data_type(), 1),
2168            })
2169            .collect();
2170        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2171        let state = db
2172            .table_store()
2173            .append_batch(&full_path, &mut ds, batch)
2174            .await
2175            .unwrap();
2176        db.commit_updates(&[crate::db::SubTableUpdate {
2177            table_key: "node:Person".to_string(),
2178            table_version: state.version,
2179            table_branch,
2180            row_count: state.row_count,
2181            version_metadata: state.version_metadata,
2182        }])
2183        .await
2184        .unwrap();
2185    }
2186
2187    #[tokio::test]
2188    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2189        let dir = tempfile::tempdir().unwrap();
2190        let uri = dir.path().to_str().unwrap();
2191        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2192        seed_person_row(&mut db, "Alice", Some(30)).await;
2193
2194        let desired = TEST_SCHEMA.replace(
2195            "    age: I32?\n}",
2196            "    age: I32?\n    nickname: String?\n}",
2197        );
2198        let result = db.apply_schema(&desired).await.unwrap();
2199        assert!(result.applied);
2200
2201        let reopened = Omnigraph::open(uri).await.unwrap();
2202        let rows = table_rows_json(&reopened, "node:Person").await;
2203        assert_eq!(rows.len(), 1);
2204        assert_eq!(rows[0]["name"], "Alice");
2205        assert_eq!(rows[0]["age"], 30);
2206        assert!(rows[0]["nickname"].is_null());
2207        assert!(
2208            reopened.catalog().node_types["Person"]
2209                .properties
2210                .contains_key("nickname")
2211        );
2212        assert!(dir.path().join("_schema.pg").exists());
2213    }
2214
2215    #[tokio::test]
2216    async fn test_apply_schema_renames_property_and_preserves_values() {
2217        let dir = tempfile::tempdir().unwrap();
2218        let uri = dir.path().to_str().unwrap();
2219        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2220        seed_person_row(&mut db, "Alice", Some(30)).await;
2221
2222        let desired = TEST_SCHEMA.replace(
2223            "    age: I32?\n}",
2224            "    years: I32? @rename_from(\"age\")\n}",
2225        );
2226        db.apply_schema(&desired).await.unwrap();
2227
2228        let reopened = Omnigraph::open(uri).await.unwrap();
2229        let rows = table_rows_json(&reopened, "node:Person").await;
2230        assert_eq!(rows[0]["name"], "Alice");
2231        assert_eq!(rows[0]["years"], 30);
2232        assert!(rows[0].get("age").is_none());
2233    }
2234
2235    #[tokio::test]
2236    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2237        let dir = tempfile::tempdir().unwrap();
2238        let uri = dir.path().to_str().unwrap();
2239        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2240        seed_person_row(&mut db, "Alice", Some(30)).await;
2241        let before_version = db.snapshot().await.version();
2242
2243        let desired = TEST_SCHEMA
2244            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2245            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2246            .replace(
2247                "edge WorksAt: Person -> Company",
2248                "edge WorksAt: Human -> Company",
2249            );
2250        db.apply_schema(&desired).await.unwrap();
2251
2252        let head = db.snapshot().await;
2253        assert!(head.entry("node:Person").is_none());
2254        assert!(head.entry("node:Human").is_some());
2255        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2256            .await
2257            .unwrap();
2258        assert!(historical.entry("node:Person").is_some());
2259        assert!(historical.entry("node:Human").is_none());
2260    }
2261
2262    #[tokio::test]
2263    async fn test_apply_schema_succeeds_after_load() {
2264        // Historical: schema apply used to be blocked by leftover
2265        // `__run__` branches. The Run state machine was removed in
2266        // MR-771, so a fresh graph never creates a `__run__` branch;
2267        // legacy ones are swept by the v2→v3 manifest migration. This
2268        // asserts the invariant a current graph upholds: publish leaves
2269        // no `__run__` branch behind, so schema apply proceeds.
2270        let dir = tempfile::tempdir().unwrap();
2271        let uri = dir.path().to_str().unwrap();
2272        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2273
2274        crate::loader::load_jsonl(
2275            &mut db,
2276            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2277            crate::loader::LoadMode::Overwrite,
2278        )
2279        .await
2280        .unwrap();
2281
2282        let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2283        assert!(
2284            !all_branches.iter().any(|b| b.starts_with("__run__")),
2285            "no __run__ branch should exist after publish, got: {:?}",
2286            all_branches
2287        );
2288
2289        let desired = TEST_SCHEMA.replace(
2290            "    age: I32?\n}",
2291            "    age: I32?\n    nickname: String?\n}",
2292        );
2293        let result = db.apply_schema(&desired).await.unwrap();
2294        assert!(result.applied, "schema apply should have applied");
2295    }
2296
2297    /// Regression (MR-770): a pre-v0.4.0 graph that still carries a stale
2298    /// `__run__*` branch on `__manifest` must not block schema apply. The
2299    /// v2→v3 sweep runs in `Omnigraph::open(ReadWrite)` — before the
2300    /// schema-apply blocking-branch check — so apply succeeds with no
2301    /// intervening publish.
2302    ///
2303    /// Confirmed to fail before the open-time migration landed: the reopened
2304    /// graph still listed `__run__legacy`, and `apply_schema` returned
2305    /// "found non-main branches: __run__legacy".
2306    #[tokio::test]
2307    async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
2308        let dir = tempfile::tempdir().unwrap();
2309        let uri = dir.path().to_str().unwrap();
2310        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2311
2312        // Synthesize a legacy graph: a stale `__run__` branch on `__manifest`
2313        // plus the manifest stamp rewound to v2 (pre-sweep).
2314        db.branch_create("__run__legacy").await.unwrap();
2315        drop(db);
2316        {
2317            let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
2318                .await
2319                .unwrap();
2320            ds.update_schema_metadata([(
2321                "omnigraph:internal_schema_version".to_string(),
2322                Some("2".to_string()),
2323            )])
2324            .await
2325            .unwrap();
2326        }
2327
2328        // Reopen (ReadWrite): the open-time migration must sweep `__run__legacy`
2329        // before any branch-observing code runs.
2330        let db = Omnigraph::open(uri).await.unwrap();
2331        let branches = db.branch_list().await.unwrap();
2332        assert!(
2333            !branches.iter().any(|b| b.starts_with("__run__")),
2334            "open-time migration must sweep legacy __run__ branches; got {branches:?}",
2335        );
2336
2337        // Schema apply must proceed with no intervening publish — the
2338        // blocking-branch check no longer sees `__run__legacy`.
2339        let desired = TEST_SCHEMA.replace(
2340            "    age: I32?\n}",
2341            "    age: I32?\n    nickname: String?\n}",
2342        );
2343        let result = db.apply_schema(&desired).await.unwrap();
2344        assert!(result.applied, "schema apply should have applied");
2345    }
2346
2347    #[tokio::test]
2348    async fn test_apply_schema_adds_index_for_existing_property() {
2349        let dir = tempfile::tempdir().unwrap();
2350        let uri = dir.path().to_str().unwrap();
2351        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2352
2353        let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2354        db.apply_schema(&desired).await.unwrap();
2355
2356        let snapshot = db.snapshot().await;
2357        let ds = snapshot.open("node:Person").await.unwrap();
2358        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2359    }
2360
2361    #[tokio::test]
2362    async fn test_apply_schema_rewrite_preserves_existing_indices() {
2363        let dir = tempfile::tempdir().unwrap();
2364        let uri = dir.path().to_str().unwrap();
2365        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2366        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2367        seed_person_row(&mut db, "Alice", Some(30)).await;
2368
2369        let desired = initial_schema.replace(
2370            "    age: I32?\n}",
2371            "    age: I32?\n    nickname: String?\n}",
2372        );
2373        db.apply_schema(&desired).await.unwrap();
2374
2375        let snapshot = db.snapshot().await;
2376        let ds = snapshot.open("node:Person").await.unwrap();
2377        assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
2378        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2379    }
2380
2381    #[tokio::test]
2382    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2383        let dir = tempfile::tempdir().unwrap();
2384        let uri = dir.path().to_str().unwrap();
2385        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2386        let mut db = db;
2387        db.coordinator
2388            .write()
2389            .await
2390            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2391            .await
2392            .unwrap();
2393
2394        let err = db
2395            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2396            .await
2397            .unwrap_err();
2398        assert!(
2399            err.to_string()
2400                .contains("write is unavailable while schema apply is in progress")
2401        );
2402    }
2403
2404    #[tokio::test]
2405    async fn test_commit_updates_rejects_while_schema_apply_locked() {
2406        let dir = tempfile::tempdir().unwrap();
2407        let uri = dir.path().to_str().unwrap();
2408        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2409        db.coordinator
2410            .write()
2411            .await
2412            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2413            .await
2414            .unwrap();
2415
2416        let err = db.commit_updates(&[]).await.unwrap_err();
2417        assert!(
2418            err.to_string()
2419                .contains("write commit is unavailable while schema apply is in progress")
2420        );
2421    }
2422
2423    #[tokio::test]
2424    async fn test_branch_list_hides_schema_apply_lock_branch() {
2425        let dir = tempfile::tempdir().unwrap();
2426        let uri = dir.path().to_str().unwrap();
2427        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2428        db.coordinator
2429            .write()
2430            .await
2431            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2432            .await
2433            .unwrap();
2434
2435        let branches = db.branch_list().await.unwrap();
2436        assert_eq!(branches, vec!["main".to_string()]);
2437    }
2438}