Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::{PropType, ScalarType};
20use omnigraph_compiler::{
21    DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22    build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::storage_layer::SnapshotHandle;
30use crate::table_store::TableStore;
31
32mod export;
33mod optimize;
34mod repair;
35mod schema_apply;
36mod table_ops;
37
38pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
39pub use repair::{
40    RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
41};
42pub use schema_apply::SchemaApplyOptions;
43pub use table_ops::PendingIndex;
44
45use super::commit_graph::GraphCommit;
46use super::manifest::{
47    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
48    table_path_for_table_key,
49};
50use super::schema_state::{
51    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
52    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
53    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
54    write_schema_contract, write_schema_contract_staging,
55};
56use super::{
57    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
58    is_schema_apply_lock_branch,
59};
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum MergeOutcome {
63    AlreadyUpToDate,
64    FastForward,
65    Merged,
66}
67
68#[derive(Debug, Clone)]
69pub struct SchemaApplyResult {
70    pub supported: bool,
71    pub applied: bool,
72    pub manifest_version: u64,
73    pub steps: Vec<SchemaMigrationStep>,
74}
75
76#[derive(Debug, Clone)]
77pub struct SchemaApplyPreview {
78    pub plan: SchemaMigrationPlan,
79    pub catalog: Catalog,
80}
81
82/// Top-level handle to an Omnigraph database.
83///
84/// An Omnigraph is a Lance-native graph database with git-style branching.
85/// It stores typed property graphs as per-type Lance datasets coordinated
86/// through a Lance manifest table.
87pub struct Omnigraph {
88    root_uri: String,
89    storage: Arc<dyn StorageAdapter>,
90    /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
91    /// this so engine write APIs can be `&self` (the HTTP server's
92    /// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
93    /// calls without a global write lock). Reads (`snapshot`, `version`,
94    /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
95    /// `list_commits`, …) acquire `.read().await` and parallelize.
96    /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
97    /// `record_*`) acquire `.write().await` and serialize. The atomic
98    /// commit invariant — `commit_manifest_updates` followed by
99    /// `record_graph_commit` must be atomic — is preserved by the
100    /// single `.write()` covering both calls inside
101    /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
102    /// converted from `Mutex` to `RwLock` because the bench showed
103    /// the Mutex was the dominant serializer for disjoint-table
104    /// workloads. Lock acquisition order: always before `runtime_cache`
105    /// (when both are needed in one scope).
106    coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
107    table_store: TableStore,
108    runtime_cache: RuntimeCache,
109    /// Read-heavy on every query, written only by `apply_schema`. ArcSwap
110    /// gives atomic pointer swap with zero-cost reads (`load()` returns a
111    /// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
112    /// don't contend on a lock to read the catalog.
113    catalog: Arc<ArcSwap<Catalog>>,
114    /// Read-heavy on schema introspection paths, written only by
115    /// `apply_schema`. Same ArcSwap rationale as `catalog`.
116    schema_source: Arc<ArcSwap<String>>,
117    /// Per-`(table_key, branch)` writer queues — the engine's
118    /// write-serialization mechanism (the server holds the engine as a
119    /// lockless `Arc<Omnigraph>`). Reachable from engine internals
120    /// (mutation finalize, schema_apply, branch_merge, ensure_indices,
121    /// delete_where, the fork path, recovery reconciler).
122    write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
123    /// Process-wide mutex held across the swap → operate → restore window
124    /// in `branch_merge_impl`. Two concurrent merges with distinct targets
125    /// would otherwise interleave their three separate
126    /// `coordinator.write().await` acquisitions, leaving each merge's
127    /// inner body running against the other's swapped coord. Pinned by
128    /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
129    /// in `crates/omnigraph-server/tests/server.rs`.
130    ///
131    /// Cost: serializes ALL concurrent branch merges process-wide.
132    /// Acceptable because branch merges are heavy (table rewrites, index
133    /// rebuilds), per-(table, branch) queues inside `commit_all` already
134    /// serialize the data path, and merges are rare relative to /change
135    /// or /ingest. A finer-grained per-target-branch mutex is a follow-up
136    /// if telemetry shows merge concurrency matters.
137    ///
138    /// The deeper fix — refactor `branch_merge_on_current_target` to take
139    /// an explicit target coord parameter so `self.coordinator` is never
140    /// used as scratch space — is the round-1 shape applied to
141    /// `branch_create_from_impl`. Deferred because it requires unwinding
142    /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()`
143    /// call inside the merge body.
144    merge_exclusive: Arc<tokio::sync::Mutex<()>>,
145    /// Optional policy checker for engine-layer enforcement (MR-722).
146    /// `None` = no enforcement; mutating methods are unconditionally
147    /// allowed (this is the embedded/dev default). `Some` = every
148    /// mutating method calls `self.enforce(action, scope, actor)` at
149    /// entry; denial returns `OmniError::Policy`.
150    ///
151    /// Per chassis design (see `omnigraph_policy::PolicyChecker`), the
152    /// trait surface is deliberately coarse — action × scope × actor.
153    /// Per-row / per-type / per-column scope lives at the query layer
154    /// (MR-725), which extends the same trait with a different method.
155    /// Don't be tempted to add per-row enforcement here.
156    ///
157    /// Set via `with_policy(checker)` after construction. Today only
158    /// `apply_schema_as` consults this field (PR #2 proof-of-concept);
159    /// PR #3 fans the `enforce()` call out to the remaining writers.
160    policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
161    /// Lazily-built, reused-across-queries embedding client. Built on the first
162    /// `nearest($v, "string")` that needs server-side embedding (so a graph that
163    /// never embeds needs no provider key), then shared by every later query —
164    /// avoids the per-query `from_env()` rebuild and keeps the provider HTTP
165    /// connection pool warm. `OnceCell` guarantees a single initialization.
166    embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
167    /// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from an
168    /// applied cluster `providers.embedding` profile via [`Omnigraph::with_embedding_config`].
169    /// When set, the embedding cell builds its client from this instead of
170    /// `EmbeddingClient::from_env()`; `None` keeps the env fallback.
171    embedding_config: Option<Arc<crate::embedding::EmbeddingConfig>>,
172}
173
174/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
175///
176/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
177/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
178/// inspection — should not trigger writes (they may run with read-only
179/// object-store credentials, and silent open-time mutations are
180/// surprising). They also don't need recovery: reads always resolve
181/// through the manifest pin, which is the consistent snapshot regardless
182/// of any Phase B → Phase C drift on the per-table side.
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub enum OpenMode {
185    /// Run the recovery sweep on open. Default for `Omnigraph::open`.
186    ReadWrite,
187    /// Skip the recovery sweep. Use for read-only consumers via
188    /// [`Omnigraph::open_read_only`].
189    ReadOnly,
190}
191
192/// Options for [`Omnigraph::init_with_options`].
193///
194/// `force` controls the safety preflight that prevents an
195/// accidental re-init from overwriting an existing graph's schema
196/// metadata. Default behavior (`force: false`) fails fast with
197/// [`OmniError::AlreadyInitialized`] if any of `_schema.pg`,
198/// `_schema.ir.json`, or `__schema_state.json` already exists at
199/// the target URI. With `force: true` the preflight is skipped —
200/// existing schema files are overwritten in place. Force does NOT
201/// purge old Lance datasets or `__manifest/`; reclaiming those
202/// still requires deleting the graph directory by hand (or via a
203/// future `DELETE /graphs/{id}`).
204#[derive(Debug, Clone, Copy, Default)]
205pub struct InitOptions {
206    /// Skip the existing-graph preflight. Operators set this when
207    /// they actually mean to overwrite — e.g. `omnigraph init --force`.
208    pub force: bool,
209}
210
211impl Omnigraph {
212    /// Create a new graph at `uri` from schema source.
213    ///
214    /// Strict mode: errors with [`OmniError::AlreadyInitialized`] if
215    /// `uri` already holds any of the three schema artifacts. To
216    /// overwrite an existing graph deliberately, call
217    /// [`Self::init_with_options`] with `InitOptions { force: true }`.
218    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
219        Self::init_with_options(uri, schema_source, InitOptions::default()).await
220    }
221
222    /// Create a new graph at `uri`, with explicit init-time options.
223    ///
224    /// See [`InitOptions`] for the safety contract — by default this
225    /// behaves identically to [`Self::init`].
226    pub async fn init_with_options(
227        uri: &str,
228        schema_source: &str,
229        options: InitOptions,
230    ) -> Result<Self> {
231        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
232    }
233
234    pub(crate) async fn init_with_storage(
235        uri: &str,
236        schema_source: &str,
237        storage: Arc<dyn StorageAdapter>,
238        options: InitOptions,
239    ) -> Result<Self> {
240        let root = normalize_root_uri(uri)?;
241
242        // Preflight: refuse to clobber an existing graph unless the
243        // operator passed `force`. This runs BEFORE any parse or
244        // write so a misdirected `init` against an existing graph
245        // URI cannot reach a code path that overwrites or, on a
246        // later cleanup, deletes the schema files.
247        //
248        // Closes the "init is destructive against existing state"
249        // class: there is no longer a code path where strict-mode
250        // `init` can mutate a populated graph root.
251        if !options.force {
252            for candidate in [
253                schema_source_uri(&root),
254                schema_ir_uri(&root),
255                schema_state_uri(&root),
256            ] {
257                if storage.exists(&candidate).await? {
258                    return Err(OmniError::AlreadyInitialized { uri: root.clone() });
259                }
260            }
261        }
262
263        let schema_ir = read_schema_ir_from_source(schema_source)?;
264        let mut catalog = build_catalog_from_ir(&schema_ir)?;
265        fixup_blob_schemas(&mut catalog);
266
267        // Establish an atomic ownership claim on `_schema.pg` before
268        // writing the remaining init artifacts. A check-then-write preflight
269        // is not enough under concurrent `init` calls: two callers can both
270        // observe an empty root, one can successfully initialize, and the
271        // loser can then fail in Lance `WriteMode::Create`. Only the caller
272        // that atomically created `_schema.pg` may clean up schema artifacts
273        // on later failure.
274        let schema_pg_claimed = if options.force {
275            false
276        } else {
277            let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
278            if !storage
279                .write_text_if_absent(&schema_path, schema_source)
280                .await?
281            {
282                return Err(OmniError::AlreadyInitialized { uri: root.clone() });
283            }
284            if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
285                best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
286                return Err(err);
287            }
288            true
289        };
290
291        // Run the I/O phase. On any error, best-effort-clean schema
292        // artifacts only when this invocation owns them: strict mode owns
293        // them after the atomic `_schema.pg` claim above; force mode owns
294        // destructive overwrite semantics by explicit operator request.
295        //
296        // Coverage gap: Lance per-type datasets and `__manifest/`
297        // directory created by `GraphCoordinator::init` are NOT cleaned
298        // up here — fully recursive directory deletion requires a
299        // `StorageAdapter::delete_prefix` primitive that's deferred
300        // along with `DELETE /graphs/{id}` (PR 2b in the MR-668 plan
301        // is currently deferred). If `init` fails after coordinator
302        // init succeeds, operators may need to remove the graph
303        // directory manually before retrying `init` on the same URI.
304        // Documented in the PR 2a commit message and `init` rustdoc.
305        let coordinator = match init_storage_phase(
306            &root,
307            schema_source,
308            &schema_ir,
309            &catalog,
310            &storage,
311            !schema_pg_claimed,
312        )
313        .await
314        {
315            Ok(coordinator) => coordinator,
316            Err(err) => {
317                if schema_pg_claimed || options.force {
318                    best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
319                }
320                return Err(err);
321            }
322        };
323
324        Ok(Self {
325            root_uri: root.clone(),
326            storage,
327            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
328            table_store: TableStore::new(&root),
329            runtime_cache: RuntimeCache::default(),
330            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
331            schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
332            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
333            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
334            policy: None,
335            embedding: Arc::new(tokio::sync::OnceCell::new()),
336            embedding_config: None,
337        })
338    }
339
340    /// Open an existing graph (read-write).
341    ///
342    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
343    /// Runs the open-time recovery sweep before returning — see [`OpenMode`].
344    pub async fn open(uri: &str) -> Result<Self> {
345        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
346    }
347
348    /// Open an existing graph for read-only consumers (NDJSON export,
349    /// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
350    pub async fn open_read_only(uri: &str) -> Result<Self> {
351        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
352    }
353
354    /// `open_with_storage` retained for existing callers (init/test paths).
355    /// Defaults to `OpenMode::ReadWrite`.
356    pub(crate) async fn open_with_storage(
357        uri: &str,
358        storage: Arc<dyn StorageAdapter>,
359    ) -> Result<Self> {
360        Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
361    }
362
363    pub(crate) async fn open_with_storage_and_mode(
364        uri: &str,
365        storage: Arc<dyn StorageAdapter>,
366        mode: OpenMode,
367    ) -> Result<Self> {
368        let root = normalize_root_uri(uri)?;
369        // Apply pending internal-schema migrations before the coordinator reads
370        // branch state, so `branch_list` and the schema-apply blocking-branch
371        // checks observe the post-migration graph — notably the v2→v3 sweep of
372        // legacy `__run__*` staging branches (MR-770). ReadWrite only: a
373        // read-only open must not trigger object-store writes, so a read-only
374        // open of an unmigrated legacy graph still lists `__run__*` until its
375        // first read-write open (an accepted, documented limitation).
376        if matches!(mode, OpenMode::ReadWrite) {
377            crate::db::manifest::migrate_on_open(&root).await?;
378        }
379        // Open the coordinator first so the schema-staging recovery sweep can
380        // compare its snapshot against any leftover staging files.
381        let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
382        // Both the schema-state recovery sweep AND the manifest-drift
383        // recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
384        // consumers (NDJSON export, `commit list`, schema show) shouldn't
385        // trigger object-store mutations: they may run with read-only
386        // credentials, and silent open-time writes are surprising. Both
387        // sweeps' work is recoverable on the next ReadWrite open, so
388        // skipping under ReadOnly doesn't lose any safety guarantees —
389        // the manifest pin is the consistent snapshot regardless of
390        // drift on the per-table side or leftover schema-staging files.
391        if matches!(mode, OpenMode::ReadWrite) {
392            let schema_state_recovery =
393                recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
394                    .await?;
395            // Recovery sweep: close the Phase B → Phase C residual on
396            // any sidecar left over from a crashed writer. Long-running
397            // processes additionally converge in-process: the staged-
398            // write entry points and `refresh` run the roll-forward-only
399            // heal (`heal_pending_sidecars_roll_forward`); only
400            // rollback-eligible sidecars wait for this open-time sweep.
401            crate::db::manifest::recover_manifest_drift(
402                &root,
403                Arc::clone(&storage),
404                &mut coordinator,
405                crate::db::manifest::RecoveryMode::Full,
406                schema_state_recovery,
407            )
408            .await?;
409        }
410        // Read _schema.pg (post-recovery — may have just been renamed in).
411        let schema_path = schema_source_uri(&root);
412        let schema_source = storage.read_text(&schema_path).await?;
413        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
414        let branches = coordinator.branch_list().await?;
415        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
416            &root,
417            Arc::clone(&storage),
418            &branches,
419            &current_source_ir,
420        )
421        .await?;
422        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
423        fixup_blob_schemas(&mut catalog);
424
425        Ok(Self {
426            root_uri: root.clone(),
427            storage,
428            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
429            table_store: TableStore::new(&root),
430            runtime_cache: RuntimeCache::default(),
431            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
432            schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
433            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
434            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
435            policy: None,
436            embedding: Arc::new(tokio::sync::OnceCell::new()),
437            embedding_config: None,
438        })
439    }
440
441    /// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
442    /// catalog pointer; callers can hold the returned `Arc` across awaits
443    /// without blocking concurrent `apply_schema`.
444    pub fn catalog(&self) -> Arc<Catalog> {
445        self.catalog.load_full()
446    }
447
448    /// Returns an `Arc<String>` snapshot of the schema source.
449    pub fn schema_source(&self) -> Arc<String> {
450        self.schema_source.load_full()
451    }
452
453    /// Atomically swap the in-memory catalog. Concurrent readers see
454    /// either the old or the new pointer; never a torn state. Used by
455    /// `apply_schema` and `reload_schema_if_source_changed`.
456    pub(crate) fn store_catalog(&self, catalog: Catalog) {
457        self.catalog.store(Arc::new(catalog));
458    }
459
460    /// Atomically swap the in-memory schema source. Same rationale as
461    /// [`store_catalog`](Self::store_catalog).
462    pub(crate) fn store_schema_source(&self, schema_source: String) {
463        self.schema_source.store(Arc::new(schema_source));
464    }
465
466    pub fn uri(&self) -> &str {
467        &self.root_uri
468    }
469
470    /// Install a policy checker for engine-layer enforcement (MR-722).
471    /// Builder-style setter — consumes `self`, returns `Self`. Calling
472    /// this on a `Omnigraph` previously without policy enables
473    /// `enforce()` to fire at every mutating engine method that's been
474    /// wired to call it (currently `apply_schema_as`; PR #3 fans out to
475    /// the remaining writers).
476    ///
477    /// Embedded callers that don't care about authorization should
478    /// just not call this. Server / CLI callers that have loaded a
479    /// `PolicyEngine` from `policy.yaml` pass it here.
480    pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
481        self.policy = Some(checker);
482        self
483    }
484
485    /// The lazily-initialized, reused-across-queries embedding client cell
486    /// (see the `embedding` field doc). The query executor resolves the client
487    /// through this on the first `nearest($v, "string")` that needs embedding.
488    pub(crate) fn embedding_cell(
489        &self,
490    ) -> &tokio::sync::OnceCell<crate::embedding::EmbeddingClient> {
491        &self.embedding
492    }
493
494    /// Install a pre-resolved embedding config (RFC-012 Phase 5). Builder-style,
495    /// mirroring [`Omnigraph::with_policy`]: a graph served from a cluster
496    /// embedding provider profile injects it here; an embedded/CLI caller that doesn't
497    /// call this keeps the `EmbeddingClient::from_env()` fallback.
498    pub fn with_embedding_config(mut self, config: Arc<crate::embedding::EmbeddingConfig>) -> Self {
499        self.embedding_config = Some(config);
500        self
501    }
502
503    /// The injected embedding config, if any (see the `embedding_config` field).
504    pub(crate) fn embedding_config_ref(&self) -> Option<&crate::embedding::EmbeddingConfig> {
505        self.embedding_config.as_deref()
506    }
507
508    /// Engine-layer policy enforcement gate (MR-722 chassis core).
509    ///
510    /// * If no policy is installed → no-op (returns `Ok(())`).
511    /// * If policy is installed AND actor is None → denial with a
512    ///   clear "no actor for engine-layer policy check" message.
513    ///   Forces server / CLI / SDK callers to thread an actor through
514    ///   when policy is configured — silent bypass via "I forgot the
515    ///   actor" is exactly the footgun this gate is here to prevent.
516    /// * If policy is installed AND actor is Some → call
517    ///   `PolicyChecker::check(action, scope, actor)`; map denial /
518    ///   internal failure to `OmniError::Policy(...)`.
519    pub(crate) fn enforce(
520        &self,
521        action: omnigraph_policy::PolicyAction,
522        scope: &omnigraph_policy::ResourceScope,
523        actor: Option<&str>,
524    ) -> Result<()> {
525        let Some(checker) = self.policy.as_ref() else {
526            return Ok(());
527        };
528        let Some(actor) = actor else {
529            return Err(OmniError::Policy(
530                "no actor for engine-layer policy check (policy is configured but the call site \
531                 didn't thread an actor through — this is almost certainly a bug, not an \
532                 intended bypass)"
533                    .to_string(),
534            ));
535        };
536        checker
537            .check(action, scope, actor)
538            .map_err(|err| OmniError::Policy(err.to_string()))
539    }
540
541    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
542        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
543    }
544
545    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
546        self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
547            .await
548    }
549
550    pub async fn plan_schema_with_options(
551        &self,
552        desired_schema_source: &str,
553        options: SchemaApplyOptions,
554    ) -> Result<SchemaMigrationPlan> {
555        schema_apply::plan_schema(self, desired_schema_source, options).await
556    }
557
558    pub async fn preview_schema_apply_with_options(
559        &self,
560        desired_schema_source: &str,
561        options: SchemaApplyOptions,
562    ) -> Result<SchemaApplyPreview> {
563        schema_apply::preview_schema_apply(self, desired_schema_source, options).await
564    }
565
566    pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
567        self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
568            .await
569    }
570
571    pub async fn apply_schema_with_options(
572        &self,
573        desired_schema_source: &str,
574        options: SchemaApplyOptions,
575    ) -> Result<SchemaApplyResult> {
576        self.apply_schema_as(desired_schema_source, options, None)
577            .await
578    }
579
580    /// Apply a schema migration with an explicit actor for engine-layer
581    /// policy enforcement (MR-722). When a `PolicyChecker` is installed
582    /// via [`Self::with_policy`], this method calls `enforce(SchemaApply,
583    /// Branch("main"), actor)` before any apply work happens. Denial
584    /// returns `OmniError::Policy` and leaves the manifest untouched.
585    ///
586    /// The no-actor variants (`apply_schema`, `apply_schema_with_options`)
587    /// pass `None` here. They work fine without a policy; if a policy IS
588    /// installed and actor is None, enforcement intentionally fails to
589    /// prevent silent-bypass-via-forgetting-the-actor footguns.
590    pub async fn apply_schema_as(
591        &self,
592        desired_schema_source: &str,
593        options: SchemaApplyOptions,
594        actor: Option<&str>,
595    ) -> Result<SchemaApplyResult> {
596        self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
597            .await
598    }
599
600    pub async fn apply_schema_as_with_catalog_check<F>(
601        &self,
602        desired_schema_source: &str,
603        options: SchemaApplyOptions,
604        actor: Option<&str>,
605        validate_catalog: F,
606    ) -> Result<SchemaApplyResult>
607    where
608        F: FnOnce(&Catalog) -> Result<()>,
609    {
610        schema_apply::apply_schema(
611            self,
612            desired_schema_source,
613            options,
614            actor,
615            validate_catalog,
616        )
617        .await
618    }
619
620    pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
621        schema_apply::ensure_schema_apply_idle(self, operation).await
622    }
623
624    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
625        schema_apply::ensure_schema_apply_not_locked(self, operation).await
626    }
627
628    /// Engine-facing trait surface around `TableStore`.
629    ///
630    /// This is the **only** accessor for engine code reaching into the
631    /// storage layer. The trait's signatures use opaque `SnapshotHandle`
632    /// / `StagedHandle` instead of leaking `lance::Dataset` /
633    /// `lance::dataset::transaction::Transaction`, so newly-added engine
634    /// call sites cannot drift the staged-write invariant by mistake
635    /// (the trait's `stage_*` + `commit_staged` pair is the only way to
636    /// land a write).
637    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
638        &self.table_store
639    }
640
641    /// Inline-commit residual surface (`delete_where`,
642    /// `create_vector_index`) — the writes Lance cannot yet express as a
643    /// stage-then-commit pair. Deliberately separate from [`Self::storage`] so
644    /// the default storage surface is staged-only and a new writer cannot couple
645    /// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only
646    /// the handful of documented residual call sites (mutation/merge deletes,
647    /// vector-index build) use this accessor. See
648    /// `crate::storage_layer::InlineCommitResidual` for the per-method blocker.
649    pub(crate) fn storage_inline_residual(
650        &self,
651    ) -> &dyn crate::storage_layer::InlineCommitResidual {
652        &self.table_store
653    }
654
655    /// Engine-level access to the object-store adapter (S3 / local fs).
656    /// Used by the recovery sidecar protocol — writers in the engine
657    /// call this to write/delete sidecars at `__recovery/{ulid}.json`.
658    pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
659        self.storage.as_ref()
660    }
661
662    /// Per-`(table_key, branch)` writer queues.
663    ///
664    /// Engine-internal writers (mutation finalize, schema_apply,
665    /// branch_merge, ensure_indices, delete_where) and the future MR-870
666    /// recovery reconciler reach the queue manager via this accessor.
667    /// Returns an `Arc` clone so callers can hold the manager across
668    /// `&mut self` engine API boundaries.
669    pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
670        Arc::clone(&self.write_queue)
671    }
672
673    /// Engine-internal access to the merge-exclusive mutex. Held across
674    /// the swap → operate → restore window in `branch_merge_impl` so
675    /// concurrent merges with distinct targets don't corrupt
676    /// `self.coordinator` mid-operation. See the field doc on
677    /// `Omnigraph::merge_exclusive` for the full design rationale.
678    pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
679        Arc::clone(&self.merge_exclusive)
680    }
681
682    /// Engine-level access to the graph's normalized root URI. Used by
683    /// the recovery sidecar protocol to compute `__recovery/` paths.
684    pub(crate) fn root_uri(&self) -> &str {
685        &self.root_uri
686    }
687
688    pub(crate) async fn open_coordinator_for_branch(
689        &self,
690        branch: Option<&str>,
691    ) -> Result<GraphCoordinator> {
692        match branch {
693            Some(branch) => {
694                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
695            }
696            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
697        }
698    }
699
700    pub(crate) async fn swap_coordinator_for_branch(
701        &self,
702        branch: Option<&str>,
703    ) -> Result<GraphCoordinator> {
704        let next = self.open_coordinator_for_branch(branch).await?;
705        let mut coord = self.coordinator.write().await;
706        Ok(std::mem::replace(&mut *coord, next))
707    }
708
709    pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
710        *self.coordinator.write().await = coordinator;
711    }
712
713    pub(crate) async fn resolved_branch_target(
714        &self,
715        branch: Option<&str>,
716    ) -> Result<ResolvedTarget> {
717        self.ensure_schema_state_valid().await?;
718        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
719        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
720        let coord = self.coordinator.read().await;
721        if normalized.as_deref() == coord.current_branch() {
722            let snapshot_id = coord
723                .head_commit_id()
724                .await?
725                .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
726            return Ok(ResolvedTarget {
727                requested,
728                branch: coord.current_branch().map(str::to_string),
729                snapshot_id,
730                snapshot: coord.snapshot(),
731            });
732        }
733        coord.resolve_target(&requested).await
734    }
735
736    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
737        self.resolved_branch_target(branch)
738            .await
739            .map(|resolved| resolved.snapshot)
740    }
741
742    pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
743        self.ensure_schema_state_valid().await?;
744        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
745        let coord = self.coordinator.read().await;
746        coord
747            .resolve_target(&requested)
748            .await
749            .map(|resolved| resolved.snapshot)
750    }
751
752    pub(crate) async fn version(&self) -> u64 {
753        self.coordinator.read().await.version()
754    }
755
756    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
757    pub(crate) async fn snapshot(&self) -> Snapshot {
758        self.coordinator.read().await.snapshot()
759    }
760
761    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
762        self.resolved_target(target)
763            .await
764            .map(|resolved| resolved.snapshot)
765    }
766
767    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
768        self.snapshot_of(target)
769            .await
770            .map(|snapshot| snapshot.version())
771    }
772
773    pub async fn resolved_branch_of(
774        &self,
775        target: impl Into<ReadTarget>,
776    ) -> Result<Option<String>> {
777        self.resolved_target(target)
778            .await
779            .map(|resolved| resolved.branch)
780    }
781
782    /// Synchronize this handle's write base to the latest head of the named branch.
783    pub async fn sync_branch(&self, branch: &str) -> Result<()> {
784        self.ensure_schema_state_valid().await?;
785        let branch = normalize_branch_name(branch)?;
786        let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
787        *self.coordinator.write().await = next;
788        self.runtime_cache.invalidate_all().await;
789        Ok(())
790    }
791
792    /// Re-read the handle-local coordinator state from storage AND run
793    /// in-process recovery. Closes the Phase B → Phase C residual (e.g.
794    /// `MutationStaging::finalize` crash mid-publish in a long-running
795    /// server) without restart.
796    ///
797    /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
798    /// recovery sequence, in the same order, with one restriction: the
799    /// manifest-drift heal runs in `RollForwardOnly` mode (rollback /
800    /// abort cases defer to the next ReadWrite open because
801    /// `Dataset::restore` is unsafe under concurrency). Each step:
802    ///
803    /// 1. `coordinator.refresh()` — re-read manifest.
804    /// 2. `recover_schema_state_files` — complete an in-flight
805    ///    schema_apply's staging→final rename if a SchemaApply sidecar
806    ///    is on disk; idempotent + early-returns when no staging files
807    ///    exist. Required BEFORE manifest-drift recovery so a
808    ///    SchemaApply roll-forward doesn't publish the manifest while
809    ///    the staging files remain unrenamed (which would corrupt the
810    ///    graph: data on new schema, catalog on old).
811    /// 3. `heal_pending_sidecars_roll_forward` — close the
812    ///    finalize→publisher residual via roll-forward; defer rollback
813    ///    work to next ReadWrite open. Serializes against live writers
814    ///    by acquiring each sidecar's per-(table_key, branch) write
815    ///    queues, so refresh never rolls forward an in-flight writer's
816    ///    sidecar from under it.
817    /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
818    ///
819    /// Steady state cost: one `list_dir` of `__recovery/` (typically
820    /// returns empty → early return for both passes). No additional
821    /// Lance reads.
822    ///
823    /// The staged-write entry points (`load_as`, `mutate_as`) run the
824    /// same heal via
825    /// [`heal_pending_recovery_sidecars`](Self::heal_pending_recovery_sidecars),
826    /// so a long-lived server converges on the next write without an
827    /// explicit refresh. Engine-internal callers that already hold an
828    /// in-flight sidecar (e.g. `schema_apply` mid-write) MUST use
829    /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
830    /// avoid the recovery sweep racing their own sidecar.
831    pub async fn refresh(&self) -> Result<()> {
832        // Standalone schema-staging reconcile ONLY when no recovery
833        // sidecar exists (legacy/manual staging residue). When sidecars
834        // exist, the heal below owns the reconcile — per SchemaApply
835        // sidecar, under that sidecar's queue guards — because an
836        // unserialized reconcile can promote a LIVE schema apply's
837        // staging files from under it, and a pre-promoted result would
838        // make the heal's own guarded reconcile see clean staging and
839        // wrongly defer the sidecar. The no-sidecar case cannot race a
840        // live apply: its sidecar is on disk before its staging files.
841        //
842        // Scope the coord write guard to the schema-state section only.
843        // `reload_schema_if_source_changed` (below) acquires
844        // `self.coordinator.read().await` when the on-disk schema source
845        // has drifted from the cached `schema_source`. Tokio's RwLock is
846        // not reentrant, so holding the write across that call deadlocks.
847        // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`.
848        // The heal also takes the lock itself (queues → coordinator
849        // order), so it must run after this guard is released.
850        {
851            // Hold the schema-apply serialization key across the
852            // list-then-reconcile pair: without it, a live apply can
853            // write its sidecar + staging between the empty check and
854            // the reconcile (the same race, through a smaller window).
855            // Queue before coordinator — the documented lock order.
856            //
857            // Liveness note: with a pending NON-SchemaApply sidecar
858            // (e.g. a Mutation residual), this gate skips the standalone
859            // reconcile and the heal below reconciles only per
860            // SchemaApply sidecar — so pre-sidecar-era orphaned staging
861            // residue waits for the NEXT refresh after the sidecars are
862            // consumed. Convergence holds, one pass late. Do not "fix"
863            // by re-running the reconcile unserialized here: that is
864            // exactly the live-apply race this block exists to close.
865            let _serial = self
866                .write_queue
867                .acquire(&crate::db::manifest::schema_apply_serial_queue_key())
868                .await;
869            if crate::db::manifest::list_sidecars(&self.root_uri, self.storage.as_ref())
870                .await?
871                .is_empty()
872            {
873                let mut coord = self.coordinator.write().await;
874                coord.refresh().await?;
875                recover_schema_state_files(
876                    &self.root_uri,
877                    Arc::clone(&self.storage),
878                    &coord.snapshot(),
879                )
880                .await?;
881            }
882        } // ← guards released before the heal's queue acquisition
883        crate::db::manifest::heal_pending_sidecars_roll_forward(
884            &self.root_uri,
885            Arc::clone(&self.storage),
886            &self.coordinator,
887            &self.write_queue,
888        )
889        .await?;
890        self.reload_schema_if_source_changed().await?;
891        self.runtime_cache.invalidate_all().await;
892        Ok(())
893    }
894
895    /// Write-entry heal: converge any pending recovery sidecars (a
896    /// previously failed writer's Phase B → Phase C residual) before
897    /// starting a new staged write, so a long-lived process (the HTTP
898    /// server, an embedded handle) recovers on its next write instead
899    /// of wedging every write on the commit-time drift guard until
900    /// restart. Roll-forward only; rollback-eligible sidecars defer to
901    /// the next ReadWrite open exactly as [`refresh`](Self::refresh)
902    /// does.
903    ///
904    /// Steady-state cost: one `list_dir` of `__recovery/` (typically
905    /// empty → immediate return). See
906    /// `recovery::heal_pending_sidecars_roll_forward` for the
907    /// concurrency contract (per-table write-queue acquisition).
908    pub(crate) async fn heal_pending_recovery_sidecars(&self) -> Result<()> {
909        let processed = crate::db::manifest::heal_pending_sidecars_roll_forward(
910            &self.root_uri,
911            Arc::clone(&self.storage),
912            &self.coordinator,
913            &self.write_queue,
914        )
915        .await?;
916        if processed {
917            // A rolled-forward SchemaApply sidecar moved disk + manifest
918            // to the new schema (staging promoted, registrations
919            // published); the in-memory catalog must follow or the very
920            // write that triggered the heal validates against the stale
921            // schema. Same post-heal step as `refresh`.
922            self.reload_schema_if_source_changed().await?;
923            self.runtime_cache.invalidate_all().await;
924        }
925        Ok(())
926    }
927
928    async fn reload_schema_if_source_changed(&self) -> Result<()> {
929        let schema_path = schema_source_uri(&self.root_uri);
930        let schema_source = self.storage.read_text(&schema_path).await?;
931        if schema_source == *self.schema_source.load_full() {
932            return Ok(());
933        }
934        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
935        let branches = self.coordinator.read().await.branch_list().await?;
936        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
937            &self.root_uri,
938            Arc::clone(&self.storage),
939            &branches,
940            &current_source_ir,
941        )
942        .await?;
943        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
944        fixup_blob_schemas(&mut catalog);
945        self.store_schema_source(schema_source);
946        self.store_catalog(catalog);
947        Ok(())
948    }
949
950    /// Refresh coordinator state and invalidate the runtime cache WITHOUT
951    /// running the recovery sweep. Engine-internal callers that hold an
952    /// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
953    /// internal lease-check refresh) need this variant: running recovery
954    /// here would observe the caller's own sidecar, classify it as
955    /// RolledPastExpected, and roll it forward — racing the caller's
956    /// own publish path.
957    pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
958        self.coordinator.write().await.refresh().await?;
959        self.runtime_cache.invalidate_all().await;
960        Ok(())
961    }
962
963    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
964        self.ensure_schema_state_valid().await?;
965        self.coordinator
966            .read()
967            .await
968            .resolve_snapshot_id(branch)
969            .await
970    }
971
972    pub(crate) async fn resolved_target(
973        &self,
974        target: impl Into<ReadTarget>,
975    ) -> Result<ResolvedTarget> {
976        self.ensure_schema_state_valid().await?;
977        self.coordinator
978            .read()
979            .await
980            .resolve_target(&target.into())
981            .await
982    }
983
984    // ─── Change detection ────────────────────────────────────────────────
985
986    pub async fn diff_between(
987        &self,
988        from: impl Into<ReadTarget>,
989        to: impl Into<ReadTarget>,
990        filter: &crate::changes::ChangeFilter,
991    ) -> Result<crate::changes::ChangeSet> {
992        let from_resolved = self.resolved_target(from).await?;
993        let to_resolved = self.resolved_target(to).await?;
994        crate::changes::diff_snapshots(
995            self.uri(),
996            &from_resolved.snapshot,
997            &to_resolved.snapshot,
998            filter,
999            to_resolved.branch.clone().or(from_resolved.branch.clone()),
1000        )
1001        .await
1002    }
1003
1004    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
1005    /// and creates branch-aware snapshots. Supports cross-branch comparison.
1006    pub async fn diff_commits(
1007        &self,
1008        from_commit_id: &str,
1009        to_commit_id: &str,
1010        filter: &crate::changes::ChangeFilter,
1011    ) -> Result<crate::changes::ChangeSet> {
1012        let coord = self.coordinator.read().await;
1013        let from_commit = coord
1014            .resolve_commit(&SnapshotId::new(from_commit_id))
1015            .await?;
1016        let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
1017        let from_snap = coord
1018            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1019                from_commit.graph_commit_id.clone(),
1020            )))
1021            .await?;
1022        let to_snap = coord
1023            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1024                to_commit.graph_commit_id.clone(),
1025            )))
1026            .await?;
1027        drop(coord);
1028        crate::changes::diff_snapshots(
1029            self.uri(),
1030            &from_snap.snapshot,
1031            &to_snap.snapshot,
1032            filter,
1033            to_snap.branch.clone().or(from_snap.branch.clone()),
1034        )
1035        .await
1036    }
1037
1038    pub async fn entity_at_target(
1039        &self,
1040        target: impl Into<ReadTarget>,
1041        table_key: &str,
1042        id: &str,
1043    ) -> Result<Option<serde_json::Value>> {
1044        export::entity_at_target(self, target, table_key, id).await
1045    }
1046
1047    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
1048    pub async fn entity_at(
1049        &self,
1050        table_key: &str,
1051        id: &str,
1052        version: u64,
1053    ) -> Result<Option<serde_json::Value>> {
1054        export::entity_at(self, table_key, id, version).await
1055    }
1056
1057    /// Create a Snapshot at any historical manifest version.
1058    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
1059        self.ensure_schema_state_valid().await?;
1060        self.coordinator
1061            .read()
1062            .await
1063            .snapshot_at_version(version)
1064            .await
1065    }
1066
1067    pub async fn export_jsonl(
1068        &self,
1069        branch: &str,
1070        type_names: &[String],
1071        table_keys: &[String],
1072    ) -> Result<String> {
1073        export::export_jsonl(self, branch, type_names, table_keys).await
1074    }
1075
1076    pub async fn export_jsonl_to_writer<W: Write>(
1077        &self,
1078        branch: &str,
1079        type_names: &[String],
1080        table_keys: &[String],
1081        writer: &mut W,
1082    ) -> Result<()> {
1083        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
1084    }
1085
1086    // ─── Graph index ──────────────────────────────────────────────────────
1087
1088    /// Get or build the graph index for the current snapshot.
1089    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
1090        table_ops::graph_index(self).await
1091    }
1092
1093    pub(crate) async fn graph_index_for_resolved(
1094        &self,
1095        resolved: &ResolvedTarget,
1096    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
1097        table_ops::graph_index_for_resolved(self, resolved).await
1098    }
1099
1100    /// Ensure BTree scalar indices exist on key columns.
1101    /// Idempotent — Lance skips if index already exists.
1102    ///
1103    /// Opens sub-tables at their latest version (not snapshot-pinned) because
1104    /// indices must be created on the current head. Any version drift from the
1105    /// snapshot is expected and logged. The resulting versions are committed
1106    /// back to the manifest.
1107    ///
1108    /// On named branches, indexing preserves lazy branching:
1109    /// unbranched subtables keep inheriting `main`, while subtables inherited
1110    /// from an ancestor branch are first forked into the active branch before
1111    /// their index metadata is updated.
1112    /// Returns the declared indexes that could not be materialized on this
1113    /// pass (today: vector columns with no trainable vectors yet). They are
1114    /// deferred, not errors; a later `ensure_indices`/`optimize` builds them
1115    /// once the column is trainable. Reads stay correct (brute-force) meanwhile.
1116    pub async fn ensure_indices(&self) -> Result<Vec<PendingIndex>> {
1117        table_ops::ensure_indices(self).await
1118    }
1119
1120    pub async fn ensure_indices_on(&self, branch: &str) -> Result<Vec<PendingIndex>> {
1121        table_ops::ensure_indices_on(self, branch).await
1122    }
1123
1124    #[cfg(feature = "failpoints")]
1125    #[doc(hidden)]
1126    pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
1127        &mut self,
1128        branch: &str,
1129        table_key: &str,
1130        table_branch: Option<&str>,
1131    ) -> Result<u64> {
1132        table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
1133            self,
1134            branch,
1135            table_key,
1136            table_branch,
1137        )
1138        .await
1139    }
1140
1141    /// Compact small Lance fragments into fewer larger ones across every
1142    /// node + edge table on `main`. See [`optimize`] for details.
1143    pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
1144        optimize::optimize_all_tables(self).await
1145    }
1146
1147    /// Classify and explicitly repair uncovered manifest/head drift. See
1148    /// [`repair`] for the distinction between safe maintenance drift and
1149    /// suspicious/unverifiable drift.
1150    pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
1151        repair::repair_all_tables(self, options).await
1152    }
1153
1154    /// Remove Lance manifests (and the fragments they uniquely own) per the
1155    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
1156    /// history. See [`optimize`] for details.
1157    pub async fn cleanup(
1158        &mut self,
1159        options: optimize::CleanupPolicyOptions,
1160    ) -> Result<Vec<optimize::TableCleanupStats>> {
1161        optimize::cleanup_all_tables(self, options).await
1162    }
1163
1164    /// Read a blob from a node by its string ID and property name.
1165    ///
1166    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
1167    /// and metadata accessors (`size()`, `kind()`, `uri()`).
1168    ///
1169    /// ```ignore
1170    /// let blob = db.read_blob("Document", "readme", "content").await?;
1171    /// let bytes = blob.read().await?;
1172    /// ```
1173    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1174        self.ensure_schema_state_valid().await?;
1175        let catalog = self.catalog();
1176        let node_type = catalog
1177            .node_types
1178            .get(type_name)
1179            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1180        if !node_type.blob_properties.contains(property) {
1181            return Err(OmniError::manifest(format!(
1182                "property '{}' on type '{}' is not a Blob",
1183                property, type_name
1184            )));
1185        }
1186
1187        let snapshot = self.snapshot().await;
1188        let table_key = format!("node:{}", type_name);
1189        let handle = self
1190            .storage()
1191            .open_snapshot_at_table(&snapshot, &table_key)
1192            .await?;
1193
1194        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1195        let row_id = self
1196            .storage()
1197            .first_row_id_for_filter(&handle, &filter_sql)
1198            .await?
1199            .ok_or_else(|| {
1200                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1201            })?;
1202
1203        // `take_blobs` is a Lance-specific blob accessor not surfaced
1204        // through the `TableStorage` trait — reach the inner `Arc<Dataset>`
1205        // via the `pub(crate)` accessor for this read-only call.
1206        let ds = handle.into_arc();
1207        let mut blobs = ds
1208            .take_blobs(&[row_id], property)
1209            .await
1210            .map_err(|e| OmniError::Lance(e.to_string()))?;
1211
1212        blobs.pop().ok_or_else(|| {
1213            OmniError::manifest(format!(
1214                "blob '{}' on {} '{}' returned no data",
1215                property, type_name, id
1216            ))
1217        })
1218    }
1219
1220    pub(crate) async fn active_branch(&self) -> Option<String> {
1221        self.coordinator
1222            .read()
1223            .await
1224            .current_branch()
1225            .map(str::to_string)
1226    }
1227
1228    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1229        let descendants = self
1230            .coordinator
1231            .read()
1232            .await
1233            .branch_descendants(branch)
1234            .await?;
1235        if let Some(descendant) = descendants.first() {
1236            return Err(OmniError::manifest_conflict(format!(
1237                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1238                branch, descendant
1239            )));
1240        }
1241
1242        for other_branch in branches
1243            .iter()
1244            .filter(|candidate| candidate.as_str() != branch)
1245        {
1246            let snapshot = self
1247                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1248                .await?;
1249            if snapshot
1250                .entries()
1251                .any(|entry| entry.table_branch.as_deref() == Some(branch))
1252            {
1253                return Err(OmniError::manifest_conflict(format!(
1254                    "cannot delete branch '{}' because branch '{}' still depends on it",
1255                    branch, other_branch
1256                )));
1257            }
1258        }
1259
1260        Ok(())
1261    }
1262
1263    /// Best-effort reclaim of the per-table Lance forks a just-deleted branch
1264    /// owned. Runs AFTER the manifest authority flip, so the branch is already
1265    /// gone and these forks are unreachable orphans. A failure here (transient
1266    /// object-store error, the `branch_delete.before_table_cleanup` failpoint)
1267    /// is logged and swallowed: the `cleanup` reconciler is the guaranteed
1268    /// backstop that converges any leftover orphan. Uses `force_delete_branch`
1269    /// so a partially-reclaimed retry is idempotent.
1270    async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1271        let mut seen_paths = HashSet::new();
1272        let mut cleanup_targets = owned_tables
1273            .iter()
1274            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1275            .cloned()
1276            .collect::<Vec<_>>();
1277        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1278
1279        for (table_key, table_path) in cleanup_targets {
1280            let dataset_uri = self.storage().dataset_uri(&table_path);
1281            let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
1282            {
1283                Ok(()) => {
1284                    self.storage()
1285                        .force_delete_branch(&dataset_uri, branch)
1286                        .await
1287                }
1288                Err(injected) => Err(injected),
1289            };
1290            if let Err(err) = outcome {
1291                tracing::warn!(
1292                    target: "omnigraph::branch_delete::cleanup",
1293                    branch = %branch,
1294                    table = %table_key,
1295                    error = %err,
1296                    "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1297                );
1298            }
1299        }
1300    }
1301
1302    async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1303        let active = self
1304            .coordinator
1305            .read()
1306            .await
1307            .current_branch()
1308            .map(str::to_string);
1309        if active.as_deref() == Some(branch) {
1310            return Err(OmniError::manifest_conflict(format!(
1311                "cannot delete currently active branch '{}'",
1312                branch
1313            )));
1314        }
1315
1316        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1317        let owned_tables = branch_snapshot
1318            .entries()
1319            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1320            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1321            .collect::<Vec<_>>();
1322
1323        // Authority flip (+ best-effort commit-graph reclaim) — must succeed.
1324        self.coordinator.write().await.branch_delete(branch).await?;
1325        // Best-effort per-table fork reclaim; cleanup reconciles any leftover.
1326        self.cleanup_deleted_branch_tables(branch, &owned_tables)
1327            .await;
1328        Ok(())
1329    }
1330
1331    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1332        normalize_branch_name(branch)
1333    }
1334
1335    pub(crate) async fn head_commit_id_for_branch(
1336        &self,
1337        branch: Option<&str>,
1338    ) -> Result<Option<String>> {
1339        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1340        coordinator.ensure_commit_graph_initialized().await?;
1341        coordinator
1342            .head_commit_id()
1343            .await
1344            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1345    }
1346
1347    pub async fn branch_create(&self, name: &str) -> Result<()> {
1348        self.branch_create_as(name, None).await
1349    }
1350
1351    /// Create a branch from the coordinator's currently-open snapshot,
1352    /// with an explicit actor for engine-layer policy enforcement
1353    /// (MR-722 fan-out). Scope is `TargetBranch(name)` — symmetric with
1354    /// `branch_delete_as`: the branch being acted upon is the target.
1355    /// Cedar rules using `target_branch_scope: protected` therefore see
1356    /// the new-branch name and can deny e.g. creating any branch named
1357    /// `main` from a non-privileged actor.
1358    pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1359        self.enforce(
1360            omnigraph_policy::PolicyAction::BranchCreate,
1361            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1362            actor,
1363        )?;
1364        self.ensure_schema_state_valid().await?;
1365        self.ensure_schema_apply_idle("branch_create").await?;
1366        ensure_public_branch_ref(name, "branch_create")?;
1367        self.coordinator.write().await.branch_create(name).await
1368    }
1369
1370    pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1371        self.branch_create_from_as(from, name, None).await
1372    }
1373
1374    /// Create a branch from a specific source branch with an explicit
1375    /// actor for engine-layer policy enforcement (MR-722 fan-out).
1376    ///
1377    /// Scope is `BranchTransition { source, target }` — matches the
1378    /// HTTP-layer convention at `server_branch_create`
1379    /// (branch=Some(from), target_branch=Some(name)), so engine and
1380    /// HTTP fire the same Cedar decision. Pinned-snapshot sources
1381    /// (which aren't a branch ref) materialize as the sentinel
1382    /// `<snapshot>` for the policy check; Cedar rules using
1383    /// `branch_scope: any` still match, rules pinning a specific
1384    /// source branch correctly do not.
1385    pub async fn branch_create_from_as(
1386        &self,
1387        from: impl Into<ReadTarget>,
1388        name: &str,
1389        actor: Option<&str>,
1390    ) -> Result<()> {
1391        let target = from.into();
1392        let source_branch = match &target {
1393            ReadTarget::Branch(b) => b.clone(),
1394            _ => "<snapshot>".to_string(),
1395        };
1396        self.enforce(
1397            omnigraph_policy::PolicyAction::BranchCreate,
1398            &omnigraph_policy::ResourceScope::BranchTransition {
1399                source: source_branch,
1400                target: name.to_string(),
1401            },
1402            actor,
1403        )?;
1404        self.ensure_schema_apply_idle("branch_create_from").await?;
1405        self.branch_create_from_impl(target, name, false).await
1406    }
1407
1408    async fn branch_create_from_impl(
1409        &self,
1410        from: impl Into<ReadTarget>,
1411        name: &str,
1412        allow_internal_refs: bool,
1413    ) -> Result<()> {
1414        let target = from.into();
1415        let ReadTarget::Branch(branch_name) = target else {
1416            return Err(OmniError::manifest(
1417                "branch creation from pinned snapshots is not supported yet".to_string(),
1418            ));
1419        };
1420        if !allow_internal_refs {
1421            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1422            ensure_public_branch_ref(name, "branch_create_from")?;
1423        }
1424        let branch = normalize_branch_name(&branch_name)?;
1425        // Operate on a freshly-opened source coordinator that's owned locally
1426        // — never touch `self.coordinator`. The pre-fix implementation used
1427        // `swap_coordinator_for_branch` + operate + `restore_coordinator` as
1428        // three separate `coordinator.write().await` acquisitions; under
1429        // `&self` concurrency, a second `branch_create_from` could swap
1430        // self.coordinator between this caller's swap and operate steps,
1431        // making the operate run against the wrong source branch and
1432        // forking off the wrong HEAD. Pinned by
1433        // `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`
1434        // in `crates/omnigraph-server/tests/server.rs`.
1435        //
1436        // `branch_create` mutates only the local coord's commit-graph cache;
1437        // the manifest write is durable on disk regardless of which
1438        // coord-handle issued it. Discarding `source_coord` after the call
1439        // is the right shape — the new branch is reachable from any
1440        // subsequent open of any coord.
1441        let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1442        source_coord.branch_create(name).await
1443    }
1444
1445    pub async fn branch_list(&self) -> Result<Vec<String>> {
1446        self.ensure_schema_state_valid().await?;
1447        self.coordinator.read().await.branch_list().await
1448    }
1449
1450    pub async fn branch_delete(&self, name: &str) -> Result<()> {
1451        self.branch_delete_as(name, None).await
1452    }
1453
1454    /// Delete a branch with an explicit actor for engine-layer policy
1455    /// enforcement (MR-722 fan-out). Scope is `TargetBranch(name)` —
1456    /// matches the HTTP-layer convention at `server_branch_delete`
1457    /// (branch=None, target_branch=Some(name)). Cedar rules using
1458    /// `target_branch_scope: protected` therefore correctly gate
1459    /// deletion of protected branches (e.g. deny BranchDelete against
1460    /// `main`).
1461    pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1462        self.enforce(
1463            omnigraph_policy::PolicyAction::BranchDelete,
1464            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1465            actor,
1466        )?;
1467        self.ensure_schema_state_valid().await?;
1468        self.ensure_schema_apply_idle("branch_delete").await?;
1469        ensure_public_branch_ref(name, "branch_delete")?;
1470        self.refresh().await?;
1471        let branch = normalize_branch_name(name)?
1472            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1473        let branches = self.coordinator.read().await.branch_list().await?;
1474        if !branches.iter().any(|candidate| candidate == &branch) {
1475            return Err(OmniError::manifest_not_found(format!(
1476                "branch '{}' not found",
1477                branch
1478            )));
1479        }
1480
1481        self.ensure_branch_delete_safe(&branch, &branches).await?;
1482        self.delete_branch_storage_only(&branch).await
1483    }
1484
1485    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1486        self.ensure_schema_state_valid().await?;
1487        self.coordinator
1488            .read()
1489            .await
1490            .resolve_commit(&SnapshotId::new(commit_id))
1491            .await
1492    }
1493
1494    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1495        self.ensure_schema_state_valid().await?;
1496        let branch = match branch {
1497            Some(branch) => normalize_branch_name(branch)?,
1498            None => None,
1499        };
1500        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1501        coordinator.list_commits().await
1502    }
1503
1504    /// Open a sub-table for mutation with version-drift guard.
1505    ///
1506    /// Checks that the dataset's current version matches the snapshot-pinned
1507    /// version. If another writer has advanced the version, returns an error
1508    /// prompting the caller to refresh and retry (optimistic concurrency).
1509    pub(crate) async fn open_for_mutation(
1510        &self,
1511        table_key: &str,
1512        op_kind: crate::db::MutationOpKind,
1513    ) -> Result<(SnapshotHandle, String, Option<String>)> {
1514        table_ops::open_for_mutation(self, table_key, op_kind).await
1515    }
1516
1517    pub(crate) async fn open_for_mutation_on_branch(
1518        &self,
1519        branch: Option<&str>,
1520        table_key: &str,
1521        op_kind: crate::db::MutationOpKind,
1522    ) -> Result<(SnapshotHandle, String, Option<String>)> {
1523        table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1524    }
1525
1526    /// Fork `table_key` onto `active_branch` from the given source state,
1527    /// self-healing a manifest-unreferenced leftover fork if one is in the
1528    /// way. Callers that reach this MUST already hold the per-`(table_key,
1529    /// active_branch)` write queue (so the reclaim cannot race an in-process
1530    /// fork) and must have confirmed via the live manifest that the table is
1531    /// not yet on `active_branch`. Both the first-write fork path
1532    /// (`open_owned_dataset_for_branch_write`) and `branch_merge` satisfy this.
1533    pub(crate) async fn fork_dataset_from_entry_state(
1534        &self,
1535        table_key: &str,
1536        full_path: &str,
1537        source_branch: Option<&str>,
1538        source_version: u64,
1539        active_branch: &str,
1540    ) -> Result<SnapshotHandle> {
1541        match table_ops::fork_dataset_from_entry_state(
1542            self,
1543            table_key,
1544            full_path,
1545            source_branch,
1546            source_version,
1547            active_branch,
1548        )
1549        .await?
1550        {
1551            crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
1552            crate::storage_layer::ForkOutcome::RefAlreadyExists => {
1553                table_ops::reclaim_orphaned_fork_and_refork(
1554                    self,
1555                    table_key,
1556                    full_path,
1557                    source_branch,
1558                    source_version,
1559                    active_branch,
1560                )
1561                .await
1562            }
1563        }
1564    }
1565
1566    pub(crate) async fn reopen_for_mutation(
1567        &self,
1568        table_key: &str,
1569        full_path: &str,
1570        table_branch: Option<&str>,
1571        expected_version: u64,
1572        op_kind: crate::db::MutationOpKind,
1573    ) -> Result<SnapshotHandle> {
1574        table_ops::reopen_for_mutation(
1575            self,
1576            table_key,
1577            full_path,
1578            table_branch,
1579            expected_version,
1580            op_kind,
1581        )
1582        .await
1583    }
1584
1585    pub(crate) async fn open_dataset_at_state(
1586        &self,
1587        table_path: &str,
1588        table_branch: Option<&str>,
1589        table_version: u64,
1590    ) -> Result<SnapshotHandle> {
1591        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1592    }
1593
1594    pub(crate) async fn build_indices_on_dataset(
1595        &self,
1596        table_key: &str,
1597        ds: &mut SnapshotHandle,
1598    ) -> Result<Vec<PendingIndex>> {
1599        table_ops::build_indices_on_dataset(self, table_key, ds).await
1600    }
1601
1602    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
1603    // uses `commit_updates_on_branch_with_expected` exclusively.
1604    #[cfg(test)]
1605    pub(crate) async fn commit_updates(
1606        &mut self,
1607        updates: &[crate::db::SubTableUpdate],
1608    ) -> Result<u64> {
1609        table_ops::commit_updates(self, updates).await
1610    }
1611
1612    pub(crate) async fn commit_manifest_updates(
1613        &self,
1614        updates: &[crate::db::SubTableUpdate],
1615    ) -> Result<u64> {
1616        table_ops::commit_manifest_updates(self, updates).await
1617    }
1618
1619    pub(crate) async fn record_merge_commit(
1620        &self,
1621        manifest_version: u64,
1622        parent_commit_id: &str,
1623        merged_parent_commit_id: &str,
1624        actor_id: Option<&str>,
1625    ) -> Result<String> {
1626        table_ops::record_merge_commit(
1627            self,
1628            manifest_version,
1629            parent_commit_id,
1630            merged_parent_commit_id,
1631            actor_id,
1632        )
1633        .await
1634    }
1635
1636    pub(crate) async fn commit_updates_on_branch_with_expected(
1637        &self,
1638        branch: Option<&str>,
1639        updates: &[crate::db::SubTableUpdate],
1640        expected_table_versions: &std::collections::HashMap<String, u64>,
1641        actor_id: Option<&str>,
1642    ) -> Result<u64> {
1643        table_ops::commit_updates_on_branch_with_expected(
1644            self,
1645            branch,
1646            updates,
1647            expected_table_versions,
1648            actor_id,
1649        )
1650        .await
1651    }
1652
1653    pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1654        table_ops::ensure_commit_graph_initialized(self).await
1655    }
1656
1657    /// Invalidate the cached graph index. Called after edge mutations.
1658    pub(crate) async fn invalidate_graph_index(&self) {
1659        table_ops::invalidate_graph_index(self).await
1660    }
1661}
1662
1663pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1664    let branch = branch.trim();
1665    if branch.is_empty() {
1666        return Err(OmniError::manifest(
1667            "branch name cannot be empty".to_string(),
1668        ));
1669    }
1670    if branch == "main" {
1671        return Ok(None);
1672    }
1673    Ok(Some(branch.to_string()))
1674}
1675
1676pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1677    if is_internal_system_branch(branch) {
1678        return Err(OmniError::manifest(format!(
1679            "{} does not allow internal system ref '{}'",
1680            operation, branch
1681        )));
1682    }
1683    Ok(())
1684}
1685
1686fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1687    if batches.is_empty() {
1688        return Ok(RecordBatch::new_empty(schema));
1689    }
1690    if batches.len() == 1 {
1691        return Ok(batches.into_iter().next().unwrap());
1692    }
1693    let batch_schema = batches[0].schema();
1694    arrow_select::concat::concat_batches(&batch_schema, &batches)
1695        .map_err(|e| OmniError::Lance(e.to_string()))
1696}
1697
1698fn blob_properties_for_table_key<'a>(
1699    catalog: &'a Catalog,
1700    table_key: &str,
1701) -> Result<&'a std::collections::HashSet<String>> {
1702    if let Some(type_name) = table_key.strip_prefix("node:") {
1703        return catalog
1704            .node_types
1705            .get(type_name)
1706            .map(|node_type| &node_type.blob_properties)
1707            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1708    }
1709    if let Some(type_name) = table_key.strip_prefix("edge:") {
1710        return catalog
1711            .edge_types
1712            .get(type_name)
1713            .map(|edge_type| &edge_type.blob_properties)
1714            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1715    }
1716    Err(OmniError::manifest(format!(
1717        "invalid table key '{}'",
1718        table_key
1719    )))
1720}
1721
1722fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1723    if descriptions.is_null(row) {
1724        return Ok(true);
1725    }
1726
1727    let kind = descriptions
1728        .column_by_name("kind")
1729        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1730        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1731        .or_else(|| {
1732            descriptions
1733                .column_by_name("kind")
1734                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1735                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1736        });
1737    let position = descriptions
1738        .column_by_name("position")
1739        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1740        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1741    let size = descriptions
1742        .column_by_name("size")
1743        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1744        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1745    let blob_uri = descriptions
1746        .column_by_name("blob_uri")
1747        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1748        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1749
1750    let Some(kind) = kind else {
1751        return Ok(true);
1752    };
1753    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1754    if kind != BlobKind::Inline {
1755        return Ok(false);
1756    }
1757
1758    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1759}
1760
1761/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1762///
1763/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1764/// `DataType::LargeBinary` as a placeholder. This function replaces those
1765/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1766fn fixup_blob_schemas(catalog: &mut Catalog) {
1767    for node_type in catalog.node_types.values_mut() {
1768        if node_type.blob_properties.is_empty() {
1769            continue;
1770        }
1771        let fields: Vec<Field> = node_type
1772            .arrow_schema
1773            .fields()
1774            .iter()
1775            .map(|f| {
1776                if node_type.blob_properties.contains(f.name()) {
1777                    blob_field(f.name(), f.is_nullable())
1778                } else {
1779                    f.as_ref().clone()
1780                }
1781            })
1782            .collect();
1783        node_type.arrow_schema = Arc::new(Schema::new(fields));
1784    }
1785    for edge_type in catalog.edge_types.values_mut() {
1786        if edge_type.blob_properties.is_empty() {
1787            continue;
1788        }
1789        let fields: Vec<Field> = edge_type
1790            .arrow_schema
1791            .fields()
1792            .iter()
1793            .map(|f| {
1794                if edge_type.blob_properties.contains(f.name()) {
1795                    blob_field(f.name(), f.is_nullable())
1796                } else {
1797                    f.as_ref().clone()
1798                }
1799            })
1800            .collect();
1801        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1802    }
1803}
1804
1805fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1806    let schema_ast = parse_schema(schema_source)?;
1807    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1808}
1809
1810/// I/O phase of `Omnigraph::init_with_storage`. Split out so the caller
1811/// can pattern-match on the result and run cleanup on error before
1812/// returning the original error.
1813///
1814/// Failpoints fire at the phase boundaries:
1815/// * `init.after_schema_pg_written` — `_schema.pg` is on disk. In strict mode
1816///   this fires in the caller immediately after the atomic ownership claim; in
1817///   force mode it fires here after the explicit overwrite.
1818/// * `init.after_schema_contract_written` — `_schema.pg` + `_schema.ir.json`
1819///   + `__schema_state.json` are on disk.
1820/// * `init.after_coordinator_init` — all schema files plus Lance per-type
1821///   datasets and `__manifest/` are on disk. (The cleanup wrapper can only
1822///   remove the schema files; Lance directories need `delete_prefix` —
1823///   deferred along with `DELETE /graphs/{id}`.)
1824async fn init_storage_phase(
1825    root: &str,
1826    schema_source: &str,
1827    schema_ir: &SchemaIR,
1828    catalog: &Catalog,
1829    storage: &Arc<dyn StorageAdapter>,
1830    write_schema_pg: bool,
1831) -> Result<GraphCoordinator> {
1832    if write_schema_pg {
1833        let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1834        storage.write_text(&schema_path, schema_source).await?;
1835        crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1836    }
1837
1838    write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1839    crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1840
1841    let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1842    crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1843
1844    Ok(coordinator)
1845}
1846
1847/// Best-effort cleanup of init-phase artifacts. Called from
1848/// `init_with_storage` on any error returned by `init_storage_phase`.
1849///
1850/// Removes the three schema files: `_schema.pg`, `_schema.ir.json`,
1851/// `__schema_state.json`. Lance datasets and `__manifest/` are not
1852/// touched here — recursive directory deletion requires a
1853/// `StorageAdapter::delete_prefix` primitive that's deferred along
1854/// with `DELETE /graphs/{id}` (MR-668 PR 2b).
1855///
1856/// Failures to delete are logged via `tracing::warn` and do not mask
1857/// the original init error.
1858async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1859    for uri in [
1860        schema_source_uri(root),
1861        schema_ir_uri(root),
1862        schema_state_uri(root),
1863    ] {
1864        if let Err(err) = storage.delete(&uri).await {
1865            tracing::warn!(
1866                target: "omnigraph::init::cleanup",
1867                uri = %uri,
1868                error = %err,
1869                "init failed; best-effort cleanup could not delete artifact",
1870            );
1871        }
1872    }
1873}
1874
1875fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1876    match type_kind {
1877        SchemaTypeKind::Node => format!("node:{}", name),
1878        SchemaTypeKind::Edge => format!("edge:{}", name),
1879        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1880    }
1881}
1882
1883fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1884    if let Some(type_name) = table_key.strip_prefix("node:") {
1885        let node_type: &NodeType = catalog
1886            .node_types
1887            .get(type_name)
1888            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1889        return Ok(node_type.arrow_schema.clone());
1890    }
1891    if let Some(type_name) = table_key.strip_prefix("edge:") {
1892        let edge_type: &EdgeType = catalog
1893            .edge_types
1894            .get(type_name)
1895            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1896        return Ok(edge_type.arrow_schema.clone());
1897    }
1898    Err(OmniError::manifest(format!(
1899        "invalid table key '{}'",
1900        table_key
1901    )))
1902}
1903
1904fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1905    let mut obj = serde_json::Map::new();
1906    for (i, field) in batch.schema().fields().iter().enumerate() {
1907        obj.insert(
1908            field.name().clone(),
1909            json_value_from_array(batch.column(i).as_ref(), row)?,
1910        );
1911    }
1912    Ok(serde_json::Value::Object(obj))
1913}
1914
1915fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1916    if array.is_null(row) {
1917        return Ok(serde_json::Value::Null);
1918    }
1919
1920    match array.data_type() {
1921        DataType::Utf8 => Ok(serde_json::Value::String(
1922            array
1923                .as_any()
1924                .downcast_ref::<StringArray>()
1925                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1926                .value(row)
1927                .to_string(),
1928        )),
1929        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1930            array
1931                .as_any()
1932                .downcast_ref::<LargeStringArray>()
1933                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1934                .value(row)
1935                .to_string(),
1936        )),
1937        DataType::Boolean => Ok(serde_json::Value::Bool(
1938            array
1939                .as_any()
1940                .downcast_ref::<BooleanArray>()
1941                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1942                .value(row),
1943        )),
1944        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1945            array
1946                .as_any()
1947                .downcast_ref::<Int32Array>()
1948                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1949                .value(row),
1950        ))),
1951        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1952            array
1953                .as_any()
1954                .downcast_ref::<Int64Array>()
1955                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1956                .value(row),
1957        ))),
1958        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1959            array
1960                .as_any()
1961                .downcast_ref::<UInt32Array>()
1962                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1963                .value(row),
1964        ))),
1965        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1966            array
1967                .as_any()
1968                .downcast_ref::<UInt64Array>()
1969                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1970                .value(row),
1971        ))),
1972        DataType::Float32 => {
1973            let value = array
1974                .as_any()
1975                .downcast_ref::<Float32Array>()
1976                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1977                .value(row) as f64;
1978            Ok(serde_json::Value::Number(
1979                serde_json::Number::from_f64(value).ok_or_else(|| {
1980                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1981                })?,
1982            ))
1983        }
1984        DataType::Float64 => {
1985            let value = array
1986                .as_any()
1987                .downcast_ref::<Float64Array>()
1988                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1989                .value(row);
1990            Ok(serde_json::Value::Number(
1991                serde_json::Number::from_f64(value).ok_or_else(|| {
1992                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1993                })?,
1994            ))
1995        }
1996        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1997            array
1998                .as_any()
1999                .downcast_ref::<Date32Array>()
2000                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
2001                .value(row),
2002        ))),
2003        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
2004            &base64::engine::general_purpose::STANDARD,
2005            array
2006                .as_any()
2007                .downcast_ref::<BinaryArray>()
2008                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
2009                .value(row),
2010        ))),
2011        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
2012            &base64::engine::general_purpose::STANDARD,
2013            array
2014                .as_any()
2015                .downcast_ref::<LargeBinaryArray>()
2016                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
2017                .value(row),
2018        ))),
2019        DataType::List(_) => {
2020            let list = array
2021                .as_any()
2022                .downcast_ref::<ListArray>()
2023                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
2024            let values = list.value(row);
2025            let mut out = Vec::with_capacity(values.len());
2026            for idx in 0..values.len() {
2027                out.push(json_value_from_array(values.as_ref(), idx)?);
2028            }
2029            Ok(serde_json::Value::Array(out))
2030        }
2031        DataType::LargeList(_) => {
2032            let list = array
2033                .as_any()
2034                .downcast_ref::<LargeListArray>()
2035                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
2036            let values = list.value(row);
2037            let mut out = Vec::with_capacity(values.len());
2038            for idx in 0..values.len() {
2039                out.push(json_value_from_array(values.as_ref(), idx)?);
2040            }
2041            Ok(serde_json::Value::Array(out))
2042        }
2043        DataType::FixedSizeList(_, _) => {
2044            let list = array
2045                .as_any()
2046                .downcast_ref::<FixedSizeListArray>()
2047                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
2048            let values = list.value(row);
2049            let mut out = Vec::with_capacity(values.len());
2050            for idx in 0..values.len() {
2051                out.push(json_value_from_array(values.as_ref(), idx)?);
2052            }
2053            Ok(serde_json::Value::Array(out))
2054        }
2055        DataType::Struct(fields) => {
2056            let struct_array = array
2057                .as_any()
2058                .downcast_ref::<StructArray>()
2059                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
2060            let mut obj = serde_json::Map::new();
2061            for (field_idx, field) in fields.iter().enumerate() {
2062                obj.insert(
2063                    field.name().clone(),
2064                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
2065                );
2066            }
2067            Ok(serde_json::Value::Object(obj))
2068        }
2069        _ => {
2070            let value = arrow_cast::display::array_value_to_string(array, row)
2071                .map_err(|e| OmniError::Lance(e.to_string()))?;
2072            Ok(serde_json::Value::String(value))
2073        }
2074    }
2075}
2076
2077#[cfg(test)]
2078mod tests {
2079    use super::*;
2080    use crate::db::manifest::ManifestCoordinator;
2081    use async_trait::async_trait;
2082    use serde_json::Value;
2083    use std::sync::{Arc, Mutex};
2084
2085    use crate::storage::{ObjectStorageAdapter, StorageAdapter, join_uri};
2086
2087    const TEST_SCHEMA: &str = r#"
2088node Person {
2089    name: String @key
2090    age: I32?
2091}
2092node Company {
2093    name: String @key
2094}
2095edge Knows: Person -> Person {
2096    since: Date?
2097}
2098edge WorksAt: Person -> Company
2099"#;
2100
2101    #[derive(Debug)]
2102    struct RecordingStorageAdapter {
2103        inner: ObjectStorageAdapter,
2104        reads: Mutex<Vec<String>>,
2105        writes: Mutex<Vec<String>>,
2106        exists_checks: Mutex<Vec<String>>,
2107        renames: Mutex<Vec<(String, String)>>,
2108        deletes: Mutex<Vec<String>>,
2109    }
2110
2111    impl Default for RecordingStorageAdapter {
2112        fn default() -> Self {
2113            Self {
2114                inner: ObjectStorageAdapter::local(),
2115                reads: Mutex::default(),
2116                writes: Mutex::default(),
2117                exists_checks: Mutex::default(),
2118                renames: Mutex::default(),
2119                deletes: Mutex::default(),
2120            }
2121        }
2122    }
2123
2124    impl RecordingStorageAdapter {
2125        fn reads(&self) -> Vec<String> {
2126            self.reads.lock().unwrap().clone()
2127        }
2128
2129        fn writes(&self) -> Vec<String> {
2130            self.writes.lock().unwrap().clone()
2131        }
2132
2133        fn exists_checks(&self) -> Vec<String> {
2134            self.exists_checks.lock().unwrap().clone()
2135        }
2136    }
2137
2138    #[async_trait]
2139    impl StorageAdapter for RecordingStorageAdapter {
2140        async fn read_text(&self, uri: &str) -> Result<String> {
2141            self.reads.lock().unwrap().push(uri.to_string());
2142            self.inner.read_text(uri).await
2143        }
2144
2145        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2146            self.writes.lock().unwrap().push(uri.to_string());
2147            self.inner.write_text(uri, contents).await
2148        }
2149
2150        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2151            self.writes.lock().unwrap().push(uri.to_string());
2152            self.inner.write_text_if_absent(uri, contents).await
2153        }
2154
2155        async fn exists(&self, uri: &str) -> Result<bool> {
2156            self.exists_checks.lock().unwrap().push(uri.to_string());
2157            self.inner.exists(uri).await
2158        }
2159
2160        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2161            self.renames
2162                .lock()
2163                .unwrap()
2164                .push((from_uri.to_string(), to_uri.to_string()));
2165            self.inner.rename_text(from_uri, to_uri).await
2166        }
2167
2168        async fn delete(&self, uri: &str) -> Result<()> {
2169            self.deletes.lock().unwrap().push(uri.to_string());
2170            self.inner.delete(uri).await
2171        }
2172
2173        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2174            self.inner.list_dir(dir_uri).await
2175        }
2176
2177        async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2178            self.inner.read_text_versioned(uri).await
2179        }
2180
2181        async fn write_text_if_match(
2182            &self,
2183            uri: &str,
2184            contents: &str,
2185            expected_version: &str,
2186        ) -> Result<Option<String>> {
2187            self.inner
2188                .write_text_if_match(uri, contents, expected_version)
2189                .await
2190        }
2191
2192        async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2193            self.inner.delete_prefix(prefix_uri).await
2194        }
2195    }
2196
2197    #[derive(Debug)]
2198    struct InitRaceStorageAdapter {
2199        inner: ObjectStorageAdapter,
2200        root: String,
2201        barrier: Arc<tokio::sync::Barrier>,
2202    }
2203
2204    #[async_trait]
2205    impl StorageAdapter for InitRaceStorageAdapter {
2206        async fn read_text(&self, uri: &str) -> Result<String> {
2207            self.inner.read_text(uri).await
2208        }
2209
2210        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2211            self.inner.write_text(uri, contents).await
2212        }
2213
2214        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2215            self.inner.write_text_if_absent(uri, contents).await
2216        }
2217
2218        async fn exists(&self, uri: &str) -> Result<bool> {
2219            let exists = self.inner.exists(uri).await?;
2220            if uri == schema_state_uri(&self.root) {
2221                self.barrier.wait().await;
2222            }
2223            Ok(exists)
2224        }
2225
2226        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2227            self.inner.rename_text(from_uri, to_uri).await
2228        }
2229
2230        async fn delete(&self, uri: &str) -> Result<()> {
2231            self.inner.delete(uri).await
2232        }
2233
2234        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2235            self.inner.list_dir(dir_uri).await
2236        }
2237
2238        async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2239            self.inner.read_text_versioned(uri).await
2240        }
2241
2242        async fn write_text_if_match(
2243            &self,
2244            uri: &str,
2245            contents: &str,
2246            expected_version: &str,
2247        ) -> Result<Option<String>> {
2248            self.inner
2249                .write_text_if_match(uri, contents, expected_version)
2250                .await
2251        }
2252
2253        async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2254            self.inner.delete_prefix(prefix_uri).await
2255        }
2256    }
2257
2258    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2259    async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2260        let dir = tempfile::tempdir().unwrap();
2261        let uri = dir.path().to_str().unwrap().to_string();
2262        let root = normalize_root_uri(&uri).unwrap();
2263        let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2264            inner: ObjectStorageAdapter::local(),
2265            root,
2266            barrier: Arc::new(tokio::sync::Barrier::new(2)),
2267        });
2268
2269        let left = Omnigraph::init_with_storage(
2270            &uri,
2271            TEST_SCHEMA,
2272            Arc::clone(&storage),
2273            InitOptions::default(),
2274        );
2275        let right = Omnigraph::init_with_storage(
2276            &uri,
2277            TEST_SCHEMA,
2278            Arc::clone(&storage),
2279            InitOptions::default(),
2280        );
2281        let (left, right) = tokio::join!(left, right);
2282        let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2283        assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2284
2285        assert!(
2286            dir.path().join("_schema.pg").exists(),
2287            "winning init must leave _schema.pg in place"
2288        );
2289        assert!(
2290            dir.path().join("_schema.ir.json").exists(),
2291            "winning init must leave _schema.ir.json in place"
2292        );
2293        assert!(
2294            dir.path().join("__schema_state.json").exists(),
2295            "winning init must leave __schema_state.json in place"
2296        );
2297    }
2298
2299    #[tokio::test]
2300    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2301        let dir = tempfile::tempdir().unwrap();
2302        let uri = dir.path().to_str().unwrap();
2303        let adapter = Arc::new(RecordingStorageAdapter::default());
2304
2305        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2306            .await
2307            .unwrap();
2308        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2309        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2310        assert!(
2311            adapter
2312                .writes()
2313                .contains(&join_uri(uri, "__schema_state.json"))
2314        );
2315
2316        Omnigraph::open_with_storage(uri, adapter.clone())
2317            .await
2318            .unwrap();
2319        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2320        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2321        assert!(
2322            adapter
2323                .reads()
2324                .contains(&join_uri(uri, "__schema_state.json"))
2325        );
2326        assert!(
2327            adapter
2328                .exists_checks()
2329                .contains(&join_uri(uri, "_schema.ir.json"))
2330        );
2331        assert!(
2332            adapter
2333                .exists_checks()
2334                .contains(&join_uri(uri, "__schema_state.json"))
2335        );
2336        assert!(
2337            adapter
2338                .exists_checks()
2339                .contains(&join_uri(uri, "_graph_commits.lance"))
2340        );
2341    }
2342
2343    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2344        let snapshot = db.snapshot().await;
2345        let ds = db
2346            .storage()
2347            .open_snapshot_at_table(&snapshot, table_key)
2348            .await
2349            .unwrap();
2350        let batches = db.storage().scan_batches(&ds).await.unwrap();
2351        batches
2352            .into_iter()
2353            .flat_map(|batch| {
2354                (0..batch.num_rows())
2355                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2356                    .collect::<Vec<_>>()
2357            })
2358            .collect()
2359    }
2360
2361    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2362        let (ds, full_path, table_branch) = db
2363            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2364            .await
2365            .unwrap();
2366        let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
2367        let columns: Vec<Arc<dyn Array>> = schema
2368            .fields()
2369            .iter()
2370            .map(|field| match field.name().as_str() {
2371                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2372                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2373                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2374                _ => new_null_array(field.data_type(), 1),
2375            })
2376            .collect();
2377        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2378        let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap();
2379        let committed = db.storage().commit_staged(ds, staged).await.unwrap();
2380        let state = db
2381            .storage()
2382            .table_state(&full_path, &committed)
2383            .await
2384            .unwrap();
2385        db.commit_updates(&[crate::db::SubTableUpdate {
2386            table_key: "node:Person".to_string(),
2387            table_version: state.version,
2388            table_branch,
2389            row_count: state.row_count,
2390            version_metadata: state.version_metadata,
2391        }])
2392        .await
2393        .unwrap();
2394    }
2395
2396    #[tokio::test]
2397    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2398        let dir = tempfile::tempdir().unwrap();
2399        let uri = dir.path().to_str().unwrap();
2400        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2401        seed_person_row(&mut db, "Alice", Some(30)).await;
2402
2403        let desired = TEST_SCHEMA.replace(
2404            "    age: I32?\n}",
2405            "    age: I32?\n    nickname: String?\n}",
2406        );
2407        let result = db.apply_schema(&desired).await.unwrap();
2408        assert!(result.applied);
2409
2410        let reopened = Omnigraph::open(uri).await.unwrap();
2411        let rows = table_rows_json(&reopened, "node:Person").await;
2412        assert_eq!(rows.len(), 1);
2413        assert_eq!(rows[0]["name"], "Alice");
2414        assert_eq!(rows[0]["age"], 30);
2415        assert!(rows[0]["nickname"].is_null());
2416        assert!(
2417            reopened.catalog().node_types["Person"]
2418                .properties
2419                .contains_key("nickname")
2420        );
2421        assert!(dir.path().join("_schema.pg").exists());
2422    }
2423
2424    #[tokio::test]
2425    async fn test_apply_schema_renames_property_and_preserves_values() {
2426        let dir = tempfile::tempdir().unwrap();
2427        let uri = dir.path().to_str().unwrap();
2428        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2429        seed_person_row(&mut db, "Alice", Some(30)).await;
2430
2431        let desired = TEST_SCHEMA.replace(
2432            "    age: I32?\n}",
2433            "    years: I32? @rename_from(\"age\")\n}",
2434        );
2435        db.apply_schema(&desired).await.unwrap();
2436
2437        let reopened = Omnigraph::open(uri).await.unwrap();
2438        let rows = table_rows_json(&reopened, "node:Person").await;
2439        assert_eq!(rows[0]["name"], "Alice");
2440        assert_eq!(rows[0]["years"], 30);
2441        assert!(rows[0].get("age").is_none());
2442    }
2443
2444    #[tokio::test]
2445    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2446        let dir = tempfile::tempdir().unwrap();
2447        let uri = dir.path().to_str().unwrap();
2448        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2449        seed_person_row(&mut db, "Alice", Some(30)).await;
2450        let before_version = db.snapshot().await.version();
2451
2452        let desired = TEST_SCHEMA
2453            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2454            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2455            .replace(
2456                "edge WorksAt: Person -> Company",
2457                "edge WorksAt: Human -> Company",
2458            );
2459        db.apply_schema(&desired).await.unwrap();
2460
2461        let head = db.snapshot().await;
2462        assert!(head.entry("node:Person").is_none());
2463        assert!(head.entry("node:Human").is_some());
2464        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2465            .await
2466            .unwrap();
2467        assert!(historical.entry("node:Person").is_some());
2468        assert!(historical.entry("node:Human").is_none());
2469    }
2470
2471    #[tokio::test]
2472    async fn test_apply_schema_succeeds_after_load() {
2473        // Historical: schema apply used to be blocked by leftover
2474        // `__run__` branches. The Run state machine was removed in
2475        // MR-771, so a fresh graph never creates a `__run__` branch;
2476        // legacy ones are swept by the v2→v3 manifest migration. This
2477        // asserts the invariant a current graph upholds: publish leaves
2478        // no `__run__` branch behind, so schema apply proceeds.
2479        let dir = tempfile::tempdir().unwrap();
2480        let uri = dir.path().to_str().unwrap();
2481        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2482
2483        crate::loader::load_jsonl(
2484            &mut db,
2485            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2486            crate::loader::LoadMode::Overwrite,
2487        )
2488        .await
2489        .unwrap();
2490
2491        let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2492        assert!(
2493            !all_branches.iter().any(|b| b.starts_with("__run__")),
2494            "no __run__ branch should exist after publish, got: {:?}",
2495            all_branches
2496        );
2497
2498        let desired = TEST_SCHEMA.replace(
2499            "    age: I32?\n}",
2500            "    age: I32?\n    nickname: String?\n}",
2501        );
2502        let result = db.apply_schema(&desired).await.unwrap();
2503        assert!(result.applied, "schema apply should have applied");
2504    }
2505
2506    /// Regression (MR-770): a pre-v0.4.0 graph that still carries a stale
2507    /// `__run__*` branch on `__manifest` must not block schema apply. The
2508    /// v2→v3 sweep runs in `Omnigraph::open(ReadWrite)` — before the
2509    /// schema-apply blocking-branch check — so apply succeeds with no
2510    /// intervening publish.
2511    ///
2512    /// Confirmed to fail before the open-time migration landed: the reopened
2513    /// graph still listed `__run__legacy`, and `apply_schema` returned
2514    /// "found non-main branches: __run__legacy".
2515    #[tokio::test]
2516    async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
2517        let dir = tempfile::tempdir().unwrap();
2518        let uri = dir.path().to_str().unwrap();
2519        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2520
2521        // Synthesize a legacy graph: a stale `__run__` branch on `__manifest`
2522        // plus the manifest stamp rewound to v2 (pre-sweep).
2523        db.branch_create("__run__legacy").await.unwrap();
2524        drop(db);
2525        {
2526            let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
2527                .await
2528                .unwrap();
2529            ds.update_schema_metadata([(
2530                "omnigraph:internal_schema_version".to_string(),
2531                Some("2".to_string()),
2532            )])
2533            .await
2534            .unwrap();
2535        }
2536
2537        // Reopen (ReadWrite): the open-time migration must sweep `__run__legacy`
2538        // before any branch-observing code runs.
2539        let db = Omnigraph::open(uri).await.unwrap();
2540        let branches = db.branch_list().await.unwrap();
2541        assert!(
2542            !branches.iter().any(|b| b.starts_with("__run__")),
2543            "open-time migration must sweep legacy __run__ branches; got {branches:?}",
2544        );
2545
2546        // Schema apply must proceed with no intervening publish — the
2547        // blocking-branch check no longer sees `__run__legacy`.
2548        let desired = TEST_SCHEMA.replace(
2549            "    age: I32?\n}",
2550            "    age: I32?\n    nickname: String?\n}",
2551        );
2552        let result = db.apply_schema(&desired).await.unwrap();
2553        assert!(result.applied, "schema apply should have applied");
2554    }
2555
2556    #[tokio::test]
2557    async fn test_apply_schema_defers_index_then_reconciler_builds_it() {
2558        // iss-848: schema apply records the @index intent but builds nothing
2559        // inline; a later ensure_indices materializes it once the table has
2560        // rows. (Use `age`, which is unindexed in TEST_SCHEMA — `name @key` is
2561        // already FTS-indexed at seed, so it can't show the deferral.)
2562        let dir = tempfile::tempdir().unwrap();
2563        let uri = dir.path().to_str().unwrap();
2564        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2565        seed_person_row(&mut db, "Alice", Some(30)).await;
2566
2567        let desired = TEST_SCHEMA.replace("age: I32?", "age: I32? @index");
2568        db.apply_schema(&desired).await.unwrap();
2569
2570        // Apply built nothing — the BTREE on `age` is deferred.
2571        let snapshot = db.snapshot().await;
2572        let ds = db
2573            .storage()
2574            .open_snapshot_at_table(&snapshot, "node:Person")
2575            .await
2576            .unwrap();
2577        assert!(
2578            !db.storage().has_btree_index(&ds, "age").await.unwrap(),
2579            "apply must not build the index inline (deferred to the reconciler)"
2580        );
2581
2582        // The reconciler materializes it (Person has a row).
2583        db.ensure_indices().await.unwrap();
2584        let snapshot = db.snapshot().await;
2585        let ds = db
2586            .storage()
2587            .open_snapshot_at_table(&snapshot, "node:Person")
2588            .await
2589            .unwrap();
2590        assert!(
2591            db.storage().has_btree_index(&ds, "age").await.unwrap(),
2592            "ensure_indices must build the deferred index"
2593        );
2594    }
2595
2596    #[tokio::test]
2597    async fn test_apply_schema_rewrite_defers_index_then_reconciler_restores() {
2598        // iss-848: an AddProperty rewrite writes a new dataset version without
2599        // rebuilding indexes inline (deferred); ensure_indices restores them.
2600        let dir = tempfile::tempdir().unwrap();
2601        let uri = dir.path().to_str().unwrap();
2602        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2603        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2604        seed_person_row(&mut db, "Alice", Some(30)).await;
2605
2606        let desired = initial_schema.replace(
2607            "    age: I32?\n}",
2608            "    age: I32?\n    nickname: String?\n}",
2609        );
2610        db.apply_schema(&desired).await.unwrap();
2611
2612        // After the rewrite the reconciler restores index coverage.
2613        db.ensure_indices().await.unwrap();
2614        let snapshot = db.snapshot().await;
2615        let ds = db
2616            .storage()
2617            .open_snapshot_at_table(&snapshot, "node:Person")
2618            .await
2619            .unwrap();
2620        assert!(db.storage().has_btree_index(&ds, "id").await.unwrap());
2621        assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
2622    }
2623
2624    #[tokio::test]
2625    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2626        let dir = tempfile::tempdir().unwrap();
2627        let uri = dir.path().to_str().unwrap();
2628        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2629        let mut db = db;
2630        db.coordinator
2631            .write()
2632            .await
2633            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2634            .await
2635            .unwrap();
2636
2637        let err = db
2638            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2639            .await
2640            .unwrap_err();
2641        assert!(
2642            err.to_string()
2643                .contains("write is unavailable while schema apply is in progress")
2644        );
2645    }
2646
2647    #[tokio::test]
2648    async fn test_commit_updates_rejects_while_schema_apply_locked() {
2649        let dir = tempfile::tempdir().unwrap();
2650        let uri = dir.path().to_str().unwrap();
2651        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2652        db.coordinator
2653            .write()
2654            .await
2655            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2656            .await
2657            .unwrap();
2658
2659        let err = db.commit_updates(&[]).await.unwrap_err();
2660        assert!(
2661            err.to_string()
2662                .contains("write commit is unavailable while schema apply is in progress")
2663        );
2664    }
2665
2666    #[tokio::test]
2667    async fn test_branch_list_hides_schema_apply_lock_branch() {
2668        let dir = tempfile::tempdir().unwrap();
2669        let uri = dir.path().to_str().unwrap();
2670        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2671        db.coordinator
2672            .write()
2673            .await
2674            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2675            .await
2676            .unwrap();
2677
2678        let branches = db.branch_list().await.unwrap();
2679        assert_eq!(branches, vec!["main".to_string()]);
2680    }
2681}