Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::{PropType, ScalarType};
20use omnigraph_compiler::{
21    DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22    build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::storage_layer::SnapshotHandle;
30use crate::table_store::TableStore;
31
32mod export;
33mod optimize;
34mod repair;
35mod schema_apply;
36mod table_ops;
37
38pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
39pub use repair::{
40    RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
41};
42pub use schema_apply::SchemaApplyOptions;
43pub use table_ops::PendingIndex;
44
45use super::commit_graph::GraphCommit;
46use super::manifest::{
47    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
48    table_path_for_table_key,
49};
50use super::schema_state::{
51    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
52    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
53    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
54    write_schema_contract, write_schema_contract_staging,
55};
56use super::{
57    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
58    is_schema_apply_lock_branch,
59};
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum MergeOutcome {
63    AlreadyUpToDate,
64    FastForward,
65    Merged,
66}
67
68#[derive(Debug, Clone)]
69pub struct SchemaApplyResult {
70    pub supported: bool,
71    pub applied: bool,
72    pub manifest_version: u64,
73    pub steps: Vec<SchemaMigrationStep>,
74}
75
76#[derive(Debug, Clone)]
77pub struct SchemaApplyPreview {
78    pub plan: SchemaMigrationPlan,
79    pub catalog: Catalog,
80}
81
82/// Top-level handle to an Omnigraph database.
83///
84/// An Omnigraph is a Lance-native graph database with git-style branching.
85/// It stores typed property graphs as per-type Lance datasets coordinated
86/// through a Lance manifest table.
87pub struct Omnigraph {
88    root_uri: String,
89    storage: Arc<dyn StorageAdapter>,
90    /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
91    /// this so engine write APIs can be `&self` (the HTTP server's
92    /// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
93    /// calls without a global write lock). Reads (`snapshot`, `version`,
94    /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
95    /// `list_commits`, …) acquire `.read().await` and parallelize.
96    /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
97    /// `record_*`) acquire `.write().await` and serialize. The atomic
98    /// commit invariant — `commit_manifest_updates` followed by
99    /// `record_graph_commit` must be atomic — is preserved by the
100    /// single `.write()` covering both calls inside
101    /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
102    /// converted from `Mutex` to `RwLock` because the bench showed
103    /// the Mutex was the dominant serializer for disjoint-table
104    /// workloads. Lock acquisition order: always before `runtime_cache`
105    /// (when both are needed in one scope).
106    coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
107    table_store: TableStore,
108    runtime_cache: RuntimeCache,
109    /// Per-graph read caches: one shared Lance `Session` plus the held-`Dataset`
110    /// handle cache, handed to live-Branch-read snapshots (via
111    /// `resolved_target`) so table opens reuse handles (0 IO on a warm repeat)
112    /// and one session. Invalidated alongside `runtime_cache` on branch switch /
113    /// refresh — hygiene only; version-in-key carries correctness.
114    read_caches: Arc<crate::runtime_cache::ReadCaches>,
115    /// Read-heavy on every query, written only by `apply_schema`. ArcSwap
116    /// gives atomic pointer swap with zero-cost reads (`load()` returns a
117    /// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
118    /// don't contend on a lock to read the catalog.
119    catalog: Arc<ArcSwap<Catalog>>,
120    /// Read-heavy on schema introspection paths, written only by
121    /// `apply_schema`. Same ArcSwap rationale as `catalog`.
122    schema_source: Arc<ArcSwap<String>>,
123    /// Per-`(table_key, branch)` writer queues — the engine's
124    /// write-serialization mechanism (the server holds the engine as a
125    /// lockless `Arc<Omnigraph>`). Reachable from engine internals
126    /// (mutation finalize, schema_apply, branch_merge, ensure_indices,
127    /// delete_where, the fork path, recovery reconciler).
128    write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
129    /// Process-wide mutex held across the swap → operate → restore window
130    /// in `branch_merge_impl`. Two concurrent merges with distinct targets
131    /// would otherwise interleave their three separate
132    /// `coordinator.write().await` acquisitions, leaving each merge's
133    /// inner body running against the other's swapped coord. Pinned by
134    /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
135    /// in `crates/omnigraph-server/tests/server.rs`.
136    ///
137    /// Cost: serializes ALL concurrent branch merges process-wide.
138    /// Acceptable because branch merges are heavy (table rewrites, index
139    /// rebuilds), per-(table, branch) queues inside `commit_all` already
140    /// serialize the data path, and merges are rare relative to /change
141    /// or /ingest. A finer-grained per-target-branch mutex is a follow-up
142    /// if telemetry shows merge concurrency matters.
143    ///
144    /// The deeper fix — refactor `branch_merge_on_current_target` to take
145    /// an explicit target coord parameter so `self.coordinator` is never
146    /// used as scratch space — is the round-1 shape applied to
147    /// `branch_create_from_impl`. Deferred because it requires unwinding
148    /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()`
149    /// call inside the merge body.
150    merge_exclusive: Arc<tokio::sync::Mutex<()>>,
151    /// Optional policy checker for engine-layer enforcement (MR-722).
152    /// `None` = no enforcement; mutating methods are unconditionally
153    /// allowed (this is the embedded/dev default). `Some` = every
154    /// mutating method calls `self.enforce(action, scope, actor)` at
155    /// entry; denial returns `OmniError::Policy`.
156    ///
157    /// Per chassis design (see `omnigraph_policy::PolicyChecker`), the
158    /// trait surface is deliberately coarse — action × scope × actor.
159    /// Per-row / per-type / per-column scope lives at the query layer
160    /// (MR-725), which extends the same trait with a different method.
161    /// Don't be tempted to add per-row enforcement here.
162    ///
163    /// Set via `with_policy(checker)` after construction. Today only
164    /// `apply_schema_as` consults this field (PR #2 proof-of-concept);
165    /// PR #3 fans the `enforce()` call out to the remaining writers.
166    policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
167    /// Lazily-built, reused-across-queries embedding client. Built on the first
168    /// `nearest($v, "string")` that needs server-side embedding (so a graph that
169    /// never embeds needs no provider key), then shared by every later query —
170    /// avoids the per-query `from_env()` rebuild and keeps the provider HTTP
171    /// connection pool warm. `OnceCell` guarantees a single initialization.
172    embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
173    /// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from an
174    /// applied cluster `providers.embedding` profile via [`Omnigraph::with_embedding_config`].
175    /// When set, the embedding cell builds its client from this instead of
176    /// `EmbeddingClient::from_env()`; `None` keeps the env fallback.
177    embedding_config: Option<Arc<crate::embedding::EmbeddingConfig>>,
178}
179
180/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
181///
182/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
183/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
184/// inspection — should not trigger writes (they may run with read-only
185/// object-store credentials, and silent open-time mutations are
186/// surprising). They also don't need recovery: reads always resolve
187/// through the manifest pin, which is the consistent snapshot regardless
188/// of any Phase B → Phase C drift on the per-table side.
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum OpenMode {
191    /// Run the recovery sweep on open. Default for `Omnigraph::open`.
192    ReadWrite,
193    /// Skip the recovery sweep. Use for read-only consumers via
194    /// [`Omnigraph::open_read_only`].
195    ReadOnly,
196}
197
198/// Options for [`Omnigraph::init_with_options`].
199///
200/// `force` controls the safety preflight that prevents an
201/// accidental re-init from overwriting an existing graph's schema
202/// metadata. Default behavior (`force: false`) fails fast with
203/// [`OmniError::AlreadyInitialized`] if any of `_schema.pg`,
204/// `_schema.ir.json`, or `__schema_state.json` already exists at
205/// the target URI. With `force: true` the preflight is skipped —
206/// existing schema files are overwritten in place. Force does NOT
207/// purge old Lance datasets or `__manifest/`; reclaiming those
208/// still requires deleting the graph directory by hand (or via a
209/// future `DELETE /graphs/{id}`).
210#[derive(Debug, Clone, Copy, Default)]
211pub struct InitOptions {
212    /// Skip the existing-graph preflight. Operators set this when
213    /// they actually mean to overwrite — e.g. `omnigraph init --force`.
214    pub force: bool,
215}
216
217impl Omnigraph {
218    /// Create a new graph at `uri` from schema source.
219    ///
220    /// Strict mode: errors with [`OmniError::AlreadyInitialized`] if
221    /// `uri` already holds any of the three schema artifacts. To
222    /// overwrite an existing graph deliberately, call
223    /// [`Self::init_with_options`] with `InitOptions { force: true }`.
224    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
225        Self::init_with_options(uri, schema_source, InitOptions::default()).await
226    }
227
228    /// Create a new graph at `uri`, with explicit init-time options.
229    ///
230    /// See [`InitOptions`] for the safety contract — by default this
231    /// behaves identically to [`Self::init`].
232    pub async fn init_with_options(
233        uri: &str,
234        schema_source: &str,
235        options: InitOptions,
236    ) -> Result<Self> {
237        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
238    }
239
240    pub(crate) async fn init_with_storage(
241        uri: &str,
242        schema_source: &str,
243        storage: Arc<dyn StorageAdapter>,
244        options: InitOptions,
245    ) -> Result<Self> {
246        let root = normalize_root_uri(uri)?;
247
248        // Preflight: refuse to clobber an existing graph unless the
249        // operator passed `force`. This runs BEFORE any parse or
250        // write so a misdirected `init` against an existing graph
251        // URI cannot reach a code path that overwrites or, on a
252        // later cleanup, deletes the schema files.
253        //
254        // Closes the "init is destructive against existing state"
255        // class: there is no longer a code path where strict-mode
256        // `init` can mutate a populated graph root.
257        if !options.force {
258            for candidate in [
259                schema_source_uri(&root),
260                schema_ir_uri(&root),
261                schema_state_uri(&root),
262            ] {
263                if storage.exists(&candidate).await? {
264                    return Err(OmniError::AlreadyInitialized { uri: root.clone() });
265                }
266            }
267        }
268
269        let schema_ir = read_schema_ir_from_source(schema_source)?;
270        let mut catalog = build_catalog_from_ir(&schema_ir)?;
271        fixup_blob_schemas(&mut catalog);
272
273        // Establish an atomic ownership claim on `_schema.pg` before
274        // writing the remaining init artifacts. A check-then-write preflight
275        // is not enough under concurrent `init` calls: two callers can both
276        // observe an empty root, one can successfully initialize, and the
277        // loser can then fail in Lance `WriteMode::Create`. Only the caller
278        // that atomically created `_schema.pg` may clean up schema artifacts
279        // on later failure.
280        let schema_pg_claimed = if options.force {
281            false
282        } else {
283            let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
284            if !storage
285                .write_text_if_absent(&schema_path, schema_source)
286                .await?
287            {
288                return Err(OmniError::AlreadyInitialized { uri: root.clone() });
289            }
290            if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
291                best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
292                return Err(err);
293            }
294            true
295        };
296
297        // Run the I/O phase. On any error, best-effort-clean schema
298        // artifacts only when this invocation owns them: strict mode owns
299        // them after the atomic `_schema.pg` claim above; force mode owns
300        // destructive overwrite semantics by explicit operator request.
301        //
302        // Coverage gap: Lance per-type datasets and `__manifest/`
303        // directory created by `GraphCoordinator::init` are NOT cleaned
304        // up here — fully recursive directory deletion requires a
305        // `StorageAdapter::delete_prefix` primitive that's deferred
306        // along with `DELETE /graphs/{id}` (PR 2b in the MR-668 plan
307        // is currently deferred). If `init` fails after coordinator
308        // init succeeds, operators may need to remove the graph
309        // directory manually before retrying `init` on the same URI.
310        // Documented in the PR 2a commit message and `init` rustdoc.
311        let coordinator = match init_storage_phase(
312            &root,
313            schema_source,
314            &schema_ir,
315            &catalog,
316            &storage,
317            !schema_pg_claimed,
318        )
319        .await
320        {
321            Ok(coordinator) => coordinator,
322            Err(err) => {
323                if schema_pg_claimed || options.force {
324                    best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
325                }
326                return Err(err);
327            }
328        };
329
330        Ok(Self {
331            root_uri: root.clone(),
332            storage,
333            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
334            table_store: TableStore::new(&root),
335            runtime_cache: RuntimeCache::default(),
336            // One shared Session per graph (LanceDB's one-session-per-connection
337            // model) plus the held-handle cache, created once and reused across
338            // reads. Session::default() caps are lazy (6 GiB index / 1 GiB
339            // metadata); multi-graph cap/sharing is a deferred follow-up.
340            read_caches: Arc::new(crate::runtime_cache::ReadCaches {
341                session: Arc::new(lance::session::Session::default()),
342                handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
343            }),
344            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
345            schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
346            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
347            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
348            policy: None,
349            embedding: Arc::new(tokio::sync::OnceCell::new()),
350            embedding_config: None,
351        })
352    }
353
354    /// Open an existing graph (read-write).
355    ///
356    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
357    /// Runs the open-time recovery sweep before returning — see [`OpenMode`].
358    pub async fn open(uri: &str) -> Result<Self> {
359        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
360    }
361
362    /// Open an existing graph for read-only consumers (NDJSON export,
363    /// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
364    pub async fn open_read_only(uri: &str) -> Result<Self> {
365        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
366    }
367
368    /// Open with a caller-supplied [`StorageAdapter`]. Used by init/test paths
369    /// and by embedding/test consumers that wrap storage (e.g. a counting
370    /// decorator for IO-budget tests). Defaults to `OpenMode::ReadWrite`.
371    pub async fn open_with_storage(uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
372        Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
373    }
374
375    pub(crate) async fn open_with_storage_and_mode(
376        uri: &str,
377        storage: Arc<dyn StorageAdapter>,
378        mode: OpenMode,
379    ) -> Result<Self> {
380        let root = normalize_root_uri(uri)?;
381        // Apply pending internal-schema migrations before the coordinator reads
382        // branch state, so `branch_list` and the schema-apply blocking-branch
383        // checks observe the post-migration graph — notably the v2→v3 sweep of
384        // legacy `__run__*` staging branches (MR-770). ReadWrite only: a
385        // read-only open must not trigger object-store writes, so a read-only
386        // open of an unmigrated legacy graph still lists `__run__*` until its
387        // first read-write open (an accepted, documented limitation).
388        if matches!(mode, OpenMode::ReadWrite) {
389            crate::db::manifest::migrate_on_open(&root).await?;
390        }
391        // Open the coordinator first so the schema-staging recovery sweep can
392        // compare its snapshot against any leftover staging files.
393        let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
394        // Both the schema-state recovery sweep AND the manifest-drift
395        // recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
396        // consumers (NDJSON export, `commit list`, schema show) shouldn't
397        // trigger object-store mutations: they may run with read-only
398        // credentials, and silent open-time writes are surprising. Both
399        // sweeps' work is recoverable on the next ReadWrite open, so
400        // skipping under ReadOnly doesn't lose any safety guarantees —
401        // the manifest pin is the consistent snapshot regardless of
402        // drift on the per-table side or leftover schema-staging files.
403        if matches!(mode, OpenMode::ReadWrite) {
404            let schema_state_recovery =
405                recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
406                    .await?;
407            // Recovery sweep: close the Phase B → Phase C residual on
408            // any sidecar left over from a crashed writer. Long-running
409            // processes additionally converge in-process: the staged-
410            // write entry points and `refresh` run the roll-forward-only
411            // heal (`heal_pending_sidecars_roll_forward`); only
412            // rollback-eligible sidecars wait for this open-time sweep.
413            crate::db::manifest::recover_manifest_drift(
414                &root,
415                Arc::clone(&storage),
416                &mut coordinator,
417                crate::db::manifest::RecoveryMode::Full,
418                schema_state_recovery,
419            )
420            .await?;
421        }
422        // Read _schema.pg (post-recovery — may have just been renamed in).
423        let schema_path = schema_source_uri(&root);
424        let schema_source = storage.read_text(&schema_path).await?;
425        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
426        let branches = coordinator.branch_list().await?;
427        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
428            &root,
429            Arc::clone(&storage),
430            &branches,
431            &current_source_ir,
432        )
433        .await?;
434        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
435        fixup_blob_schemas(&mut catalog);
436
437        Ok(Self {
438            root_uri: root.clone(),
439            storage,
440            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
441            table_store: TableStore::new(&root),
442            runtime_cache: RuntimeCache::default(),
443            // One shared Session per graph (LanceDB's one-session-per-connection
444            // model) plus the held-handle cache, created once and reused across
445            // reads. Session::default() caps are lazy (6 GiB index / 1 GiB
446            // metadata); multi-graph cap/sharing is a deferred follow-up.
447            read_caches: Arc::new(crate::runtime_cache::ReadCaches {
448                session: Arc::new(lance::session::Session::default()),
449                handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
450            }),
451            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
452            schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
453            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
454            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
455            policy: None,
456            embedding: Arc::new(tokio::sync::OnceCell::new()),
457            embedding_config: None,
458        })
459    }
460
461    /// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
462    /// catalog pointer; callers can hold the returned `Arc` across awaits
463    /// without blocking concurrent `apply_schema`.
464    pub fn catalog(&self) -> Arc<Catalog> {
465        self.catalog.load_full()
466    }
467
468    /// Returns an `Arc<String>` snapshot of the schema source.
469    pub fn schema_source(&self) -> Arc<String> {
470        self.schema_source.load_full()
471    }
472
473    /// Atomically swap the in-memory catalog. Concurrent readers see
474    /// either the old or the new pointer; never a torn state. Used by
475    /// `apply_schema` and `reload_schema_if_source_changed`.
476    pub(crate) fn store_catalog(&self, catalog: Catalog) {
477        self.catalog.store(Arc::new(catalog));
478    }
479
480    /// Atomically swap the in-memory schema source. Same rationale as
481    /// [`store_catalog`](Self::store_catalog).
482    pub(crate) fn store_schema_source(&self, schema_source: String) {
483        self.schema_source.store(Arc::new(schema_source));
484    }
485
486    pub fn uri(&self) -> &str {
487        &self.root_uri
488    }
489
490    /// Install a policy checker for engine-layer enforcement (MR-722).
491    /// Builder-style setter — consumes `self`, returns `Self`. Calling
492    /// this on a `Omnigraph` previously without policy enables
493    /// `enforce()` to fire at every mutating engine method that's been
494    /// wired to call it (currently `apply_schema_as`; PR #3 fans out to
495    /// the remaining writers).
496    ///
497    /// Embedded callers that don't care about authorization should
498    /// just not call this. Server / CLI callers that have loaded a
499    /// `PolicyEngine` from `policy.yaml` pass it here.
500    pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
501        self.policy = Some(checker);
502        self
503    }
504
505    /// The lazily-initialized, reused-across-queries embedding client cell
506    /// (see the `embedding` field doc). The query executor resolves the client
507    /// through this on the first `nearest($v, "string")` that needs embedding.
508    pub(crate) fn embedding_cell(
509        &self,
510    ) -> &tokio::sync::OnceCell<crate::embedding::EmbeddingClient> {
511        &self.embedding
512    }
513
514    /// Install a pre-resolved embedding config (RFC-012 Phase 5). Builder-style,
515    /// mirroring [`Omnigraph::with_policy`]: a graph served from a cluster
516    /// embedding provider profile injects it here; an embedded/CLI caller that doesn't
517    /// call this keeps the `EmbeddingClient::from_env()` fallback.
518    pub fn with_embedding_config(mut self, config: Arc<crate::embedding::EmbeddingConfig>) -> Self {
519        self.embedding_config = Some(config);
520        self
521    }
522
523    /// The injected embedding config, if any (see the `embedding_config` field).
524    pub(crate) fn embedding_config_ref(&self) -> Option<&crate::embedding::EmbeddingConfig> {
525        self.embedding_config.as_deref()
526    }
527
528    /// Engine-layer policy enforcement gate (MR-722 chassis core).
529    ///
530    /// * If no policy is installed → no-op (returns `Ok(())`).
531    /// * If policy is installed AND actor is None → denial with a
532    ///   clear "no actor for engine-layer policy check" message.
533    ///   Forces server / CLI / SDK callers to thread an actor through
534    ///   when policy is configured — silent bypass via "I forgot the
535    ///   actor" is exactly the footgun this gate is here to prevent.
536    /// * If policy is installed AND actor is Some → call
537    ///   `PolicyChecker::check(action, scope, actor)`; map denial /
538    ///   internal failure to `OmniError::Policy(...)`.
539    pub(crate) fn enforce(
540        &self,
541        action: omnigraph_policy::PolicyAction,
542        scope: &omnigraph_policy::ResourceScope,
543        actor: Option<&str>,
544    ) -> Result<()> {
545        let Some(checker) = self.policy.as_ref() else {
546            return Ok(());
547        };
548        let Some(actor) = actor else {
549            return Err(OmniError::Policy(
550                "no actor for engine-layer policy check (policy is configured but the call site \
551                 didn't thread an actor through — this is almost certainly a bug, not an \
552                 intended bypass)"
553                    .to_string(),
554            ));
555        };
556        checker
557            .check(action, scope, actor)
558            .map_err(|err| OmniError::Policy(err.to_string()))
559    }
560
561    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
562        // Full per-call validation is intentional: a long-lived handle must
563        // detect external drift of the schema source, IR, OR state on its next
564        // operation (see lifecycle::long_lived_handle_rejects_schema_* tests). A
565        // source-only fast path would miss IR/state drift when _schema.pg is
566        // unchanged, so the only safe latency win is not calling this twice per
567        // query (finding A removes the redundant caller in exec/query.rs).
568        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
569    }
570
571    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
572        self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
573            .await
574    }
575
576    pub async fn plan_schema_with_options(
577        &self,
578        desired_schema_source: &str,
579        options: SchemaApplyOptions,
580    ) -> Result<SchemaMigrationPlan> {
581        schema_apply::plan_schema(self, desired_schema_source, options).await
582    }
583
584    pub async fn preview_schema_apply_with_options(
585        &self,
586        desired_schema_source: &str,
587        options: SchemaApplyOptions,
588    ) -> Result<SchemaApplyPreview> {
589        schema_apply::preview_schema_apply(self, desired_schema_source, options).await
590    }
591
592    pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
593        self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
594            .await
595    }
596
597    pub async fn apply_schema_with_options(
598        &self,
599        desired_schema_source: &str,
600        options: SchemaApplyOptions,
601    ) -> Result<SchemaApplyResult> {
602        self.apply_schema_as(desired_schema_source, options, None)
603            .await
604    }
605
606    /// Apply a schema migration with an explicit actor for engine-layer
607    /// policy enforcement (MR-722). When a `PolicyChecker` is installed
608    /// via [`Self::with_policy`], this method calls `enforce(SchemaApply,
609    /// Branch("main"), actor)` before any apply work happens. Denial
610    /// returns `OmniError::Policy` and leaves the manifest untouched.
611    ///
612    /// The no-actor variants (`apply_schema`, `apply_schema_with_options`)
613    /// pass `None` here. They work fine without a policy; if a policy IS
614    /// installed and actor is None, enforcement intentionally fails to
615    /// prevent silent-bypass-via-forgetting-the-actor footguns.
616    pub async fn apply_schema_as(
617        &self,
618        desired_schema_source: &str,
619        options: SchemaApplyOptions,
620        actor: Option<&str>,
621    ) -> Result<SchemaApplyResult> {
622        self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
623            .await
624    }
625
626    pub async fn apply_schema_as_with_catalog_check<F>(
627        &self,
628        desired_schema_source: &str,
629        options: SchemaApplyOptions,
630        actor: Option<&str>,
631        validate_catalog: F,
632    ) -> Result<SchemaApplyResult>
633    where
634        F: FnOnce(&Catalog) -> Result<()>,
635    {
636        schema_apply::apply_schema(
637            self,
638            desired_schema_source,
639            options,
640            actor,
641            validate_catalog,
642        )
643        .await
644    }
645
646    pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
647        schema_apply::ensure_schema_apply_idle(self, operation).await
648    }
649
650    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
651        schema_apply::ensure_schema_apply_not_locked(self, operation).await
652    }
653
654    /// Engine-facing trait surface around `TableStore`.
655    ///
656    /// This is the **only** accessor for engine code reaching into the
657    /// storage layer. The trait's signatures use opaque `SnapshotHandle`
658    /// / `StagedHandle` instead of leaking `lance::Dataset` /
659    /// `lance::dataset::transaction::Transaction`, so newly-added engine
660    /// call sites cannot drift the staged-write invariant by mistake
661    /// (the trait's `stage_*` + `commit_staged` pair is the only way to
662    /// land a write).
663    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
664        &self.table_store
665    }
666
667    /// Inline-commit residual surface (`delete_where`,
668    /// `create_vector_index`) — the writes Lance cannot yet express as a
669    /// stage-then-commit pair. Deliberately separate from [`Self::storage`] so
670    /// the default storage surface is staged-only and a new writer cannot couple
671    /// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only
672    /// the handful of documented residual call sites (mutation/merge deletes,
673    /// vector-index build) use this accessor. See
674    /// `crate::storage_layer::InlineCommitResidual` for the per-method blocker.
675    pub(crate) fn storage_inline_residual(
676        &self,
677    ) -> &dyn crate::storage_layer::InlineCommitResidual {
678        &self.table_store
679    }
680
681    /// Engine-level access to the object-store adapter (S3 / local fs).
682    /// Used by the recovery sidecar protocol — writers in the engine
683    /// call this to write/delete sidecars at `__recovery/{ulid}.json`.
684    pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
685        self.storage.as_ref()
686    }
687
688    /// Per-`(table_key, branch)` writer queues.
689    ///
690    /// Engine-internal writers (mutation finalize, schema_apply,
691    /// branch_merge, ensure_indices, delete_where) and the future MR-870
692    /// recovery reconciler reach the queue manager via this accessor.
693    /// Returns an `Arc` clone so callers can hold the manager across
694    /// `&mut self` engine API boundaries.
695    pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
696        Arc::clone(&self.write_queue)
697    }
698
699    /// Engine-internal access to the merge-exclusive mutex. Held across
700    /// the swap → operate → restore window in `branch_merge_impl` so
701    /// concurrent merges with distinct targets don't corrupt
702    /// `self.coordinator` mid-operation. See the field doc on
703    /// `Omnigraph::merge_exclusive` for the full design rationale.
704    pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
705        Arc::clone(&self.merge_exclusive)
706    }
707
708    /// Engine-level access to the graph's normalized root URI. Used by
709    /// the recovery sidecar protocol to compute `__recovery/` paths.
710    pub(crate) fn root_uri(&self) -> &str {
711        &self.root_uri
712    }
713
714    pub(crate) async fn open_coordinator_for_branch(
715        &self,
716        branch: Option<&str>,
717    ) -> Result<GraphCoordinator> {
718        match branch {
719            Some(branch) => {
720                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
721            }
722            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
723        }
724    }
725
726    pub(crate) async fn swap_coordinator_for_branch(
727        &self,
728        branch: Option<&str>,
729    ) -> Result<GraphCoordinator> {
730        let next = self.open_coordinator_for_branch(branch).await?;
731        let mut coord = self.coordinator.write().await;
732        Ok(std::mem::replace(&mut *coord, next))
733    }
734
735    pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
736        *self.coordinator.write().await = coordinator;
737    }
738
739    pub(crate) async fn resolved_branch_target(
740        &self,
741        branch: Option<&str>,
742    ) -> Result<ResolvedTarget> {
743        self.ensure_schema_state_valid().await?;
744        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
745        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
746        let coord = self.coordinator.read().await;
747        if normalized.as_deref() == coord.current_branch() {
748            let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
749                SnapshotId::synthetic(
750                    coord.current_branch(),
751                    coord.version(),
752                    coord.manifest_incarnation().e_tag.as_deref(),
753                )
754            });
755            return Ok(ResolvedTarget {
756                requested,
757                branch: coord.current_branch().map(str::to_string),
758                snapshot_id,
759                snapshot: coord.snapshot(),
760            });
761        }
762        coord.resolve_target(&requested).await
763    }
764
765    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
766        self.resolved_branch_target(branch)
767            .await
768            .map(|resolved| resolved.snapshot)
769    }
770
771    pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
772        self.ensure_schema_state_valid().await?;
773        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
774        let coord = self.coordinator.read().await;
775        coord
776            .resolve_target(&requested)
777            .await
778            .map(|resolved| resolved.snapshot)
779    }
780
781    pub(crate) async fn version(&self) -> u64 {
782        self.coordinator.read().await.version()
783    }
784
785    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
786    pub(crate) async fn snapshot(&self) -> Snapshot {
787        self.coordinator.read().await.snapshot()
788    }
789
790    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
791        self.resolved_target(target)
792            .await
793            .map(|resolved| resolved.snapshot)
794    }
795
796    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
797        self.snapshot_of(target)
798            .await
799            .map(|snapshot| snapshot.version())
800    }
801
802    pub async fn resolved_branch_of(
803        &self,
804        target: impl Into<ReadTarget>,
805    ) -> Result<Option<String>> {
806        self.resolved_target(target)
807            .await
808            .map(|resolved| resolved.branch)
809    }
810
811    /// Synchronize this handle's write base to the latest head of the named branch.
812    pub async fn sync_branch(&self, branch: &str) -> Result<()> {
813        self.ensure_schema_state_valid().await?;
814        let branch = normalize_branch_name(branch)?;
815        let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
816        *self.coordinator.write().await = next;
817        self.invalidate_read_caches().await;
818        Ok(())
819    }
820
821    async fn invalidate_read_caches(&self) {
822        self.runtime_cache.invalidate_all().await;
823        self.read_caches.handles.invalidate_all().await;
824    }
825
826    /// Re-read the handle-local coordinator state from storage AND run
827    /// in-process recovery. Closes the Phase B → Phase C residual (e.g.
828    /// `MutationStaging::finalize` crash mid-publish in a long-running
829    /// server) without restart.
830    ///
831    /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
832    /// recovery sequence, in the same order, with one restriction: the
833    /// manifest-drift heal runs in `RollForwardOnly` mode (rollback /
834    /// abort cases defer to the next ReadWrite open because
835    /// `Dataset::restore` is unsafe under concurrency). Each step:
836    ///
837    /// 1. `coordinator.refresh()` — re-read manifest.
838    /// 2. `recover_schema_state_files` — complete an in-flight
839    ///    schema_apply's staging→final rename if a SchemaApply sidecar
840    ///    is on disk; idempotent + early-returns when no staging files
841    ///    exist. Required BEFORE manifest-drift recovery so a
842    ///    SchemaApply roll-forward doesn't publish the manifest while
843    ///    the staging files remain unrenamed (which would corrupt the
844    ///    graph: data on new schema, catalog on old).
845    /// 3. `heal_pending_sidecars_roll_forward` — close the
846    ///    finalize→publisher residual via roll-forward; defer rollback
847    ///    work to next ReadWrite open. Serializes against live writers
848    ///    by acquiring each sidecar's per-(table_key, branch) write
849    ///    queues, so refresh never rolls forward an in-flight writer's
850    ///    sidecar from under it.
851    /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
852    ///
853    /// Steady state cost: one `list_dir` of `__recovery/` (typically
854    /// returns empty → early return for both passes). No additional
855    /// Lance reads.
856    ///
857    /// The staged-write entry points (`load_as`, `mutate_as`) run the
858    /// same heal via
859    /// [`heal_pending_recovery_sidecars`](Self::heal_pending_recovery_sidecars),
860    /// so a long-lived server converges on the next write without an
861    /// explicit refresh. Engine-internal callers that already hold an
862    /// in-flight sidecar (e.g. `schema_apply` mid-write) MUST use
863    /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
864    /// avoid the recovery sweep racing their own sidecar.
865    pub async fn refresh(&self) -> Result<()> {
866        // Standalone schema-staging reconcile ONLY when no recovery
867        // sidecar exists (legacy/manual staging residue). When sidecars
868        // exist, the heal below owns the reconcile — per SchemaApply
869        // sidecar, under that sidecar's queue guards — because an
870        // unserialized reconcile can promote a LIVE schema apply's
871        // staging files from under it, and a pre-promoted result would
872        // make the heal's own guarded reconcile see clean staging and
873        // wrongly defer the sidecar. The no-sidecar case cannot race a
874        // live apply: its sidecar is on disk before its staging files.
875        //
876        // Scope the coord write guard to the schema-state section only.
877        // `reload_schema_if_source_changed` (below) acquires
878        // `self.coordinator.read().await` when the on-disk schema source
879        // has drifted from the cached `schema_source`. Tokio's RwLock is
880        // not reentrant, so holding the write across that call deadlocks.
881        // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`.
882        // The heal also takes the lock itself (queues → coordinator
883        // order), so it must run after this guard is released.
884        {
885            // Hold the schema-apply serialization key across the
886            // list-then-reconcile pair: without it, a live apply can
887            // write its sidecar + staging between the empty check and
888            // the reconcile (the same race, through a smaller window).
889            // Queue before coordinator — the documented lock order.
890            //
891            // Liveness note: with a pending NON-SchemaApply sidecar
892            // (e.g. a Mutation residual), this gate skips the standalone
893            // reconcile and the heal below reconciles only per
894            // SchemaApply sidecar — so pre-sidecar-era orphaned staging
895            // residue waits for the NEXT refresh after the sidecars are
896            // consumed. Convergence holds, one pass late. Do not "fix"
897            // by re-running the reconcile unserialized here: that is
898            // exactly the live-apply race this block exists to close.
899            let _serial = self
900                .write_queue
901                .acquire(&crate::db::manifest::schema_apply_serial_queue_key())
902                .await;
903            if crate::db::manifest::list_sidecars(&self.root_uri, self.storage.as_ref())
904                .await?
905                .is_empty()
906            {
907                let mut coord = self.coordinator.write().await;
908                coord.refresh().await?;
909                recover_schema_state_files(
910                    &self.root_uri,
911                    Arc::clone(&self.storage),
912                    &coord.snapshot(),
913                )
914                .await?;
915            }
916        } // ← guards released before the heal's queue acquisition
917        crate::db::manifest::heal_pending_sidecars_roll_forward(
918            &self.root_uri,
919            Arc::clone(&self.storage),
920            &self.coordinator,
921            &self.write_queue,
922        )
923        .await?;
924        self.reload_schema_if_source_changed().await?;
925        self.invalidate_read_caches().await;
926        Ok(())
927    }
928
929    /// Write-entry heal: converge any pending recovery sidecars (a
930    /// previously failed writer's Phase B → Phase C residual) before
931    /// starting a new staged write, so a long-lived process (the HTTP
932    /// server, an embedded handle) recovers on its next write instead
933    /// of wedging every write on the commit-time drift guard until
934    /// restart. Roll-forward only; rollback-eligible sidecars defer to
935    /// the next ReadWrite open exactly as [`refresh`](Self::refresh)
936    /// does.
937    ///
938    /// Steady-state cost: one `list_dir` of `__recovery/` (typically
939    /// empty → immediate return). See
940    /// `recovery::heal_pending_sidecars_roll_forward` for the
941    /// concurrency contract (per-table write-queue acquisition).
942    pub(crate) async fn heal_pending_recovery_sidecars(&self) -> Result<()> {
943        let processed = crate::db::manifest::heal_pending_sidecars_roll_forward(
944            &self.root_uri,
945            Arc::clone(&self.storage),
946            &self.coordinator,
947            &self.write_queue,
948        )
949        .await?;
950        if processed {
951            // A rolled-forward SchemaApply sidecar moved disk + manifest
952            // to the new schema (staging promoted, registrations
953            // published); the in-memory catalog must follow or the very
954            // write that triggered the heal validates against the stale
955            // schema. Same post-heal step as `refresh`.
956            self.reload_schema_if_source_changed().await?;
957            self.invalidate_read_caches().await;
958        }
959        Ok(())
960    }
961
962    async fn reload_schema_if_source_changed(&self) -> Result<()> {
963        let schema_path = schema_source_uri(&self.root_uri);
964        let schema_source = self.storage.read_text(&schema_path).await?;
965        if schema_source == *self.schema_source.load_full() {
966            return Ok(());
967        }
968        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
969        let branches = self.coordinator.read().await.branch_list().await?;
970        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
971            &self.root_uri,
972            Arc::clone(&self.storage),
973            &branches,
974            &current_source_ir,
975        )
976        .await?;
977        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
978        fixup_blob_schemas(&mut catalog);
979        self.store_schema_source(schema_source);
980        self.store_catalog(catalog);
981        Ok(())
982    }
983
984    /// Refresh coordinator state and invalidate the runtime cache WITHOUT
985    /// running the recovery sweep. Engine-internal callers that hold an
986    /// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
987    /// internal lease-check refresh) need this variant: running recovery
988    /// here would observe the caller's own sidecar, classify it as
989    /// RolledPastExpected, and roll it forward — racing the caller's
990    /// own publish path.
991    pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
992        self.coordinator.write().await.refresh().await?;
993        self.invalidate_read_caches().await;
994        Ok(())
995    }
996
997    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
998        self.ensure_schema_state_valid().await?;
999        self.coordinator
1000            .read()
1001            .await
1002            .resolve_snapshot_id(branch)
1003            .await
1004    }
1005
1006    pub(crate) async fn resolved_target(
1007        &self,
1008        target: impl Into<ReadTarget>,
1009    ) -> Result<ResolvedTarget> {
1010        self.ensure_schema_state_valid().await?;
1011        let target = target.into();
1012        let mut resolved = self.resolve_target_inner(&target).await?;
1013        // Attach the read caches (shared Session + held-handle cache) for live
1014        // Branch reads so table opens reuse handles (0 IO on a warm repeat).
1015        // Snapshot-id reads are deliberately NOT cached: they pin a historical
1016        // version `cleanup` may GC, so bypassing the cache sidesteps the
1017        // cleanup-vs-cached-handle edge. Writes never reach here (they use
1018        // `resolved_branch_target`), so they never receive a pinned handle.
1019        if matches!(target, ReadTarget::Branch(_)) {
1020            resolved
1021                .snapshot
1022                .set_read_caches(Arc::clone(&self.read_caches));
1023        }
1024        Ok(resolved)
1025    }
1026
1027    /// Resolve a read target to its snapshot, without attaching read caches.
1028    /// Same-branch reads reuse the warm coordinator, gated by a cheap version
1029    /// probe (invariant 6: strong consistency, never a blind warm read). Reads do
1030    /// not need the commit graph (the manifest version is the visibility
1031    /// authority, invariant 2), so the id is synthetic and no commit-graph scan
1032    /// happens on this path.
1033    async fn resolve_target_inner(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
1034        if let ReadTarget::Branch(branch) = target {
1035            let normalized = normalize_branch_name(branch)?;
1036            {
1037                let coord = self.coordinator.read().await;
1038                if normalized.as_deref() != coord.current_branch() {
1039                    // Different branch: cold resolve (opens that branch).
1040                    return coord.resolve_target(target).await;
1041                }
1042                let held = coord.manifest_incarnation();
1043                if coord.probe_latest_incarnation().await?.matches(&held) {
1044                    return Ok(warm_resolved_target(&coord, target));
1045                }
1046                // Stale: refresh under the write lock below.
1047            }
1048            let mut coord = self.coordinator.write().await;
1049            if normalized.as_deref() == coord.current_branch() {
1050                // Re-check after taking the write lock; another writer may have
1051                // refreshed (tokio RwLock has no read->write upgrade).
1052                let held = coord.manifest_incarnation();
1053                let mut refreshed = false;
1054                if !coord.probe_latest_incarnation().await?.matches(&held) {
1055                    coord.refresh_manifest_only().await?;
1056                    refreshed = true;
1057                }
1058                let resolved = warm_resolved_target(&coord, target);
1059                drop(coord);
1060                if refreshed {
1061                    self.invalidate_read_caches().await;
1062                }
1063                return Ok(resolved);
1064            }
1065            // Branch changed while waiting for the write lock: cold resolve.
1066            return coord.resolve_target(target).await;
1067        }
1068
1069        // Snapshot target: resolve through the commit graph as before.
1070        self.coordinator.read().await.resolve_target(target).await
1071    }
1072
1073    // ─── Change detection ────────────────────────────────────────────────
1074
1075    pub async fn diff_between(
1076        &self,
1077        from: impl Into<ReadTarget>,
1078        to: impl Into<ReadTarget>,
1079        filter: &crate::changes::ChangeFilter,
1080    ) -> Result<crate::changes::ChangeSet> {
1081        let from_resolved = self.resolved_target(from).await?;
1082        let to_resolved = self.resolved_target(to).await?;
1083        crate::changes::diff_snapshots(
1084            self.uri(),
1085            &from_resolved.snapshot,
1086            &to_resolved.snapshot,
1087            filter,
1088            to_resolved.branch.clone().or(from_resolved.branch.clone()),
1089        )
1090        .await
1091    }
1092
1093    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
1094    /// and creates branch-aware snapshots. Supports cross-branch comparison.
1095    pub async fn diff_commits(
1096        &self,
1097        from_commit_id: &str,
1098        to_commit_id: &str,
1099        filter: &crate::changes::ChangeFilter,
1100    ) -> Result<crate::changes::ChangeSet> {
1101        let coord = self.coordinator.read().await;
1102        let from_commit = coord
1103            .resolve_commit(&SnapshotId::new(from_commit_id))
1104            .await?;
1105        let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
1106        let from_snap = coord
1107            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1108                from_commit.graph_commit_id.clone(),
1109            )))
1110            .await?;
1111        let to_snap = coord
1112            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1113                to_commit.graph_commit_id.clone(),
1114            )))
1115            .await?;
1116        drop(coord);
1117        crate::changes::diff_snapshots(
1118            self.uri(),
1119            &from_snap.snapshot,
1120            &to_snap.snapshot,
1121            filter,
1122            to_snap.branch.clone().or(from_snap.branch.clone()),
1123        )
1124        .await
1125    }
1126
1127    pub async fn entity_at_target(
1128        &self,
1129        target: impl Into<ReadTarget>,
1130        table_key: &str,
1131        id: &str,
1132    ) -> Result<Option<serde_json::Value>> {
1133        export::entity_at_target(self, target, table_key, id).await
1134    }
1135
1136    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
1137    pub async fn entity_at(
1138        &self,
1139        table_key: &str,
1140        id: &str,
1141        version: u64,
1142    ) -> Result<Option<serde_json::Value>> {
1143        export::entity_at(self, table_key, id, version).await
1144    }
1145
1146    /// Create a Snapshot at any historical manifest version.
1147    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
1148        self.ensure_schema_state_valid().await?;
1149        self.coordinator
1150            .read()
1151            .await
1152            .snapshot_at_version(version)
1153            .await
1154    }
1155
1156    pub async fn export_jsonl(
1157        &self,
1158        branch: &str,
1159        type_names: &[String],
1160        table_keys: &[String],
1161    ) -> Result<String> {
1162        export::export_jsonl(self, branch, type_names, table_keys).await
1163    }
1164
1165    pub async fn export_jsonl_to_writer<W: Write>(
1166        &self,
1167        branch: &str,
1168        type_names: &[String],
1169        table_keys: &[String],
1170        writer: &mut W,
1171    ) -> Result<()> {
1172        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
1173    }
1174
1175    // ─── Graph index ──────────────────────────────────────────────────────
1176
1177    /// Get or build the graph index for the current snapshot.
1178    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
1179        table_ops::graph_index(self).await
1180    }
1181
1182    pub(crate) async fn graph_index_for_resolved(
1183        &self,
1184        resolved: &ResolvedTarget,
1185    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
1186        table_ops::graph_index_for_resolved(self, resolved).await
1187    }
1188
1189    /// Ensure BTree scalar indices exist on key columns.
1190    /// Idempotent — Lance skips if index already exists.
1191    ///
1192    /// Opens sub-tables at their latest version (not snapshot-pinned) because
1193    /// indices must be created on the current head. Any version drift from the
1194    /// snapshot is expected and logged. The resulting versions are committed
1195    /// back to the manifest.
1196    ///
1197    /// On named branches, indexing preserves lazy branching:
1198    /// unbranched subtables keep inheriting `main`, while subtables inherited
1199    /// from an ancestor branch are first forked into the active branch before
1200    /// their index metadata is updated.
1201    /// Returns the declared indexes that could not be materialized on this
1202    /// pass (today: vector columns with no trainable vectors yet). They are
1203    /// deferred, not errors; a later `ensure_indices`/`optimize` builds them
1204    /// once the column is trainable. Reads stay correct (brute-force) meanwhile.
1205    pub async fn ensure_indices(&self) -> Result<Vec<PendingIndex>> {
1206        table_ops::ensure_indices(self).await
1207    }
1208
1209    pub async fn ensure_indices_on(&self, branch: &str) -> Result<Vec<PendingIndex>> {
1210        table_ops::ensure_indices_on(self, branch).await
1211    }
1212
1213    #[cfg(feature = "failpoints")]
1214    #[doc(hidden)]
1215    pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
1216        &mut self,
1217        branch: &str,
1218        table_key: &str,
1219        table_branch: Option<&str>,
1220    ) -> Result<u64> {
1221        table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
1222            self,
1223            branch,
1224            table_key,
1225            table_branch,
1226        )
1227        .await
1228    }
1229
1230    /// Compact small Lance fragments into fewer larger ones across every
1231    /// node + edge table on `main`. See [`optimize`] for details.
1232    pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
1233        optimize::optimize_all_tables(self).await
1234    }
1235
1236    /// Classify and explicitly repair uncovered manifest/head drift. See
1237    /// [`repair`] for the distinction between safe maintenance drift and
1238    /// suspicious/unverifiable drift.
1239    pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
1240        repair::repair_all_tables(self, options).await
1241    }
1242
1243    /// Remove Lance manifests (and the fragments they uniquely own) per the
1244    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
1245    /// history. See [`optimize`] for details.
1246    pub async fn cleanup(
1247        &mut self,
1248        options: optimize::CleanupPolicyOptions,
1249    ) -> Result<Vec<optimize::TableCleanupStats>> {
1250        optimize::cleanup_all_tables(self, options).await
1251    }
1252
1253    /// Read a blob from a node by its string ID and property name.
1254    ///
1255    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
1256    /// and metadata accessors (`size()`, `kind()`, `uri()`).
1257    ///
1258    /// ```ignore
1259    /// let blob = db.read_blob("Document", "readme", "content").await?;
1260    /// let bytes = blob.read().await?;
1261    /// ```
1262    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1263        self.ensure_schema_state_valid().await?;
1264        let catalog = self.catalog();
1265        let node_type = catalog
1266            .node_types
1267            .get(type_name)
1268            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1269        if !node_type.blob_properties.contains(property) {
1270            return Err(OmniError::manifest(format!(
1271                "property '{}' on type '{}' is not a Blob",
1272                property, type_name
1273            )));
1274        }
1275
1276        let snapshot = self.snapshot().await;
1277        let table_key = format!("node:{}", type_name);
1278        let handle = self
1279            .storage()
1280            .open_snapshot_at_table(&snapshot, &table_key)
1281            .await?;
1282
1283        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1284        let row_id = self
1285            .storage()
1286            .first_row_id_for_filter(&handle, &filter_sql)
1287            .await?
1288            .ok_or_else(|| {
1289                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1290            })?;
1291
1292        // `take_blobs` is a Lance-specific blob accessor not surfaced
1293        // through the `TableStorage` trait — reach the inner `Arc<Dataset>`
1294        // via the `pub(crate)` accessor for this read-only call.
1295        let ds = handle.into_arc();
1296        let mut blobs = ds
1297            .take_blobs(&[row_id], property)
1298            .await
1299            .map_err(|e| OmniError::Lance(e.to_string()))?;
1300
1301        blobs.pop().ok_or_else(|| {
1302            OmniError::manifest(format!(
1303                "blob '{}' on {} '{}' returned no data",
1304                property, type_name, id
1305            ))
1306        })
1307    }
1308
1309    pub(crate) async fn active_branch(&self) -> Option<String> {
1310        self.coordinator
1311            .read()
1312            .await
1313            .current_branch()
1314            .map(str::to_string)
1315    }
1316
1317    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1318        let descendants = self
1319            .coordinator
1320            .read()
1321            .await
1322            .branch_descendants(branch)
1323            .await?;
1324        if let Some(descendant) = descendants.first() {
1325            return Err(OmniError::manifest_conflict(format!(
1326                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1327                branch, descendant
1328            )));
1329        }
1330
1331        for other_branch in branches
1332            .iter()
1333            .filter(|candidate| candidate.as_str() != branch)
1334        {
1335            let snapshot = self
1336                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1337                .await?;
1338            if snapshot
1339                .entries()
1340                .any(|entry| entry.table_branch.as_deref() == Some(branch))
1341            {
1342                return Err(OmniError::manifest_conflict(format!(
1343                    "cannot delete branch '{}' because branch '{}' still depends on it",
1344                    branch, other_branch
1345                )));
1346            }
1347        }
1348
1349        Ok(())
1350    }
1351
1352    /// Best-effort reclaim of the per-table Lance forks a just-deleted branch
1353    /// owned. Runs AFTER the manifest authority flip, so the branch is already
1354    /// gone and these forks are unreachable orphans. A failure here (transient
1355    /// object-store error, the `branch_delete.before_table_cleanup` failpoint)
1356    /// is logged and swallowed: the `cleanup` reconciler is the guaranteed
1357    /// backstop that converges any leftover orphan. Uses `force_delete_branch`
1358    /// so a partially-reclaimed retry is idempotent.
1359    async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1360        let mut seen_paths = HashSet::new();
1361        let mut cleanup_targets = owned_tables
1362            .iter()
1363            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1364            .cloned()
1365            .collect::<Vec<_>>();
1366        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1367
1368        for (table_key, table_path) in cleanup_targets {
1369            let dataset_uri = self.storage().dataset_uri(&table_path);
1370            let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
1371            {
1372                Ok(()) => {
1373                    self.storage()
1374                        .force_delete_branch(&dataset_uri, branch)
1375                        .await
1376                }
1377                Err(injected) => Err(injected),
1378            };
1379            if let Err(err) = outcome {
1380                tracing::warn!(
1381                    target: "omnigraph::branch_delete::cleanup",
1382                    branch = %branch,
1383                    table = %table_key,
1384                    error = %err,
1385                    "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1386                );
1387            }
1388        }
1389    }
1390
1391    async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1392        let active = self
1393            .coordinator
1394            .read()
1395            .await
1396            .current_branch()
1397            .map(str::to_string);
1398        if active.as_deref() == Some(branch) {
1399            return Err(OmniError::manifest_conflict(format!(
1400                "cannot delete currently active branch '{}'",
1401                branch
1402            )));
1403        }
1404
1405        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1406        let owned_tables = branch_snapshot
1407            .entries()
1408            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1409            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1410            .collect::<Vec<_>>();
1411
1412        // Authority flip (+ best-effort commit-graph reclaim) — must succeed.
1413        self.coordinator.write().await.branch_delete(branch).await?;
1414        // Best-effort per-table fork reclaim; cleanup reconciles any leftover.
1415        self.cleanup_deleted_branch_tables(branch, &owned_tables)
1416            .await;
1417        Ok(())
1418    }
1419
1420    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1421        normalize_branch_name(branch)
1422    }
1423
1424    pub(crate) async fn head_commit_id_for_branch(
1425        &self,
1426        branch: Option<&str>,
1427    ) -> Result<Option<String>> {
1428        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1429        coordinator.ensure_commit_graph_initialized().await?;
1430        coordinator
1431            .head_commit_id()
1432            .await
1433            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1434    }
1435
1436    pub async fn branch_create(&self, name: &str) -> Result<()> {
1437        self.branch_create_as(name, None).await
1438    }
1439
1440    /// Create a branch from the coordinator's currently-open snapshot,
1441    /// with an explicit actor for engine-layer policy enforcement
1442    /// (MR-722 fan-out). Scope is `TargetBranch(name)` — symmetric with
1443    /// `branch_delete_as`: the branch being acted upon is the target.
1444    /// Cedar rules using `target_branch_scope: protected` therefore see
1445    /// the new-branch name and can deny e.g. creating any branch named
1446    /// `main` from a non-privileged actor.
1447    pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1448        self.enforce(
1449            omnigraph_policy::PolicyAction::BranchCreate,
1450            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1451            actor,
1452        )?;
1453        self.ensure_schema_state_valid().await?;
1454        self.ensure_schema_apply_idle("branch_create").await?;
1455        ensure_public_branch_ref(name, "branch_create")?;
1456        self.coordinator.write().await.branch_create(name).await
1457    }
1458
1459    pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1460        self.branch_create_from_as(from, name, None).await
1461    }
1462
1463    /// Create a branch from a specific source branch with an explicit
1464    /// actor for engine-layer policy enforcement (MR-722 fan-out).
1465    ///
1466    /// Scope is `BranchTransition { source, target }` — matches the
1467    /// HTTP-layer convention at `server_branch_create`
1468    /// (branch=Some(from), target_branch=Some(name)), so engine and
1469    /// HTTP fire the same Cedar decision. Pinned-snapshot sources
1470    /// (which aren't a branch ref) materialize as the sentinel
1471    /// `<snapshot>` for the policy check; Cedar rules using
1472    /// `branch_scope: any` still match, rules pinning a specific
1473    /// source branch correctly do not.
1474    pub async fn branch_create_from_as(
1475        &self,
1476        from: impl Into<ReadTarget>,
1477        name: &str,
1478        actor: Option<&str>,
1479    ) -> Result<()> {
1480        let target = from.into();
1481        let source_branch = match &target {
1482            ReadTarget::Branch(b) => b.clone(),
1483            _ => "<snapshot>".to_string(),
1484        };
1485        self.enforce(
1486            omnigraph_policy::PolicyAction::BranchCreate,
1487            &omnigraph_policy::ResourceScope::BranchTransition {
1488                source: source_branch,
1489                target: name.to_string(),
1490            },
1491            actor,
1492        )?;
1493        self.ensure_schema_apply_idle("branch_create_from").await?;
1494        self.branch_create_from_impl(target, name, false).await
1495    }
1496
1497    async fn branch_create_from_impl(
1498        &self,
1499        from: impl Into<ReadTarget>,
1500        name: &str,
1501        allow_internal_refs: bool,
1502    ) -> Result<()> {
1503        let target = from.into();
1504        let ReadTarget::Branch(branch_name) = target else {
1505            return Err(OmniError::manifest(
1506                "branch creation from pinned snapshots is not supported yet".to_string(),
1507            ));
1508        };
1509        if !allow_internal_refs {
1510            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1511            ensure_public_branch_ref(name, "branch_create_from")?;
1512        }
1513        let branch = normalize_branch_name(&branch_name)?;
1514        // Operate on a freshly-opened source coordinator that's owned locally
1515        // — never touch `self.coordinator`. The pre-fix implementation used
1516        // `swap_coordinator_for_branch` + operate + `restore_coordinator` as
1517        // three separate `coordinator.write().await` acquisitions; under
1518        // `&self` concurrency, a second `branch_create_from` could swap
1519        // self.coordinator between this caller's swap and operate steps,
1520        // making the operate run against the wrong source branch and
1521        // forking off the wrong HEAD. Pinned by
1522        // `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`
1523        // in `crates/omnigraph-server/tests/server.rs`.
1524        //
1525        // `branch_create` mutates only the local coord's commit-graph cache;
1526        // the manifest write is durable on disk regardless of which
1527        // coord-handle issued it. Discarding `source_coord` after the call
1528        // is the right shape — the new branch is reachable from any
1529        // subsequent open of any coord.
1530        let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1531        source_coord.branch_create(name).await
1532    }
1533
1534    pub async fn branch_list(&self) -> Result<Vec<String>> {
1535        self.ensure_schema_state_valid().await?;
1536        self.coordinator.read().await.branch_list().await
1537    }
1538
1539    pub async fn branch_delete(&self, name: &str) -> Result<()> {
1540        self.branch_delete_as(name, None).await
1541    }
1542
1543    /// Delete a branch with an explicit actor for engine-layer policy
1544    /// enforcement (MR-722 fan-out). Scope is `TargetBranch(name)` —
1545    /// matches the HTTP-layer convention at `server_branch_delete`
1546    /// (branch=None, target_branch=Some(name)). Cedar rules using
1547    /// `target_branch_scope: protected` therefore correctly gate
1548    /// deletion of protected branches (e.g. deny BranchDelete against
1549    /// `main`).
1550    pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1551        self.enforce(
1552            omnigraph_policy::PolicyAction::BranchDelete,
1553            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1554            actor,
1555        )?;
1556        self.ensure_schema_state_valid().await?;
1557        self.ensure_schema_apply_idle("branch_delete").await?;
1558        ensure_public_branch_ref(name, "branch_delete")?;
1559        self.refresh().await?;
1560        let branch = normalize_branch_name(name)?
1561            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1562        let branches = self.coordinator.read().await.branch_list().await?;
1563        if !branches.iter().any(|candidate| candidate == &branch) {
1564            return Err(OmniError::manifest_not_found(format!(
1565                "branch '{}' not found",
1566                branch
1567            )));
1568        }
1569
1570        self.ensure_branch_delete_safe(&branch, &branches).await?;
1571        self.delete_branch_storage_only(&branch).await
1572    }
1573
1574    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1575        self.ensure_schema_state_valid().await?;
1576        self.coordinator
1577            .read()
1578            .await
1579            .resolve_commit(&SnapshotId::new(commit_id))
1580            .await
1581    }
1582
1583    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1584        self.ensure_schema_state_valid().await?;
1585        let branch = match branch {
1586            Some(branch) => normalize_branch_name(branch)?,
1587            None => None,
1588        };
1589        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1590        coordinator.list_commits().await
1591    }
1592
1593    /// Open a sub-table for mutation with version-drift guard.
1594    ///
1595    /// Checks that the dataset's current version matches the snapshot-pinned
1596    /// version. If another writer has advanced the version, returns an error
1597    /// prompting the caller to refresh and retry (optimistic concurrency).
1598    pub(crate) async fn open_for_mutation(
1599        &self,
1600        table_key: &str,
1601        op_kind: crate::db::MutationOpKind,
1602    ) -> Result<(SnapshotHandle, String, Option<String>)> {
1603        table_ops::open_for_mutation(self, table_key, op_kind).await
1604    }
1605
1606    pub(crate) async fn open_for_mutation_on_branch(
1607        &self,
1608        branch: Option<&str>,
1609        table_key: &str,
1610        op_kind: crate::db::MutationOpKind,
1611    ) -> Result<(SnapshotHandle, String, Option<String>)> {
1612        table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1613    }
1614
1615    /// Fork `table_key` onto `active_branch` from the given source state,
1616    /// self-healing a manifest-unreferenced leftover fork if one is in the
1617    /// way. Callers that reach this MUST already hold the per-`(table_key,
1618    /// active_branch)` write queue (so the reclaim cannot race an in-process
1619    /// fork) and must have confirmed via the live manifest that the table is
1620    /// not yet on `active_branch`. Both the first-write fork path
1621    /// (`open_owned_dataset_for_branch_write`) and `branch_merge` satisfy this.
1622    pub(crate) async fn fork_dataset_from_entry_state(
1623        &self,
1624        table_key: &str,
1625        full_path: &str,
1626        source_branch: Option<&str>,
1627        source_version: u64,
1628        active_branch: &str,
1629    ) -> Result<SnapshotHandle> {
1630        match table_ops::fork_dataset_from_entry_state(
1631            self,
1632            table_key,
1633            full_path,
1634            source_branch,
1635            source_version,
1636            active_branch,
1637        )
1638        .await?
1639        {
1640            crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
1641            crate::storage_layer::ForkOutcome::RefAlreadyExists => {
1642                table_ops::reclaim_orphaned_fork_and_refork(
1643                    self,
1644                    table_key,
1645                    full_path,
1646                    source_branch,
1647                    source_version,
1648                    active_branch,
1649                )
1650                .await
1651            }
1652        }
1653    }
1654
1655    pub(crate) async fn reopen_for_mutation(
1656        &self,
1657        table_key: &str,
1658        full_path: &str,
1659        table_branch: Option<&str>,
1660        expected_version: u64,
1661        op_kind: crate::db::MutationOpKind,
1662    ) -> Result<SnapshotHandle> {
1663        table_ops::reopen_for_mutation(
1664            self,
1665            table_key,
1666            full_path,
1667            table_branch,
1668            expected_version,
1669            op_kind,
1670        )
1671        .await
1672    }
1673
1674    pub(crate) async fn open_dataset_at_state(
1675        &self,
1676        table_path: &str,
1677        table_branch: Option<&str>,
1678        table_version: u64,
1679    ) -> Result<SnapshotHandle> {
1680        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1681    }
1682
1683    pub(crate) async fn build_indices_on_dataset(
1684        &self,
1685        table_key: &str,
1686        ds: &mut SnapshotHandle,
1687    ) -> Result<Vec<PendingIndex>> {
1688        table_ops::build_indices_on_dataset(self, table_key, ds).await
1689    }
1690
1691    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
1692    // uses `commit_updates_on_branch_with_expected` exclusively.
1693    #[cfg(test)]
1694    pub(crate) async fn commit_updates(
1695        &mut self,
1696        updates: &[crate::db::SubTableUpdate],
1697    ) -> Result<u64> {
1698        table_ops::commit_updates(self, updates).await
1699    }
1700
1701    pub(crate) async fn commit_manifest_updates(
1702        &self,
1703        updates: &[crate::db::SubTableUpdate],
1704    ) -> Result<u64> {
1705        table_ops::commit_manifest_updates(self, updates).await
1706    }
1707
1708    pub(crate) async fn record_merge_commit(
1709        &self,
1710        manifest_version: u64,
1711        parent_commit_id: &str,
1712        merged_parent_commit_id: &str,
1713        actor_id: Option<&str>,
1714    ) -> Result<String> {
1715        table_ops::record_merge_commit(
1716            self,
1717            manifest_version,
1718            parent_commit_id,
1719            merged_parent_commit_id,
1720            actor_id,
1721        )
1722        .await
1723    }
1724
1725    pub(crate) async fn commit_updates_on_branch_with_expected(
1726        &self,
1727        branch: Option<&str>,
1728        updates: &[crate::db::SubTableUpdate],
1729        expected_table_versions: &std::collections::HashMap<String, u64>,
1730        actor_id: Option<&str>,
1731    ) -> Result<u64> {
1732        table_ops::commit_updates_on_branch_with_expected(
1733            self,
1734            branch,
1735            updates,
1736            expected_table_versions,
1737            actor_id,
1738        )
1739        .await
1740    }
1741
1742    pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1743        table_ops::ensure_commit_graph_initialized(self).await
1744    }
1745
1746    /// Invalidate the cached graph index. Called after edge mutations.
1747    pub(crate) async fn invalidate_graph_index(&self) {
1748        table_ops::invalidate_graph_index(self).await
1749    }
1750}
1751
1752pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1753    let branch = branch.trim();
1754    if branch.is_empty() {
1755        return Err(OmniError::manifest(
1756            "branch name cannot be empty".to_string(),
1757        ));
1758    }
1759    if branch == "main" {
1760        return Ok(None);
1761    }
1762    Ok(Some(branch.to_string()))
1763}
1764
1765/// Build a `ResolvedTarget` from the warm coordinator without opening the commit
1766/// graph. The live branch snapshot is pinned by the manifest incarnation, so the
1767/// id is synthetic `(branch, version, e_tag when available)`; nothing on the read
1768/// path needs a real commit ULID (only `RuntimeCache` keys on the id, where
1769/// synthetic is consistent).
1770fn warm_resolved_target(coord: &GraphCoordinator, requested: &ReadTarget) -> ResolvedTarget {
1771    ResolvedTarget {
1772        requested: requested.clone(),
1773        branch: coord.current_branch().map(str::to_string),
1774        snapshot_id: SnapshotId::synthetic(
1775            coord.current_branch(),
1776            coord.version(),
1777            coord.manifest_incarnation().e_tag.as_deref(),
1778        ),
1779        snapshot: coord.snapshot(),
1780    }
1781}
1782
1783pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1784    if is_internal_system_branch(branch) {
1785        return Err(OmniError::manifest(format!(
1786            "{} does not allow internal system ref '{}'",
1787            operation, branch
1788        )));
1789    }
1790    Ok(())
1791}
1792
1793fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1794    if batches.is_empty() {
1795        return Ok(RecordBatch::new_empty(schema));
1796    }
1797    if batches.len() == 1 {
1798        return Ok(batches.into_iter().next().unwrap());
1799    }
1800    let batch_schema = batches[0].schema();
1801    arrow_select::concat::concat_batches(&batch_schema, &batches)
1802        .map_err(|e| OmniError::Lance(e.to_string()))
1803}
1804
1805fn blob_properties_for_table_key<'a>(
1806    catalog: &'a Catalog,
1807    table_key: &str,
1808) -> Result<&'a std::collections::HashSet<String>> {
1809    if let Some(type_name) = table_key.strip_prefix("node:") {
1810        return catalog
1811            .node_types
1812            .get(type_name)
1813            .map(|node_type| &node_type.blob_properties)
1814            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1815    }
1816    if let Some(type_name) = table_key.strip_prefix("edge:") {
1817        return catalog
1818            .edge_types
1819            .get(type_name)
1820            .map(|edge_type| &edge_type.blob_properties)
1821            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1822    }
1823    Err(OmniError::manifest(format!(
1824        "invalid table key '{}'",
1825        table_key
1826    )))
1827}
1828
1829fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1830    if descriptions.is_null(row) {
1831        return Ok(true);
1832    }
1833
1834    let kind = descriptions
1835        .column_by_name("kind")
1836        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1837        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1838        .or_else(|| {
1839            descriptions
1840                .column_by_name("kind")
1841                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1842                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1843        });
1844    let position = descriptions
1845        .column_by_name("position")
1846        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1847        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1848    let size = descriptions
1849        .column_by_name("size")
1850        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1851        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1852    let blob_uri = descriptions
1853        .column_by_name("blob_uri")
1854        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1855        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1856
1857    let Some(kind) = kind else {
1858        return Ok(true);
1859    };
1860    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1861    if kind != BlobKind::Inline {
1862        return Ok(false);
1863    }
1864
1865    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1866}
1867
1868/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1869///
1870/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1871/// `DataType::LargeBinary` as a placeholder. This function replaces those
1872/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1873fn fixup_blob_schemas(catalog: &mut Catalog) {
1874    for node_type in catalog.node_types.values_mut() {
1875        if node_type.blob_properties.is_empty() {
1876            continue;
1877        }
1878        let fields: Vec<Field> = node_type
1879            .arrow_schema
1880            .fields()
1881            .iter()
1882            .map(|f| {
1883                if node_type.blob_properties.contains(f.name()) {
1884                    blob_field(f.name(), f.is_nullable())
1885                } else {
1886                    f.as_ref().clone()
1887                }
1888            })
1889            .collect();
1890        node_type.arrow_schema = Arc::new(Schema::new(fields));
1891    }
1892    for edge_type in catalog.edge_types.values_mut() {
1893        if edge_type.blob_properties.is_empty() {
1894            continue;
1895        }
1896        let fields: Vec<Field> = edge_type
1897            .arrow_schema
1898            .fields()
1899            .iter()
1900            .map(|f| {
1901                if edge_type.blob_properties.contains(f.name()) {
1902                    blob_field(f.name(), f.is_nullable())
1903                } else {
1904                    f.as_ref().clone()
1905                }
1906            })
1907            .collect();
1908        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1909    }
1910}
1911
1912fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1913    let schema_ast = parse_schema(schema_source)?;
1914    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1915}
1916
1917/// I/O phase of `Omnigraph::init_with_storage`. Split out so the caller
1918/// can pattern-match on the result and run cleanup on error before
1919/// returning the original error.
1920///
1921/// Failpoints fire at the phase boundaries:
1922/// * `init.after_schema_pg_written` — `_schema.pg` is on disk. In strict mode
1923///   this fires in the caller immediately after the atomic ownership claim; in
1924///   force mode it fires here after the explicit overwrite.
1925/// * `init.after_schema_contract_written` — `_schema.pg` + `_schema.ir.json`
1926///   + `__schema_state.json` are on disk.
1927/// * `init.after_coordinator_init` — all schema files plus Lance per-type
1928///   datasets and `__manifest/` are on disk. (The cleanup wrapper can only
1929///   remove the schema files; Lance directories need `delete_prefix` —
1930///   deferred along with `DELETE /graphs/{id}`.)
1931async fn init_storage_phase(
1932    root: &str,
1933    schema_source: &str,
1934    schema_ir: &SchemaIR,
1935    catalog: &Catalog,
1936    storage: &Arc<dyn StorageAdapter>,
1937    write_schema_pg: bool,
1938) -> Result<GraphCoordinator> {
1939    if write_schema_pg {
1940        let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1941        storage.write_text(&schema_path, schema_source).await?;
1942        crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1943    }
1944
1945    write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1946    crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1947
1948    let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1949    crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1950
1951    Ok(coordinator)
1952}
1953
1954/// Best-effort cleanup of init-phase artifacts. Called from
1955/// `init_with_storage` on any error returned by `init_storage_phase`.
1956///
1957/// Removes the three schema files: `_schema.pg`, `_schema.ir.json`,
1958/// `__schema_state.json`. Lance datasets and `__manifest/` are not
1959/// touched here — recursive directory deletion requires a
1960/// `StorageAdapter::delete_prefix` primitive that's deferred along
1961/// with `DELETE /graphs/{id}` (MR-668 PR 2b).
1962///
1963/// Failures to delete are logged via `tracing::warn` and do not mask
1964/// the original init error.
1965async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1966    for uri in [
1967        schema_source_uri(root),
1968        schema_ir_uri(root),
1969        schema_state_uri(root),
1970    ] {
1971        if let Err(err) = storage.delete(&uri).await {
1972            tracing::warn!(
1973                target: "omnigraph::init::cleanup",
1974                uri = %uri,
1975                error = %err,
1976                "init failed; best-effort cleanup could not delete artifact",
1977            );
1978        }
1979    }
1980}
1981
1982fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1983    match type_kind {
1984        SchemaTypeKind::Node => format!("node:{}", name),
1985        SchemaTypeKind::Edge => format!("edge:{}", name),
1986        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1987    }
1988}
1989
1990fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1991    if let Some(type_name) = table_key.strip_prefix("node:") {
1992        let node_type: &NodeType = catalog
1993            .node_types
1994            .get(type_name)
1995            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1996        return Ok(node_type.arrow_schema.clone());
1997    }
1998    if let Some(type_name) = table_key.strip_prefix("edge:") {
1999        let edge_type: &EdgeType = catalog
2000            .edge_types
2001            .get(type_name)
2002            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
2003        return Ok(edge_type.arrow_schema.clone());
2004    }
2005    Err(OmniError::manifest(format!(
2006        "invalid table key '{}'",
2007        table_key
2008    )))
2009}
2010
2011fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
2012    let mut obj = serde_json::Map::new();
2013    for (i, field) in batch.schema().fields().iter().enumerate() {
2014        obj.insert(
2015            field.name().clone(),
2016            json_value_from_array(batch.column(i).as_ref(), row)?,
2017        );
2018    }
2019    Ok(serde_json::Value::Object(obj))
2020}
2021
2022fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
2023    if array.is_null(row) {
2024        return Ok(serde_json::Value::Null);
2025    }
2026
2027    match array.data_type() {
2028        DataType::Utf8 => Ok(serde_json::Value::String(
2029            array
2030                .as_any()
2031                .downcast_ref::<StringArray>()
2032                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
2033                .value(row)
2034                .to_string(),
2035        )),
2036        DataType::LargeUtf8 => Ok(serde_json::Value::String(
2037            array
2038                .as_any()
2039                .downcast_ref::<LargeStringArray>()
2040                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
2041                .value(row)
2042                .to_string(),
2043        )),
2044        DataType::Boolean => Ok(serde_json::Value::Bool(
2045            array
2046                .as_any()
2047                .downcast_ref::<BooleanArray>()
2048                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
2049                .value(row),
2050        )),
2051        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2052            array
2053                .as_any()
2054                .downcast_ref::<Int32Array>()
2055                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
2056                .value(row),
2057        ))),
2058        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
2059            array
2060                .as_any()
2061                .downcast_ref::<Int64Array>()
2062                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
2063                .value(row),
2064        ))),
2065        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2066            array
2067                .as_any()
2068                .downcast_ref::<UInt32Array>()
2069                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
2070                .value(row),
2071        ))),
2072        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
2073            array
2074                .as_any()
2075                .downcast_ref::<UInt64Array>()
2076                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
2077                .value(row),
2078        ))),
2079        DataType::Float32 => {
2080            let value = array
2081                .as_any()
2082                .downcast_ref::<Float32Array>()
2083                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
2084                .value(row) as f64;
2085            Ok(serde_json::Value::Number(
2086                serde_json::Number::from_f64(value).ok_or_else(|| {
2087                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
2088                })?,
2089            ))
2090        }
2091        DataType::Float64 => {
2092            let value = array
2093                .as_any()
2094                .downcast_ref::<Float64Array>()
2095                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
2096                .value(row);
2097            Ok(serde_json::Value::Number(
2098                serde_json::Number::from_f64(value).ok_or_else(|| {
2099                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
2100                })?,
2101            ))
2102        }
2103        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2104            array
2105                .as_any()
2106                .downcast_ref::<Date32Array>()
2107                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
2108                .value(row),
2109        ))),
2110        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
2111            &base64::engine::general_purpose::STANDARD,
2112            array
2113                .as_any()
2114                .downcast_ref::<BinaryArray>()
2115                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
2116                .value(row),
2117        ))),
2118        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
2119            &base64::engine::general_purpose::STANDARD,
2120            array
2121                .as_any()
2122                .downcast_ref::<LargeBinaryArray>()
2123                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
2124                .value(row),
2125        ))),
2126        DataType::List(_) => {
2127            let list = array
2128                .as_any()
2129                .downcast_ref::<ListArray>()
2130                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
2131            let values = list.value(row);
2132            let mut out = Vec::with_capacity(values.len());
2133            for idx in 0..values.len() {
2134                out.push(json_value_from_array(values.as_ref(), idx)?);
2135            }
2136            Ok(serde_json::Value::Array(out))
2137        }
2138        DataType::LargeList(_) => {
2139            let list = array
2140                .as_any()
2141                .downcast_ref::<LargeListArray>()
2142                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
2143            let values = list.value(row);
2144            let mut out = Vec::with_capacity(values.len());
2145            for idx in 0..values.len() {
2146                out.push(json_value_from_array(values.as_ref(), idx)?);
2147            }
2148            Ok(serde_json::Value::Array(out))
2149        }
2150        DataType::FixedSizeList(_, _) => {
2151            let list = array
2152                .as_any()
2153                .downcast_ref::<FixedSizeListArray>()
2154                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
2155            let values = list.value(row);
2156            let mut out = Vec::with_capacity(values.len());
2157            for idx in 0..values.len() {
2158                out.push(json_value_from_array(values.as_ref(), idx)?);
2159            }
2160            Ok(serde_json::Value::Array(out))
2161        }
2162        DataType::Struct(fields) => {
2163            let struct_array = array
2164                .as_any()
2165                .downcast_ref::<StructArray>()
2166                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
2167            let mut obj = serde_json::Map::new();
2168            for (field_idx, field) in fields.iter().enumerate() {
2169                obj.insert(
2170                    field.name().clone(),
2171                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
2172                );
2173            }
2174            Ok(serde_json::Value::Object(obj))
2175        }
2176        _ => {
2177            let value = arrow_cast::display::array_value_to_string(array, row)
2178                .map_err(|e| OmniError::Lance(e.to_string()))?;
2179            Ok(serde_json::Value::String(value))
2180        }
2181    }
2182}
2183
2184#[cfg(test)]
2185mod tests {
2186    use super::*;
2187    use crate::db::manifest::ManifestCoordinator;
2188    use async_trait::async_trait;
2189    use serde_json::Value;
2190    use std::sync::{Arc, Mutex};
2191
2192    use crate::storage::{ObjectStorageAdapter, StorageAdapter, join_uri};
2193
2194    const TEST_SCHEMA: &str = r#"
2195node Person {
2196    name: String @key
2197    age: I32?
2198}
2199node Company {
2200    name: String @key
2201}
2202edge Knows: Person -> Person {
2203    since: Date?
2204}
2205edge WorksAt: Person -> Company
2206"#;
2207
2208    #[derive(Debug)]
2209    struct RecordingStorageAdapter {
2210        inner: ObjectStorageAdapter,
2211        reads: Mutex<Vec<String>>,
2212        writes: Mutex<Vec<String>>,
2213        exists_checks: Mutex<Vec<String>>,
2214        renames: Mutex<Vec<(String, String)>>,
2215        deletes: Mutex<Vec<String>>,
2216    }
2217
2218    impl Default for RecordingStorageAdapter {
2219        fn default() -> Self {
2220            Self {
2221                inner: ObjectStorageAdapter::local(),
2222                reads: Mutex::default(),
2223                writes: Mutex::default(),
2224                exists_checks: Mutex::default(),
2225                renames: Mutex::default(),
2226                deletes: Mutex::default(),
2227            }
2228        }
2229    }
2230
2231    impl RecordingStorageAdapter {
2232        fn reads(&self) -> Vec<String> {
2233            self.reads.lock().unwrap().clone()
2234        }
2235
2236        fn writes(&self) -> Vec<String> {
2237            self.writes.lock().unwrap().clone()
2238        }
2239
2240        fn exists_checks(&self) -> Vec<String> {
2241            self.exists_checks.lock().unwrap().clone()
2242        }
2243    }
2244
2245    #[async_trait]
2246    impl StorageAdapter for RecordingStorageAdapter {
2247        async fn read_text(&self, uri: &str) -> Result<String> {
2248            self.reads.lock().unwrap().push(uri.to_string());
2249            self.inner.read_text(uri).await
2250        }
2251
2252        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2253            self.writes.lock().unwrap().push(uri.to_string());
2254            self.inner.write_text(uri, contents).await
2255        }
2256
2257        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2258            self.writes.lock().unwrap().push(uri.to_string());
2259            self.inner.write_text_if_absent(uri, contents).await
2260        }
2261
2262        async fn exists(&self, uri: &str) -> Result<bool> {
2263            self.exists_checks.lock().unwrap().push(uri.to_string());
2264            self.inner.exists(uri).await
2265        }
2266
2267        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2268            self.renames
2269                .lock()
2270                .unwrap()
2271                .push((from_uri.to_string(), to_uri.to_string()));
2272            self.inner.rename_text(from_uri, to_uri).await
2273        }
2274
2275        async fn delete(&self, uri: &str) -> Result<()> {
2276            self.deletes.lock().unwrap().push(uri.to_string());
2277            self.inner.delete(uri).await
2278        }
2279
2280        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2281            self.inner.list_dir(dir_uri).await
2282        }
2283
2284        async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2285            self.inner.read_text_versioned(uri).await
2286        }
2287
2288        async fn write_text_if_match(
2289            &self,
2290            uri: &str,
2291            contents: &str,
2292            expected_version: &str,
2293        ) -> Result<Option<String>> {
2294            self.inner
2295                .write_text_if_match(uri, contents, expected_version)
2296                .await
2297        }
2298
2299        async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2300            self.inner.delete_prefix(prefix_uri).await
2301        }
2302    }
2303
2304    #[derive(Debug)]
2305    struct InitRaceStorageAdapter {
2306        inner: ObjectStorageAdapter,
2307        root: String,
2308        barrier: Arc<tokio::sync::Barrier>,
2309    }
2310
2311    #[async_trait]
2312    impl StorageAdapter for InitRaceStorageAdapter {
2313        async fn read_text(&self, uri: &str) -> Result<String> {
2314            self.inner.read_text(uri).await
2315        }
2316
2317        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2318            self.inner.write_text(uri, contents).await
2319        }
2320
2321        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2322            self.inner.write_text_if_absent(uri, contents).await
2323        }
2324
2325        async fn exists(&self, uri: &str) -> Result<bool> {
2326            let exists = self.inner.exists(uri).await?;
2327            if uri == schema_state_uri(&self.root) {
2328                self.barrier.wait().await;
2329            }
2330            Ok(exists)
2331        }
2332
2333        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2334            self.inner.rename_text(from_uri, to_uri).await
2335        }
2336
2337        async fn delete(&self, uri: &str) -> Result<()> {
2338            self.inner.delete(uri).await
2339        }
2340
2341        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2342            self.inner.list_dir(dir_uri).await
2343        }
2344
2345        async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2346            self.inner.read_text_versioned(uri).await
2347        }
2348
2349        async fn write_text_if_match(
2350            &self,
2351            uri: &str,
2352            contents: &str,
2353            expected_version: &str,
2354        ) -> Result<Option<String>> {
2355            self.inner
2356                .write_text_if_match(uri, contents, expected_version)
2357                .await
2358        }
2359
2360        async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2361            self.inner.delete_prefix(prefix_uri).await
2362        }
2363    }
2364
2365    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2366    async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2367        let dir = tempfile::tempdir().unwrap();
2368        let uri = dir.path().to_str().unwrap().to_string();
2369        let root = normalize_root_uri(&uri).unwrap();
2370        let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2371            inner: ObjectStorageAdapter::local(),
2372            root,
2373            barrier: Arc::new(tokio::sync::Barrier::new(2)),
2374        });
2375
2376        let left = Omnigraph::init_with_storage(
2377            &uri,
2378            TEST_SCHEMA,
2379            Arc::clone(&storage),
2380            InitOptions::default(),
2381        );
2382        let right = Omnigraph::init_with_storage(
2383            &uri,
2384            TEST_SCHEMA,
2385            Arc::clone(&storage),
2386            InitOptions::default(),
2387        );
2388        let (left, right) = tokio::join!(left, right);
2389        let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2390        assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2391
2392        assert!(
2393            dir.path().join("_schema.pg").exists(),
2394            "winning init must leave _schema.pg in place"
2395        );
2396        assert!(
2397            dir.path().join("_schema.ir.json").exists(),
2398            "winning init must leave _schema.ir.json in place"
2399        );
2400        assert!(
2401            dir.path().join("__schema_state.json").exists(),
2402            "winning init must leave __schema_state.json in place"
2403        );
2404    }
2405
2406    #[tokio::test]
2407    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2408        let dir = tempfile::tempdir().unwrap();
2409        let uri = dir.path().to_str().unwrap();
2410        let adapter = Arc::new(RecordingStorageAdapter::default());
2411
2412        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2413            .await
2414            .unwrap();
2415        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2416        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2417        assert!(
2418            adapter
2419                .writes()
2420                .contains(&join_uri(uri, "__schema_state.json"))
2421        );
2422
2423        Omnigraph::open_with_storage(uri, adapter.clone())
2424            .await
2425            .unwrap();
2426        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2427        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2428        assert!(
2429            adapter
2430                .reads()
2431                .contains(&join_uri(uri, "__schema_state.json"))
2432        );
2433        assert!(
2434            adapter
2435                .exists_checks()
2436                .contains(&join_uri(uri, "_schema.ir.json"))
2437        );
2438        assert!(
2439            adapter
2440                .exists_checks()
2441                .contains(&join_uri(uri, "__schema_state.json"))
2442        );
2443        assert!(
2444            adapter
2445                .exists_checks()
2446                .contains(&join_uri(uri, "_graph_commits.lance"))
2447        );
2448    }
2449
2450    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2451        let snapshot = db.snapshot().await;
2452        let ds = db
2453            .storage()
2454            .open_snapshot_at_table(&snapshot, table_key)
2455            .await
2456            .unwrap();
2457        let batches = db.storage().scan_batches(&ds).await.unwrap();
2458        batches
2459            .into_iter()
2460            .flat_map(|batch| {
2461                (0..batch.num_rows())
2462                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2463                    .collect::<Vec<_>>()
2464            })
2465            .collect()
2466    }
2467
2468    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2469        let (ds, full_path, table_branch) = db
2470            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2471            .await
2472            .unwrap();
2473        let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
2474        let columns: Vec<Arc<dyn Array>> = schema
2475            .fields()
2476            .iter()
2477            .map(|field| match field.name().as_str() {
2478                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2479                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2480                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2481                _ => new_null_array(field.data_type(), 1),
2482            })
2483            .collect();
2484        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2485        let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap();
2486        let committed = db.storage().commit_staged(ds, staged).await.unwrap();
2487        let state = db
2488            .storage()
2489            .table_state(&full_path, &committed)
2490            .await
2491            .unwrap();
2492        db.commit_updates(&[crate::db::SubTableUpdate {
2493            table_key: "node:Person".to_string(),
2494            table_version: state.version,
2495            table_branch,
2496            row_count: state.row_count,
2497            version_metadata: state.version_metadata,
2498        }])
2499        .await
2500        .unwrap();
2501    }
2502
2503    #[tokio::test]
2504    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2505        let dir = tempfile::tempdir().unwrap();
2506        let uri = dir.path().to_str().unwrap();
2507        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2508        seed_person_row(&mut db, "Alice", Some(30)).await;
2509
2510        let desired = TEST_SCHEMA.replace(
2511            "    age: I32?\n}",
2512            "    age: I32?\n    nickname: String?\n}",
2513        );
2514        let result = db.apply_schema(&desired).await.unwrap();
2515        assert!(result.applied);
2516
2517        let reopened = Omnigraph::open(uri).await.unwrap();
2518        let rows = table_rows_json(&reopened, "node:Person").await;
2519        assert_eq!(rows.len(), 1);
2520        assert_eq!(rows[0]["name"], "Alice");
2521        assert_eq!(rows[0]["age"], 30);
2522        assert!(rows[0]["nickname"].is_null());
2523        assert!(
2524            reopened.catalog().node_types["Person"]
2525                .properties
2526                .contains_key("nickname")
2527        );
2528        assert!(dir.path().join("_schema.pg").exists());
2529    }
2530
2531    #[tokio::test]
2532    async fn test_apply_schema_renames_property_and_preserves_values() {
2533        let dir = tempfile::tempdir().unwrap();
2534        let uri = dir.path().to_str().unwrap();
2535        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2536        seed_person_row(&mut db, "Alice", Some(30)).await;
2537
2538        let desired = TEST_SCHEMA.replace(
2539            "    age: I32?\n}",
2540            "    years: I32? @rename_from(\"age\")\n}",
2541        );
2542        db.apply_schema(&desired).await.unwrap();
2543
2544        let reopened = Omnigraph::open(uri).await.unwrap();
2545        let rows = table_rows_json(&reopened, "node:Person").await;
2546        assert_eq!(rows[0]["name"], "Alice");
2547        assert_eq!(rows[0]["years"], 30);
2548        assert!(rows[0].get("age").is_none());
2549    }
2550
2551    #[tokio::test]
2552    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2553        let dir = tempfile::tempdir().unwrap();
2554        let uri = dir.path().to_str().unwrap();
2555        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2556        seed_person_row(&mut db, "Alice", Some(30)).await;
2557        let before_version = db.snapshot().await.version();
2558
2559        let desired = TEST_SCHEMA
2560            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2561            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2562            .replace(
2563                "edge WorksAt: Person -> Company",
2564                "edge WorksAt: Human -> Company",
2565            );
2566        db.apply_schema(&desired).await.unwrap();
2567
2568        let head = db.snapshot().await;
2569        assert!(head.entry("node:Person").is_none());
2570        assert!(head.entry("node:Human").is_some());
2571        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2572            .await
2573            .unwrap();
2574        assert!(historical.entry("node:Person").is_some());
2575        assert!(historical.entry("node:Human").is_none());
2576    }
2577
2578    #[tokio::test]
2579    async fn test_apply_schema_succeeds_after_load() {
2580        // Historical: schema apply used to be blocked by leftover
2581        // `__run__` branches. The Run state machine was removed in
2582        // MR-771, so a fresh graph never creates a `__run__` branch;
2583        // legacy ones are swept by the v2→v3 manifest migration. This
2584        // asserts the invariant a current graph upholds: publish leaves
2585        // no `__run__` branch behind, so schema apply proceeds.
2586        let dir = tempfile::tempdir().unwrap();
2587        let uri = dir.path().to_str().unwrap();
2588        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2589
2590        crate::loader::load_jsonl(
2591            &mut db,
2592            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2593            crate::loader::LoadMode::Overwrite,
2594        )
2595        .await
2596        .unwrap();
2597
2598        let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2599        assert!(
2600            !all_branches.iter().any(|b| b.starts_with("__run__")),
2601            "no __run__ branch should exist after publish, got: {:?}",
2602            all_branches
2603        );
2604
2605        let desired = TEST_SCHEMA.replace(
2606            "    age: I32?\n}",
2607            "    age: I32?\n    nickname: String?\n}",
2608        );
2609        let result = db.apply_schema(&desired).await.unwrap();
2610        assert!(result.applied, "schema apply should have applied");
2611    }
2612
2613    /// Regression (MR-770): a pre-v0.4.0 graph that still carries a stale
2614    /// `__run__*` branch on `__manifest` must not block schema apply. The
2615    /// v2→v3 sweep runs in `Omnigraph::open(ReadWrite)` — before the
2616    /// schema-apply blocking-branch check — so apply succeeds with no
2617    /// intervening publish.
2618    ///
2619    /// Confirmed to fail before the open-time migration landed: the reopened
2620    /// graph still listed `__run__legacy`, and `apply_schema` returned
2621    /// "found non-main branches: __run__legacy".
2622    #[tokio::test]
2623    async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
2624        let dir = tempfile::tempdir().unwrap();
2625        let uri = dir.path().to_str().unwrap();
2626        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2627
2628        // Synthesize a legacy graph: a stale `__run__` branch on `__manifest`
2629        // plus the manifest stamp rewound to v2 (pre-sweep).
2630        db.branch_create("__run__legacy").await.unwrap();
2631        drop(db);
2632        {
2633            // forbidden-api-allow: test synthesizes a legacy graph by editing __manifest directly.
2634            let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
2635                .await
2636                .unwrap();
2637            ds.update_schema_metadata([(
2638                "omnigraph:internal_schema_version".to_string(),
2639                Some("2".to_string()),
2640            )])
2641            .await
2642            .unwrap();
2643        }
2644
2645        // Reopen (ReadWrite): the open-time migration must sweep `__run__legacy`
2646        // before any branch-observing code runs.
2647        let db = Omnigraph::open(uri).await.unwrap();
2648        let branches = db.branch_list().await.unwrap();
2649        assert!(
2650            !branches.iter().any(|b| b.starts_with("__run__")),
2651            "open-time migration must sweep legacy __run__ branches; got {branches:?}",
2652        );
2653
2654        // Schema apply must proceed with no intervening publish — the
2655        // blocking-branch check no longer sees `__run__legacy`.
2656        let desired = TEST_SCHEMA.replace(
2657            "    age: I32?\n}",
2658            "    age: I32?\n    nickname: String?\n}",
2659        );
2660        let result = db.apply_schema(&desired).await.unwrap();
2661        assert!(result.applied, "schema apply should have applied");
2662    }
2663
2664    #[tokio::test]
2665    async fn test_apply_schema_defers_index_then_reconciler_builds_it() {
2666        // iss-848: schema apply records the @index intent but builds nothing
2667        // inline; a later ensure_indices materializes it once the table has
2668        // rows. (Use `age`, which is unindexed in TEST_SCHEMA — `name @key` is
2669        // already FTS-indexed at seed, so it can't show the deferral.)
2670        let dir = tempfile::tempdir().unwrap();
2671        let uri = dir.path().to_str().unwrap();
2672        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2673        seed_person_row(&mut db, "Alice", Some(30)).await;
2674
2675        let desired = TEST_SCHEMA.replace("age: I32?", "age: I32? @index");
2676        db.apply_schema(&desired).await.unwrap();
2677
2678        // Apply built nothing — the BTREE on `age` is deferred.
2679        let snapshot = db.snapshot().await;
2680        let ds = db
2681            .storage()
2682            .open_snapshot_at_table(&snapshot, "node:Person")
2683            .await
2684            .unwrap();
2685        assert!(
2686            !db.storage().has_btree_index(&ds, "age").await.unwrap(),
2687            "apply must not build the index inline (deferred to the reconciler)"
2688        );
2689
2690        // The reconciler materializes it (Person has a row).
2691        db.ensure_indices().await.unwrap();
2692        let snapshot = db.snapshot().await;
2693        let ds = db
2694            .storage()
2695            .open_snapshot_at_table(&snapshot, "node:Person")
2696            .await
2697            .unwrap();
2698        assert!(
2699            db.storage().has_btree_index(&ds, "age").await.unwrap(),
2700            "ensure_indices must build the deferred index"
2701        );
2702    }
2703
2704    #[tokio::test]
2705    async fn test_apply_schema_rewrite_defers_index_then_reconciler_restores() {
2706        // iss-848: an AddProperty rewrite writes a new dataset version without
2707        // rebuilding indexes inline (deferred); ensure_indices restores them.
2708        let dir = tempfile::tempdir().unwrap();
2709        let uri = dir.path().to_str().unwrap();
2710        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2711        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2712        seed_person_row(&mut db, "Alice", Some(30)).await;
2713
2714        let desired = initial_schema.replace(
2715            "    age: I32?\n}",
2716            "    age: I32?\n    nickname: String?\n}",
2717        );
2718        db.apply_schema(&desired).await.unwrap();
2719
2720        // After the rewrite the reconciler restores index coverage.
2721        db.ensure_indices().await.unwrap();
2722        let snapshot = db.snapshot().await;
2723        let ds = db
2724            .storage()
2725            .open_snapshot_at_table(&snapshot, "node:Person")
2726            .await
2727            .unwrap();
2728        assert!(db.storage().has_btree_index(&ds, "id").await.unwrap());
2729        assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
2730    }
2731
2732    #[tokio::test]
2733    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2734        let dir = tempfile::tempdir().unwrap();
2735        let uri = dir.path().to_str().unwrap();
2736        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2737        let mut db = db;
2738        db.coordinator
2739            .write()
2740            .await
2741            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2742            .await
2743            .unwrap();
2744
2745        let err = db
2746            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2747            .await
2748            .unwrap_err();
2749        assert!(
2750            err.to_string()
2751                .contains("write is unavailable while schema apply is in progress")
2752        );
2753    }
2754
2755    #[tokio::test]
2756    async fn test_commit_updates_rejects_while_schema_apply_locked() {
2757        let dir = tempfile::tempdir().unwrap();
2758        let uri = dir.path().to_str().unwrap();
2759        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2760        db.coordinator
2761            .write()
2762            .await
2763            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2764            .await
2765            .unwrap();
2766
2767        let err = db.commit_updates(&[]).await.unwrap_err();
2768        assert!(
2769            err.to_string()
2770                .contains("write commit is unavailable while schema apply is in progress")
2771        );
2772    }
2773
2774    #[tokio::test]
2775    async fn test_branch_list_hides_schema_apply_lock_branch() {
2776        let dir = tempfile::tempdir().unwrap();
2777        let uri = dir.path().to_str().unwrap();
2778        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2779        db.coordinator
2780            .write()
2781            .await
2782            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2783            .await
2784            .unwrap();
2785
2786        let branches = db.branch_list().await.unwrap();
2787        assert_eq!(branches, vec!["main".to_string()]);
2788    }
2789}