Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::{PropType, ScalarType};
20use omnigraph_compiler::{
21    DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22    build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::storage_layer::SnapshotHandle;
30use crate::table_store::TableStore;
31
32mod export;
33mod optimize;
34mod repair;
35mod schema_apply;
36mod table_ops;
37
38pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
39pub use repair::{
40    RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
41};
42pub use schema_apply::SchemaApplyOptions;
43pub use table_ops::PendingIndex;
44pub(crate) use table_ops::OpenedForMutation;
45
46use super::commit_graph::GraphCommit;
47use super::manifest::{
48    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
49    table_path_for_table_key,
50};
51use super::schema_state::{
52    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
53    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
54    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
55    write_schema_contract, write_schema_contract_staging,
56};
57use super::{
58    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
59    is_schema_apply_lock_branch,
60};
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum MergeOutcome {
64    AlreadyUpToDate,
65    FastForward,
66    Merged,
67}
68
69#[derive(Debug, Clone)]
70pub struct SchemaApplyResult {
71    pub supported: bool,
72    pub applied: bool,
73    pub manifest_version: u64,
74    pub steps: Vec<SchemaMigrationStep>,
75}
76
77#[derive(Debug, Clone)]
78pub struct SchemaApplyPreview {
79    pub plan: SchemaMigrationPlan,
80    pub catalog: Catalog,
81}
82
83/// A capture-once write transaction (RFC-013 step 3b). Pins the operation's read
84/// base ONCE so the per-table opens reuse the pinned version instead of
85/// re-resolving / re-validating per table. The schema contract is validated once
86/// (when `base` is captured). NOT a general "no re-resolution" handle — the
87/// commit-time OCC re-read, the live-HEAD drift probe, and the fork-authority reads
88/// stay fresh (correctness machinery). Step 5 (PublishPlan unification) makes this
89/// the non-optional publish carrier and adds session-aware base opens there, gated
90/// by an S3 cost test — the warm-session benefit on the single remaining open is an
91/// object-store phenomenon, so it earns its own gate rather than riding this PR.
92///
93/// Threaded as `Option<&WriteTxn>` through the mutate/load write chain
94/// (`open_for_mutation_on_branch`, `commit_all`, `commit_updates_on_branch_with_expected`)
95/// so a single write validates the schema contract EXACTLY ONCE — at capture. When
96/// present, the per-table resolves source the pinned `base` entry instead of calling
97/// `resolved_branch_target` / `snapshot_for_branch` / `fresh_snapshot_for_branch`
98/// (each of which re-runs `ensure_schema_state_valid`). When absent (`None` — every
99/// non-mutate/load caller), every threaded function behaves byte-identically to
100/// before. The carrier never removes a version guard or changes which dataset version
101/// the per-table open targets: strict ops keep `open_dataset_head_for_write` +
102/// `ensure_expected_version`, and the commit-time OCC re-read still opens a fresh
103/// manifest snapshot (via `fresh_snapshot_for_branch_unchecked`) — only the redundant
104/// schema re-validation is dropped.
105pub(crate) struct WriteTxn {
106    /// The resolved branch (`None` = main).
107    pub(crate) branch: Option<String>,
108    /// The pinned base snapshot (per-table location + version + e_tag), captured once.
109    pub(crate) base: Snapshot,
110}
111
112/// Top-level handle to an Omnigraph database.
113///
114/// An Omnigraph is a Lance-native graph database with git-style branching.
115/// It stores typed property graphs as per-type Lance datasets coordinated
116/// through a Lance manifest table.
117pub struct Omnigraph {
118    root_uri: String,
119    storage: Arc<dyn StorageAdapter>,
120    /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
121    /// this so engine write APIs can be `&self` (the HTTP server's
122    /// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
123    /// calls without a global write lock). Reads (`snapshot`, `version`,
124    /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
125    /// `list_commits`, …) acquire `.read().await` and parallelize.
126    /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
127    /// `record_*`) acquire `.write().await` and serialize. The atomic
128    /// commit invariant — `commit_manifest_updates` followed by
129    /// `record_graph_commit` must be atomic — is preserved by the
130    /// single `.write()` covering both calls inside
131    /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
132    /// converted from `Mutex` to `RwLock` because the bench showed
133    /// the Mutex was the dominant serializer for disjoint-table
134    /// workloads. Lock acquisition order: always before `runtime_cache`
135    /// (when both are needed in one scope).
136    coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
137    table_store: TableStore,
138    runtime_cache: RuntimeCache,
139    /// Per-graph read caches: one shared Lance `Session` plus the held-`Dataset`
140    /// handle cache, handed to live-Branch-read snapshots (via
141    /// `resolved_target`) so table opens reuse handles (0 IO on a warm repeat)
142    /// and one session. Invalidated alongside `runtime_cache` on branch switch /
143    /// refresh — hygiene only; version-in-key carries correctness.
144    read_caches: Arc<crate::runtime_cache::ReadCaches>,
145    /// Read-heavy on every query, written only by `apply_schema`. ArcSwap
146    /// gives atomic pointer swap with zero-cost reads (`load()` returns a
147    /// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
148    /// don't contend on a lock to read the catalog.
149    catalog: Arc<ArcSwap<Catalog>>,
150    /// Read-heavy on schema introspection paths, written only by
151    /// `apply_schema`. Same ArcSwap rationale as `catalog`.
152    schema_source: Arc<ArcSwap<String>>,
153    /// Per-`(table_key, branch)` writer queues — the engine's
154    /// write-serialization mechanism (the server holds the engine as a
155    /// lockless `Arc<Omnigraph>`). Reachable from engine internals
156    /// (mutation finalize, schema_apply, branch_merge, ensure_indices,
157    /// delete_where, the fork path, recovery reconciler).
158    write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
159    /// Process-wide mutex held across the swap → operate → restore window
160    /// in `branch_merge_impl`. Two concurrent merges with distinct targets
161    /// would otherwise interleave their three separate
162    /// `coordinator.write().await` acquisitions, leaving each merge's
163    /// inner body running against the other's swapped coord. Pinned by
164    /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
165    /// in `crates/omnigraph-server/tests/server.rs`.
166    ///
167    /// Cost: serializes ALL concurrent branch merges process-wide.
168    /// Acceptable because branch merges are heavy (table rewrites, index
169    /// rebuilds), per-(table, branch) queues inside `commit_all` already
170    /// serialize the data path, and merges are rare relative to /change
171    /// or /ingest. A finer-grained per-target-branch mutex is a follow-up
172    /// if telemetry shows merge concurrency matters.
173    ///
174    /// The deeper fix — refactor `branch_merge_on_current_target` to take
175    /// an explicit target coord parameter so `self.coordinator` is never
176    /// used as scratch space — is the round-1 shape applied to
177    /// `branch_create_from_impl`. Deferred because it requires unwinding
178    /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()`
179    /// call inside the merge body.
180    merge_exclusive: Arc<tokio::sync::Mutex<()>>,
181    /// Optional policy checker for engine-layer enforcement (MR-722).
182    /// `None` = no enforcement; mutating methods are unconditionally
183    /// allowed (this is the embedded/dev default). `Some` = every
184    /// mutating method calls `self.enforce(action, scope, actor)` at
185    /// entry; denial returns `OmniError::Policy`.
186    ///
187    /// Per chassis design (see `omnigraph_policy::PolicyChecker`), the
188    /// trait surface is deliberately coarse — action × scope × actor.
189    /// Per-row / per-type / per-column scope lives at the query layer
190    /// (MR-725), which extends the same trait with a different method.
191    /// Don't be tempted to add per-row enforcement here.
192    ///
193    /// Set via `with_policy(checker)` after construction. Today only
194    /// `apply_schema_as` consults this field (PR #2 proof-of-concept);
195    /// PR #3 fans the `enforce()` call out to the remaining writers.
196    policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
197    /// Lazily-built, reused-across-queries embedding client. Built on the first
198    /// `nearest($v, "string")` that needs server-side embedding (so a graph that
199    /// never embeds needs no provider key), then shared by every later query —
200    /// avoids the per-query `from_env()` rebuild and keeps the provider HTTP
201    /// connection pool warm. `OnceCell` guarantees a single initialization.
202    embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
203    /// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from an
204    /// applied cluster `providers.embedding` profile via [`Omnigraph::with_embedding_config`].
205    /// When set, the embedding cell builds its client from this instead of
206    /// `EmbeddingClient::from_env()`; `None` keeps the env fallback.
207    embedding_config: Option<Arc<crate::embedding::EmbeddingConfig>>,
208}
209
210/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
211///
212/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
213/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
214/// inspection — should not trigger writes (they may run with read-only
215/// object-store credentials, and silent open-time mutations are
216/// surprising). They also don't need recovery: reads always resolve
217/// through the manifest pin, which is the consistent snapshot regardless
218/// of any Phase B → Phase C drift on the per-table side.
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum OpenMode {
221    /// Run the recovery sweep on open. Default for `Omnigraph::open`.
222    ReadWrite,
223    /// Skip the recovery sweep. Use for read-only consumers via
224    /// [`Omnigraph::open_read_only`].
225    ReadOnly,
226}
227
228/// Options for [`Omnigraph::init_with_options`].
229///
230/// `force` controls the safety preflight that prevents an
231/// accidental re-init from overwriting an existing graph's schema
232/// metadata. Default behavior (`force: false`) fails fast with
233/// [`OmniError::AlreadyInitialized`] if any of `_schema.pg`,
234/// `_schema.ir.json`, or `__schema_state.json` already exists at
235/// the target URI. With `force: true` the preflight is skipped —
236/// existing schema files are overwritten in place. Force does NOT
237/// purge old Lance datasets or `__manifest/`; reclaiming those
238/// still requires deleting the graph directory by hand (or via a
239/// future `DELETE /graphs/{id}`).
240#[derive(Debug, Clone, Copy, Default)]
241pub struct InitOptions {
242    /// Skip the existing-graph preflight. Operators set this when
243    /// they actually mean to overwrite — e.g. `omnigraph init --force`.
244    pub force: bool,
245}
246
247impl Omnigraph {
248    /// Create a new graph at `uri` from schema source.
249    ///
250    /// Strict mode: errors with [`OmniError::AlreadyInitialized`] if
251    /// `uri` already holds any of the three schema artifacts. To
252    /// overwrite an existing graph deliberately, call
253    /// [`Self::init_with_options`] with `InitOptions { force: true }`.
254    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
255        Self::init_with_options(uri, schema_source, InitOptions::default()).await
256    }
257
258    /// Create a new graph at `uri`, with explicit init-time options.
259    ///
260    /// See [`InitOptions`] for the safety contract — by default this
261    /// behaves identically to [`Self::init`].
262    pub async fn init_with_options(
263        uri: &str,
264        schema_source: &str,
265        options: InitOptions,
266    ) -> Result<Self> {
267        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
268    }
269
270    pub(crate) async fn init_with_storage(
271        uri: &str,
272        schema_source: &str,
273        storage: Arc<dyn StorageAdapter>,
274        options: InitOptions,
275    ) -> Result<Self> {
276        let root = normalize_root_uri(uri)?;
277
278        // Preflight: refuse to clobber an existing graph unless the
279        // operator passed `force`. This runs BEFORE any parse or
280        // write so a misdirected `init` against an existing graph
281        // URI cannot reach a code path that overwrites or, on a
282        // later cleanup, deletes the schema files.
283        //
284        // Closes the "init is destructive against existing state"
285        // class: there is no longer a code path where strict-mode
286        // `init` can mutate a populated graph root.
287        if !options.force {
288            for candidate in [
289                schema_source_uri(&root),
290                schema_ir_uri(&root),
291                schema_state_uri(&root),
292            ] {
293                if storage.exists(&candidate).await? {
294                    return Err(OmniError::AlreadyInitialized { uri: root.clone() });
295                }
296            }
297        }
298
299        let schema_ir = read_schema_ir_from_source(schema_source)?;
300        let mut catalog = build_catalog_from_ir(&schema_ir)?;
301        fixup_blob_schemas(&mut catalog);
302
303        // Establish an atomic ownership claim on `_schema.pg` before
304        // writing the remaining init artifacts. A check-then-write preflight
305        // is not enough under concurrent `init` calls: two callers can both
306        // observe an empty root, one can successfully initialize, and the
307        // loser can then fail in Lance `WriteMode::Create`. Only the caller
308        // that atomically created `_schema.pg` may clean up schema artifacts
309        // on later failure.
310        let schema_pg_claimed = if options.force {
311            false
312        } else {
313            let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
314            if !storage
315                .write_text_if_absent(&schema_path, schema_source)
316                .await?
317            {
318                return Err(OmniError::AlreadyInitialized { uri: root.clone() });
319            }
320            if let Err(err) = crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_PG_WRITTEN) {
321                best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
322                return Err(err);
323            }
324            true
325        };
326
327        // Run the I/O phase. On any error, best-effort-clean schema
328        // artifacts only when this invocation owns them: strict mode owns
329        // them after the atomic `_schema.pg` claim above; force mode owns
330        // destructive overwrite semantics by explicit operator request.
331        //
332        // Coverage gap: Lance per-type datasets and `__manifest/`
333        // directory created by `GraphCoordinator::init` are NOT cleaned
334        // up here — fully recursive directory deletion requires a
335        // `StorageAdapter::delete_prefix` primitive that's deferred
336        // along with `DELETE /graphs/{id}` (PR 2b in the MR-668 plan
337        // is currently deferred). If `init` fails after coordinator
338        // init succeeds, operators may need to remove the graph
339        // directory manually before retrying `init` on the same URI.
340        // Documented in the PR 2a commit message and `init` rustdoc.
341        let coordinator = match init_storage_phase(
342            &root,
343            schema_source,
344            &schema_ir,
345            &catalog,
346            &storage,
347            !schema_pg_claimed,
348        )
349        .await
350        {
351            Ok(coordinator) => coordinator,
352            Err(err) => {
353                if schema_pg_claimed || options.force {
354                    best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
355                }
356                return Err(err);
357            }
358        };
359
360        Ok(Self {
361            root_uri: root.clone(),
362            storage,
363            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
364            table_store: TableStore::new(&root),
365            runtime_cache: RuntimeCache::default(),
366            // One shared Session per graph (LanceDB's one-session-per-connection
367            // model) plus the held-handle cache, created once and reused across
368            // reads. Session::default() caps are lazy (6 GiB index / 1 GiB
369            // metadata); multi-graph cap/sharing is a deferred follow-up.
370            read_caches: Arc::new(crate::runtime_cache::ReadCaches {
371                session: Arc::new(lance::session::Session::default()),
372                handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
373            }),
374            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
375            schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
376            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
377            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
378            policy: None,
379            embedding: Arc::new(tokio::sync::OnceCell::new()),
380            embedding_config: None,
381        })
382    }
383
384    /// Open an existing graph (read-write).
385    ///
386    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
387    /// Runs the open-time recovery sweep before returning — see [`OpenMode`].
388    pub async fn open(uri: &str) -> Result<Self> {
389        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
390    }
391
392    /// Open an existing graph for read-only consumers (NDJSON export,
393    /// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
394    pub async fn open_read_only(uri: &str) -> Result<Self> {
395        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
396    }
397
398    /// Open with a caller-supplied [`StorageAdapter`]. Used by init/test paths
399    /// and by embedding/test consumers that wrap storage (e.g. a counting
400    /// decorator for IO-budget tests). Defaults to `OpenMode::ReadWrite`.
401    pub async fn open_with_storage(uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
402        Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
403    }
404
405    pub(crate) async fn open_with_storage_and_mode(
406        uri: &str,
407        storage: Arc<dyn StorageAdapter>,
408        mode: OpenMode,
409    ) -> Result<Self> {
410        let root = normalize_root_uri(uri)?;
411        // Apply pending internal-schema migrations before the coordinator reads
412        // branch state, so `branch_list` and the schema-apply blocking-branch
413        // checks observe the post-migration graph — notably the v2→v3 sweep of
414        // legacy `__run__*` staging branches (MR-770). ReadWrite only: a
415        // read-only open must not trigger object-store writes, so a read-only
416        // open of an unmigrated legacy graph still lists `__run__*` until its
417        // first read-write open (an accepted, documented limitation).
418        if matches!(mode, OpenMode::ReadWrite) {
419            crate::db::manifest::migrate_on_open(&root).await?;
420        }
421        // Open the coordinator first so the schema-staging recovery sweep can
422        // compare its snapshot against any leftover staging files.
423        let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
424        // Both the schema-state recovery sweep AND the manifest-drift
425        // recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
426        // consumers (NDJSON export, `commit list`, schema show) shouldn't
427        // trigger object-store mutations: they may run with read-only
428        // credentials, and silent open-time writes are surprising. Both
429        // sweeps' work is recoverable on the next ReadWrite open, so
430        // skipping under ReadOnly doesn't lose any safety guarantees —
431        // the manifest pin is the consistent snapshot regardless of
432        // drift on the per-table side or leftover schema-staging files.
433        if matches!(mode, OpenMode::ReadWrite) {
434            let schema_state_recovery =
435                recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
436                    .await?;
437            // Recovery sweep: close the Phase B → Phase C residual on
438            // any sidecar left over from a crashed writer. Long-running
439            // processes additionally converge in-process: the staged-
440            // write entry points and `refresh` run the roll-forward-only
441            // heal (`heal_pending_sidecars_roll_forward`); only
442            // rollback-eligible sidecars wait for this open-time sweep.
443            crate::db::manifest::recover_manifest_drift(
444                &root,
445                Arc::clone(&storage),
446                &mut coordinator,
447                crate::db::manifest::RecoveryMode::Full,
448                schema_state_recovery,
449            )
450            .await?;
451        }
452        // Read _schema.pg (post-recovery — may have just been renamed in).
453        let schema_path = schema_source_uri(&root);
454        let schema_source = storage.read_text(&schema_path).await?;
455        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
456        let branches = coordinator.branch_list().await?;
457        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
458            &root,
459            Arc::clone(&storage),
460            &branches,
461            &current_source_ir,
462        )
463        .await?;
464        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
465        fixup_blob_schemas(&mut catalog);
466
467        Ok(Self {
468            root_uri: root.clone(),
469            storage,
470            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
471            table_store: TableStore::new(&root),
472            runtime_cache: RuntimeCache::default(),
473            // One shared Session per graph (LanceDB's one-session-per-connection
474            // model) plus the held-handle cache, created once and reused across
475            // reads. Session::default() caps are lazy (6 GiB index / 1 GiB
476            // metadata); multi-graph cap/sharing is a deferred follow-up.
477            read_caches: Arc::new(crate::runtime_cache::ReadCaches {
478                session: Arc::new(lance::session::Session::default()),
479                handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
480            }),
481            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
482            schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
483            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
484            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
485            policy: None,
486            embedding: Arc::new(tokio::sync::OnceCell::new()),
487            embedding_config: None,
488        })
489    }
490
491    /// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
492    /// catalog pointer; callers can hold the returned `Arc` across awaits
493    /// without blocking concurrent `apply_schema`.
494    pub fn catalog(&self) -> Arc<Catalog> {
495        self.catalog.load_full()
496    }
497
498    /// Returns an `Arc<String>` snapshot of the schema source.
499    pub fn schema_source(&self) -> Arc<String> {
500        self.schema_source.load_full()
501    }
502
503    /// Atomically swap the in-memory catalog. Concurrent readers see
504    /// either the old or the new pointer; never a torn state. Used by
505    /// `apply_schema` and `reload_schema_if_source_changed`.
506    pub(crate) fn store_catalog(&self, catalog: Catalog) {
507        self.catalog.store(Arc::new(catalog));
508    }
509
510    /// Atomically swap the in-memory schema source. Same rationale as
511    /// [`store_catalog`](Self::store_catalog).
512    pub(crate) fn store_schema_source(&self, schema_source: String) {
513        self.schema_source.store(Arc::new(schema_source));
514    }
515
516    pub fn uri(&self) -> &str {
517        &self.root_uri
518    }
519
520    /// Install a policy checker for engine-layer enforcement (MR-722).
521    /// Builder-style setter — consumes `self`, returns `Self`. Calling
522    /// this on a `Omnigraph` previously without policy enables
523    /// `enforce()` to fire at every mutating engine method that's been
524    /// wired to call it (currently `apply_schema_as`; PR #3 fans out to
525    /// the remaining writers).
526    ///
527    /// Embedded callers that don't care about authorization should
528    /// just not call this. Server / CLI callers that have loaded a
529    /// `PolicyEngine` from `policy.yaml` pass it here.
530    pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
531        self.policy = Some(checker);
532        self
533    }
534
535    /// The lazily-initialized, reused-across-queries embedding client cell
536    /// (see the `embedding` field doc). The query executor resolves the client
537    /// through this on the first `nearest($v, "string")` that needs embedding.
538    pub(crate) fn embedding_cell(
539        &self,
540    ) -> &tokio::sync::OnceCell<crate::embedding::EmbeddingClient> {
541        &self.embedding
542    }
543
544    /// Install a pre-resolved embedding config (RFC-012 Phase 5). Builder-style,
545    /// mirroring [`Omnigraph::with_policy`]: a graph served from a cluster
546    /// embedding provider profile injects it here; an embedded/CLI caller that doesn't
547    /// call this keeps the `EmbeddingClient::from_env()` fallback.
548    pub fn with_embedding_config(mut self, config: Arc<crate::embedding::EmbeddingConfig>) -> Self {
549        self.embedding_config = Some(config);
550        self
551    }
552
553    /// The injected embedding config, if any (see the `embedding_config` field).
554    pub(crate) fn embedding_config_ref(&self) -> Option<&crate::embedding::EmbeddingConfig> {
555        self.embedding_config.as_deref()
556    }
557
558    /// Engine-layer policy enforcement gate (MR-722 chassis core).
559    ///
560    /// * If no policy is installed → no-op (returns `Ok(())`).
561    /// * If policy is installed AND actor is None → denial with a
562    ///   clear "no actor for engine-layer policy check" message.
563    ///   Forces server / CLI / SDK callers to thread an actor through
564    ///   when policy is configured — silent bypass via "I forgot the
565    ///   actor" is exactly the footgun this gate is here to prevent.
566    /// * If policy is installed AND actor is Some → call
567    ///   `PolicyChecker::check(action, scope, actor)`; map denial /
568    ///   internal failure to `OmniError::Policy(...)`.
569    pub(crate) fn enforce(
570        &self,
571        action: omnigraph_policy::PolicyAction,
572        scope: &omnigraph_policy::ResourceScope,
573        actor: Option<&str>,
574    ) -> Result<()> {
575        let Some(checker) = self.policy.as_ref() else {
576            return Ok(());
577        };
578        let Some(actor) = actor else {
579            return Err(OmniError::Policy(
580                "no actor for engine-layer policy check (policy is configured but the call site \
581                 didn't thread an actor through — this is almost certainly a bug, not an \
582                 intended bypass)"
583                    .to_string(),
584            ));
585        };
586        checker
587            .check(action, scope, actor)
588            .map_err(|err| OmniError::Policy(err.to_string()))
589    }
590
591    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
592        // Full per-call validation is intentional: a long-lived handle must
593        // detect external drift of the schema source, IR, OR state on its next
594        // operation (see lifecycle::long_lived_handle_rejects_schema_* tests). A
595        // source-only fast path would miss IR/state drift when _schema.pg is
596        // unchanged, so the only safe latency win is not calling this twice per
597        // query (finding A removes the redundant caller in exec/query.rs).
598        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
599    }
600
601    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
602        self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
603            .await
604    }
605
606    pub async fn plan_schema_with_options(
607        &self,
608        desired_schema_source: &str,
609        options: SchemaApplyOptions,
610    ) -> Result<SchemaMigrationPlan> {
611        schema_apply::plan_schema(self, desired_schema_source, options).await
612    }
613
614    pub async fn preview_schema_apply_with_options(
615        &self,
616        desired_schema_source: &str,
617        options: SchemaApplyOptions,
618    ) -> Result<SchemaApplyPreview> {
619        schema_apply::preview_schema_apply(self, desired_schema_source, options).await
620    }
621
622    pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
623        self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
624            .await
625    }
626
627    pub async fn apply_schema_with_options(
628        &self,
629        desired_schema_source: &str,
630        options: SchemaApplyOptions,
631    ) -> Result<SchemaApplyResult> {
632        self.apply_schema_as(desired_schema_source, options, None)
633            .await
634    }
635
636    /// Apply a schema migration with an explicit actor for engine-layer
637    /// policy enforcement (MR-722). When a `PolicyChecker` is installed
638    /// via [`Self::with_policy`], this method calls `enforce(SchemaApply,
639    /// Branch("main"), actor)` before any apply work happens. Denial
640    /// returns `OmniError::Policy` and leaves the manifest untouched.
641    ///
642    /// The no-actor variants (`apply_schema`, `apply_schema_with_options`)
643    /// pass `None` here. They work fine without a policy; if a policy IS
644    /// installed and actor is None, enforcement intentionally fails to
645    /// prevent silent-bypass-via-forgetting-the-actor footguns.
646    pub async fn apply_schema_as(
647        &self,
648        desired_schema_source: &str,
649        options: SchemaApplyOptions,
650        actor: Option<&str>,
651    ) -> Result<SchemaApplyResult> {
652        self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
653            .await
654    }
655
656    pub async fn apply_schema_as_with_catalog_check<F>(
657        &self,
658        desired_schema_source: &str,
659        options: SchemaApplyOptions,
660        actor: Option<&str>,
661        validate_catalog: F,
662    ) -> Result<SchemaApplyResult>
663    where
664        F: FnOnce(&Catalog) -> Result<()>,
665    {
666        schema_apply::apply_schema(
667            self,
668            desired_schema_source,
669            options,
670            actor,
671            validate_catalog,
672        )
673        .await
674    }
675
676    pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
677        schema_apply::ensure_schema_apply_idle(self, operation).await
678    }
679
680    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
681        schema_apply::ensure_schema_apply_not_locked(self, operation).await
682    }
683
684    /// Engine-facing trait surface around `TableStore`.
685    ///
686    /// This is the **only** accessor for engine code reaching into the
687    /// storage layer. The trait's signatures use opaque `SnapshotHandle`
688    /// / `StagedHandle` instead of leaking `lance::Dataset` /
689    /// `lance::dataset::transaction::Transaction`, so newly-added engine
690    /// call sites cannot drift the staged-write invariant by mistake
691    /// (the trait's `stage_*` + `commit_staged` pair is the only way to
692    /// land a write).
693    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
694        &self.table_store
695    }
696
697    /// Inline-commit residual surface (`delete_where`,
698    /// `create_vector_index`) — the writes Lance cannot yet express as a
699    /// stage-then-commit pair. Deliberately separate from [`Self::storage`] so
700    /// the default storage surface is staged-only and a new writer cannot couple
701    /// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only
702    /// the handful of documented residual call sites (mutation/merge deletes,
703    /// vector-index build) use this accessor. See
704    /// `crate::storage_layer::InlineCommitResidual` for the per-method blocker.
705    pub(crate) fn storage_inline_residual(
706        &self,
707    ) -> &dyn crate::storage_layer::InlineCommitResidual {
708        &self.table_store
709    }
710
711    /// Engine-level access to the object-store adapter (S3 / local fs).
712    /// Used by the recovery sidecar protocol — writers in the engine
713    /// call this to write/delete sidecars at `__recovery/{ulid}.json`.
714    pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
715        self.storage.as_ref()
716    }
717
718    /// Per-`(table_key, branch)` writer queues.
719    ///
720    /// Engine-internal writers (mutation finalize, schema_apply,
721    /// branch_merge, ensure_indices, delete_where) and the future MR-870
722    /// recovery reconciler reach the queue manager via this accessor.
723    /// Returns an `Arc` clone so callers can hold the manager across
724    /// `&mut self` engine API boundaries.
725    pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
726        Arc::clone(&self.write_queue)
727    }
728
729    /// Engine-internal access to the merge-exclusive mutex. Held across
730    /// the swap → operate → restore window in `branch_merge_impl` so
731    /// concurrent merges with distinct targets don't corrupt
732    /// `self.coordinator` mid-operation. See the field doc on
733    /// `Omnigraph::merge_exclusive` for the full design rationale.
734    pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
735        Arc::clone(&self.merge_exclusive)
736    }
737
738    /// Engine-level access to the graph's normalized root URI. Used by
739    /// the recovery sidecar protocol to compute `__recovery/` paths.
740    pub(crate) fn root_uri(&self) -> &str {
741        &self.root_uri
742    }
743
744    pub(crate) async fn open_coordinator_for_branch(
745        &self,
746        branch: Option<&str>,
747    ) -> Result<GraphCoordinator> {
748        match branch {
749            Some(branch) => {
750                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
751            }
752            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
753        }
754    }
755
756    pub(crate) async fn swap_coordinator_for_branch(
757        &self,
758        branch: Option<&str>,
759    ) -> Result<GraphCoordinator> {
760        let next = self.open_coordinator_for_branch(branch).await?;
761        let mut coord = self.coordinator.write().await;
762        Ok(std::mem::replace(&mut *coord, next))
763    }
764
765    pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
766        *self.coordinator.write().await = coordinator;
767    }
768
769    /// Open a capture-once write transaction (RFC-013 step 3b): validate the schema
770    /// contract ONCE and pin the base snapshot. The per-table opens take
771    /// `Option<&WriteTxn>` and, on the bound branch for the non-strict (Insert/Merge)
772    /// path, source the pinned base entry — instead of re-resolving (re-validating the
773    /// schema) per table. Strict ops, the fork path, and the commit-time OCC re-read
774    /// keep their fresh reads (those are correctness machinery — see the handoff doc).
775    ///
776    /// "Once" covers the table-touch hot path captured here (proven by the node-insert
777    /// gate `write_validates_schema_contract_once`); it does NOT yet cover edge endpoint
778    /// / cardinality RI validation (`ensure_node_id_exists`, the loader's RI/cardinality),
779    /// which still resolve through `snapshot_for_branch` and re-validate. Those reads must
780    /// observe LIVE committed state, so unifying them (validate-once + pinned + re-checked
781    /// read-set) is step 4's §7.1 work — threading `txn.base` there would re-introduce the
782    /// stale-read class the #298 cardinality fix removed. A session-aware base open is
783    /// likewise deferred to step 5 (handoff §1d).
784    pub(crate) async fn open_write_txn(&self, branch: Option<&str>) -> Result<WriteTxn> {
785        let resolved = self.resolved_branch_target(branch).await?;
786        Ok(WriteTxn {
787            branch: resolved.branch,
788            base: resolved.snapshot,
789        })
790    }
791
792    pub(crate) async fn resolved_branch_target(
793        &self,
794        branch: Option<&str>,
795    ) -> Result<ResolvedTarget> {
796        self.ensure_schema_state_valid().await?;
797        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
798        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
799        let coord = self.coordinator.read().await;
800        if normalized.as_deref() == coord.current_branch() {
801            let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
802                SnapshotId::synthetic(
803                    coord.current_branch(),
804                    coord.version(),
805                    coord.manifest_incarnation().e_tag.as_deref(),
806                )
807            });
808            return Ok(ResolvedTarget {
809                requested,
810                branch: coord.current_branch().map(str::to_string),
811                snapshot_id,
812                snapshot: coord.snapshot(),
813            });
814        }
815        coord.resolve_target(&requested).await
816    }
817
818    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
819        self.resolved_branch_target(branch)
820            .await
821            .map(|resolved| resolved.snapshot)
822    }
823
824    pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
825        self.ensure_schema_state_valid().await?;
826        self.fresh_snapshot_for_branch_unchecked(branch).await
827    }
828
829    /// Fresh per-branch manifest snapshot WITHOUT the schema-contract
830    /// re-validation. Identical OCC freshness to [`fresh_snapshot_for_branch`]
831    /// — a fresh manifest re-read from storage, never the warm cache — only the
832    /// redundant `ensure_schema_state_valid` is dropped. Used inside a single
833    /// write once a `WriteTxn` has already validated the contract at capture: the
834    /// commit-time drift re-read needs the live manifest, not a second contract
835    /// read. Callers with no `WriteTxn` MUST use the checked variant.
836    ///
837    /// Reads the manifest directly via `ManifestCoordinator` rather than
838    /// `resolve_target`. The OCC re-read uses only the returned `Snapshot`
839    /// (per-table location + version), which `ManifestCoordinator::open().snapshot()`
840    /// produces identically to `GraphCoordinator::open(...).snapshot()` — but
841    /// `resolve_target` additionally opens the commit graph (an extra
842    /// `_graph_commits.lance` probe) the OCC read never consults. Skipping that
843    /// load is a pure read-cost reduction, not a freshness change. The checked
844    /// `fresh_snapshot_for_branch` delegates here, so its no-`txn` callers
845    /// (commit_all's None arm, optimize, repair, fork reclaim) get the same
846    /// identical `Snapshot` via this lighter manifest-only read; they consume
847    /// only the snapshot and never relied on the commit-graph side load.
848    pub(crate) async fn fresh_snapshot_for_branch_unchecked(
849        &self,
850        branch: Option<&str>,
851    ) -> Result<Snapshot> {
852        let manifest = match branch {
853            Some(branch) => {
854                crate::db::manifest::ManifestCoordinator::open_at_branch(self.uri(), branch).await?
855            }
856            None => crate::db::manifest::ManifestCoordinator::open(self.uri()).await?,
857        };
858        Ok(manifest.snapshot())
859    }
860
861    pub(crate) async fn version(&self) -> u64 {
862        self.coordinator.read().await.version()
863    }
864
865    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
866    pub(crate) async fn snapshot(&self) -> Snapshot {
867        self.coordinator.read().await.snapshot()
868    }
869
870    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
871        self.resolved_target(target)
872            .await
873            .map(|resolved| resolved.snapshot)
874    }
875
876    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
877        self.snapshot_of(target)
878            .await
879            .map(|snapshot| snapshot.version())
880    }
881
882    pub async fn resolved_branch_of(
883        &self,
884        target: impl Into<ReadTarget>,
885    ) -> Result<Option<String>> {
886        self.resolved_target(target)
887            .await
888            .map(|resolved| resolved.branch)
889    }
890
891    /// Synchronize this handle's write base to the latest head of the named branch.
892    pub async fn sync_branch(&self, branch: &str) -> Result<()> {
893        self.ensure_schema_state_valid().await?;
894        let branch = normalize_branch_name(branch)?;
895        let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
896        *self.coordinator.write().await = next;
897        self.invalidate_read_caches().await;
898        Ok(())
899    }
900
901    async fn invalidate_read_caches(&self) {
902        self.runtime_cache.invalidate_all().await;
903        self.read_caches.handles.invalidate_all().await;
904    }
905
906    /// Re-read the handle-local coordinator state from storage AND run
907    /// in-process recovery. Closes the Phase B → Phase C residual (e.g.
908    /// `MutationStaging::finalize` crash mid-publish in a long-running
909    /// server) without restart.
910    ///
911    /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
912    /// recovery sequence, in the same order, with one restriction: the
913    /// manifest-drift heal runs in `RollForwardOnly` mode (rollback /
914    /// abort cases defer to the next ReadWrite open because
915    /// `Dataset::restore` is unsafe under concurrency). Each step:
916    ///
917    /// 1. `coordinator.refresh()` — re-read manifest.
918    /// 2. `recover_schema_state_files` — complete an in-flight
919    ///    schema_apply's staging→final rename if a SchemaApply sidecar
920    ///    is on disk; idempotent + early-returns when no staging files
921    ///    exist. Required BEFORE manifest-drift recovery so a
922    ///    SchemaApply roll-forward doesn't publish the manifest while
923    ///    the staging files remain unrenamed (which would corrupt the
924    ///    graph: data on new schema, catalog on old).
925    /// 3. `heal_pending_sidecars_roll_forward` — close the
926    ///    finalize→publisher residual via roll-forward; defer rollback
927    ///    work to next ReadWrite open. Serializes against live writers
928    ///    by acquiring each sidecar's per-(table_key, branch) write
929    ///    queues, so refresh never rolls forward an in-flight writer's
930    ///    sidecar from under it.
931    /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
932    ///
933    /// Steady state cost: one `list_dir` of `__recovery/` (typically
934    /// returns empty → early return for both passes). No additional
935    /// Lance reads.
936    ///
937    /// The staged-write entry points (`load_as`, `mutate_as`) run the
938    /// same heal via
939    /// [`heal_pending_recovery_sidecars`](Self::heal_pending_recovery_sidecars),
940    /// so a long-lived server converges on the next write without an
941    /// explicit refresh. Engine-internal callers that already hold an
942    /// in-flight sidecar (e.g. `schema_apply` mid-write) MUST use
943    /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
944    /// avoid the recovery sweep racing their own sidecar.
945    pub async fn refresh(&self) -> Result<()> {
946        // Standalone schema-staging reconcile ONLY when no recovery
947        // sidecar exists (legacy/manual staging residue). When sidecars
948        // exist, the heal below owns the reconcile — per SchemaApply
949        // sidecar, under that sidecar's queue guards — because an
950        // unserialized reconcile can promote a LIVE schema apply's
951        // staging files from under it, and a pre-promoted result would
952        // make the heal's own guarded reconcile see clean staging and
953        // wrongly defer the sidecar. The no-sidecar case cannot race a
954        // live apply: its sidecar is on disk before its staging files.
955        //
956        // Scope the coord write guard to the schema-state section only.
957        // `reload_schema_if_source_changed` (below) acquires
958        // `self.coordinator.read().await` when the on-disk schema source
959        // has drifted from the cached `schema_source`. Tokio's RwLock is
960        // not reentrant, so holding the write across that call deadlocks.
961        // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`.
962        // The heal also takes the lock itself (queues → coordinator
963        // order), so it must run after this guard is released.
964        {
965            // Hold the schema-apply serialization key across the
966            // list-then-reconcile pair: without it, a live apply can
967            // write its sidecar + staging between the empty check and
968            // the reconcile (the same race, through a smaller window).
969            // Queue before coordinator — the documented lock order.
970            //
971            // Liveness note: with a pending NON-SchemaApply sidecar
972            // (e.g. a Mutation residual), this gate skips the standalone
973            // reconcile and the heal below reconciles only per
974            // SchemaApply sidecar — so pre-sidecar-era orphaned staging
975            // residue waits for the NEXT refresh after the sidecars are
976            // consumed. Convergence holds, one pass late. Do not "fix"
977            // by re-running the reconcile unserialized here: that is
978            // exactly the live-apply race this block exists to close.
979            let _serial = self
980                .write_queue
981                .acquire(&crate::db::manifest::schema_apply_serial_queue_key())
982                .await;
983            if crate::db::manifest::list_sidecars(&self.root_uri, self.storage.as_ref())
984                .await?
985                .is_empty()
986            {
987                let mut coord = self.coordinator.write().await;
988                coord.refresh().await?;
989                recover_schema_state_files(
990                    &self.root_uri,
991                    Arc::clone(&self.storage),
992                    &coord.snapshot(),
993                )
994                .await?;
995            }
996        } // ← guards released before the heal's queue acquisition
997        crate::db::manifest::heal_pending_sidecars_roll_forward(
998            &self.root_uri,
999            Arc::clone(&self.storage),
1000            &self.coordinator,
1001            &self.write_queue,
1002        )
1003        .await?;
1004        self.reload_schema_if_source_changed().await?;
1005        self.invalidate_read_caches().await;
1006        Ok(())
1007    }
1008
1009    /// Write-entry heal: converge any pending recovery sidecars (a
1010    /// previously failed writer's Phase B → Phase C residual) before
1011    /// starting a new staged write, so a long-lived process (the HTTP
1012    /// server, an embedded handle) recovers on its next write instead
1013    /// of wedging every write on the commit-time drift guard until
1014    /// restart. Roll-forward only; rollback-eligible sidecars defer to
1015    /// the next ReadWrite open exactly as [`refresh`](Self::refresh)
1016    /// does.
1017    ///
1018    /// Steady-state cost: one `list_dir` of `__recovery/` (typically
1019    /// empty → immediate return). See
1020    /// `recovery::heal_pending_sidecars_roll_forward` for the
1021    /// concurrency contract (per-table write-queue acquisition).
1022    pub(crate) async fn heal_pending_recovery_sidecars(&self) -> Result<()> {
1023        let processed = crate::db::manifest::heal_pending_sidecars_roll_forward(
1024            &self.root_uri,
1025            Arc::clone(&self.storage),
1026            &self.coordinator,
1027            &self.write_queue,
1028        )
1029        .await?;
1030        if processed {
1031            // A rolled-forward SchemaApply sidecar moved disk + manifest
1032            // to the new schema (staging promoted, registrations
1033            // published); the in-memory catalog must follow or the very
1034            // write that triggered the heal validates against the stale
1035            // schema. Same post-heal step as `refresh`.
1036            self.reload_schema_if_source_changed().await?;
1037            self.invalidate_read_caches().await;
1038        }
1039        Ok(())
1040    }
1041
1042    async fn reload_schema_if_source_changed(&self) -> Result<()> {
1043        let schema_path = schema_source_uri(&self.root_uri);
1044        let schema_source = self.storage.read_text(&schema_path).await?;
1045        if schema_source == *self.schema_source.load_full() {
1046            return Ok(());
1047        }
1048        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
1049        let branches = self.coordinator.read().await.branch_list().await?;
1050        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
1051            &self.root_uri,
1052            Arc::clone(&self.storage),
1053            &branches,
1054            &current_source_ir,
1055        )
1056        .await?;
1057        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
1058        fixup_blob_schemas(&mut catalog);
1059        self.store_schema_source(schema_source);
1060        self.store_catalog(catalog);
1061        Ok(())
1062    }
1063
1064    /// Refresh coordinator state and invalidate the runtime cache WITHOUT
1065    /// running the recovery sweep. Engine-internal callers that hold an
1066    /// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
1067    /// internal lease-check refresh) need this variant: running recovery
1068    /// here would observe the caller's own sidecar, classify it as
1069    /// RolledPastExpected, and roll it forward — racing the caller's
1070    /// own publish path.
1071    pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
1072        self.coordinator.write().await.refresh().await?;
1073        self.invalidate_read_caches().await;
1074        Ok(())
1075    }
1076
1077    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
1078        self.ensure_schema_state_valid().await?;
1079        self.coordinator
1080            .read()
1081            .await
1082            .resolve_snapshot_id(branch)
1083            .await
1084    }
1085
1086    pub(crate) async fn resolved_target(
1087        &self,
1088        target: impl Into<ReadTarget>,
1089    ) -> Result<ResolvedTarget> {
1090        self.ensure_schema_state_valid().await?;
1091        let target = target.into();
1092        let mut resolved = self.resolve_target_inner(&target).await?;
1093        // Attach the read caches (shared Session + held-handle cache) for live
1094        // Branch reads so table opens reuse handles (0 IO on a warm repeat).
1095        // Snapshot-id reads are deliberately NOT cached: they pin a historical
1096        // version `cleanup` may GC, so bypassing the cache sidesteps the
1097        // cleanup-vs-cached-handle edge. Writes never reach here (they use
1098        // `resolved_branch_target`), so they never receive a pinned handle.
1099        if matches!(target, ReadTarget::Branch(_)) {
1100            resolved
1101                .snapshot
1102                .set_read_caches(Arc::clone(&self.read_caches));
1103        }
1104        Ok(resolved)
1105    }
1106
1107    /// Resolve a read target to its snapshot, without attaching read caches.
1108    /// Same-branch reads reuse the warm coordinator, gated by a cheap version
1109    /// probe (invariant 6: strong consistency, never a blind warm read). Reads do
1110    /// not need the commit graph (the manifest version is the visibility
1111    /// authority, invariant 2), so the id is synthetic and no commit-graph scan
1112    /// happens on this path.
1113    async fn resolve_target_inner(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
1114        if let ReadTarget::Branch(branch) = target {
1115            let normalized = normalize_branch_name(branch)?;
1116            {
1117                let coord = self.coordinator.read().await;
1118                if normalized.as_deref() != coord.current_branch() {
1119                    // Different branch: cold resolve (opens that branch).
1120                    return coord.resolve_target(target).await;
1121                }
1122                let held = coord.manifest_incarnation();
1123                if coord.probe_latest_incarnation().await?.matches(&held) {
1124                    return Ok(warm_resolved_target(&coord, target));
1125                }
1126                // Stale: refresh under the write lock below.
1127            }
1128            let mut coord = self.coordinator.write().await;
1129            if normalized.as_deref() == coord.current_branch() {
1130                // Re-check after taking the write lock; another writer may have
1131                // refreshed (tokio RwLock has no read->write upgrade).
1132                let held = coord.manifest_incarnation();
1133                let mut refreshed = false;
1134                if !coord.probe_latest_incarnation().await?.matches(&held) {
1135                    coord.refresh_manifest_only().await?;
1136                    refreshed = true;
1137                }
1138                let resolved = warm_resolved_target(&coord, target);
1139                drop(coord);
1140                if refreshed {
1141                    self.invalidate_read_caches().await;
1142                }
1143                return Ok(resolved);
1144            }
1145            // Branch changed while waiting for the write lock: cold resolve.
1146            return coord.resolve_target(target).await;
1147        }
1148
1149        // Snapshot target: resolve through the commit graph as before.
1150        self.coordinator.read().await.resolve_target(target).await
1151    }
1152
1153    // ─── Change detection ────────────────────────────────────────────────
1154
1155    pub async fn diff_between(
1156        &self,
1157        from: impl Into<ReadTarget>,
1158        to: impl Into<ReadTarget>,
1159        filter: &crate::changes::ChangeFilter,
1160    ) -> Result<crate::changes::ChangeSet> {
1161        let from_resolved = self.resolved_target(from).await?;
1162        let to_resolved = self.resolved_target(to).await?;
1163        crate::changes::diff_snapshots(
1164            self.uri(),
1165            &from_resolved.snapshot,
1166            &to_resolved.snapshot,
1167            filter,
1168            to_resolved.branch.clone().or(from_resolved.branch.clone()),
1169        )
1170        .await
1171    }
1172
1173    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
1174    /// and creates branch-aware snapshots. Supports cross-branch comparison.
1175    pub async fn diff_commits(
1176        &self,
1177        from_commit_id: &str,
1178        to_commit_id: &str,
1179        filter: &crate::changes::ChangeFilter,
1180    ) -> Result<crate::changes::ChangeSet> {
1181        let coord = self.coordinator.read().await;
1182        let from_commit = coord
1183            .resolve_commit(&SnapshotId::new(from_commit_id))
1184            .await?;
1185        let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
1186        let from_snap = coord
1187            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1188                from_commit.graph_commit_id.clone(),
1189            )))
1190            .await?;
1191        let to_snap = coord
1192            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1193                to_commit.graph_commit_id.clone(),
1194            )))
1195            .await?;
1196        drop(coord);
1197        crate::changes::diff_snapshots(
1198            self.uri(),
1199            &from_snap.snapshot,
1200            &to_snap.snapshot,
1201            filter,
1202            to_snap.branch.clone().or(from_snap.branch.clone()),
1203        )
1204        .await
1205    }
1206
1207    pub async fn entity_at_target(
1208        &self,
1209        target: impl Into<ReadTarget>,
1210        table_key: &str,
1211        id: &str,
1212    ) -> Result<Option<serde_json::Value>> {
1213        export::entity_at_target(self, target, table_key, id).await
1214    }
1215
1216    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
1217    pub async fn entity_at(
1218        &self,
1219        table_key: &str,
1220        id: &str,
1221        version: u64,
1222    ) -> Result<Option<serde_json::Value>> {
1223        export::entity_at(self, table_key, id, version).await
1224    }
1225
1226    /// Create a Snapshot at any historical manifest version.
1227    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
1228        self.ensure_schema_state_valid().await?;
1229        self.coordinator
1230            .read()
1231            .await
1232            .snapshot_at_version(version)
1233            .await
1234    }
1235
1236    pub async fn export_jsonl(
1237        &self,
1238        branch: &str,
1239        type_names: &[String],
1240        table_keys: &[String],
1241    ) -> Result<String> {
1242        export::export_jsonl(self, branch, type_names, table_keys).await
1243    }
1244
1245    pub async fn export_jsonl_to_writer<W: Write>(
1246        &self,
1247        branch: &str,
1248        type_names: &[String],
1249        table_keys: &[String],
1250        writer: &mut W,
1251    ) -> Result<()> {
1252        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
1253    }
1254
1255    // ─── Graph index ──────────────────────────────────────────────────────
1256
1257    /// Get or build the graph index for the current snapshot.
1258    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
1259        table_ops::graph_index(self).await
1260    }
1261
1262    pub(crate) async fn graph_index_for_resolved(
1263        &self,
1264        resolved: &ResolvedTarget,
1265    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
1266        table_ops::graph_index_for_resolved(self, resolved).await
1267    }
1268
1269    /// Ensure BTree scalar indices exist on key columns.
1270    /// Idempotent — Lance skips if index already exists.
1271    ///
1272    /// Opens sub-tables at their latest version (not snapshot-pinned) because
1273    /// indices must be created on the current head. Any version drift from the
1274    /// snapshot is expected and logged. The resulting versions are committed
1275    /// back to the manifest.
1276    ///
1277    /// On named branches, indexing preserves lazy branching:
1278    /// unbranched subtables keep inheriting `main`, while subtables inherited
1279    /// from an ancestor branch are first forked into the active branch before
1280    /// their index metadata is updated.
1281    /// Returns the declared indexes that could not be materialized on this
1282    /// pass (today: vector columns with no trainable vectors yet). They are
1283    /// deferred, not errors; a later `ensure_indices`/`optimize` builds them
1284    /// once the column is trainable. Reads stay correct (brute-force) meanwhile.
1285    pub async fn ensure_indices(&self) -> Result<Vec<PendingIndex>> {
1286        table_ops::ensure_indices(self).await
1287    }
1288
1289    pub async fn ensure_indices_on(&self, branch: &str) -> Result<Vec<PendingIndex>> {
1290        table_ops::ensure_indices_on(self, branch).await
1291    }
1292
1293    #[cfg(feature = "failpoints")]
1294    #[doc(hidden)]
1295    pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
1296        &mut self,
1297        branch: &str,
1298        table_key: &str,
1299        table_branch: Option<&str>,
1300    ) -> Result<u64> {
1301        table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
1302            self,
1303            branch,
1304            table_key,
1305            table_branch,
1306        )
1307        .await
1308    }
1309
1310    /// Compact small Lance fragments into fewer larger ones across every
1311    /// node + edge table on `main`. See [`optimize`] for details.
1312    pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
1313        optimize::optimize_all_tables(self).await
1314    }
1315
1316    /// Classify and explicitly repair uncovered manifest/head drift. See
1317    /// [`repair`] for the distinction between safe maintenance drift and
1318    /// suspicious/unverifiable drift.
1319    pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
1320        repair::repair_all_tables(self, options).await
1321    }
1322
1323    /// Remove Lance manifests (and the fragments they uniquely own) per the
1324    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
1325    /// history. See [`optimize`] for details.
1326    pub async fn cleanup(
1327        &mut self,
1328        options: optimize::CleanupPolicyOptions,
1329    ) -> Result<Vec<optimize::TableCleanupStats>> {
1330        optimize::cleanup_all_tables(self, options).await
1331    }
1332
1333    /// Read a blob from a node by its string ID and property name.
1334    ///
1335    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
1336    /// and metadata accessors (`size()`, `kind()`, `uri()`).
1337    ///
1338    /// ```ignore
1339    /// let blob = db.read_blob("Document", "readme", "content").await?;
1340    /// let bytes = blob.read().await?;
1341    /// ```
1342    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1343        self.ensure_schema_state_valid().await?;
1344        let catalog = self.catalog();
1345        let node_type = catalog
1346            .node_types
1347            .get(type_name)
1348            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1349        if !node_type.blob_properties.contains(property) {
1350            return Err(OmniError::manifest(format!(
1351                "property '{}' on type '{}' is not a Blob",
1352                property, type_name
1353            )));
1354        }
1355
1356        let snapshot = self.snapshot().await;
1357        let table_key = format!("node:{}", type_name);
1358        let handle = self
1359            .storage()
1360            .open_snapshot_at_table(&snapshot, &table_key)
1361            .await?;
1362
1363        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1364        let row_id = self
1365            .storage()
1366            .first_row_id_for_filter(&handle, &filter_sql)
1367            .await?
1368            .ok_or_else(|| {
1369                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1370            })?;
1371
1372        // `take_blobs` is a Lance-specific blob accessor not surfaced
1373        // through the `TableStorage` trait — reach the inner `Arc<Dataset>`
1374        // via the `pub(crate)` accessor for this read-only call.
1375        let ds = handle.into_arc();
1376        let mut blobs = ds
1377            .take_blobs(&[row_id], property)
1378            .await
1379            .map_err(|e| OmniError::Lance(e.to_string()))?;
1380
1381        blobs.pop().ok_or_else(|| {
1382            OmniError::manifest(format!(
1383                "blob '{}' on {} '{}' returned no data",
1384                property, type_name, id
1385            ))
1386        })
1387    }
1388
1389    pub(crate) async fn active_branch(&self) -> Option<String> {
1390        self.coordinator
1391            .read()
1392            .await
1393            .current_branch()
1394            .map(str::to_string)
1395    }
1396
1397    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1398        let descendants = self
1399            .coordinator
1400            .read()
1401            .await
1402            .branch_descendants(branch)
1403            .await?;
1404        if let Some(descendant) = descendants.first() {
1405            return Err(OmniError::manifest_conflict(format!(
1406                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1407                branch, descendant
1408            )));
1409        }
1410
1411        for other_branch in branches
1412            .iter()
1413            .filter(|candidate| candidate.as_str() != branch)
1414        {
1415            let snapshot = self
1416                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1417                .await?;
1418            if snapshot
1419                .entries()
1420                .any(|entry| entry.table_branch.as_deref() == Some(branch))
1421            {
1422                return Err(OmniError::manifest_conflict(format!(
1423                    "cannot delete branch '{}' because branch '{}' still depends on it",
1424                    branch, other_branch
1425                )));
1426            }
1427        }
1428
1429        Ok(())
1430    }
1431
1432    /// Best-effort reclaim of the per-table Lance forks a just-deleted branch
1433    /// owned. Runs AFTER the manifest authority flip, so the branch is already
1434    /// gone and these forks are unreachable orphans. A failure here (transient
1435    /// object-store error, the `branch_delete.before_table_cleanup` failpoint)
1436    /// is logged and swallowed: the `cleanup` reconciler is the guaranteed
1437    /// backstop that converges any leftover orphan. Uses `force_delete_branch`
1438    /// so a partially-reclaimed retry is idempotent.
1439    async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1440        let mut seen_paths = HashSet::new();
1441        let mut cleanup_targets = owned_tables
1442            .iter()
1443            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1444            .cloned()
1445            .collect::<Vec<_>>();
1446        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1447
1448        for (table_key, table_path) in cleanup_targets {
1449            let dataset_uri = self.storage().dataset_uri(&table_path);
1450            let outcome = match crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_DELETE_BEFORE_TABLE_CLEANUP)
1451            {
1452                Ok(()) => {
1453                    self.storage()
1454                        .force_delete_branch(&dataset_uri, branch)
1455                        .await
1456                }
1457                Err(injected) => Err(injected),
1458            };
1459            if let Err(err) = outcome {
1460                tracing::warn!(
1461                    target: "omnigraph::branch_delete::cleanup",
1462                    branch = %branch,
1463                    table = %table_key,
1464                    error = %err,
1465                    "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1466                );
1467            }
1468        }
1469    }
1470
1471    async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1472        let active = self
1473            .coordinator
1474            .read()
1475            .await
1476            .current_branch()
1477            .map(str::to_string);
1478        if active.as_deref() == Some(branch) {
1479            return Err(OmniError::manifest_conflict(format!(
1480                "cannot delete currently active branch '{}'",
1481                branch
1482            )));
1483        }
1484
1485        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1486        let owned_tables = branch_snapshot
1487            .entries()
1488            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1489            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1490            .collect::<Vec<_>>();
1491
1492        // Authority flip (+ best-effort commit-graph reclaim) — must succeed.
1493        self.coordinator.write().await.branch_delete(branch).await?;
1494        // Best-effort per-table fork reclaim; cleanup reconciles any leftover.
1495        self.cleanup_deleted_branch_tables(branch, &owned_tables)
1496            .await;
1497        Ok(())
1498    }
1499
1500    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1501        normalize_branch_name(branch)
1502    }
1503
1504    pub(crate) async fn head_commit_id_for_branch(
1505        &self,
1506        branch: Option<&str>,
1507    ) -> Result<Option<String>> {
1508        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1509        coordinator.ensure_commit_graph_initialized().await?;
1510        coordinator
1511            .head_commit_id()
1512            .await
1513            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1514    }
1515
1516    pub async fn branch_create(&self, name: &str) -> Result<()> {
1517        self.branch_create_as(name, None).await
1518    }
1519
1520    /// Create a branch from the coordinator's currently-open snapshot,
1521    /// with an explicit actor for engine-layer policy enforcement
1522    /// (MR-722 fan-out). Scope is `TargetBranch(name)` — symmetric with
1523    /// `branch_delete_as`: the branch being acted upon is the target.
1524    /// Cedar rules using `target_branch_scope: protected` therefore see
1525    /// the new-branch name and can deny e.g. creating any branch named
1526    /// `main` from a non-privileged actor.
1527    pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1528        self.enforce(
1529            omnigraph_policy::PolicyAction::BranchCreate,
1530            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1531            actor,
1532        )?;
1533        self.ensure_schema_state_valid().await?;
1534        self.ensure_schema_apply_idle("branch_create").await?;
1535        ensure_public_branch_ref(name, "branch_create")?;
1536        self.coordinator.write().await.branch_create(name).await
1537    }
1538
1539    pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1540        self.branch_create_from_as(from, name, None).await
1541    }
1542
1543    /// Create a branch from a specific source branch with an explicit
1544    /// actor for engine-layer policy enforcement (MR-722 fan-out).
1545    ///
1546    /// Scope is `BranchTransition { source, target }` — matches the
1547    /// HTTP-layer convention at `server_branch_create`
1548    /// (branch=Some(from), target_branch=Some(name)), so engine and
1549    /// HTTP fire the same Cedar decision. Pinned-snapshot sources
1550    /// (which aren't a branch ref) materialize as the sentinel
1551    /// `<snapshot>` for the policy check; Cedar rules using
1552    /// `branch_scope: any` still match, rules pinning a specific
1553    /// source branch correctly do not.
1554    pub async fn branch_create_from_as(
1555        &self,
1556        from: impl Into<ReadTarget>,
1557        name: &str,
1558        actor: Option<&str>,
1559    ) -> Result<()> {
1560        let target = from.into();
1561        let source_branch = match &target {
1562            ReadTarget::Branch(b) => b.clone(),
1563            _ => "<snapshot>".to_string(),
1564        };
1565        self.enforce(
1566            omnigraph_policy::PolicyAction::BranchCreate,
1567            &omnigraph_policy::ResourceScope::BranchTransition {
1568                source: source_branch,
1569                target: name.to_string(),
1570            },
1571            actor,
1572        )?;
1573        self.ensure_schema_apply_idle("branch_create_from").await?;
1574        self.branch_create_from_impl(target, name, false).await
1575    }
1576
1577    async fn branch_create_from_impl(
1578        &self,
1579        from: impl Into<ReadTarget>,
1580        name: &str,
1581        allow_internal_refs: bool,
1582    ) -> Result<()> {
1583        let target = from.into();
1584        let ReadTarget::Branch(branch_name) = target else {
1585            return Err(OmniError::manifest(
1586                "branch creation from pinned snapshots is not supported yet".to_string(),
1587            ));
1588        };
1589        if !allow_internal_refs {
1590            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1591            ensure_public_branch_ref(name, "branch_create_from")?;
1592        }
1593        let branch = normalize_branch_name(&branch_name)?;
1594        // Operate on a freshly-opened source coordinator that's owned locally
1595        // — never touch `self.coordinator`. The pre-fix implementation used
1596        // `swap_coordinator_for_branch` + operate + `restore_coordinator` as
1597        // three separate `coordinator.write().await` acquisitions; under
1598        // `&self` concurrency, a second `branch_create_from` could swap
1599        // self.coordinator between this caller's swap and operate steps,
1600        // making the operate run against the wrong source branch and
1601        // forking off the wrong HEAD. Pinned by
1602        // `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`
1603        // in `crates/omnigraph-server/tests/server.rs`.
1604        //
1605        // `branch_create` mutates only the local coord's commit-graph cache;
1606        // the manifest write is durable on disk regardless of which
1607        // coord-handle issued it. Discarding `source_coord` after the call
1608        // is the right shape — the new branch is reachable from any
1609        // subsequent open of any coord.
1610        let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1611        source_coord.branch_create(name).await
1612    }
1613
1614    pub async fn branch_list(&self) -> Result<Vec<String>> {
1615        self.ensure_schema_state_valid().await?;
1616        self.coordinator.read().await.branch_list().await
1617    }
1618
1619    pub async fn branch_delete(&self, name: &str) -> Result<()> {
1620        self.branch_delete_as(name, None).await
1621    }
1622
1623    /// Delete a branch with an explicit actor for engine-layer policy
1624    /// enforcement (MR-722 fan-out). Scope is `TargetBranch(name)` —
1625    /// matches the HTTP-layer convention at `server_branch_delete`
1626    /// (branch=None, target_branch=Some(name)). Cedar rules using
1627    /// `target_branch_scope: protected` therefore correctly gate
1628    /// deletion of protected branches (e.g. deny BranchDelete against
1629    /// `main`).
1630    pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1631        self.enforce(
1632            omnigraph_policy::PolicyAction::BranchDelete,
1633            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1634            actor,
1635        )?;
1636        self.ensure_schema_state_valid().await?;
1637        self.ensure_schema_apply_idle("branch_delete").await?;
1638        ensure_public_branch_ref(name, "branch_delete")?;
1639        self.refresh().await?;
1640        let branch = normalize_branch_name(name)?
1641            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1642        let branches = self.coordinator.read().await.branch_list().await?;
1643        if !branches.iter().any(|candidate| candidate == &branch) {
1644            return Err(OmniError::manifest_not_found(format!(
1645                "branch '{}' not found",
1646                branch
1647            )));
1648        }
1649
1650        self.ensure_branch_delete_safe(&branch, &branches).await?;
1651        self.delete_branch_storage_only(&branch).await
1652    }
1653
1654    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1655        self.ensure_schema_state_valid().await?;
1656        self.coordinator
1657            .read()
1658            .await
1659            .resolve_commit(&SnapshotId::new(commit_id))
1660            .await
1661    }
1662
1663    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1664        self.ensure_schema_state_valid().await?;
1665        let branch = match branch {
1666            Some(branch) => normalize_branch_name(branch)?,
1667            None => None,
1668        };
1669        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1670        coordinator.list_commits().await
1671    }
1672
1673    /// Open a sub-table for mutation with version-drift guard.
1674    ///
1675    /// Checks that the dataset's current version matches the snapshot-pinned
1676    /// version. If another writer has advanced the version, returns an error
1677    /// prompting the caller to refresh and retry (optimistic concurrency).
1678    pub(crate) async fn open_for_mutation(
1679        &self,
1680        table_key: &str,
1681        op_kind: crate::db::MutationOpKind,
1682    ) -> Result<OpenedForMutation> {
1683        table_ops::open_for_mutation(self, table_key, op_kind).await
1684    }
1685
1686    pub(crate) async fn open_for_mutation_on_branch(
1687        &self,
1688        branch: Option<&str>,
1689        table_key: &str,
1690        op_kind: crate::db::MutationOpKind,
1691        txn: Option<&crate::db::WriteTxn>,
1692    ) -> Result<OpenedForMutation> {
1693        table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind, txn).await
1694    }
1695
1696    /// Fork `table_key` onto `active_branch` from the given source state,
1697    /// self-healing a manifest-unreferenced leftover fork if one is in the
1698    /// way. Callers that reach this MUST already hold the per-`(table_key,
1699    /// active_branch)` write queue (so the reclaim cannot race an in-process
1700    /// fork) and must have confirmed via the live manifest that the table is
1701    /// not yet on `active_branch`. Both the first-write fork path
1702    /// (`open_owned_dataset_for_branch_write`) and `branch_merge` satisfy this.
1703    pub(crate) async fn fork_dataset_from_entry_state(
1704        &self,
1705        table_key: &str,
1706        full_path: &str,
1707        source_branch: Option<&str>,
1708        source_version: u64,
1709        active_branch: &str,
1710    ) -> Result<SnapshotHandle> {
1711        match table_ops::fork_dataset_from_entry_state(
1712            self,
1713            table_key,
1714            full_path,
1715            source_branch,
1716            source_version,
1717            active_branch,
1718        )
1719        .await?
1720        {
1721            crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
1722            crate::storage_layer::ForkOutcome::RefAlreadyExists => {
1723                table_ops::reclaim_orphaned_fork_and_refork(
1724                    self,
1725                    table_key,
1726                    full_path,
1727                    source_branch,
1728                    source_version,
1729                    active_branch,
1730                )
1731                .await
1732            }
1733        }
1734    }
1735
1736    pub(crate) async fn reopen_for_mutation(
1737        &self,
1738        table_key: &str,
1739        full_path: &str,
1740        table_branch: Option<&str>,
1741        expected_version: u64,
1742        op_kind: crate::db::MutationOpKind,
1743    ) -> Result<SnapshotHandle> {
1744        table_ops::reopen_for_mutation(
1745            self,
1746            table_key,
1747            full_path,
1748            table_branch,
1749            expected_version,
1750            op_kind,
1751        )
1752        .await
1753    }
1754
1755    pub(crate) async fn open_dataset_at_state(
1756        &self,
1757        table_path: &str,
1758        table_branch: Option<&str>,
1759        table_version: u64,
1760    ) -> Result<SnapshotHandle> {
1761        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1762    }
1763
1764    pub(crate) async fn build_indices_on_dataset(
1765        &self,
1766        table_key: &str,
1767        ds: &mut SnapshotHandle,
1768    ) -> Result<Vec<PendingIndex>> {
1769        table_ops::build_indices_on_dataset(self, table_key, ds).await
1770    }
1771
1772    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
1773    // uses `commit_updates_on_branch_with_expected` exclusively.
1774    #[cfg(test)]
1775    pub(crate) async fn commit_updates(
1776        &mut self,
1777        updates: &[crate::db::SubTableUpdate],
1778    ) -> Result<u64> {
1779        table_ops::commit_updates(self, updates).await
1780    }
1781
1782    pub(crate) async fn commit_manifest_updates(
1783        &self,
1784        updates: &[crate::db::SubTableUpdate],
1785    ) -> Result<u64> {
1786        table_ops::commit_manifest_updates(self, updates).await
1787    }
1788
1789    pub(crate) async fn record_merge_commit(
1790        &self,
1791        manifest_version: u64,
1792        parent_commit_id: &str,
1793        merged_parent_commit_id: &str,
1794        actor_id: Option<&str>,
1795    ) -> Result<String> {
1796        table_ops::record_merge_commit(
1797            self,
1798            manifest_version,
1799            parent_commit_id,
1800            merged_parent_commit_id,
1801            actor_id,
1802        )
1803        .await
1804    }
1805
1806    pub(crate) async fn commit_updates_on_branch_with_expected(
1807        &self,
1808        branch: Option<&str>,
1809        updates: &[crate::db::SubTableUpdate],
1810        expected_table_versions: &std::collections::HashMap<String, u64>,
1811        actor_id: Option<&str>,
1812        txn: Option<&crate::db::WriteTxn>,
1813        committed_handles: std::collections::HashMap<String, crate::storage_layer::SnapshotHandle>,
1814    ) -> Result<u64> {
1815        table_ops::commit_updates_on_branch_with_expected(
1816            self,
1817            branch,
1818            updates,
1819            expected_table_versions,
1820            actor_id,
1821            txn,
1822            committed_handles,
1823        )
1824        .await
1825    }
1826
1827    pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1828        table_ops::ensure_commit_graph_initialized(self).await
1829    }
1830
1831    /// Invalidate the cached graph index. Called after edge mutations.
1832    pub(crate) async fn invalidate_graph_index(&self) {
1833        table_ops::invalidate_graph_index(self).await
1834    }
1835}
1836
1837pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1838    let branch = branch.trim();
1839    if branch.is_empty() {
1840        return Err(OmniError::manifest(
1841            "branch name cannot be empty".to_string(),
1842        ));
1843    }
1844    if branch == "main" {
1845        return Ok(None);
1846    }
1847    Ok(Some(branch.to_string()))
1848}
1849
1850/// Build a `ResolvedTarget` from the warm coordinator without opening the commit
1851/// graph. The live branch snapshot is pinned by the manifest incarnation, so the
1852/// id is synthetic `(branch, version, e_tag when available)`; nothing on the read
1853/// path needs a real commit ULID (only `RuntimeCache` keys on the id, where
1854/// synthetic is consistent).
1855fn warm_resolved_target(coord: &GraphCoordinator, requested: &ReadTarget) -> ResolvedTarget {
1856    ResolvedTarget {
1857        requested: requested.clone(),
1858        branch: coord.current_branch().map(str::to_string),
1859        snapshot_id: SnapshotId::synthetic(
1860            coord.current_branch(),
1861            coord.version(),
1862            coord.manifest_incarnation().e_tag.as_deref(),
1863        ),
1864        snapshot: coord.snapshot(),
1865    }
1866}
1867
1868pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1869    if is_internal_system_branch(branch) {
1870        return Err(OmniError::manifest(format!(
1871            "{} does not allow internal system ref '{}'",
1872            operation, branch
1873        )));
1874    }
1875    Ok(())
1876}
1877
1878fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1879    if batches.is_empty() {
1880        return Ok(RecordBatch::new_empty(schema));
1881    }
1882    if batches.len() == 1 {
1883        return Ok(batches.into_iter().next().unwrap());
1884    }
1885    let batch_schema = batches[0].schema();
1886    arrow_select::concat::concat_batches(&batch_schema, &batches)
1887        .map_err(|e| OmniError::Lance(e.to_string()))
1888}
1889
1890fn blob_properties_for_table_key<'a>(
1891    catalog: &'a Catalog,
1892    table_key: &str,
1893) -> Result<&'a std::collections::HashSet<String>> {
1894    if let Some(type_name) = table_key.strip_prefix("node:") {
1895        return catalog
1896            .node_types
1897            .get(type_name)
1898            .map(|node_type| &node_type.blob_properties)
1899            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1900    }
1901    if let Some(type_name) = table_key.strip_prefix("edge:") {
1902        return catalog
1903            .edge_types
1904            .get(type_name)
1905            .map(|edge_type| &edge_type.blob_properties)
1906            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1907    }
1908    Err(OmniError::manifest(format!(
1909        "invalid table key '{}'",
1910        table_key
1911    )))
1912}
1913
1914fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1915    if descriptions.is_null(row) {
1916        return Ok(true);
1917    }
1918
1919    let kind = descriptions
1920        .column_by_name("kind")
1921        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1922        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1923        .or_else(|| {
1924            descriptions
1925                .column_by_name("kind")
1926                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1927                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1928        });
1929    let position = descriptions
1930        .column_by_name("position")
1931        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1932        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1933    let size = descriptions
1934        .column_by_name("size")
1935        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1936        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1937    let blob_uri = descriptions
1938        .column_by_name("blob_uri")
1939        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1940        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1941
1942    let Some(kind) = kind else {
1943        return Ok(true);
1944    };
1945    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1946    if kind != BlobKind::Inline {
1947        return Ok(false);
1948    }
1949
1950    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1951}
1952
1953/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1954///
1955/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1956/// `DataType::LargeBinary` as a placeholder. This function replaces those
1957/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1958fn fixup_blob_schemas(catalog: &mut Catalog) {
1959    for node_type in catalog.node_types.values_mut() {
1960        if node_type.blob_properties.is_empty() {
1961            continue;
1962        }
1963        let fields: Vec<Field> = node_type
1964            .arrow_schema
1965            .fields()
1966            .iter()
1967            .map(|f| {
1968                if node_type.blob_properties.contains(f.name()) {
1969                    blob_field(f.name(), f.is_nullable())
1970                } else {
1971                    f.as_ref().clone()
1972                }
1973            })
1974            .collect();
1975        node_type.arrow_schema = Arc::new(Schema::new(fields));
1976    }
1977    for edge_type in catalog.edge_types.values_mut() {
1978        if edge_type.blob_properties.is_empty() {
1979            continue;
1980        }
1981        let fields: Vec<Field> = edge_type
1982            .arrow_schema
1983            .fields()
1984            .iter()
1985            .map(|f| {
1986                if edge_type.blob_properties.contains(f.name()) {
1987                    blob_field(f.name(), f.is_nullable())
1988                } else {
1989                    f.as_ref().clone()
1990                }
1991            })
1992            .collect();
1993        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1994    }
1995}
1996
1997fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1998    let schema_ast = parse_schema(schema_source)?;
1999    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
2000}
2001
2002/// I/O phase of `Omnigraph::init_with_storage`. Split out so the caller
2003/// can pattern-match on the result and run cleanup on error before
2004/// returning the original error.
2005///
2006/// Failpoints fire at the phase boundaries:
2007/// * `init.after_schema_pg_written` — `_schema.pg` is on disk. In strict mode
2008///   this fires in the caller immediately after the atomic ownership claim; in
2009///   force mode it fires here after the explicit overwrite.
2010/// * `init.after_schema_contract_written` — `_schema.pg` + `_schema.ir.json`
2011///   + `__schema_state.json` are on disk.
2012/// * `init.after_coordinator_init` — all schema files plus Lance per-type
2013///   datasets and `__manifest/` are on disk. (The cleanup wrapper can only
2014///   remove the schema files; Lance directories need `delete_prefix` —
2015///   deferred along with `DELETE /graphs/{id}`.)
2016async fn init_storage_phase(
2017    root: &str,
2018    schema_source: &str,
2019    schema_ir: &SchemaIR,
2020    catalog: &Catalog,
2021    storage: &Arc<dyn StorageAdapter>,
2022    write_schema_pg: bool,
2023) -> Result<GraphCoordinator> {
2024    if write_schema_pg {
2025        let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
2026        storage.write_text(&schema_path, schema_source).await?;
2027        crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_PG_WRITTEN)?;
2028    }
2029
2030    write_schema_contract(root, storage.as_ref(), schema_ir).await?;
2031    crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_CONTRACT_WRITTEN)?;
2032
2033    let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
2034    crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_COORDINATOR_INIT)?;
2035
2036    Ok(coordinator)
2037}
2038
2039/// Best-effort cleanup of init-phase artifacts. Called from
2040/// `init_with_storage` on any error returned by `init_storage_phase`.
2041///
2042/// Removes the three schema files: `_schema.pg`, `_schema.ir.json`,
2043/// `__schema_state.json`. Lance datasets and `__manifest/` are not
2044/// touched here — recursive directory deletion requires a
2045/// `StorageAdapter::delete_prefix` primitive that's deferred along
2046/// with `DELETE /graphs/{id}` (MR-668 PR 2b).
2047///
2048/// Failures to delete are logged via `tracing::warn` and do not mask
2049/// the original init error.
2050async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
2051    for uri in [
2052        schema_source_uri(root),
2053        schema_ir_uri(root),
2054        schema_state_uri(root),
2055    ] {
2056        if let Err(err) = storage.delete(&uri).await {
2057            tracing::warn!(
2058                target: "omnigraph::init::cleanup",
2059                uri = %uri,
2060                error = %err,
2061                "init failed; best-effort cleanup could not delete artifact",
2062            );
2063        }
2064    }
2065}
2066
2067fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
2068    match type_kind {
2069        SchemaTypeKind::Node => format!("node:{}", name),
2070        SchemaTypeKind::Edge => format!("edge:{}", name),
2071        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
2072    }
2073}
2074
2075fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
2076    if let Some(type_name) = table_key.strip_prefix("node:") {
2077        let node_type: &NodeType = catalog
2078            .node_types
2079            .get(type_name)
2080            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
2081        return Ok(node_type.arrow_schema.clone());
2082    }
2083    if let Some(type_name) = table_key.strip_prefix("edge:") {
2084        let edge_type: &EdgeType = catalog
2085            .edge_types
2086            .get(type_name)
2087            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
2088        return Ok(edge_type.arrow_schema.clone());
2089    }
2090    Err(OmniError::manifest(format!(
2091        "invalid table key '{}'",
2092        table_key
2093    )))
2094}
2095
2096fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
2097    let mut obj = serde_json::Map::new();
2098    for (i, field) in batch.schema().fields().iter().enumerate() {
2099        obj.insert(
2100            field.name().clone(),
2101            json_value_from_array(batch.column(i).as_ref(), row)?,
2102        );
2103    }
2104    Ok(serde_json::Value::Object(obj))
2105}
2106
2107fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
2108    if array.is_null(row) {
2109        return Ok(serde_json::Value::Null);
2110    }
2111
2112    match array.data_type() {
2113        DataType::Utf8 => Ok(serde_json::Value::String(
2114            array
2115                .as_any()
2116                .downcast_ref::<StringArray>()
2117                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
2118                .value(row)
2119                .to_string(),
2120        )),
2121        DataType::LargeUtf8 => Ok(serde_json::Value::String(
2122            array
2123                .as_any()
2124                .downcast_ref::<LargeStringArray>()
2125                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
2126                .value(row)
2127                .to_string(),
2128        )),
2129        DataType::Boolean => Ok(serde_json::Value::Bool(
2130            array
2131                .as_any()
2132                .downcast_ref::<BooleanArray>()
2133                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
2134                .value(row),
2135        )),
2136        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2137            array
2138                .as_any()
2139                .downcast_ref::<Int32Array>()
2140                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
2141                .value(row),
2142        ))),
2143        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
2144            array
2145                .as_any()
2146                .downcast_ref::<Int64Array>()
2147                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
2148                .value(row),
2149        ))),
2150        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2151            array
2152                .as_any()
2153                .downcast_ref::<UInt32Array>()
2154                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
2155                .value(row),
2156        ))),
2157        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
2158            array
2159                .as_any()
2160                .downcast_ref::<UInt64Array>()
2161                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
2162                .value(row),
2163        ))),
2164        DataType::Float32 => {
2165            let value = array
2166                .as_any()
2167                .downcast_ref::<Float32Array>()
2168                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
2169                .value(row) as f64;
2170            Ok(serde_json::Value::Number(
2171                serde_json::Number::from_f64(value).ok_or_else(|| {
2172                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
2173                })?,
2174            ))
2175        }
2176        DataType::Float64 => {
2177            let value = array
2178                .as_any()
2179                .downcast_ref::<Float64Array>()
2180                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
2181                .value(row);
2182            Ok(serde_json::Value::Number(
2183                serde_json::Number::from_f64(value).ok_or_else(|| {
2184                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
2185                })?,
2186            ))
2187        }
2188        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2189            array
2190                .as_any()
2191                .downcast_ref::<Date32Array>()
2192                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
2193                .value(row),
2194        ))),
2195        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
2196            &base64::engine::general_purpose::STANDARD,
2197            array
2198                .as_any()
2199                .downcast_ref::<BinaryArray>()
2200                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
2201                .value(row),
2202        ))),
2203        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
2204            &base64::engine::general_purpose::STANDARD,
2205            array
2206                .as_any()
2207                .downcast_ref::<LargeBinaryArray>()
2208                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
2209                .value(row),
2210        ))),
2211        DataType::List(_) => {
2212            let list = array
2213                .as_any()
2214                .downcast_ref::<ListArray>()
2215                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
2216            let values = list.value(row);
2217            let mut out = Vec::with_capacity(values.len());
2218            for idx in 0..values.len() {
2219                out.push(json_value_from_array(values.as_ref(), idx)?);
2220            }
2221            Ok(serde_json::Value::Array(out))
2222        }
2223        DataType::LargeList(_) => {
2224            let list = array
2225                .as_any()
2226                .downcast_ref::<LargeListArray>()
2227                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
2228            let values = list.value(row);
2229            let mut out = Vec::with_capacity(values.len());
2230            for idx in 0..values.len() {
2231                out.push(json_value_from_array(values.as_ref(), idx)?);
2232            }
2233            Ok(serde_json::Value::Array(out))
2234        }
2235        DataType::FixedSizeList(_, _) => {
2236            let list = array
2237                .as_any()
2238                .downcast_ref::<FixedSizeListArray>()
2239                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
2240            let values = list.value(row);
2241            let mut out = Vec::with_capacity(values.len());
2242            for idx in 0..values.len() {
2243                out.push(json_value_from_array(values.as_ref(), idx)?);
2244            }
2245            Ok(serde_json::Value::Array(out))
2246        }
2247        DataType::Struct(fields) => {
2248            let struct_array = array
2249                .as_any()
2250                .downcast_ref::<StructArray>()
2251                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
2252            let mut obj = serde_json::Map::new();
2253            for (field_idx, field) in fields.iter().enumerate() {
2254                obj.insert(
2255                    field.name().clone(),
2256                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
2257                );
2258            }
2259            Ok(serde_json::Value::Object(obj))
2260        }
2261        _ => {
2262            let value = arrow_cast::display::array_value_to_string(array, row)
2263                .map_err(|e| OmniError::Lance(e.to_string()))?;
2264            Ok(serde_json::Value::String(value))
2265        }
2266    }
2267}
2268
2269#[cfg(test)]
2270mod tests {
2271    use super::*;
2272    use crate::db::manifest::ManifestCoordinator;
2273    use async_trait::async_trait;
2274    use serde_json::Value;
2275    use std::sync::{Arc, Mutex};
2276
2277    use crate::storage::{ObjectStorageAdapter, StorageAdapter, join_uri};
2278
2279    const TEST_SCHEMA: &str = r#"
2280node Person {
2281    name: String @key
2282    age: I32?
2283}
2284node Company {
2285    name: String @key
2286}
2287edge Knows: Person -> Person {
2288    since: Date?
2289}
2290edge WorksAt: Person -> Company
2291"#;
2292
2293    #[derive(Debug)]
2294    struct RecordingStorageAdapter {
2295        inner: ObjectStorageAdapter,
2296        reads: Mutex<Vec<String>>,
2297        writes: Mutex<Vec<String>>,
2298        exists_checks: Mutex<Vec<String>>,
2299        renames: Mutex<Vec<(String, String)>>,
2300        deletes: Mutex<Vec<String>>,
2301    }
2302
2303    impl Default for RecordingStorageAdapter {
2304        fn default() -> Self {
2305            Self {
2306                inner: ObjectStorageAdapter::local(),
2307                reads: Mutex::default(),
2308                writes: Mutex::default(),
2309                exists_checks: Mutex::default(),
2310                renames: Mutex::default(),
2311                deletes: Mutex::default(),
2312            }
2313        }
2314    }
2315
2316    impl RecordingStorageAdapter {
2317        fn reads(&self) -> Vec<String> {
2318            self.reads.lock().unwrap().clone()
2319        }
2320
2321        fn writes(&self) -> Vec<String> {
2322            self.writes.lock().unwrap().clone()
2323        }
2324
2325        fn exists_checks(&self) -> Vec<String> {
2326            self.exists_checks.lock().unwrap().clone()
2327        }
2328    }
2329
2330    #[async_trait]
2331    impl StorageAdapter for RecordingStorageAdapter {
2332        async fn read_text(&self, uri: &str) -> Result<String> {
2333            self.reads.lock().unwrap().push(uri.to_string());
2334            self.inner.read_text(uri).await
2335        }
2336
2337        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2338            self.writes.lock().unwrap().push(uri.to_string());
2339            self.inner.write_text(uri, contents).await
2340        }
2341
2342        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2343            self.writes.lock().unwrap().push(uri.to_string());
2344            self.inner.write_text_if_absent(uri, contents).await
2345        }
2346
2347        async fn exists(&self, uri: &str) -> Result<bool> {
2348            self.exists_checks.lock().unwrap().push(uri.to_string());
2349            self.inner.exists(uri).await
2350        }
2351
2352        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2353            self.renames
2354                .lock()
2355                .unwrap()
2356                .push((from_uri.to_string(), to_uri.to_string()));
2357            self.inner.rename_text(from_uri, to_uri).await
2358        }
2359
2360        async fn delete(&self, uri: &str) -> Result<()> {
2361            self.deletes.lock().unwrap().push(uri.to_string());
2362            self.inner.delete(uri).await
2363        }
2364
2365        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2366            self.inner.list_dir(dir_uri).await
2367        }
2368
2369        async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2370            self.inner.read_text_versioned(uri).await
2371        }
2372
2373        async fn write_text_if_match(
2374            &self,
2375            uri: &str,
2376            contents: &str,
2377            expected_version: &str,
2378        ) -> Result<Option<String>> {
2379            self.inner
2380                .write_text_if_match(uri, contents, expected_version)
2381                .await
2382        }
2383
2384        async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2385            self.inner.delete_prefix(prefix_uri).await
2386        }
2387    }
2388
2389    #[derive(Debug)]
2390    struct InitRaceStorageAdapter {
2391        inner: ObjectStorageAdapter,
2392        root: String,
2393        barrier: Arc<tokio::sync::Barrier>,
2394    }
2395
2396    #[async_trait]
2397    impl StorageAdapter for InitRaceStorageAdapter {
2398        async fn read_text(&self, uri: &str) -> Result<String> {
2399            self.inner.read_text(uri).await
2400        }
2401
2402        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2403            self.inner.write_text(uri, contents).await
2404        }
2405
2406        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2407            self.inner.write_text_if_absent(uri, contents).await
2408        }
2409
2410        async fn exists(&self, uri: &str) -> Result<bool> {
2411            let exists = self.inner.exists(uri).await?;
2412            if uri == schema_state_uri(&self.root) {
2413                self.barrier.wait().await;
2414            }
2415            Ok(exists)
2416        }
2417
2418        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2419            self.inner.rename_text(from_uri, to_uri).await
2420        }
2421
2422        async fn delete(&self, uri: &str) -> Result<()> {
2423            self.inner.delete(uri).await
2424        }
2425
2426        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2427            self.inner.list_dir(dir_uri).await
2428        }
2429
2430        async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2431            self.inner.read_text_versioned(uri).await
2432        }
2433
2434        async fn write_text_if_match(
2435            &self,
2436            uri: &str,
2437            contents: &str,
2438            expected_version: &str,
2439        ) -> Result<Option<String>> {
2440            self.inner
2441                .write_text_if_match(uri, contents, expected_version)
2442                .await
2443        }
2444
2445        async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2446            self.inner.delete_prefix(prefix_uri).await
2447        }
2448    }
2449
2450    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2451    async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2452        let dir = tempfile::tempdir().unwrap();
2453        let uri = dir.path().to_str().unwrap().to_string();
2454        let root = normalize_root_uri(&uri).unwrap();
2455        let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2456            inner: ObjectStorageAdapter::local(),
2457            root,
2458            barrier: Arc::new(tokio::sync::Barrier::new(2)),
2459        });
2460
2461        let left = Omnigraph::init_with_storage(
2462            &uri,
2463            TEST_SCHEMA,
2464            Arc::clone(&storage),
2465            InitOptions::default(),
2466        );
2467        let right = Omnigraph::init_with_storage(
2468            &uri,
2469            TEST_SCHEMA,
2470            Arc::clone(&storage),
2471            InitOptions::default(),
2472        );
2473        let (left, right) = tokio::join!(left, right);
2474        let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2475        assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2476
2477        assert!(
2478            dir.path().join("_schema.pg").exists(),
2479            "winning init must leave _schema.pg in place"
2480        );
2481        assert!(
2482            dir.path().join("_schema.ir.json").exists(),
2483            "winning init must leave _schema.ir.json in place"
2484        );
2485        assert!(
2486            dir.path().join("__schema_state.json").exists(),
2487            "winning init must leave __schema_state.json in place"
2488        );
2489    }
2490
2491    #[tokio::test]
2492    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2493        let dir = tempfile::tempdir().unwrap();
2494        let uri = dir.path().to_str().unwrap();
2495        let adapter = Arc::new(RecordingStorageAdapter::default());
2496
2497        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2498            .await
2499            .unwrap();
2500        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2501        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2502        assert!(
2503            adapter
2504                .writes()
2505                .contains(&join_uri(uri, "__schema_state.json"))
2506        );
2507
2508        Omnigraph::open_with_storage(uri, adapter.clone())
2509            .await
2510            .unwrap();
2511        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2512        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2513        assert!(
2514            adapter
2515                .reads()
2516                .contains(&join_uri(uri, "__schema_state.json"))
2517        );
2518        assert!(
2519            adapter
2520                .exists_checks()
2521                .contains(&join_uri(uri, "_schema.ir.json"))
2522        );
2523        assert!(
2524            adapter
2525                .exists_checks()
2526                .contains(&join_uri(uri, "__schema_state.json"))
2527        );
2528        assert!(
2529            adapter
2530                .exists_checks()
2531                .contains(&join_uri(uri, "_graph_commits.lance"))
2532        );
2533    }
2534
2535    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2536        let snapshot = db.snapshot().await;
2537        let ds = db
2538            .storage()
2539            .open_snapshot_at_table(&snapshot, table_key)
2540            .await
2541            .unwrap();
2542        let batches = db.storage().scan_batches(&ds).await.unwrap();
2543        batches
2544            .into_iter()
2545            .flat_map(|batch| {
2546                (0..batch.num_rows())
2547                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2548                    .collect::<Vec<_>>()
2549            })
2550            .collect()
2551    }
2552
2553    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2554        // No-txn entry, so the handle is always `Some` (collapse #1's skip is
2555        // gated on `txn.is_some()`).
2556        let (ds, full_path, table_branch) = db
2557            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2558            .await
2559            .unwrap()
2560            .require_handle("seed_person_row test");
2561        let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
2562        let columns: Vec<Arc<dyn Array>> = schema
2563            .fields()
2564            .iter()
2565            .map(|field| match field.name().as_str() {
2566                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2567                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2568                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2569                _ => new_null_array(field.data_type(), 1),
2570            })
2571            .collect();
2572        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2573        let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap();
2574        let committed = db.storage().commit_staged(ds, staged).await.unwrap();
2575        let state = db
2576            .storage()
2577            .table_state(&full_path, &committed)
2578            .await
2579            .unwrap();
2580        db.commit_updates(&[crate::db::SubTableUpdate {
2581            table_key: "node:Person".to_string(),
2582            table_version: state.version,
2583            table_branch,
2584            row_count: state.row_count,
2585            version_metadata: state.version_metadata,
2586        }])
2587        .await
2588        .unwrap();
2589    }
2590
2591    #[tokio::test]
2592    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2593        let dir = tempfile::tempdir().unwrap();
2594        let uri = dir.path().to_str().unwrap();
2595        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2596        seed_person_row(&mut db, "Alice", Some(30)).await;
2597
2598        let desired = TEST_SCHEMA.replace(
2599            "    age: I32?\n}",
2600            "    age: I32?\n    nickname: String?\n}",
2601        );
2602        let result = db.apply_schema(&desired).await.unwrap();
2603        assert!(result.applied);
2604
2605        let reopened = Omnigraph::open(uri).await.unwrap();
2606        let rows = table_rows_json(&reopened, "node:Person").await;
2607        assert_eq!(rows.len(), 1);
2608        assert_eq!(rows[0]["name"], "Alice");
2609        assert_eq!(rows[0]["age"], 30);
2610        assert!(rows[0]["nickname"].is_null());
2611        assert!(
2612            reopened.catalog().node_types["Person"]
2613                .properties
2614                .contains_key("nickname")
2615        );
2616        assert!(dir.path().join("_schema.pg").exists());
2617    }
2618
2619    #[tokio::test]
2620    async fn test_apply_schema_renames_property_and_preserves_values() {
2621        let dir = tempfile::tempdir().unwrap();
2622        let uri = dir.path().to_str().unwrap();
2623        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2624        seed_person_row(&mut db, "Alice", Some(30)).await;
2625
2626        let desired = TEST_SCHEMA.replace(
2627            "    age: I32?\n}",
2628            "    years: I32? @rename_from(\"age\")\n}",
2629        );
2630        db.apply_schema(&desired).await.unwrap();
2631
2632        let reopened = Omnigraph::open(uri).await.unwrap();
2633        let rows = table_rows_json(&reopened, "node:Person").await;
2634        assert_eq!(rows[0]["name"], "Alice");
2635        assert_eq!(rows[0]["years"], 30);
2636        assert!(rows[0].get("age").is_none());
2637    }
2638
2639    #[tokio::test]
2640    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2641        let dir = tempfile::tempdir().unwrap();
2642        let uri = dir.path().to_str().unwrap();
2643        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2644        seed_person_row(&mut db, "Alice", Some(30)).await;
2645        let before_version = db.snapshot().await.version();
2646
2647        let desired = TEST_SCHEMA
2648            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2649            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2650            .replace(
2651                "edge WorksAt: Person -> Company",
2652                "edge WorksAt: Human -> Company",
2653            );
2654        db.apply_schema(&desired).await.unwrap();
2655
2656        let head = db.snapshot().await;
2657        assert!(head.entry("node:Person").is_none());
2658        assert!(head.entry("node:Human").is_some());
2659        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2660            .await
2661            .unwrap();
2662        assert!(historical.entry("node:Person").is_some());
2663        assert!(historical.entry("node:Human").is_none());
2664    }
2665
2666    #[tokio::test]
2667    async fn test_apply_schema_succeeds_after_load() {
2668        // Historical: schema apply used to be blocked by leftover
2669        // `__run__` branches. The Run state machine was removed in
2670        // MR-771, so a fresh graph never creates a `__run__` branch;
2671        // legacy ones are swept by the v2→v3 manifest migration. This
2672        // asserts the invariant a current graph upholds: publish leaves
2673        // no `__run__` branch behind, so schema apply proceeds.
2674        let dir = tempfile::tempdir().unwrap();
2675        let uri = dir.path().to_str().unwrap();
2676        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2677
2678        crate::loader::load_jsonl(
2679            &mut db,
2680            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2681            crate::loader::LoadMode::Overwrite,
2682        )
2683        .await
2684        .unwrap();
2685
2686        let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2687        assert!(
2688            !all_branches.iter().any(|b| b.starts_with("__run__")),
2689            "no __run__ branch should exist after publish, got: {:?}",
2690            all_branches
2691        );
2692
2693        let desired = TEST_SCHEMA.replace(
2694            "    age: I32?\n}",
2695            "    age: I32?\n    nickname: String?\n}",
2696        );
2697        let result = db.apply_schema(&desired).await.unwrap();
2698        assert!(result.applied, "schema apply should have applied");
2699    }
2700
2701    /// Regression (MR-770): a pre-v0.4.0 graph that still carries a stale
2702    /// `__run__*` branch on `__manifest` must not block schema apply. The
2703    /// v2→v3 sweep runs in `Omnigraph::open(ReadWrite)` — before the
2704    /// schema-apply blocking-branch check — so apply succeeds with no
2705    /// intervening publish.
2706    ///
2707    /// Confirmed to fail before the open-time migration landed: the reopened
2708    /// graph still listed `__run__legacy`, and `apply_schema` returned
2709    /// "found non-main branches: __run__legacy".
2710    #[tokio::test]
2711    async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
2712        let dir = tempfile::tempdir().unwrap();
2713        let uri = dir.path().to_str().unwrap();
2714        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2715
2716        // Synthesize a legacy graph: a stale `__run__` branch on `__manifest`
2717        // plus the manifest stamp rewound to v2 (pre-sweep).
2718        db.branch_create("__run__legacy").await.unwrap();
2719        drop(db);
2720        {
2721            // forbidden-api-allow: test synthesizes a legacy graph by editing __manifest directly.
2722            let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
2723                .await
2724                .unwrap();
2725            ds.update_schema_metadata([(
2726                "omnigraph:internal_schema_version".to_string(),
2727                Some("2".to_string()),
2728            )])
2729            .await
2730            .unwrap();
2731        }
2732
2733        // Reopen (ReadWrite): the open-time migration must sweep `__run__legacy`
2734        // before any branch-observing code runs.
2735        let db = Omnigraph::open(uri).await.unwrap();
2736        let branches = db.branch_list().await.unwrap();
2737        assert!(
2738            !branches.iter().any(|b| b.starts_with("__run__")),
2739            "open-time migration must sweep legacy __run__ branches; got {branches:?}",
2740        );
2741
2742        // Schema apply must proceed with no intervening publish — the
2743        // blocking-branch check no longer sees `__run__legacy`.
2744        let desired = TEST_SCHEMA.replace(
2745            "    age: I32?\n}",
2746            "    age: I32?\n    nickname: String?\n}",
2747        );
2748        let result = db.apply_schema(&desired).await.unwrap();
2749        assert!(result.applied, "schema apply should have applied");
2750    }
2751
2752    #[tokio::test]
2753    async fn test_apply_schema_defers_index_then_reconciler_builds_it() {
2754        // iss-848: schema apply records the @index intent but builds nothing
2755        // inline; a later ensure_indices materializes it once the table has
2756        // rows. (Use `age`, which is unindexed in TEST_SCHEMA — `name @key` is
2757        // already FTS-indexed at seed, so it can't show the deferral.)
2758        let dir = tempfile::tempdir().unwrap();
2759        let uri = dir.path().to_str().unwrap();
2760        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2761        seed_person_row(&mut db, "Alice", Some(30)).await;
2762
2763        let desired = TEST_SCHEMA.replace("age: I32?", "age: I32? @index");
2764        db.apply_schema(&desired).await.unwrap();
2765
2766        // Apply built nothing — the BTREE on `age` is deferred.
2767        let snapshot = db.snapshot().await;
2768        let ds = db
2769            .storage()
2770            .open_snapshot_at_table(&snapshot, "node:Person")
2771            .await
2772            .unwrap();
2773        assert!(
2774            !db.storage().has_btree_index(&ds, "age").await.unwrap(),
2775            "apply must not build the index inline (deferred to the reconciler)"
2776        );
2777
2778        // The reconciler materializes it (Person has a row).
2779        db.ensure_indices().await.unwrap();
2780        let snapshot = db.snapshot().await;
2781        let ds = db
2782            .storage()
2783            .open_snapshot_at_table(&snapshot, "node:Person")
2784            .await
2785            .unwrap();
2786        assert!(
2787            db.storage().has_btree_index(&ds, "age").await.unwrap(),
2788            "ensure_indices must build the deferred index"
2789        );
2790    }
2791
2792    #[tokio::test]
2793    async fn test_apply_schema_rewrite_defers_index_then_reconciler_restores() {
2794        // iss-848: an AddProperty rewrite writes a new dataset version without
2795        // rebuilding indexes inline (deferred); ensure_indices restores them.
2796        let dir = tempfile::tempdir().unwrap();
2797        let uri = dir.path().to_str().unwrap();
2798        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2799        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2800        seed_person_row(&mut db, "Alice", Some(30)).await;
2801
2802        let desired = initial_schema.replace(
2803            "    age: I32?\n}",
2804            "    age: I32?\n    nickname: String?\n}",
2805        );
2806        db.apply_schema(&desired).await.unwrap();
2807
2808        // After the rewrite the reconciler restores index coverage.
2809        db.ensure_indices().await.unwrap();
2810        let snapshot = db.snapshot().await;
2811        let ds = db
2812            .storage()
2813            .open_snapshot_at_table(&snapshot, "node:Person")
2814            .await
2815            .unwrap();
2816        assert!(db.storage().has_btree_index(&ds, "id").await.unwrap());
2817        assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
2818    }
2819
2820    #[tokio::test]
2821    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2822        let dir = tempfile::tempdir().unwrap();
2823        let uri = dir.path().to_str().unwrap();
2824        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2825        let mut db = db;
2826        db.coordinator
2827            .write()
2828            .await
2829            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2830            .await
2831            .unwrap();
2832
2833        let err = db
2834            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2835            .await
2836            .unwrap_err();
2837        assert!(
2838            err.to_string()
2839                .contains("write is unavailable while schema apply is in progress")
2840        );
2841    }
2842
2843    #[tokio::test]
2844    async fn test_commit_updates_rejects_while_schema_apply_locked() {
2845        let dir = tempfile::tempdir().unwrap();
2846        let uri = dir.path().to_str().unwrap();
2847        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2848        db.coordinator
2849            .write()
2850            .await
2851            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2852            .await
2853            .unwrap();
2854
2855        let err = db.commit_updates(&[]).await.unwrap_err();
2856        assert!(
2857            err.to_string()
2858                .contains("write commit is unavailable while schema apply is in progress")
2859        );
2860    }
2861
2862    #[tokio::test]
2863    async fn test_branch_list_hides_schema_apply_lock_branch() {
2864        let dir = tempfile::tempdir().unwrap();
2865        let uri = dir.path().to_str().unwrap();
2866        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2867        db.coordinator
2868            .write()
2869            .await
2870            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2871            .await
2872            .unwrap();
2873
2874        let branches = db.branch_list().await.unwrap();
2875        assert_eq!(branches, vec!["main".to_string()]);
2876    }
2877}