Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21    DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22    build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
37pub use schema_apply::SchemaApplyOptions;
38
39use super::commit_graph::GraphCommit;
40use super::manifest::{
41    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
42    table_path_for_table_key,
43};
44use super::schema_state::{
45    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
46    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
47    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
48    write_schema_contract, write_schema_contract_staging,
49};
50use super::{
51    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
52    is_schema_apply_lock_branch,
53};
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum MergeOutcome {
57    AlreadyUpToDate,
58    FastForward,
59    Merged,
60}
61
62#[derive(Debug, Clone)]
63pub struct SchemaApplyResult {
64    pub supported: bool,
65    pub applied: bool,
66    pub manifest_version: u64,
67    pub steps: Vec<SchemaMigrationStep>,
68}
69
70/// Top-level handle to an Omnigraph database.
71///
72/// An Omnigraph is a Lance-native graph database with git-style branching.
73/// It stores typed property graphs as per-type Lance datasets coordinated
74/// through a Lance manifest table.
75pub struct Omnigraph {
76    root_uri: String,
77    storage: Arc<dyn StorageAdapter>,
78    /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
79    /// this so engine write APIs can be `&self` (the HTTP server's
80    /// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
81    /// calls without a global write lock). Reads (`snapshot`, `version`,
82    /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
83    /// `list_commits`, …) acquire `.read().await` and parallelize.
84    /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
85    /// `record_*`) acquire `.write().await` and serialize. The atomic
86    /// commit invariant — `commit_manifest_updates` followed by
87    /// `record_graph_commit` must be atomic — is preserved by the
88    /// single `.write()` covering both calls inside
89    /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
90    /// converted from `Mutex` to `RwLock` because the bench showed
91    /// the Mutex was the dominant serializer for disjoint-table
92    /// workloads. Lock acquisition order: always before `runtime_cache`
93    /// (when both are needed in one scope).
94    coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
95    table_store: TableStore,
96    runtime_cache: RuntimeCache,
97    /// Read-heavy on every query, written only by `apply_schema`. ArcSwap
98    /// gives atomic pointer swap with zero-cost reads (`load()` returns a
99    /// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
100    /// don't contend on a lock to read the catalog.
101    catalog: Arc<ArcSwap<Catalog>>,
102    /// Read-heavy on schema introspection paths, written only by
103    /// `apply_schema`. Same ArcSwap rationale as `catalog`.
104    schema_source: Arc<ArcSwap<String>>,
105    /// Per-`(table_key, branch)` writer queues. Reachable from engine
106    /// internals (mutation finalize, schema_apply, branch_merge,
107    /// ensure_indices, delete_where) and from future MR-870 recovery
108    /// reconciler. PR 1b adds the field; callers acquire in commits 4+.
109    write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
110    /// Process-wide mutex held across the swap → operate → restore window
111    /// in `branch_merge_impl`. Two concurrent merges with distinct targets
112    /// would otherwise interleave their three separate
113    /// `coordinator.write().await` acquisitions, leaving each merge's
114    /// inner body running against the other's swapped coord. Pinned by
115    /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
116    /// in `crates/omnigraph-server/tests/server.rs`.
117    ///
118    /// Cost: serializes ALL concurrent branch merges process-wide.
119    /// Acceptable because branch merges are heavy (table rewrites, index
120    /// rebuilds), per-(table, branch) queues inside `commit_all` already
121    /// serialize the data path, and merges are rare relative to /change
122    /// or /ingest. A finer-grained per-target-branch mutex is a follow-up
123    /// if telemetry shows merge concurrency matters.
124    ///
125    /// The deeper fix — refactor `branch_merge_on_current_target` to take
126    /// an explicit target coord parameter so `self.coordinator` is never
127    /// used as scratch space — is the round-1 shape applied to
128    /// `branch_create_from_impl`. Deferred because it requires unwinding
129    /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()`
130    /// call inside the merge body.
131    merge_exclusive: Arc<tokio::sync::Mutex<()>>,
132    /// Optional policy checker for engine-layer enforcement (MR-722).
133    /// `None` = no enforcement; mutating methods are unconditionally
134    /// allowed (this is the embedded/dev default). `Some` = every
135    /// mutating method calls `self.enforce(action, scope, actor)` at
136    /// entry; denial returns `OmniError::Policy`.
137    ///
138    /// Per chassis design (see `omnigraph_policy::PolicyChecker`), the
139    /// trait surface is deliberately coarse — action × scope × actor.
140    /// Per-row / per-type / per-column scope lives at the query layer
141    /// (MR-725), which extends the same trait with a different method.
142    /// Don't be tempted to add per-row enforcement here.
143    ///
144    /// Set via `with_policy(checker)` after construction. Today only
145    /// `apply_schema_as` consults this field (PR #2 proof-of-concept);
146    /// PR #3 fans the `enforce()` call out to the remaining writers.
147    policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
148}
149
150/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
151///
152/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
153/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
154/// inspection — should not trigger writes (they may run with read-only
155/// object-store credentials, and silent open-time mutations are
156/// surprising). They also don't need recovery: reads always resolve
157/// through the manifest pin, which is the consistent snapshot regardless
158/// of any Phase B → Phase C drift on the per-table side.
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub enum OpenMode {
161    /// Run the recovery sweep on open. Default for `Omnigraph::open`.
162    ReadWrite,
163    /// Skip the recovery sweep. Use for read-only consumers via
164    /// [`Omnigraph::open_read_only`].
165    ReadOnly,
166}
167
168/// Options for [`Omnigraph::init_with_options`].
169///
170/// `force` controls the safety preflight that prevents an
171/// accidental re-init from overwriting an existing graph's schema
172/// metadata. Default behavior (`force: false`) fails fast with
173/// [`OmniError::AlreadyInitialized`] if any of `_schema.pg`,
174/// `_schema.ir.json`, or `__schema_state.json` already exists at
175/// the target URI. With `force: true` the preflight is skipped —
176/// existing schema files are overwritten in place. Force does NOT
177/// purge old Lance datasets or `__manifest/`; reclaiming those
178/// still requires deleting the graph directory by hand (or via a
179/// future `DELETE /graphs/{id}`).
180#[derive(Debug, Clone, Copy, Default)]
181pub struct InitOptions {
182    /// Skip the existing-graph preflight. Operators set this when
183    /// they actually mean to overwrite — e.g. `omnigraph init --force`.
184    pub force: bool,
185}
186
187impl Omnigraph {
188    /// Create a new graph at `uri` from schema source.
189    ///
190    /// Strict mode: errors with [`OmniError::AlreadyInitialized`] if
191    /// `uri` already holds any of the three schema artifacts. To
192    /// overwrite an existing graph deliberately, call
193    /// [`Self::init_with_options`] with `InitOptions { force: true }`.
194    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
195        Self::init_with_options(uri, schema_source, InitOptions::default()).await
196    }
197
198    /// Create a new graph at `uri`, with explicit init-time options.
199    ///
200    /// See [`InitOptions`] for the safety contract — by default this
201    /// behaves identically to [`Self::init`].
202    pub async fn init_with_options(
203        uri: &str,
204        schema_source: &str,
205        options: InitOptions,
206    ) -> Result<Self> {
207        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
208    }
209
210    pub(crate) async fn init_with_storage(
211        uri: &str,
212        schema_source: &str,
213        storage: Arc<dyn StorageAdapter>,
214        options: InitOptions,
215    ) -> Result<Self> {
216        let root = normalize_root_uri(uri)?;
217
218        // Preflight: refuse to clobber an existing graph unless the
219        // operator passed `force`. This runs BEFORE any parse or
220        // write so a misdirected `init` against an existing graph
221        // URI cannot reach a code path that overwrites or, on a
222        // later cleanup, deletes the schema files.
223        //
224        // Closes the "init is destructive against existing state"
225        // class: there is no longer a code path where strict-mode
226        // `init` can mutate a populated graph root.
227        if !options.force {
228            for candidate in [
229                schema_source_uri(&root),
230                schema_ir_uri(&root),
231                schema_state_uri(&root),
232            ] {
233                if storage.exists(&candidate).await? {
234                    return Err(OmniError::AlreadyInitialized { uri: root.clone() });
235                }
236            }
237        }
238
239        let schema_ir = read_schema_ir_from_source(schema_source)?;
240        let mut catalog = build_catalog_from_ir(&schema_ir)?;
241        fixup_blob_schemas(&mut catalog);
242
243        // Establish an atomic ownership claim on `_schema.pg` before
244        // writing the remaining init artifacts. A check-then-write preflight
245        // is not enough under concurrent `init` calls: two callers can both
246        // observe an empty root, one can successfully initialize, and the
247        // loser can then fail in Lance `WriteMode::Create`. Only the caller
248        // that atomically created `_schema.pg` may clean up schema artifacts
249        // on later failure.
250        let schema_pg_claimed = if options.force {
251            false
252        } else {
253            let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
254            if !storage
255                .write_text_if_absent(&schema_path, schema_source)
256                .await?
257            {
258                return Err(OmniError::AlreadyInitialized { uri: root.clone() });
259            }
260            if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
261                best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
262                return Err(err);
263            }
264            true
265        };
266
267        // Run the I/O phase. On any error, best-effort-clean schema
268        // artifacts only when this invocation owns them: strict mode owns
269        // them after the atomic `_schema.pg` claim above; force mode owns
270        // destructive overwrite semantics by explicit operator request.
271        //
272        // Coverage gap: Lance per-type datasets and `__manifest/`
273        // directory created by `GraphCoordinator::init` are NOT cleaned
274        // up here — fully recursive directory deletion requires a
275        // `StorageAdapter::delete_prefix` primitive that's deferred
276        // along with `DELETE /graphs/{id}` (PR 2b in the MR-668 plan
277        // is currently deferred). If `init` fails after coordinator
278        // init succeeds, operators may need to remove the graph
279        // directory manually before retrying `init` on the same URI.
280        // Documented in the PR 2a commit message and `init` rustdoc.
281        let coordinator = match init_storage_phase(
282            &root,
283            schema_source,
284            &schema_ir,
285            &catalog,
286            &storage,
287            !schema_pg_claimed,
288        )
289        .await
290        {
291            Ok(coordinator) => coordinator,
292            Err(err) => {
293                if schema_pg_claimed || options.force {
294                    best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
295                }
296                return Err(err);
297            }
298        };
299
300        Ok(Self {
301            root_uri: root.clone(),
302            storage,
303            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
304            table_store: TableStore::new(&root),
305            runtime_cache: RuntimeCache::default(),
306            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
307            schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
308            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
309            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
310            policy: None,
311        })
312    }
313
314    /// Open an existing graph (read-write).
315    ///
316    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
317    /// Runs the open-time recovery sweep before returning — see [`OpenMode`].
318    pub async fn open(uri: &str) -> Result<Self> {
319        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
320    }
321
322    /// Open an existing graph for read-only consumers (NDJSON export,
323    /// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
324    pub async fn open_read_only(uri: &str) -> Result<Self> {
325        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
326    }
327
328    /// `open_with_storage` retained for existing callers (init/test paths).
329    /// Defaults to `OpenMode::ReadWrite`.
330    pub(crate) async fn open_with_storage(
331        uri: &str,
332        storage: Arc<dyn StorageAdapter>,
333    ) -> Result<Self> {
334        Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
335    }
336
337    pub(crate) async fn open_with_storage_and_mode(
338        uri: &str,
339        storage: Arc<dyn StorageAdapter>,
340        mode: OpenMode,
341    ) -> Result<Self> {
342        let root = normalize_root_uri(uri)?;
343        // Open the coordinator first so the schema-staging recovery sweep can
344        // compare its snapshot against any leftover staging files.
345        let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
346        // Both the schema-state recovery sweep AND the manifest-drift
347        // recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
348        // consumers (NDJSON export, `commit list`, schema show) shouldn't
349        // trigger object-store mutations: they may run with read-only
350        // credentials, and silent open-time writes are surprising. Both
351        // sweeps' work is recoverable on the next ReadWrite open, so
352        // skipping under ReadOnly doesn't lose any safety guarantees —
353        // the manifest pin is the consistent snapshot regardless of
354        // drift on the per-table side or leftover schema-staging files.
355        if matches!(mode, OpenMode::ReadWrite) {
356            let schema_state_recovery =
357                recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
358                    .await?;
359            // Recovery sweep: close the Phase B → Phase C residual on
360            // any sidecar left over from a crashed writer. Continuous
361            // in-process recovery for long-running servers (no restart
362            // required between Phase B failure and recovery) is a
363            // separate background-reconciler effort.
364            crate::db::manifest::recover_manifest_drift(
365                &root,
366                Arc::clone(&storage),
367                &mut coordinator,
368                crate::db::manifest::RecoveryMode::Full,
369                schema_state_recovery,
370            )
371            .await?;
372        }
373        // Read _schema.pg (post-recovery — may have just been renamed in).
374        let schema_path = schema_source_uri(&root);
375        let schema_source = storage.read_text(&schema_path).await?;
376        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
377        let branches = coordinator.branch_list().await?;
378        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
379            &root,
380            Arc::clone(&storage),
381            &branches,
382            &current_source_ir,
383        )
384        .await?;
385        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
386        fixup_blob_schemas(&mut catalog);
387
388        Ok(Self {
389            root_uri: root.clone(),
390            storage,
391            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
392            table_store: TableStore::new(&root),
393            runtime_cache: RuntimeCache::default(),
394            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
395            schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
396            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
397            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
398            policy: None,
399        })
400    }
401
402    /// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
403    /// catalog pointer; callers can hold the returned `Arc` across awaits
404    /// without blocking concurrent `apply_schema`.
405    pub fn catalog(&self) -> Arc<Catalog> {
406        self.catalog.load_full()
407    }
408
409    /// Returns an `Arc<String>` snapshot of the schema source.
410    pub fn schema_source(&self) -> Arc<String> {
411        self.schema_source.load_full()
412    }
413
414    /// Atomically swap the in-memory catalog. Concurrent readers see
415    /// either the old or the new pointer; never a torn state. Used by
416    /// `apply_schema` and `reload_schema_if_source_changed`.
417    pub(crate) fn store_catalog(&self, catalog: Catalog) {
418        self.catalog.store(Arc::new(catalog));
419    }
420
421    /// Atomically swap the in-memory schema source. Same rationale as
422    /// [`store_catalog`](Self::store_catalog).
423    pub(crate) fn store_schema_source(&self, schema_source: String) {
424        self.schema_source.store(Arc::new(schema_source));
425    }
426
427    pub fn uri(&self) -> &str {
428        &self.root_uri
429    }
430
431    /// Install a policy checker for engine-layer enforcement (MR-722).
432    /// Builder-style setter — consumes `self`, returns `Self`. Calling
433    /// this on a `Omnigraph` previously without policy enables
434    /// `enforce()` to fire at every mutating engine method that's been
435    /// wired to call it (currently `apply_schema_as`; PR #3 fans out to
436    /// the remaining writers).
437    ///
438    /// Embedded callers that don't care about authorization should
439    /// just not call this. Server / CLI callers that have loaded a
440    /// `PolicyEngine` from `policy.yaml` pass it here.
441    pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
442        self.policy = Some(checker);
443        self
444    }
445
446    /// Engine-layer policy enforcement gate (MR-722 chassis core).
447    ///
448    /// * If no policy is installed → no-op (returns `Ok(())`).
449    /// * If policy is installed AND actor is None → denial with a
450    ///   clear "no actor for engine-layer policy check" message.
451    ///   Forces server / CLI / SDK callers to thread an actor through
452    ///   when policy is configured — silent bypass via "I forgot the
453    ///   actor" is exactly the footgun this gate is here to prevent.
454    /// * If policy is installed AND actor is Some → call
455    ///   `PolicyChecker::check(action, scope, actor)`; map denial /
456    ///   internal failure to `OmniError::Policy(...)`.
457    pub(crate) fn enforce(
458        &self,
459        action: omnigraph_policy::PolicyAction,
460        scope: &omnigraph_policy::ResourceScope,
461        actor: Option<&str>,
462    ) -> Result<()> {
463        let Some(checker) = self.policy.as_ref() else {
464            return Ok(());
465        };
466        let Some(actor) = actor else {
467            return Err(OmniError::Policy(
468                "no actor for engine-layer policy check (policy is configured but the call site \
469                 didn't thread an actor through — this is almost certainly a bug, not an \
470                 intended bypass)"
471                    .to_string(),
472            ));
473        };
474        checker
475            .check(action, scope, actor)
476            .map_err(|err| OmniError::Policy(err.to_string()))
477    }
478
479    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
480        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
481    }
482
483    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
484        self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
485            .await
486    }
487
488    pub async fn plan_schema_with_options(
489        &self,
490        desired_schema_source: &str,
491        options: SchemaApplyOptions,
492    ) -> Result<SchemaMigrationPlan> {
493        schema_apply::plan_schema(self, desired_schema_source, options).await
494    }
495
496    pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
497        self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
498            .await
499    }
500
501    pub async fn apply_schema_with_options(
502        &self,
503        desired_schema_source: &str,
504        options: SchemaApplyOptions,
505    ) -> Result<SchemaApplyResult> {
506        self.apply_schema_as(desired_schema_source, options, None)
507            .await
508    }
509
510    /// Apply a schema migration with an explicit actor for engine-layer
511    /// policy enforcement (MR-722). When a `PolicyChecker` is installed
512    /// via [`Self::with_policy`], this method calls `enforce(SchemaApply,
513    /// Branch("main"), actor)` before any apply work happens. Denial
514    /// returns `OmniError::Policy` and leaves the manifest untouched.
515    ///
516    /// The no-actor variants (`apply_schema`, `apply_schema_with_options`)
517    /// pass `None` here. They work fine without a policy; if a policy IS
518    /// installed and actor is None, enforcement intentionally fails to
519    /// prevent silent-bypass-via-forgetting-the-actor footguns.
520    pub async fn apply_schema_as(
521        &self,
522        desired_schema_source: &str,
523        options: SchemaApplyOptions,
524        actor: Option<&str>,
525    ) -> Result<SchemaApplyResult> {
526        schema_apply::apply_schema(self, desired_schema_source, options, actor).await
527    }
528
529    pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
530        schema_apply::ensure_schema_apply_idle(self, operation).await
531    }
532
533    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
534        schema_apply::ensure_schema_apply_not_locked(self, operation).await
535    }
536
537    pub(crate) fn table_store(&self) -> &TableStore {
538        &self.table_store
539    }
540
541    /// Engine-facing trait surface around `TableStore`.
542    ///
543    /// This is the canonical accessor for newly-written engine code. The
544    /// trait's signatures use opaque `SnapshotHandle` / `StagedHandle`
545    /// instead of leaking `lance::Dataset` /
546    /// `lance::dataset::transaction::Transaction`. Existing call sites
547    /// that still use `db.table_store.X(...)` (the inherent struct
548    /// methods) are migrated incrementally.
549    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
550        &self.table_store
551    }
552
553    /// Engine-level access to the object-store adapter (S3 / local fs).
554    /// Used by the recovery sidecar protocol — writers in the engine
555    /// call this to write/delete sidecars at `__recovery/{ulid}.json`.
556    pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
557        self.storage.as_ref()
558    }
559
560    /// Per-`(table_key, branch)` writer queues.
561    ///
562    /// Engine-internal writers (mutation finalize, schema_apply,
563    /// branch_merge, ensure_indices, delete_where) and the future MR-870
564    /// recovery reconciler reach the queue manager via this accessor.
565    /// Returns an `Arc` clone so callers can hold the manager across
566    /// `&mut self` engine API boundaries.
567    pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
568        Arc::clone(&self.write_queue)
569    }
570
571    /// Engine-internal access to the merge-exclusive mutex. Held across
572    /// the swap → operate → restore window in `branch_merge_impl` so
573    /// concurrent merges with distinct targets don't corrupt
574    /// `self.coordinator` mid-operation. See the field doc on
575    /// `Omnigraph::merge_exclusive` for the full design rationale.
576    pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
577        Arc::clone(&self.merge_exclusive)
578    }
579
580    /// Engine-level access to the graph's normalized root URI. Used by
581    /// the recovery sidecar protocol to compute `__recovery/` paths.
582    pub(crate) fn root_uri(&self) -> &str {
583        &self.root_uri
584    }
585
586    pub(crate) async fn open_coordinator_for_branch(
587        &self,
588        branch: Option<&str>,
589    ) -> Result<GraphCoordinator> {
590        match branch {
591            Some(branch) => {
592                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
593            }
594            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
595        }
596    }
597
598    pub(crate) async fn swap_coordinator_for_branch(
599        &self,
600        branch: Option<&str>,
601    ) -> Result<GraphCoordinator> {
602        let next = self.open_coordinator_for_branch(branch).await?;
603        let mut coord = self.coordinator.write().await;
604        Ok(std::mem::replace(&mut *coord, next))
605    }
606
607    pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
608        *self.coordinator.write().await = coordinator;
609    }
610
611    pub(crate) async fn resolved_branch_target(
612        &self,
613        branch: Option<&str>,
614    ) -> Result<ResolvedTarget> {
615        self.ensure_schema_state_valid().await?;
616        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
617        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
618        let coord = self.coordinator.read().await;
619        if normalized.as_deref() == coord.current_branch() {
620            let snapshot_id = coord
621                .head_commit_id()
622                .await?
623                .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
624            return Ok(ResolvedTarget {
625                requested,
626                branch: coord.current_branch().map(str::to_string),
627                snapshot_id,
628                snapshot: coord.snapshot(),
629            });
630        }
631        coord.resolve_target(&requested).await
632    }
633
634    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
635        self.resolved_branch_target(branch)
636            .await
637            .map(|resolved| resolved.snapshot)
638    }
639
640    pub(crate) async fn version(&self) -> u64 {
641        self.coordinator.read().await.version()
642    }
643
644    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
645    pub(crate) async fn snapshot(&self) -> Snapshot {
646        self.coordinator.read().await.snapshot()
647    }
648
649    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
650        self.resolved_target(target)
651            .await
652            .map(|resolved| resolved.snapshot)
653    }
654
655    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
656        self.snapshot_of(target)
657            .await
658            .map(|snapshot| snapshot.version())
659    }
660
661    pub async fn resolved_branch_of(
662        &self,
663        target: impl Into<ReadTarget>,
664    ) -> Result<Option<String>> {
665        self.resolved_target(target)
666            .await
667            .map(|resolved| resolved.branch)
668    }
669
670    /// Synchronize this handle's write base to the latest head of the named branch.
671    pub async fn sync_branch(&self, branch: &str) -> Result<()> {
672        self.ensure_schema_state_valid().await?;
673        let branch = normalize_branch_name(branch)?;
674        let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
675        *self.coordinator.write().await = next;
676        self.runtime_cache.invalidate_all().await;
677        Ok(())
678    }
679
680    /// Re-read the handle-local coordinator state from storage AND run
681    /// in-process recovery. Closes the Phase B → Phase C residual (e.g.
682    /// `MutationStaging::finalize` crash mid-publish in a long-running
683    /// server) without restart.
684    ///
685    /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
686    /// recovery sequence, in the same order, with one restriction: the
687    /// manifest-drift sweep runs in `RollForwardOnly` mode (rollback /
688    /// abort cases defer to the next ReadWrite open because
689    /// `Dataset::restore` is unsafe under concurrency). Each step:
690    ///
691    /// 1. `coordinator.refresh()` — re-read manifest.
692    /// 2. `recover_schema_state_files` — complete an in-flight
693    ///    schema_apply's staging→final rename if a SchemaApply sidecar
694    ///    is on disk; idempotent + early-returns when no staging files
695    ///    exist. Required BEFORE manifest-drift recovery so a
696    ///    SchemaApply roll-forward doesn't publish the manifest while
697    ///    the staging files remain unrenamed (which would corrupt the
698    ///    graph: data on new schema, catalog on old).
699    /// 3. `recover_manifest_drift(... RollForwardOnly)` — close the
700    ///    finalize→publisher residual via roll-forward; defer rollback
701    ///    work to next ReadWrite open.
702    /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
703    ///
704    /// Steady state cost: one `list_dir` of `__recovery/` (typically
705    /// returns empty → early return for both passes). No additional
706    /// Lance reads.
707    ///
708    /// Engine-internal callers that already hold an in-flight sidecar
709    /// (e.g. `schema_apply` mid-write) MUST use
710    /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
711    /// avoid the recovery sweep racing their own sidecar.
712    pub async fn refresh(&self) -> Result<()> {
713        // Scope the coord write guard to the recovery section only.
714        // `reload_schema_if_source_changed` (below) acquires
715        // `self.coordinator.read().await` when the on-disk schema source
716        // has drifted from the cached `schema_source`. Tokio's RwLock is
717        // not reentrant, so holding the write across that call deadlocks.
718        // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`.
719        {
720            let mut coord = self.coordinator.write().await;
721            coord.refresh().await?;
722            let schema_state_recovery = recover_schema_state_files(
723                &self.root_uri,
724                Arc::clone(&self.storage),
725                &coord.snapshot(),
726            )
727            .await?;
728            crate::db::manifest::recover_manifest_drift(
729                &self.root_uri,
730                Arc::clone(&self.storage),
731                &mut *coord,
732                crate::db::manifest::RecoveryMode::RollForwardOnly,
733                schema_state_recovery,
734            )
735            .await?;
736        } // ← write guard released before reload's read acquisition
737        self.reload_schema_if_source_changed().await?;
738        self.runtime_cache.invalidate_all().await;
739        Ok(())
740    }
741
742    async fn reload_schema_if_source_changed(&self) -> Result<()> {
743        let schema_path = schema_source_uri(&self.root_uri);
744        let schema_source = self.storage.read_text(&schema_path).await?;
745        if schema_source == *self.schema_source.load_full() {
746            return Ok(());
747        }
748        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
749        let branches = self.coordinator.read().await.branch_list().await?;
750        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
751            &self.root_uri,
752            Arc::clone(&self.storage),
753            &branches,
754            &current_source_ir,
755        )
756        .await?;
757        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
758        fixup_blob_schemas(&mut catalog);
759        self.store_schema_source(schema_source);
760        self.store_catalog(catalog);
761        Ok(())
762    }
763
764    /// Refresh coordinator state and invalidate the runtime cache WITHOUT
765    /// running the recovery sweep. Engine-internal callers that hold an
766    /// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
767    /// internal lease-check refresh) need this variant: running recovery
768    /// here would observe the caller's own sidecar, classify it as
769    /// RolledPastExpected, and roll it forward — racing the caller's
770    /// own publish path.
771    pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
772        self.coordinator.write().await.refresh().await?;
773        self.runtime_cache.invalidate_all().await;
774        Ok(())
775    }
776
777    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
778        self.ensure_schema_state_valid().await?;
779        self.coordinator
780            .read()
781            .await
782            .resolve_snapshot_id(branch)
783            .await
784    }
785
786    pub(crate) async fn resolved_target(
787        &self,
788        target: impl Into<ReadTarget>,
789    ) -> Result<ResolvedTarget> {
790        self.ensure_schema_state_valid().await?;
791        self.coordinator
792            .read()
793            .await
794            .resolve_target(&target.into())
795            .await
796    }
797
798    // ─── Change detection ────────────────────────────────────────────────
799
800    pub async fn diff_between(
801        &self,
802        from: impl Into<ReadTarget>,
803        to: impl Into<ReadTarget>,
804        filter: &crate::changes::ChangeFilter,
805    ) -> Result<crate::changes::ChangeSet> {
806        let from_resolved = self.resolved_target(from).await?;
807        let to_resolved = self.resolved_target(to).await?;
808        crate::changes::diff_snapshots(
809            self.uri(),
810            &from_resolved.snapshot,
811            &to_resolved.snapshot,
812            filter,
813            to_resolved.branch.clone().or(from_resolved.branch.clone()),
814        )
815        .await
816    }
817
818    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
819    /// and creates branch-aware snapshots. Supports cross-branch comparison.
820    pub async fn diff_commits(
821        &self,
822        from_commit_id: &str,
823        to_commit_id: &str,
824        filter: &crate::changes::ChangeFilter,
825    ) -> Result<crate::changes::ChangeSet> {
826        let coord = self.coordinator.read().await;
827        let from_commit = coord
828            .resolve_commit(&SnapshotId::new(from_commit_id))
829            .await?;
830        let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
831        let from_snap = coord
832            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
833                from_commit.graph_commit_id.clone(),
834            )))
835            .await?;
836        let to_snap = coord
837            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
838                to_commit.graph_commit_id.clone(),
839            )))
840            .await?;
841        drop(coord);
842        crate::changes::diff_snapshots(
843            self.uri(),
844            &from_snap.snapshot,
845            &to_snap.snapshot,
846            filter,
847            to_snap.branch.clone().or(from_snap.branch.clone()),
848        )
849        .await
850    }
851
852    pub async fn entity_at_target(
853        &self,
854        target: impl Into<ReadTarget>,
855        table_key: &str,
856        id: &str,
857    ) -> Result<Option<serde_json::Value>> {
858        export::entity_at_target(self, target, table_key, id).await
859    }
860
861    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
862    pub async fn entity_at(
863        &self,
864        table_key: &str,
865        id: &str,
866        version: u64,
867    ) -> Result<Option<serde_json::Value>> {
868        export::entity_at(self, table_key, id, version).await
869    }
870
871    /// Create a Snapshot at any historical manifest version.
872    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
873        self.ensure_schema_state_valid().await?;
874        self.coordinator
875            .read()
876            .await
877            .snapshot_at_version(version)
878            .await
879    }
880
881    pub async fn export_jsonl(
882        &self,
883        branch: &str,
884        type_names: &[String],
885        table_keys: &[String],
886    ) -> Result<String> {
887        export::export_jsonl(self, branch, type_names, table_keys).await
888    }
889
890    pub async fn export_jsonl_to_writer<W: Write>(
891        &self,
892        branch: &str,
893        type_names: &[String],
894        table_keys: &[String],
895        writer: &mut W,
896    ) -> Result<()> {
897        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
898    }
899
900    // ─── Graph index ──────────────────────────────────────────────────────
901
902    /// Get or build the graph index for the current snapshot.
903    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
904        table_ops::graph_index(self).await
905    }
906
907    pub(crate) async fn graph_index_for_resolved(
908        &self,
909        resolved: &ResolvedTarget,
910    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
911        table_ops::graph_index_for_resolved(self, resolved).await
912    }
913
914    /// Ensure BTree scalar indices exist on key columns.
915    /// Idempotent — Lance skips if index already exists.
916    ///
917    /// Opens sub-tables at their latest version (not snapshot-pinned) because
918    /// indices must be created on the current head. Any version drift from the
919    /// snapshot is expected and logged. The resulting versions are committed
920    /// back to the manifest.
921    ///
922    /// On named branches, indexing preserves lazy branching:
923    /// unbranched subtables keep inheriting `main`, while subtables inherited
924    /// from an ancestor branch are first forked into the active branch before
925    /// their index metadata is updated.
926    pub async fn ensure_indices(&self) -> Result<()> {
927        table_ops::ensure_indices(self).await
928    }
929
930    pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
931        table_ops::ensure_indices_on(self, branch).await
932    }
933
934    #[cfg(feature = "failpoints")]
935    #[doc(hidden)]
936    pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
937        &mut self,
938        branch: &str,
939        table_key: &str,
940        table_branch: Option<&str>,
941    ) -> Result<u64> {
942        table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
943            self,
944            branch,
945            table_key,
946            table_branch,
947        )
948        .await
949    }
950
951    /// Compact small Lance fragments into fewer larger ones across every
952    /// node + edge table on `main`. See [`optimize`] for details.
953    pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
954        optimize::optimize_all_tables(self).await
955    }
956
957    /// Remove Lance manifests (and the fragments they uniquely own) per the
958    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
959    /// history. See [`optimize`] for details.
960    pub async fn cleanup(
961        &mut self,
962        options: optimize::CleanupPolicyOptions,
963    ) -> Result<Vec<optimize::TableCleanupStats>> {
964        optimize::cleanup_all_tables(self, options).await
965    }
966
967    /// Read a blob from a node by its string ID and property name.
968    ///
969    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
970    /// and metadata accessors (`size()`, `kind()`, `uri()`).
971    ///
972    /// ```ignore
973    /// let blob = db.read_blob("Document", "readme", "content").await?;
974    /// let bytes = blob.read().await?;
975    /// ```
976    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
977        self.ensure_schema_state_valid().await?;
978        let catalog = self.catalog();
979        let node_type = catalog
980            .node_types
981            .get(type_name)
982            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
983        if !node_type.blob_properties.contains(property) {
984            return Err(OmniError::manifest(format!(
985                "property '{}' on type '{}' is not a Blob",
986                property, type_name
987            )));
988        }
989
990        let snapshot = self.snapshot().await;
991        let table_key = format!("node:{}", type_name);
992        let ds = snapshot.open(&table_key).await?;
993
994        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
995        let row_id = self
996            .table_store
997            .first_row_id_for_filter(&ds, &filter_sql)
998            .await?
999            .ok_or_else(|| {
1000                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1001            })?;
1002
1003        // Use take_blobs to get the BlobFile handle
1004        let ds = Arc::new(ds);
1005        let mut blobs = ds
1006            .take_blobs(&[row_id], property)
1007            .await
1008            .map_err(|e| OmniError::Lance(e.to_string()))?;
1009
1010        blobs.pop().ok_or_else(|| {
1011            OmniError::manifest(format!(
1012                "blob '{}' on {} '{}' returned no data",
1013                property, type_name, id
1014            ))
1015        })
1016    }
1017
1018    pub(crate) async fn active_branch(&self) -> Option<String> {
1019        self.coordinator
1020            .read()
1021            .await
1022            .current_branch()
1023            .map(str::to_string)
1024    }
1025
1026    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1027        let descendants = self
1028            .coordinator
1029            .read()
1030            .await
1031            .branch_descendants(branch)
1032            .await?;
1033        if let Some(descendant) = descendants.first() {
1034            return Err(OmniError::manifest_conflict(format!(
1035                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1036                branch, descendant
1037            )));
1038        }
1039
1040        for other_branch in branches
1041            .iter()
1042            .filter(|candidate| candidate.as_str() != branch)
1043        {
1044            let snapshot = self
1045                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1046                .await?;
1047            if snapshot
1048                .entries()
1049                .any(|entry| entry.table_branch.as_deref() == Some(branch))
1050            {
1051                return Err(OmniError::manifest_conflict(format!(
1052                    "cannot delete branch '{}' because branch '{}' still depends on it",
1053                    branch, other_branch
1054                )));
1055            }
1056        }
1057
1058        Ok(())
1059    }
1060
1061    async fn cleanup_deleted_branch_tables(
1062        &self,
1063        branch: &str,
1064        owned_tables: &[(String, String)],
1065    ) -> Result<()> {
1066        let mut seen_paths = HashSet::new();
1067        let mut cleanup_targets = owned_tables
1068            .iter()
1069            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1070            .cloned()
1071            .collect::<Vec<_>>();
1072        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1073
1074        for (table_key, table_path) in cleanup_targets {
1075            let dataset_uri = self.table_store.dataset_uri(&table_path);
1076            if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
1077                return Err(OmniError::manifest_internal(format!(
1078                    "branch '{}' was deleted but cleanup failed for {}: {}",
1079                    branch, table_key, err
1080                )));
1081            }
1082        }
1083
1084        Ok(())
1085    }
1086
1087    async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1088        let active = self
1089            .coordinator
1090            .read()
1091            .await
1092            .current_branch()
1093            .map(str::to_string);
1094        if active.as_deref() == Some(branch) {
1095            return Err(OmniError::manifest_conflict(format!(
1096                "cannot delete currently active branch '{}'",
1097                branch
1098            )));
1099        }
1100
1101        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1102        let owned_tables = branch_snapshot
1103            .entries()
1104            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1105            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1106            .collect::<Vec<_>>();
1107
1108        self.coordinator.write().await.branch_delete(branch).await?;
1109        self.cleanup_deleted_branch_tables(branch, &owned_tables)
1110            .await
1111    }
1112
1113    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1114        normalize_branch_name(branch)
1115    }
1116
1117    pub(crate) async fn head_commit_id_for_branch(
1118        &self,
1119        branch: Option<&str>,
1120    ) -> Result<Option<String>> {
1121        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1122        coordinator.ensure_commit_graph_initialized().await?;
1123        coordinator
1124            .head_commit_id()
1125            .await
1126            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1127    }
1128
1129    pub async fn branch_create(&self, name: &str) -> Result<()> {
1130        self.branch_create_as(name, None).await
1131    }
1132
1133    /// Create a branch from the coordinator's currently-open snapshot,
1134    /// with an explicit actor for engine-layer policy enforcement
1135    /// (MR-722 fan-out). Scope is `TargetBranch(name)` — symmetric with
1136    /// `branch_delete_as`: the branch being acted upon is the target.
1137    /// Cedar rules using `target_branch_scope: protected` therefore see
1138    /// the new-branch name and can deny e.g. creating any branch named
1139    /// `main` from a non-privileged actor.
1140    pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1141        self.enforce(
1142            omnigraph_policy::PolicyAction::BranchCreate,
1143            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1144            actor,
1145        )?;
1146        self.ensure_schema_state_valid().await?;
1147        self.ensure_schema_apply_idle("branch_create").await?;
1148        ensure_public_branch_ref(name, "branch_create")?;
1149        self.coordinator.write().await.branch_create(name).await
1150    }
1151
1152    pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1153        self.branch_create_from_as(from, name, None).await
1154    }
1155
1156    /// Create a branch from a specific source branch with an explicit
1157    /// actor for engine-layer policy enforcement (MR-722 fan-out).
1158    ///
1159    /// Scope is `BranchTransition { source, target }` — matches the
1160    /// HTTP-layer convention at `server_branch_create`
1161    /// (branch=Some(from), target_branch=Some(name)), so engine and
1162    /// HTTP fire the same Cedar decision. Pinned-snapshot sources
1163    /// (which aren't a branch ref) materialize as the sentinel
1164    /// `<snapshot>` for the policy check; Cedar rules using
1165    /// `branch_scope: any` still match, rules pinning a specific
1166    /// source branch correctly do not.
1167    pub async fn branch_create_from_as(
1168        &self,
1169        from: impl Into<ReadTarget>,
1170        name: &str,
1171        actor: Option<&str>,
1172    ) -> Result<()> {
1173        let target = from.into();
1174        let source_branch = match &target {
1175            ReadTarget::Branch(b) => b.clone(),
1176            _ => "<snapshot>".to_string(),
1177        };
1178        self.enforce(
1179            omnigraph_policy::PolicyAction::BranchCreate,
1180            &omnigraph_policy::ResourceScope::BranchTransition {
1181                source: source_branch,
1182                target: name.to_string(),
1183            },
1184            actor,
1185        )?;
1186        self.ensure_schema_apply_idle("branch_create_from").await?;
1187        self.branch_create_from_impl(target, name, false).await
1188    }
1189
1190    async fn branch_create_from_impl(
1191        &self,
1192        from: impl Into<ReadTarget>,
1193        name: &str,
1194        allow_internal_refs: bool,
1195    ) -> Result<()> {
1196        let target = from.into();
1197        let ReadTarget::Branch(branch_name) = target else {
1198            return Err(OmniError::manifest(
1199                "branch creation from pinned snapshots is not supported yet".to_string(),
1200            ));
1201        };
1202        if !allow_internal_refs {
1203            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1204            ensure_public_branch_ref(name, "branch_create_from")?;
1205        }
1206        let branch = normalize_branch_name(&branch_name)?;
1207        // Operate on a freshly-opened source coordinator that's owned locally
1208        // — never touch `self.coordinator`. The pre-fix implementation used
1209        // `swap_coordinator_for_branch` + operate + `restore_coordinator` as
1210        // three separate `coordinator.write().await` acquisitions; under
1211        // `&self` concurrency, a second `branch_create_from` could swap
1212        // self.coordinator between this caller's swap and operate steps,
1213        // making the operate run against the wrong source branch and
1214        // forking off the wrong HEAD. Pinned by
1215        // `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`
1216        // in `crates/omnigraph-server/tests/server.rs`.
1217        //
1218        // `branch_create` mutates only the local coord's commit-graph cache;
1219        // the manifest write is durable on disk regardless of which
1220        // coord-handle issued it. Discarding `source_coord` after the call
1221        // is the right shape — the new branch is reachable from any
1222        // subsequent open of any coord.
1223        let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1224        source_coord.branch_create(name).await
1225    }
1226
1227    pub async fn branch_list(&self) -> Result<Vec<String>> {
1228        self.ensure_schema_state_valid().await?;
1229        self.coordinator.read().await.branch_list().await
1230    }
1231
1232    pub async fn branch_delete(&self, name: &str) -> Result<()> {
1233        self.branch_delete_as(name, None).await
1234    }
1235
1236    /// Delete a branch with an explicit actor for engine-layer policy
1237    /// enforcement (MR-722 fan-out). Scope is `TargetBranch(name)` —
1238    /// matches the HTTP-layer convention at `server_branch_delete`
1239    /// (branch=None, target_branch=Some(name)). Cedar rules using
1240    /// `target_branch_scope: protected` therefore correctly gate
1241    /// deletion of protected branches (e.g. deny BranchDelete against
1242    /// `main`).
1243    pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1244        self.enforce(
1245            omnigraph_policy::PolicyAction::BranchDelete,
1246            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1247            actor,
1248        )?;
1249        self.ensure_schema_state_valid().await?;
1250        self.ensure_schema_apply_idle("branch_delete").await?;
1251        ensure_public_branch_ref(name, "branch_delete")?;
1252        self.refresh().await?;
1253        let branch = normalize_branch_name(name)?
1254            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1255        let branches = self.coordinator.read().await.branch_list().await?;
1256        if !branches.iter().any(|candidate| candidate == &branch) {
1257            return Err(OmniError::manifest_not_found(format!(
1258                "branch '{}' not found",
1259                branch
1260            )));
1261        }
1262
1263        self.ensure_branch_delete_safe(&branch, &branches).await?;
1264        self.delete_branch_storage_only(&branch).await
1265    }
1266
1267    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1268        self.ensure_schema_state_valid().await?;
1269        self.coordinator
1270            .read()
1271            .await
1272            .resolve_commit(&SnapshotId::new(commit_id))
1273            .await
1274    }
1275
1276    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1277        self.ensure_schema_state_valid().await?;
1278        let branch = match branch {
1279            Some(branch) => normalize_branch_name(branch)?,
1280            None => None,
1281        };
1282        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1283        coordinator.list_commits().await
1284    }
1285
1286    /// Open a sub-table for mutation with version-drift guard.
1287    ///
1288    /// Checks that the dataset's current version matches the snapshot-pinned
1289    /// version. If another writer has advanced the version, returns an error
1290    /// prompting the caller to refresh and retry (optimistic concurrency).
1291    pub(crate) async fn open_for_mutation(
1292        &self,
1293        table_key: &str,
1294        op_kind: crate::db::MutationOpKind,
1295    ) -> Result<(Dataset, String, Option<String>)> {
1296        table_ops::open_for_mutation(self, table_key, op_kind).await
1297    }
1298
1299    pub(crate) async fn open_for_mutation_on_branch(
1300        &self,
1301        branch: Option<&str>,
1302        table_key: &str,
1303        op_kind: crate::db::MutationOpKind,
1304    ) -> Result<(Dataset, String, Option<String>)> {
1305        table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1306    }
1307
1308    pub(crate) async fn fork_dataset_from_entry_state(
1309        &self,
1310        table_key: &str,
1311        full_path: &str,
1312        source_branch: Option<&str>,
1313        source_version: u64,
1314        active_branch: &str,
1315    ) -> Result<Dataset> {
1316        table_ops::fork_dataset_from_entry_state(
1317            self,
1318            table_key,
1319            full_path,
1320            source_branch,
1321            source_version,
1322            active_branch,
1323        )
1324        .await
1325    }
1326
1327    pub(crate) async fn reopen_for_mutation(
1328        &self,
1329        table_key: &str,
1330        full_path: &str,
1331        table_branch: Option<&str>,
1332        expected_version: u64,
1333        op_kind: crate::db::MutationOpKind,
1334    ) -> Result<Dataset> {
1335        table_ops::reopen_for_mutation(
1336            self,
1337            table_key,
1338            full_path,
1339            table_branch,
1340            expected_version,
1341            op_kind,
1342        )
1343        .await
1344    }
1345
1346    pub(crate) async fn open_dataset_at_state(
1347        &self,
1348        table_path: &str,
1349        table_branch: Option<&str>,
1350        table_version: u64,
1351    ) -> Result<Dataset> {
1352        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1353    }
1354
1355    pub(crate) async fn build_indices_on_dataset(
1356        &self,
1357        table_key: &str,
1358        ds: &mut Dataset,
1359    ) -> Result<()> {
1360        table_ops::build_indices_on_dataset(self, table_key, ds).await
1361    }
1362
1363    pub(crate) async fn build_indices_on_dataset_for_catalog(
1364        &self,
1365        catalog: &Catalog,
1366        table_key: &str,
1367        ds: &mut Dataset,
1368    ) -> Result<()> {
1369        table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1370    }
1371
1372    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
1373    // uses `commit_updates_on_branch_with_expected` exclusively.
1374    #[cfg(test)]
1375    pub(crate) async fn commit_updates(
1376        &mut self,
1377        updates: &[crate::db::SubTableUpdate],
1378    ) -> Result<u64> {
1379        table_ops::commit_updates(self, updates).await
1380    }
1381
1382    pub(crate) async fn commit_manifest_updates(
1383        &self,
1384        updates: &[crate::db::SubTableUpdate],
1385    ) -> Result<u64> {
1386        table_ops::commit_manifest_updates(self, updates).await
1387    }
1388
1389    pub(crate) async fn record_merge_commit(
1390        &self,
1391        manifest_version: u64,
1392        parent_commit_id: &str,
1393        merged_parent_commit_id: &str,
1394        actor_id: Option<&str>,
1395    ) -> Result<String> {
1396        table_ops::record_merge_commit(
1397            self,
1398            manifest_version,
1399            parent_commit_id,
1400            merged_parent_commit_id,
1401            actor_id,
1402        )
1403        .await
1404    }
1405
1406    pub(crate) async fn commit_updates_on_branch_with_expected(
1407        &self,
1408        branch: Option<&str>,
1409        updates: &[crate::db::SubTableUpdate],
1410        expected_table_versions: &std::collections::HashMap<String, u64>,
1411        actor_id: Option<&str>,
1412    ) -> Result<u64> {
1413        table_ops::commit_updates_on_branch_with_expected(
1414            self,
1415            branch,
1416            updates,
1417            expected_table_versions,
1418            actor_id,
1419        )
1420        .await
1421    }
1422
1423    pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1424        table_ops::ensure_commit_graph_initialized(self).await
1425    }
1426
1427    /// Invalidate the cached graph index. Called after edge mutations.
1428    pub(crate) async fn invalidate_graph_index(&self) {
1429        table_ops::invalidate_graph_index(self).await
1430    }
1431}
1432
1433pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1434    let branch = branch.trim();
1435    if branch.is_empty() {
1436        return Err(OmniError::manifest(
1437            "branch name cannot be empty".to_string(),
1438        ));
1439    }
1440    if branch == "main" {
1441        return Ok(None);
1442    }
1443    Ok(Some(branch.to_string()))
1444}
1445
1446pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1447    if super::is_internal_run_branch(branch) {
1448        return Err(OmniError::manifest(format!(
1449            "{} does not allow internal run ref '{}'",
1450            operation, branch
1451        )));
1452    }
1453    if is_internal_system_branch(branch) {
1454        return Err(OmniError::manifest(format!(
1455            "{} does not allow internal system ref '{}'",
1456            operation, branch
1457        )));
1458    }
1459    Ok(())
1460}
1461
1462fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1463    if batches.is_empty() {
1464        return Ok(RecordBatch::new_empty(schema));
1465    }
1466    if batches.len() == 1 {
1467        return Ok(batches.into_iter().next().unwrap());
1468    }
1469    let batch_schema = batches[0].schema();
1470    arrow_select::concat::concat_batches(&batch_schema, &batches)
1471        .map_err(|e| OmniError::Lance(e.to_string()))
1472}
1473
1474fn blob_properties_for_table_key<'a>(
1475    catalog: &'a Catalog,
1476    table_key: &str,
1477) -> Result<&'a std::collections::HashSet<String>> {
1478    if let Some(type_name) = table_key.strip_prefix("node:") {
1479        return catalog
1480            .node_types
1481            .get(type_name)
1482            .map(|node_type| &node_type.blob_properties)
1483            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1484    }
1485    if let Some(type_name) = table_key.strip_prefix("edge:") {
1486        return catalog
1487            .edge_types
1488            .get(type_name)
1489            .map(|edge_type| &edge_type.blob_properties)
1490            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1491    }
1492    Err(OmniError::manifest(format!(
1493        "invalid table key '{}'",
1494        table_key
1495    )))
1496}
1497
1498fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1499    if descriptions.is_null(row) {
1500        return Ok(true);
1501    }
1502
1503    let kind = descriptions
1504        .column_by_name("kind")
1505        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1506        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1507        .or_else(|| {
1508            descriptions
1509                .column_by_name("kind")
1510                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1511                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1512        });
1513    let position = descriptions
1514        .column_by_name("position")
1515        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1516        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1517    let size = descriptions
1518        .column_by_name("size")
1519        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1520        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1521    let blob_uri = descriptions
1522        .column_by_name("blob_uri")
1523        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1524        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1525
1526    let Some(kind) = kind else {
1527        return Ok(true);
1528    };
1529    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1530    if kind != BlobKind::Inline {
1531        return Ok(false);
1532    }
1533
1534    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1535}
1536
1537/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1538///
1539/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1540/// `DataType::LargeBinary` as a placeholder. This function replaces those
1541/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1542fn fixup_blob_schemas(catalog: &mut Catalog) {
1543    for node_type in catalog.node_types.values_mut() {
1544        if node_type.blob_properties.is_empty() {
1545            continue;
1546        }
1547        let fields: Vec<Field> = node_type
1548            .arrow_schema
1549            .fields()
1550            .iter()
1551            .map(|f| {
1552                if node_type.blob_properties.contains(f.name()) {
1553                    blob_field(f.name(), f.is_nullable())
1554                } else {
1555                    f.as_ref().clone()
1556                }
1557            })
1558            .collect();
1559        node_type.arrow_schema = Arc::new(Schema::new(fields));
1560    }
1561    for edge_type in catalog.edge_types.values_mut() {
1562        if edge_type.blob_properties.is_empty() {
1563            continue;
1564        }
1565        let fields: Vec<Field> = edge_type
1566            .arrow_schema
1567            .fields()
1568            .iter()
1569            .map(|f| {
1570                if edge_type.blob_properties.contains(f.name()) {
1571                    blob_field(f.name(), f.is_nullable())
1572                } else {
1573                    f.as_ref().clone()
1574                }
1575            })
1576            .collect();
1577        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1578    }
1579}
1580
1581fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1582    let schema_ast = parse_schema(schema_source)?;
1583    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1584}
1585
1586/// I/O phase of `Omnigraph::init_with_storage`. Split out so the caller
1587/// can pattern-match on the result and run cleanup on error before
1588/// returning the original error.
1589///
1590/// Failpoints fire at the phase boundaries:
1591/// * `init.after_schema_pg_written` — `_schema.pg` is on disk. In strict mode
1592///   this fires in the caller immediately after the atomic ownership claim; in
1593///   force mode it fires here after the explicit overwrite.
1594/// * `init.after_schema_contract_written` — `_schema.pg` + `_schema.ir.json`
1595///   + `__schema_state.json` are on disk.
1596/// * `init.after_coordinator_init` — all schema files plus Lance per-type
1597///   datasets and `__manifest/` are on disk. (The cleanup wrapper can only
1598///   remove the schema files; Lance directories need `delete_prefix` —
1599///   deferred along with `DELETE /graphs/{id}`.)
1600async fn init_storage_phase(
1601    root: &str,
1602    schema_source: &str,
1603    schema_ir: &SchemaIR,
1604    catalog: &Catalog,
1605    storage: &Arc<dyn StorageAdapter>,
1606    write_schema_pg: bool,
1607) -> Result<GraphCoordinator> {
1608    if write_schema_pg {
1609        let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1610        storage.write_text(&schema_path, schema_source).await?;
1611        crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1612    }
1613
1614    write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1615    crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1616
1617    let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1618    crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1619
1620    Ok(coordinator)
1621}
1622
1623/// Best-effort cleanup of init-phase artifacts. Called from
1624/// `init_with_storage` on any error returned by `init_storage_phase`.
1625///
1626/// Removes the three schema files: `_schema.pg`, `_schema.ir.json`,
1627/// `__schema_state.json`. Lance datasets and `__manifest/` are not
1628/// touched here — recursive directory deletion requires a
1629/// `StorageAdapter::delete_prefix` primitive that's deferred along
1630/// with `DELETE /graphs/{id}` (MR-668 PR 2b).
1631///
1632/// Failures to delete are logged via `tracing::warn` and do not mask
1633/// the original init error.
1634async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1635    for uri in [
1636        schema_source_uri(root),
1637        schema_ir_uri(root),
1638        schema_state_uri(root),
1639    ] {
1640        if let Err(err) = storage.delete(&uri).await {
1641            tracing::warn!(
1642                target: "omnigraph::init::cleanup",
1643                uri = %uri,
1644                error = %err,
1645                "init failed; best-effort cleanup could not delete artifact",
1646            );
1647        }
1648    }
1649}
1650
1651fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1652    match type_kind {
1653        SchemaTypeKind::Node => format!("node:{}", name),
1654        SchemaTypeKind::Edge => format!("edge:{}", name),
1655        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1656    }
1657}
1658
1659fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1660    if let Some(type_name) = table_key.strip_prefix("node:") {
1661        let node_type: &NodeType = catalog
1662            .node_types
1663            .get(type_name)
1664            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1665        return Ok(node_type.arrow_schema.clone());
1666    }
1667    if let Some(type_name) = table_key.strip_prefix("edge:") {
1668        let edge_type: &EdgeType = catalog
1669            .edge_types
1670            .get(type_name)
1671            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1672        return Ok(edge_type.arrow_schema.clone());
1673    }
1674    Err(OmniError::manifest(format!(
1675        "invalid table key '{}'",
1676        table_key
1677    )))
1678}
1679
1680fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1681    let mut obj = serde_json::Map::new();
1682    for (i, field) in batch.schema().fields().iter().enumerate() {
1683        obj.insert(
1684            field.name().clone(),
1685            json_value_from_array(batch.column(i).as_ref(), row)?,
1686        );
1687    }
1688    Ok(serde_json::Value::Object(obj))
1689}
1690
1691fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1692    if array.is_null(row) {
1693        return Ok(serde_json::Value::Null);
1694    }
1695
1696    match array.data_type() {
1697        DataType::Utf8 => Ok(serde_json::Value::String(
1698            array
1699                .as_any()
1700                .downcast_ref::<StringArray>()
1701                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1702                .value(row)
1703                .to_string(),
1704        )),
1705        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1706            array
1707                .as_any()
1708                .downcast_ref::<LargeStringArray>()
1709                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1710                .value(row)
1711                .to_string(),
1712        )),
1713        DataType::Boolean => Ok(serde_json::Value::Bool(
1714            array
1715                .as_any()
1716                .downcast_ref::<BooleanArray>()
1717                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1718                .value(row),
1719        )),
1720        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1721            array
1722                .as_any()
1723                .downcast_ref::<Int32Array>()
1724                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1725                .value(row),
1726        ))),
1727        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1728            array
1729                .as_any()
1730                .downcast_ref::<Int64Array>()
1731                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1732                .value(row),
1733        ))),
1734        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1735            array
1736                .as_any()
1737                .downcast_ref::<UInt32Array>()
1738                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1739                .value(row),
1740        ))),
1741        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1742            array
1743                .as_any()
1744                .downcast_ref::<UInt64Array>()
1745                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1746                .value(row),
1747        ))),
1748        DataType::Float32 => {
1749            let value = array
1750                .as_any()
1751                .downcast_ref::<Float32Array>()
1752                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1753                .value(row) as f64;
1754            Ok(serde_json::Value::Number(
1755                serde_json::Number::from_f64(value).ok_or_else(|| {
1756                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1757                })?,
1758            ))
1759        }
1760        DataType::Float64 => {
1761            let value = array
1762                .as_any()
1763                .downcast_ref::<Float64Array>()
1764                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1765                .value(row);
1766            Ok(serde_json::Value::Number(
1767                serde_json::Number::from_f64(value).ok_or_else(|| {
1768                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1769                })?,
1770            ))
1771        }
1772        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1773            array
1774                .as_any()
1775                .downcast_ref::<Date32Array>()
1776                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1777                .value(row),
1778        ))),
1779        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1780            &base64::engine::general_purpose::STANDARD,
1781            array
1782                .as_any()
1783                .downcast_ref::<BinaryArray>()
1784                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1785                .value(row),
1786        ))),
1787        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1788            &base64::engine::general_purpose::STANDARD,
1789            array
1790                .as_any()
1791                .downcast_ref::<LargeBinaryArray>()
1792                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1793                .value(row),
1794        ))),
1795        DataType::List(_) => {
1796            let list = array
1797                .as_any()
1798                .downcast_ref::<ListArray>()
1799                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1800            let values = list.value(row);
1801            let mut out = Vec::with_capacity(values.len());
1802            for idx in 0..values.len() {
1803                out.push(json_value_from_array(values.as_ref(), idx)?);
1804            }
1805            Ok(serde_json::Value::Array(out))
1806        }
1807        DataType::LargeList(_) => {
1808            let list = array
1809                .as_any()
1810                .downcast_ref::<LargeListArray>()
1811                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1812            let values = list.value(row);
1813            let mut out = Vec::with_capacity(values.len());
1814            for idx in 0..values.len() {
1815                out.push(json_value_from_array(values.as_ref(), idx)?);
1816            }
1817            Ok(serde_json::Value::Array(out))
1818        }
1819        DataType::FixedSizeList(_, _) => {
1820            let list = array
1821                .as_any()
1822                .downcast_ref::<FixedSizeListArray>()
1823                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1824            let values = list.value(row);
1825            let mut out = Vec::with_capacity(values.len());
1826            for idx in 0..values.len() {
1827                out.push(json_value_from_array(values.as_ref(), idx)?);
1828            }
1829            Ok(serde_json::Value::Array(out))
1830        }
1831        DataType::Struct(fields) => {
1832            let struct_array = array
1833                .as_any()
1834                .downcast_ref::<StructArray>()
1835                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1836            let mut obj = serde_json::Map::new();
1837            for (field_idx, field) in fields.iter().enumerate() {
1838                obj.insert(
1839                    field.name().clone(),
1840                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1841                );
1842            }
1843            Ok(serde_json::Value::Object(obj))
1844        }
1845        _ => {
1846            let value = arrow_cast::display::array_value_to_string(array, row)
1847                .map_err(|e| OmniError::Lance(e.to_string()))?;
1848            Ok(serde_json::Value::String(value))
1849        }
1850    }
1851}
1852
1853#[cfg(test)]
1854mod tests {
1855    use super::*;
1856    use crate::db::is_internal_run_branch;
1857    use crate::db::manifest::ManifestCoordinator;
1858    use async_trait::async_trait;
1859    use serde_json::Value;
1860    use std::sync::{Arc, Mutex};
1861
1862    use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1863
1864    const TEST_SCHEMA: &str = r#"
1865node Person {
1866    name: String @key
1867    age: I32?
1868}
1869node Company {
1870    name: String @key
1871}
1872edge Knows: Person -> Person {
1873    since: Date?
1874}
1875edge WorksAt: Person -> Company
1876"#;
1877
1878    #[derive(Debug, Default)]
1879    struct RecordingStorageAdapter {
1880        inner: LocalStorageAdapter,
1881        reads: Mutex<Vec<String>>,
1882        writes: Mutex<Vec<String>>,
1883        exists_checks: Mutex<Vec<String>>,
1884        renames: Mutex<Vec<(String, String)>>,
1885        deletes: Mutex<Vec<String>>,
1886    }
1887
1888    impl RecordingStorageAdapter {
1889        fn reads(&self) -> Vec<String> {
1890            self.reads.lock().unwrap().clone()
1891        }
1892
1893        fn writes(&self) -> Vec<String> {
1894            self.writes.lock().unwrap().clone()
1895        }
1896
1897        fn exists_checks(&self) -> Vec<String> {
1898            self.exists_checks.lock().unwrap().clone()
1899        }
1900    }
1901
1902    #[async_trait]
1903    impl StorageAdapter for RecordingStorageAdapter {
1904        async fn read_text(&self, uri: &str) -> Result<String> {
1905            self.reads.lock().unwrap().push(uri.to_string());
1906            self.inner.read_text(uri).await
1907        }
1908
1909        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1910            self.writes.lock().unwrap().push(uri.to_string());
1911            self.inner.write_text(uri, contents).await
1912        }
1913
1914        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
1915            self.writes.lock().unwrap().push(uri.to_string());
1916            self.inner.write_text_if_absent(uri, contents).await
1917        }
1918
1919        async fn exists(&self, uri: &str) -> Result<bool> {
1920            self.exists_checks.lock().unwrap().push(uri.to_string());
1921            self.inner.exists(uri).await
1922        }
1923
1924        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1925            self.renames
1926                .lock()
1927                .unwrap()
1928                .push((from_uri.to_string(), to_uri.to_string()));
1929            self.inner.rename_text(from_uri, to_uri).await
1930        }
1931
1932        async fn delete(&self, uri: &str) -> Result<()> {
1933            self.deletes.lock().unwrap().push(uri.to_string());
1934            self.inner.delete(uri).await
1935        }
1936
1937        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1938            self.inner.list_dir(dir_uri).await
1939        }
1940    }
1941
1942    #[derive(Debug)]
1943    struct InitRaceStorageAdapter {
1944        inner: LocalStorageAdapter,
1945        root: String,
1946        barrier: Arc<tokio::sync::Barrier>,
1947    }
1948
1949    #[async_trait]
1950    impl StorageAdapter for InitRaceStorageAdapter {
1951        async fn read_text(&self, uri: &str) -> Result<String> {
1952            self.inner.read_text(uri).await
1953        }
1954
1955        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1956            self.inner.write_text(uri, contents).await
1957        }
1958
1959        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
1960            self.inner.write_text_if_absent(uri, contents).await
1961        }
1962
1963        async fn exists(&self, uri: &str) -> Result<bool> {
1964            let exists = self.inner.exists(uri).await?;
1965            if uri == schema_state_uri(&self.root) {
1966                self.barrier.wait().await;
1967            }
1968            Ok(exists)
1969        }
1970
1971        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1972            self.inner.rename_text(from_uri, to_uri).await
1973        }
1974
1975        async fn delete(&self, uri: &str) -> Result<()> {
1976            self.inner.delete(uri).await
1977        }
1978
1979        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1980            self.inner.list_dir(dir_uri).await
1981        }
1982    }
1983
1984    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1985    async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
1986        let dir = tempfile::tempdir().unwrap();
1987        let uri = dir.path().to_str().unwrap().to_string();
1988        let root = normalize_root_uri(&uri).unwrap();
1989        let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
1990            inner: LocalStorageAdapter,
1991            root,
1992            barrier: Arc::new(tokio::sync::Barrier::new(2)),
1993        });
1994
1995        let left = Omnigraph::init_with_storage(
1996            &uri,
1997            TEST_SCHEMA,
1998            Arc::clone(&storage),
1999            InitOptions::default(),
2000        );
2001        let right = Omnigraph::init_with_storage(
2002            &uri,
2003            TEST_SCHEMA,
2004            Arc::clone(&storage),
2005            InitOptions::default(),
2006        );
2007        let (left, right) = tokio::join!(left, right);
2008        let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2009        assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2010
2011        assert!(
2012            dir.path().join("_schema.pg").exists(),
2013            "winning init must leave _schema.pg in place"
2014        );
2015        assert!(
2016            dir.path().join("_schema.ir.json").exists(),
2017            "winning init must leave _schema.ir.json in place"
2018        );
2019        assert!(
2020            dir.path().join("__schema_state.json").exists(),
2021            "winning init must leave __schema_state.json in place"
2022        );
2023    }
2024
2025    #[tokio::test]
2026    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2027        let dir = tempfile::tempdir().unwrap();
2028        let uri = dir.path().to_str().unwrap();
2029        let adapter = Arc::new(RecordingStorageAdapter::default());
2030
2031        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2032            .await
2033            .unwrap();
2034        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2035        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2036        assert!(
2037            adapter
2038                .writes()
2039                .contains(&join_uri(uri, "__schema_state.json"))
2040        );
2041
2042        Omnigraph::open_with_storage(uri, adapter.clone())
2043            .await
2044            .unwrap();
2045        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2046        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2047        assert!(
2048            adapter
2049                .reads()
2050                .contains(&join_uri(uri, "__schema_state.json"))
2051        );
2052        assert!(
2053            adapter
2054                .exists_checks()
2055                .contains(&join_uri(uri, "_schema.ir.json"))
2056        );
2057        assert!(
2058            adapter
2059                .exists_checks()
2060                .contains(&join_uri(uri, "__schema_state.json"))
2061        );
2062        assert!(
2063            adapter
2064                .exists_checks()
2065                .contains(&join_uri(uri, "_graph_commits.lance"))
2066        );
2067    }
2068
2069    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2070        let snapshot = db.snapshot().await;
2071        let ds = snapshot.open(table_key).await.unwrap();
2072        let batches = db.table_store().scan_batches(&ds).await.unwrap();
2073        batches
2074            .into_iter()
2075            .flat_map(|batch| {
2076                (0..batch.num_rows())
2077                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2078                    .collect::<Vec<_>>()
2079            })
2080            .collect()
2081    }
2082
2083    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2084        let (mut ds, full_path, table_branch) = db
2085            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2086            .await
2087            .unwrap();
2088        let schema: Arc<Schema> = Arc::new(ds.schema().into());
2089        let columns: Vec<Arc<dyn Array>> = schema
2090            .fields()
2091            .iter()
2092            .map(|field| match field.name().as_str() {
2093                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2094                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2095                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2096                _ => new_null_array(field.data_type(), 1),
2097            })
2098            .collect();
2099        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2100        let state = db
2101            .table_store()
2102            .append_batch(&full_path, &mut ds, batch)
2103            .await
2104            .unwrap();
2105        db.commit_updates(&[crate::db::SubTableUpdate {
2106            table_key: "node:Person".to_string(),
2107            table_version: state.version,
2108            table_branch,
2109            row_count: state.row_count,
2110            version_metadata: state.version_metadata,
2111        }])
2112        .await
2113        .unwrap();
2114    }
2115
2116    #[tokio::test]
2117    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2118        let dir = tempfile::tempdir().unwrap();
2119        let uri = dir.path().to_str().unwrap();
2120        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2121        seed_person_row(&mut db, "Alice", Some(30)).await;
2122
2123        let desired = TEST_SCHEMA.replace(
2124            "    age: I32?\n}",
2125            "    age: I32?\n    nickname: String?\n}",
2126        );
2127        let result = db.apply_schema(&desired).await.unwrap();
2128        assert!(result.applied);
2129
2130        let reopened = Omnigraph::open(uri).await.unwrap();
2131        let rows = table_rows_json(&reopened, "node:Person").await;
2132        assert_eq!(rows.len(), 1);
2133        assert_eq!(rows[0]["name"], "Alice");
2134        assert_eq!(rows[0]["age"], 30);
2135        assert!(rows[0]["nickname"].is_null());
2136        assert!(
2137            reopened.catalog().node_types["Person"]
2138                .properties
2139                .contains_key("nickname")
2140        );
2141        assert!(dir.path().join("_schema.pg").exists());
2142    }
2143
2144    #[tokio::test]
2145    async fn test_apply_schema_renames_property_and_preserves_values() {
2146        let dir = tempfile::tempdir().unwrap();
2147        let uri = dir.path().to_str().unwrap();
2148        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2149        seed_person_row(&mut db, "Alice", Some(30)).await;
2150
2151        let desired = TEST_SCHEMA.replace(
2152            "    age: I32?\n}",
2153            "    years: I32? @rename_from(\"age\")\n}",
2154        );
2155        db.apply_schema(&desired).await.unwrap();
2156
2157        let reopened = Omnigraph::open(uri).await.unwrap();
2158        let rows = table_rows_json(&reopened, "node:Person").await;
2159        assert_eq!(rows[0]["name"], "Alice");
2160        assert_eq!(rows[0]["years"], 30);
2161        assert!(rows[0].get("age").is_none());
2162    }
2163
2164    #[tokio::test]
2165    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2166        let dir = tempfile::tempdir().unwrap();
2167        let uri = dir.path().to_str().unwrap();
2168        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2169        seed_person_row(&mut db, "Alice", Some(30)).await;
2170        let before_version = db.snapshot().await.version();
2171
2172        let desired = TEST_SCHEMA
2173            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2174            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2175            .replace(
2176                "edge WorksAt: Person -> Company",
2177                "edge WorksAt: Human -> Company",
2178            );
2179        db.apply_schema(&desired).await.unwrap();
2180
2181        let head = db.snapshot().await;
2182        assert!(head.entry("node:Person").is_none());
2183        assert!(head.entry("node:Human").is_some());
2184        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2185            .await
2186            .unwrap();
2187        assert!(historical.entry("node:Person").is_some());
2188        assert!(historical.entry("node:Human").is_none());
2189    }
2190
2191    #[tokio::test]
2192    async fn test_apply_schema_succeeds_after_load() {
2193        // Historical: schema apply used to be blocked by leftover
2194        // `__run__` branches. A defense-in-depth filter now skips
2195        // internal system branches, and run branches were made
2196        // ephemeral on every terminal state — so in practice no
2197        // `__run__` branch survives publish. The filter still guards
2198        // the invariant.
2199        let dir = tempfile::tempdir().unwrap();
2200        let uri = dir.path().to_str().unwrap();
2201        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2202
2203        crate::loader::load_jsonl(
2204            &mut db,
2205            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2206            crate::loader::LoadMode::Overwrite,
2207        )
2208        .await
2209        .unwrap();
2210
2211        let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2212        assert!(
2213            !all_branches.iter().any(|b| is_internal_run_branch(b)),
2214            "run branch should be deleted after publish, got: {:?}",
2215            all_branches
2216        );
2217
2218        let desired = TEST_SCHEMA.replace(
2219            "    age: I32?\n}",
2220            "    age: I32?\n    nickname: String?\n}",
2221        );
2222        let result = db.apply_schema(&desired).await.unwrap();
2223        assert!(result.applied, "schema apply should have applied");
2224    }
2225
2226    #[tokio::test]
2227    async fn test_apply_schema_adds_index_for_existing_property() {
2228        let dir = tempfile::tempdir().unwrap();
2229        let uri = dir.path().to_str().unwrap();
2230        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2231
2232        let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2233        db.apply_schema(&desired).await.unwrap();
2234
2235        let snapshot = db.snapshot().await;
2236        let ds = snapshot.open("node:Person").await.unwrap();
2237        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2238    }
2239
2240    #[tokio::test]
2241    async fn test_apply_schema_rewrite_preserves_existing_indices() {
2242        let dir = tempfile::tempdir().unwrap();
2243        let uri = dir.path().to_str().unwrap();
2244        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2245        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2246        seed_person_row(&mut db, "Alice", Some(30)).await;
2247
2248        let desired = initial_schema.replace(
2249            "    age: I32?\n}",
2250            "    age: I32?\n    nickname: String?\n}",
2251        );
2252        db.apply_schema(&desired).await.unwrap();
2253
2254        let snapshot = db.snapshot().await;
2255        let ds = snapshot.open("node:Person").await.unwrap();
2256        assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
2257        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2258    }
2259
2260    #[tokio::test]
2261    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2262        let dir = tempfile::tempdir().unwrap();
2263        let uri = dir.path().to_str().unwrap();
2264        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2265        let mut db = db;
2266        db.coordinator
2267            .write()
2268            .await
2269            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2270            .await
2271            .unwrap();
2272
2273        let err = db
2274            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2275            .await
2276            .unwrap_err();
2277        assert!(
2278            err.to_string()
2279                .contains("write is unavailable while schema apply is in progress")
2280        );
2281    }
2282
2283    #[tokio::test]
2284    async fn test_commit_updates_rejects_while_schema_apply_locked() {
2285        let dir = tempfile::tempdir().unwrap();
2286        let uri = dir.path().to_str().unwrap();
2287        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2288        db.coordinator
2289            .write()
2290            .await
2291            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2292            .await
2293            .unwrap();
2294
2295        let err = db.commit_updates(&[]).await.unwrap_err();
2296        assert!(
2297            err.to_string()
2298                .contains("write commit is unavailable while schema apply is in progress")
2299        );
2300    }
2301
2302    #[tokio::test]
2303    async fn test_branch_list_hides_schema_apply_lock_branch() {
2304        let dir = tempfile::tempdir().unwrap();
2305        let uri = dir.path().to_str().unwrap();
2306        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2307        db.coordinator
2308            .write()
2309            .await
2310            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2311            .await
2312            .unwrap();
2313
2314        let branches = db.branch_list().await.unwrap();
2315        assert_eq!(branches, vec!["main".to_string()]);
2316    }
2317}