Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21    DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22    build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
37pub use schema_apply::SchemaApplyOptions;
38
39use super::commit_graph::GraphCommit;
40use super::manifest::{
41    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
42    table_path_for_table_key,
43};
44use super::schema_state::{
45    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
46    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
47    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
48    write_schema_contract, write_schema_contract_staging,
49};
50use super::{
51    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
52    is_schema_apply_lock_branch,
53};
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum MergeOutcome {
57    AlreadyUpToDate,
58    FastForward,
59    Merged,
60}
61
62#[derive(Debug, Clone)]
63pub struct SchemaApplyResult {
64    pub supported: bool,
65    pub applied: bool,
66    pub manifest_version: u64,
67    pub steps: Vec<SchemaMigrationStep>,
68}
69
70#[derive(Debug, Clone)]
71pub struct SchemaApplyPreview {
72    pub plan: SchemaMigrationPlan,
73    pub catalog: Catalog,
74}
75
76/// Top-level handle to an Omnigraph database.
77///
78/// An Omnigraph is a Lance-native graph database with git-style branching.
79/// It stores typed property graphs as per-type Lance datasets coordinated
80/// through a Lance manifest table.
81pub struct Omnigraph {
82    root_uri: String,
83    storage: Arc<dyn StorageAdapter>,
84    /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
85    /// this so engine write APIs can be `&self` (the HTTP server's
86    /// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
87    /// calls without a global write lock). Reads (`snapshot`, `version`,
88    /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
89    /// `list_commits`, …) acquire `.read().await` and parallelize.
90    /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
91    /// `record_*`) acquire `.write().await` and serialize. The atomic
92    /// commit invariant — `commit_manifest_updates` followed by
93    /// `record_graph_commit` must be atomic — is preserved by the
94    /// single `.write()` covering both calls inside
95    /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
96    /// converted from `Mutex` to `RwLock` because the bench showed
97    /// the Mutex was the dominant serializer for disjoint-table
98    /// workloads. Lock acquisition order: always before `runtime_cache`
99    /// (when both are needed in one scope).
100    coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
101    table_store: TableStore,
102    runtime_cache: RuntimeCache,
103    /// Read-heavy on every query, written only by `apply_schema`. ArcSwap
104    /// gives atomic pointer swap with zero-cost reads (`load()` returns a
105    /// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
106    /// don't contend on a lock to read the catalog.
107    catalog: Arc<ArcSwap<Catalog>>,
108    /// Read-heavy on schema introspection paths, written only by
109    /// `apply_schema`. Same ArcSwap rationale as `catalog`.
110    schema_source: Arc<ArcSwap<String>>,
111    /// Per-`(table_key, branch)` writer queues. Reachable from engine
112    /// internals (mutation finalize, schema_apply, branch_merge,
113    /// ensure_indices, delete_where) and from future MR-870 recovery
114    /// reconciler. PR 1b adds the field; callers acquire in commits 4+.
115    write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
116    /// Process-wide mutex held across the swap → operate → restore window
117    /// in `branch_merge_impl`. Two concurrent merges with distinct targets
118    /// would otherwise interleave their three separate
119    /// `coordinator.write().await` acquisitions, leaving each merge's
120    /// inner body running against the other's swapped coord. Pinned by
121    /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
122    /// in `crates/omnigraph-server/tests/server.rs`.
123    ///
124    /// Cost: serializes ALL concurrent branch merges process-wide.
125    /// Acceptable because branch merges are heavy (table rewrites, index
126    /// rebuilds), per-(table, branch) queues inside `commit_all` already
127    /// serialize the data path, and merges are rare relative to /change
128    /// or /ingest. A finer-grained per-target-branch mutex is a follow-up
129    /// if telemetry shows merge concurrency matters.
130    ///
131    /// The deeper fix — refactor `branch_merge_on_current_target` to take
132    /// an explicit target coord parameter so `self.coordinator` is never
133    /// used as scratch space — is the round-1 shape applied to
134    /// `branch_create_from_impl`. Deferred because it requires unwinding
135    /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()`
136    /// call inside the merge body.
137    merge_exclusive: Arc<tokio::sync::Mutex<()>>,
138    /// Optional policy checker for engine-layer enforcement (MR-722).
139    /// `None` = no enforcement; mutating methods are unconditionally
140    /// allowed (this is the embedded/dev default). `Some` = every
141    /// mutating method calls `self.enforce(action, scope, actor)` at
142    /// entry; denial returns `OmniError::Policy`.
143    ///
144    /// Per chassis design (see `omnigraph_policy::PolicyChecker`), the
145    /// trait surface is deliberately coarse — action × scope × actor.
146    /// Per-row / per-type / per-column scope lives at the query layer
147    /// (MR-725), which extends the same trait with a different method.
148    /// Don't be tempted to add per-row enforcement here.
149    ///
150    /// Set via `with_policy(checker)` after construction. Today only
151    /// `apply_schema_as` consults this field (PR #2 proof-of-concept);
152    /// PR #3 fans the `enforce()` call out to the remaining writers.
153    policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
154}
155
156/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
157///
158/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
159/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
160/// inspection — should not trigger writes (they may run with read-only
161/// object-store credentials, and silent open-time mutations are
162/// surprising). They also don't need recovery: reads always resolve
163/// through the manifest pin, which is the consistent snapshot regardless
164/// of any Phase B → Phase C drift on the per-table side.
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum OpenMode {
167    /// Run the recovery sweep on open. Default for `Omnigraph::open`.
168    ReadWrite,
169    /// Skip the recovery sweep. Use for read-only consumers via
170    /// [`Omnigraph::open_read_only`].
171    ReadOnly,
172}
173
174/// Options for [`Omnigraph::init_with_options`].
175///
176/// `force` controls the safety preflight that prevents an
177/// accidental re-init from overwriting an existing graph's schema
178/// metadata. Default behavior (`force: false`) fails fast with
179/// [`OmniError::AlreadyInitialized`] if any of `_schema.pg`,
180/// `_schema.ir.json`, or `__schema_state.json` already exists at
181/// the target URI. With `force: true` the preflight is skipped —
182/// existing schema files are overwritten in place. Force does NOT
183/// purge old Lance datasets or `__manifest/`; reclaiming those
184/// still requires deleting the graph directory by hand (or via a
185/// future `DELETE /graphs/{id}`).
186#[derive(Debug, Clone, Copy, Default)]
187pub struct InitOptions {
188    /// Skip the existing-graph preflight. Operators set this when
189    /// they actually mean to overwrite — e.g. `omnigraph init --force`.
190    pub force: bool,
191}
192
193impl Omnigraph {
194    /// Create a new graph at `uri` from schema source.
195    ///
196    /// Strict mode: errors with [`OmniError::AlreadyInitialized`] if
197    /// `uri` already holds any of the three schema artifacts. To
198    /// overwrite an existing graph deliberately, call
199    /// [`Self::init_with_options`] with `InitOptions { force: true }`.
200    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
201        Self::init_with_options(uri, schema_source, InitOptions::default()).await
202    }
203
204    /// Create a new graph at `uri`, with explicit init-time options.
205    ///
206    /// See [`InitOptions`] for the safety contract — by default this
207    /// behaves identically to [`Self::init`].
208    pub async fn init_with_options(
209        uri: &str,
210        schema_source: &str,
211        options: InitOptions,
212    ) -> Result<Self> {
213        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
214    }
215
216    pub(crate) async fn init_with_storage(
217        uri: &str,
218        schema_source: &str,
219        storage: Arc<dyn StorageAdapter>,
220        options: InitOptions,
221    ) -> Result<Self> {
222        let root = normalize_root_uri(uri)?;
223
224        // Preflight: refuse to clobber an existing graph unless the
225        // operator passed `force`. This runs BEFORE any parse or
226        // write so a misdirected `init` against an existing graph
227        // URI cannot reach a code path that overwrites or, on a
228        // later cleanup, deletes the schema files.
229        //
230        // Closes the "init is destructive against existing state"
231        // class: there is no longer a code path where strict-mode
232        // `init` can mutate a populated graph root.
233        if !options.force {
234            for candidate in [
235                schema_source_uri(&root),
236                schema_ir_uri(&root),
237                schema_state_uri(&root),
238            ] {
239                if storage.exists(&candidate).await? {
240                    return Err(OmniError::AlreadyInitialized { uri: root.clone() });
241                }
242            }
243        }
244
245        let schema_ir = read_schema_ir_from_source(schema_source)?;
246        let mut catalog = build_catalog_from_ir(&schema_ir)?;
247        fixup_blob_schemas(&mut catalog);
248
249        // Establish an atomic ownership claim on `_schema.pg` before
250        // writing the remaining init artifacts. A check-then-write preflight
251        // is not enough under concurrent `init` calls: two callers can both
252        // observe an empty root, one can successfully initialize, and the
253        // loser can then fail in Lance `WriteMode::Create`. Only the caller
254        // that atomically created `_schema.pg` may clean up schema artifacts
255        // on later failure.
256        let schema_pg_claimed = if options.force {
257            false
258        } else {
259            let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
260            if !storage
261                .write_text_if_absent(&schema_path, schema_source)
262                .await?
263            {
264                return Err(OmniError::AlreadyInitialized { uri: root.clone() });
265            }
266            if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
267                best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
268                return Err(err);
269            }
270            true
271        };
272
273        // Run the I/O phase. On any error, best-effort-clean schema
274        // artifacts only when this invocation owns them: strict mode owns
275        // them after the atomic `_schema.pg` claim above; force mode owns
276        // destructive overwrite semantics by explicit operator request.
277        //
278        // Coverage gap: Lance per-type datasets and `__manifest/`
279        // directory created by `GraphCoordinator::init` are NOT cleaned
280        // up here — fully recursive directory deletion requires a
281        // `StorageAdapter::delete_prefix` primitive that's deferred
282        // along with `DELETE /graphs/{id}` (PR 2b in the MR-668 plan
283        // is currently deferred). If `init` fails after coordinator
284        // init succeeds, operators may need to remove the graph
285        // directory manually before retrying `init` on the same URI.
286        // Documented in the PR 2a commit message and `init` rustdoc.
287        let coordinator = match init_storage_phase(
288            &root,
289            schema_source,
290            &schema_ir,
291            &catalog,
292            &storage,
293            !schema_pg_claimed,
294        )
295        .await
296        {
297            Ok(coordinator) => coordinator,
298            Err(err) => {
299                if schema_pg_claimed || options.force {
300                    best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
301                }
302                return Err(err);
303            }
304        };
305
306        Ok(Self {
307            root_uri: root.clone(),
308            storage,
309            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
310            table_store: TableStore::new(&root),
311            runtime_cache: RuntimeCache::default(),
312            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
313            schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
314            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
315            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
316            policy: None,
317        })
318    }
319
320    /// Open an existing graph (read-write).
321    ///
322    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
323    /// Runs the open-time recovery sweep before returning — see [`OpenMode`].
324    pub async fn open(uri: &str) -> Result<Self> {
325        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
326    }
327
328    /// Open an existing graph for read-only consumers (NDJSON export,
329    /// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
330    pub async fn open_read_only(uri: &str) -> Result<Self> {
331        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
332    }
333
334    /// `open_with_storage` retained for existing callers (init/test paths).
335    /// Defaults to `OpenMode::ReadWrite`.
336    pub(crate) async fn open_with_storage(
337        uri: &str,
338        storage: Arc<dyn StorageAdapter>,
339    ) -> Result<Self> {
340        Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
341    }
342
343    pub(crate) async fn open_with_storage_and_mode(
344        uri: &str,
345        storage: Arc<dyn StorageAdapter>,
346        mode: OpenMode,
347    ) -> Result<Self> {
348        let root = normalize_root_uri(uri)?;
349        // Open the coordinator first so the schema-staging recovery sweep can
350        // compare its snapshot against any leftover staging files.
351        let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
352        // Both the schema-state recovery sweep AND the manifest-drift
353        // recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
354        // consumers (NDJSON export, `commit list`, schema show) shouldn't
355        // trigger object-store mutations: they may run with read-only
356        // credentials, and silent open-time writes are surprising. Both
357        // sweeps' work is recoverable on the next ReadWrite open, so
358        // skipping under ReadOnly doesn't lose any safety guarantees —
359        // the manifest pin is the consistent snapshot regardless of
360        // drift on the per-table side or leftover schema-staging files.
361        if matches!(mode, OpenMode::ReadWrite) {
362            let schema_state_recovery =
363                recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
364                    .await?;
365            // Recovery sweep: close the Phase B → Phase C residual on
366            // any sidecar left over from a crashed writer. Continuous
367            // in-process recovery for long-running servers (no restart
368            // required between Phase B failure and recovery) is a
369            // separate background-reconciler effort.
370            crate::db::manifest::recover_manifest_drift(
371                &root,
372                Arc::clone(&storage),
373                &mut coordinator,
374                crate::db::manifest::RecoveryMode::Full,
375                schema_state_recovery,
376            )
377            .await?;
378        }
379        // Read _schema.pg (post-recovery — may have just been renamed in).
380        let schema_path = schema_source_uri(&root);
381        let schema_source = storage.read_text(&schema_path).await?;
382        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
383        let branches = coordinator.branch_list().await?;
384        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
385            &root,
386            Arc::clone(&storage),
387            &branches,
388            &current_source_ir,
389        )
390        .await?;
391        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
392        fixup_blob_schemas(&mut catalog);
393
394        Ok(Self {
395            root_uri: root.clone(),
396            storage,
397            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
398            table_store: TableStore::new(&root),
399            runtime_cache: RuntimeCache::default(),
400            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
401            schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
402            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
403            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
404            policy: None,
405        })
406    }
407
408    /// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
409    /// catalog pointer; callers can hold the returned `Arc` across awaits
410    /// without blocking concurrent `apply_schema`.
411    pub fn catalog(&self) -> Arc<Catalog> {
412        self.catalog.load_full()
413    }
414
415    /// Returns an `Arc<String>` snapshot of the schema source.
416    pub fn schema_source(&self) -> Arc<String> {
417        self.schema_source.load_full()
418    }
419
420    /// Atomically swap the in-memory catalog. Concurrent readers see
421    /// either the old or the new pointer; never a torn state. Used by
422    /// `apply_schema` and `reload_schema_if_source_changed`.
423    pub(crate) fn store_catalog(&self, catalog: Catalog) {
424        self.catalog.store(Arc::new(catalog));
425    }
426
427    /// Atomically swap the in-memory schema source. Same rationale as
428    /// [`store_catalog`](Self::store_catalog).
429    pub(crate) fn store_schema_source(&self, schema_source: String) {
430        self.schema_source.store(Arc::new(schema_source));
431    }
432
433    pub fn uri(&self) -> &str {
434        &self.root_uri
435    }
436
437    /// Install a policy checker for engine-layer enforcement (MR-722).
438    /// Builder-style setter — consumes `self`, returns `Self`. Calling
439    /// this on a `Omnigraph` previously without policy enables
440    /// `enforce()` to fire at every mutating engine method that's been
441    /// wired to call it (currently `apply_schema_as`; PR #3 fans out to
442    /// the remaining writers).
443    ///
444    /// Embedded callers that don't care about authorization should
445    /// just not call this. Server / CLI callers that have loaded a
446    /// `PolicyEngine` from `policy.yaml` pass it here.
447    pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
448        self.policy = Some(checker);
449        self
450    }
451
452    /// Engine-layer policy enforcement gate (MR-722 chassis core).
453    ///
454    /// * If no policy is installed → no-op (returns `Ok(())`).
455    /// * If policy is installed AND actor is None → denial with a
456    ///   clear "no actor for engine-layer policy check" message.
457    ///   Forces server / CLI / SDK callers to thread an actor through
458    ///   when policy is configured — silent bypass via "I forgot the
459    ///   actor" is exactly the footgun this gate is here to prevent.
460    /// * If policy is installed AND actor is Some → call
461    ///   `PolicyChecker::check(action, scope, actor)`; map denial /
462    ///   internal failure to `OmniError::Policy(...)`.
463    pub(crate) fn enforce(
464        &self,
465        action: omnigraph_policy::PolicyAction,
466        scope: &omnigraph_policy::ResourceScope,
467        actor: Option<&str>,
468    ) -> Result<()> {
469        let Some(checker) = self.policy.as_ref() else {
470            return Ok(());
471        };
472        let Some(actor) = actor else {
473            return Err(OmniError::Policy(
474                "no actor for engine-layer policy check (policy is configured but the call site \
475                 didn't thread an actor through — this is almost certainly a bug, not an \
476                 intended bypass)"
477                    .to_string(),
478            ));
479        };
480        checker
481            .check(action, scope, actor)
482            .map_err(|err| OmniError::Policy(err.to_string()))
483    }
484
485    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
486        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
487    }
488
489    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
490        self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
491            .await
492    }
493
494    pub async fn plan_schema_with_options(
495        &self,
496        desired_schema_source: &str,
497        options: SchemaApplyOptions,
498    ) -> Result<SchemaMigrationPlan> {
499        schema_apply::plan_schema(self, desired_schema_source, options).await
500    }
501
502    pub async fn preview_schema_apply_with_options(
503        &self,
504        desired_schema_source: &str,
505        options: SchemaApplyOptions,
506    ) -> Result<SchemaApplyPreview> {
507        schema_apply::preview_schema_apply(self, desired_schema_source, options).await
508    }
509
510    pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
511        self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
512            .await
513    }
514
515    pub async fn apply_schema_with_options(
516        &self,
517        desired_schema_source: &str,
518        options: SchemaApplyOptions,
519    ) -> Result<SchemaApplyResult> {
520        self.apply_schema_as(desired_schema_source, options, None)
521            .await
522    }
523
524    /// Apply a schema migration with an explicit actor for engine-layer
525    /// policy enforcement (MR-722). When a `PolicyChecker` is installed
526    /// via [`Self::with_policy`], this method calls `enforce(SchemaApply,
527    /// Branch("main"), actor)` before any apply work happens. Denial
528    /// returns `OmniError::Policy` and leaves the manifest untouched.
529    ///
530    /// The no-actor variants (`apply_schema`, `apply_schema_with_options`)
531    /// pass `None` here. They work fine without a policy; if a policy IS
532    /// installed and actor is None, enforcement intentionally fails to
533    /// prevent silent-bypass-via-forgetting-the-actor footguns.
534    pub async fn apply_schema_as(
535        &self,
536        desired_schema_source: &str,
537        options: SchemaApplyOptions,
538        actor: Option<&str>,
539    ) -> Result<SchemaApplyResult> {
540        self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
541            .await
542    }
543
544    pub async fn apply_schema_as_with_catalog_check<F>(
545        &self,
546        desired_schema_source: &str,
547        options: SchemaApplyOptions,
548        actor: Option<&str>,
549        validate_catalog: F,
550    ) -> Result<SchemaApplyResult>
551    where
552        F: FnOnce(&Catalog) -> Result<()>,
553    {
554        schema_apply::apply_schema(
555            self,
556            desired_schema_source,
557            options,
558            actor,
559            validate_catalog,
560        )
561        .await
562    }
563
564    pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
565        schema_apply::ensure_schema_apply_idle(self, operation).await
566    }
567
568    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
569        schema_apply::ensure_schema_apply_not_locked(self, operation).await
570    }
571
572    pub(crate) fn table_store(&self) -> &TableStore {
573        &self.table_store
574    }
575
576    /// Engine-facing trait surface around `TableStore`.
577    ///
578    /// This is the canonical accessor for newly-written engine code. The
579    /// trait's signatures use opaque `SnapshotHandle` / `StagedHandle`
580    /// instead of leaking `lance::Dataset` /
581    /// `lance::dataset::transaction::Transaction`. Existing call sites
582    /// that still use `db.table_store.X(...)` (the inherent struct
583    /// methods) are migrated incrementally.
584    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
585        &self.table_store
586    }
587
588    /// Engine-level access to the object-store adapter (S3 / local fs).
589    /// Used by the recovery sidecar protocol — writers in the engine
590    /// call this to write/delete sidecars at `__recovery/{ulid}.json`.
591    pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
592        self.storage.as_ref()
593    }
594
595    /// Per-`(table_key, branch)` writer queues.
596    ///
597    /// Engine-internal writers (mutation finalize, schema_apply,
598    /// branch_merge, ensure_indices, delete_where) and the future MR-870
599    /// recovery reconciler reach the queue manager via this accessor.
600    /// Returns an `Arc` clone so callers can hold the manager across
601    /// `&mut self` engine API boundaries.
602    pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
603        Arc::clone(&self.write_queue)
604    }
605
606    /// Engine-internal access to the merge-exclusive mutex. Held across
607    /// the swap → operate → restore window in `branch_merge_impl` so
608    /// concurrent merges with distinct targets don't corrupt
609    /// `self.coordinator` mid-operation. See the field doc on
610    /// `Omnigraph::merge_exclusive` for the full design rationale.
611    pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
612        Arc::clone(&self.merge_exclusive)
613    }
614
615    /// Engine-level access to the graph's normalized root URI. Used by
616    /// the recovery sidecar protocol to compute `__recovery/` paths.
617    pub(crate) fn root_uri(&self) -> &str {
618        &self.root_uri
619    }
620
621    pub(crate) async fn open_coordinator_for_branch(
622        &self,
623        branch: Option<&str>,
624    ) -> Result<GraphCoordinator> {
625        match branch {
626            Some(branch) => {
627                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
628            }
629            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
630        }
631    }
632
633    pub(crate) async fn swap_coordinator_for_branch(
634        &self,
635        branch: Option<&str>,
636    ) -> Result<GraphCoordinator> {
637        let next = self.open_coordinator_for_branch(branch).await?;
638        let mut coord = self.coordinator.write().await;
639        Ok(std::mem::replace(&mut *coord, next))
640    }
641
642    pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
643        *self.coordinator.write().await = coordinator;
644    }
645
646    pub(crate) async fn resolved_branch_target(
647        &self,
648        branch: Option<&str>,
649    ) -> Result<ResolvedTarget> {
650        self.ensure_schema_state_valid().await?;
651        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
652        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
653        let coord = self.coordinator.read().await;
654        if normalized.as_deref() == coord.current_branch() {
655            let snapshot_id = coord
656                .head_commit_id()
657                .await?
658                .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
659            return Ok(ResolvedTarget {
660                requested,
661                branch: coord.current_branch().map(str::to_string),
662                snapshot_id,
663                snapshot: coord.snapshot(),
664            });
665        }
666        coord.resolve_target(&requested).await
667    }
668
669    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
670        self.resolved_branch_target(branch)
671            .await
672            .map(|resolved| resolved.snapshot)
673    }
674
675    pub(crate) async fn version(&self) -> u64 {
676        self.coordinator.read().await.version()
677    }
678
679    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
680    pub(crate) async fn snapshot(&self) -> Snapshot {
681        self.coordinator.read().await.snapshot()
682    }
683
684    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
685        self.resolved_target(target)
686            .await
687            .map(|resolved| resolved.snapshot)
688    }
689
690    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
691        self.snapshot_of(target)
692            .await
693            .map(|snapshot| snapshot.version())
694    }
695
696    pub async fn resolved_branch_of(
697        &self,
698        target: impl Into<ReadTarget>,
699    ) -> Result<Option<String>> {
700        self.resolved_target(target)
701            .await
702            .map(|resolved| resolved.branch)
703    }
704
705    /// Synchronize this handle's write base to the latest head of the named branch.
706    pub async fn sync_branch(&self, branch: &str) -> Result<()> {
707        self.ensure_schema_state_valid().await?;
708        let branch = normalize_branch_name(branch)?;
709        let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
710        *self.coordinator.write().await = next;
711        self.runtime_cache.invalidate_all().await;
712        Ok(())
713    }
714
715    /// Re-read the handle-local coordinator state from storage AND run
716    /// in-process recovery. Closes the Phase B → Phase C residual (e.g.
717    /// `MutationStaging::finalize` crash mid-publish in a long-running
718    /// server) without restart.
719    ///
720    /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
721    /// recovery sequence, in the same order, with one restriction: the
722    /// manifest-drift sweep runs in `RollForwardOnly` mode (rollback /
723    /// abort cases defer to the next ReadWrite open because
724    /// `Dataset::restore` is unsafe under concurrency). Each step:
725    ///
726    /// 1. `coordinator.refresh()` — re-read manifest.
727    /// 2. `recover_schema_state_files` — complete an in-flight
728    ///    schema_apply's staging→final rename if a SchemaApply sidecar
729    ///    is on disk; idempotent + early-returns when no staging files
730    ///    exist. Required BEFORE manifest-drift recovery so a
731    ///    SchemaApply roll-forward doesn't publish the manifest while
732    ///    the staging files remain unrenamed (which would corrupt the
733    ///    graph: data on new schema, catalog on old).
734    /// 3. `recover_manifest_drift(... RollForwardOnly)` — close the
735    ///    finalize→publisher residual via roll-forward; defer rollback
736    ///    work to next ReadWrite open.
737    /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
738    ///
739    /// Steady state cost: one `list_dir` of `__recovery/` (typically
740    /// returns empty → early return for both passes). No additional
741    /// Lance reads.
742    ///
743    /// Engine-internal callers that already hold an in-flight sidecar
744    /// (e.g. `schema_apply` mid-write) MUST use
745    /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
746    /// avoid the recovery sweep racing their own sidecar.
747    pub async fn refresh(&self) -> Result<()> {
748        // Scope the coord write guard to the recovery section only.
749        // `reload_schema_if_source_changed` (below) acquires
750        // `self.coordinator.read().await` when the on-disk schema source
751        // has drifted from the cached `schema_source`. Tokio's RwLock is
752        // not reentrant, so holding the write across that call deadlocks.
753        // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`.
754        {
755            let mut coord = self.coordinator.write().await;
756            coord.refresh().await?;
757            let schema_state_recovery = recover_schema_state_files(
758                &self.root_uri,
759                Arc::clone(&self.storage),
760                &coord.snapshot(),
761            )
762            .await?;
763            crate::db::manifest::recover_manifest_drift(
764                &self.root_uri,
765                Arc::clone(&self.storage),
766                &mut *coord,
767                crate::db::manifest::RecoveryMode::RollForwardOnly,
768                schema_state_recovery,
769            )
770            .await?;
771        } // ← write guard released before reload's read acquisition
772        self.reload_schema_if_source_changed().await?;
773        self.runtime_cache.invalidate_all().await;
774        Ok(())
775    }
776
777    async fn reload_schema_if_source_changed(&self) -> Result<()> {
778        let schema_path = schema_source_uri(&self.root_uri);
779        let schema_source = self.storage.read_text(&schema_path).await?;
780        if schema_source == *self.schema_source.load_full() {
781            return Ok(());
782        }
783        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
784        let branches = self.coordinator.read().await.branch_list().await?;
785        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
786            &self.root_uri,
787            Arc::clone(&self.storage),
788            &branches,
789            &current_source_ir,
790        )
791        .await?;
792        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
793        fixup_blob_schemas(&mut catalog);
794        self.store_schema_source(schema_source);
795        self.store_catalog(catalog);
796        Ok(())
797    }
798
799    /// Refresh coordinator state and invalidate the runtime cache WITHOUT
800    /// running the recovery sweep. Engine-internal callers that hold an
801    /// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
802    /// internal lease-check refresh) need this variant: running recovery
803    /// here would observe the caller's own sidecar, classify it as
804    /// RolledPastExpected, and roll it forward — racing the caller's
805    /// own publish path.
806    pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
807        self.coordinator.write().await.refresh().await?;
808        self.runtime_cache.invalidate_all().await;
809        Ok(())
810    }
811
812    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
813        self.ensure_schema_state_valid().await?;
814        self.coordinator
815            .read()
816            .await
817            .resolve_snapshot_id(branch)
818            .await
819    }
820
821    pub(crate) async fn resolved_target(
822        &self,
823        target: impl Into<ReadTarget>,
824    ) -> Result<ResolvedTarget> {
825        self.ensure_schema_state_valid().await?;
826        self.coordinator
827            .read()
828            .await
829            .resolve_target(&target.into())
830            .await
831    }
832
833    // ─── Change detection ────────────────────────────────────────────────
834
835    pub async fn diff_between(
836        &self,
837        from: impl Into<ReadTarget>,
838        to: impl Into<ReadTarget>,
839        filter: &crate::changes::ChangeFilter,
840    ) -> Result<crate::changes::ChangeSet> {
841        let from_resolved = self.resolved_target(from).await?;
842        let to_resolved = self.resolved_target(to).await?;
843        crate::changes::diff_snapshots(
844            self.uri(),
845            &from_resolved.snapshot,
846            &to_resolved.snapshot,
847            filter,
848            to_resolved.branch.clone().or(from_resolved.branch.clone()),
849        )
850        .await
851    }
852
853    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
854    /// and creates branch-aware snapshots. Supports cross-branch comparison.
855    pub async fn diff_commits(
856        &self,
857        from_commit_id: &str,
858        to_commit_id: &str,
859        filter: &crate::changes::ChangeFilter,
860    ) -> Result<crate::changes::ChangeSet> {
861        let coord = self.coordinator.read().await;
862        let from_commit = coord
863            .resolve_commit(&SnapshotId::new(from_commit_id))
864            .await?;
865        let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
866        let from_snap = coord
867            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
868                from_commit.graph_commit_id.clone(),
869            )))
870            .await?;
871        let to_snap = coord
872            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
873                to_commit.graph_commit_id.clone(),
874            )))
875            .await?;
876        drop(coord);
877        crate::changes::diff_snapshots(
878            self.uri(),
879            &from_snap.snapshot,
880            &to_snap.snapshot,
881            filter,
882            to_snap.branch.clone().or(from_snap.branch.clone()),
883        )
884        .await
885    }
886
887    pub async fn entity_at_target(
888        &self,
889        target: impl Into<ReadTarget>,
890        table_key: &str,
891        id: &str,
892    ) -> Result<Option<serde_json::Value>> {
893        export::entity_at_target(self, target, table_key, id).await
894    }
895
896    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
897    pub async fn entity_at(
898        &self,
899        table_key: &str,
900        id: &str,
901        version: u64,
902    ) -> Result<Option<serde_json::Value>> {
903        export::entity_at(self, table_key, id, version).await
904    }
905
906    /// Create a Snapshot at any historical manifest version.
907    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
908        self.ensure_schema_state_valid().await?;
909        self.coordinator
910            .read()
911            .await
912            .snapshot_at_version(version)
913            .await
914    }
915
916    pub async fn export_jsonl(
917        &self,
918        branch: &str,
919        type_names: &[String],
920        table_keys: &[String],
921    ) -> Result<String> {
922        export::export_jsonl(self, branch, type_names, table_keys).await
923    }
924
925    pub async fn export_jsonl_to_writer<W: Write>(
926        &self,
927        branch: &str,
928        type_names: &[String],
929        table_keys: &[String],
930        writer: &mut W,
931    ) -> Result<()> {
932        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
933    }
934
935    // ─── Graph index ──────────────────────────────────────────────────────
936
937    /// Get or build the graph index for the current snapshot.
938    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
939        table_ops::graph_index(self).await
940    }
941
942    pub(crate) async fn graph_index_for_resolved(
943        &self,
944        resolved: &ResolvedTarget,
945    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
946        table_ops::graph_index_for_resolved(self, resolved).await
947    }
948
949    /// Ensure BTree scalar indices exist on key columns.
950    /// Idempotent — Lance skips if index already exists.
951    ///
952    /// Opens sub-tables at their latest version (not snapshot-pinned) because
953    /// indices must be created on the current head. Any version drift from the
954    /// snapshot is expected and logged. The resulting versions are committed
955    /// back to the manifest.
956    ///
957    /// On named branches, indexing preserves lazy branching:
958    /// unbranched subtables keep inheriting `main`, while subtables inherited
959    /// from an ancestor branch are first forked into the active branch before
960    /// their index metadata is updated.
961    pub async fn ensure_indices(&self) -> Result<()> {
962        table_ops::ensure_indices(self).await
963    }
964
965    pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
966        table_ops::ensure_indices_on(self, branch).await
967    }
968
969    #[cfg(feature = "failpoints")]
970    #[doc(hidden)]
971    pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
972        &mut self,
973        branch: &str,
974        table_key: &str,
975        table_branch: Option<&str>,
976    ) -> Result<u64> {
977        table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
978            self,
979            branch,
980            table_key,
981            table_branch,
982        )
983        .await
984    }
985
986    /// Compact small Lance fragments into fewer larger ones across every
987    /// node + edge table on `main`. See [`optimize`] for details.
988    pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
989        optimize::optimize_all_tables(self).await
990    }
991
992    /// Remove Lance manifests (and the fragments they uniquely own) per the
993    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
994    /// history. See [`optimize`] for details.
995    pub async fn cleanup(
996        &mut self,
997        options: optimize::CleanupPolicyOptions,
998    ) -> Result<Vec<optimize::TableCleanupStats>> {
999        optimize::cleanup_all_tables(self, options).await
1000    }
1001
1002    /// Read a blob from a node by its string ID and property name.
1003    ///
1004    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
1005    /// and metadata accessors (`size()`, `kind()`, `uri()`).
1006    ///
1007    /// ```ignore
1008    /// let blob = db.read_blob("Document", "readme", "content").await?;
1009    /// let bytes = blob.read().await?;
1010    /// ```
1011    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1012        self.ensure_schema_state_valid().await?;
1013        let catalog = self.catalog();
1014        let node_type = catalog
1015            .node_types
1016            .get(type_name)
1017            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1018        if !node_type.blob_properties.contains(property) {
1019            return Err(OmniError::manifest(format!(
1020                "property '{}' on type '{}' is not a Blob",
1021                property, type_name
1022            )));
1023        }
1024
1025        let snapshot = self.snapshot().await;
1026        let table_key = format!("node:{}", type_name);
1027        let ds = snapshot.open(&table_key).await?;
1028
1029        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1030        let row_id = self
1031            .table_store
1032            .first_row_id_for_filter(&ds, &filter_sql)
1033            .await?
1034            .ok_or_else(|| {
1035                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1036            })?;
1037
1038        // Use take_blobs to get the BlobFile handle
1039        let ds = Arc::new(ds);
1040        let mut blobs = ds
1041            .take_blobs(&[row_id], property)
1042            .await
1043            .map_err(|e| OmniError::Lance(e.to_string()))?;
1044
1045        blobs.pop().ok_or_else(|| {
1046            OmniError::manifest(format!(
1047                "blob '{}' on {} '{}' returned no data",
1048                property, type_name, id
1049            ))
1050        })
1051    }
1052
1053    pub(crate) async fn active_branch(&self) -> Option<String> {
1054        self.coordinator
1055            .read()
1056            .await
1057            .current_branch()
1058            .map(str::to_string)
1059    }
1060
1061    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1062        let descendants = self
1063            .coordinator
1064            .read()
1065            .await
1066            .branch_descendants(branch)
1067            .await?;
1068        if let Some(descendant) = descendants.first() {
1069            return Err(OmniError::manifest_conflict(format!(
1070                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1071                branch, descendant
1072            )));
1073        }
1074
1075        for other_branch in branches
1076            .iter()
1077            .filter(|candidate| candidate.as_str() != branch)
1078        {
1079            let snapshot = self
1080                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1081                .await?;
1082            if snapshot
1083                .entries()
1084                .any(|entry| entry.table_branch.as_deref() == Some(branch))
1085            {
1086                return Err(OmniError::manifest_conflict(format!(
1087                    "cannot delete branch '{}' because branch '{}' still depends on it",
1088                    branch, other_branch
1089                )));
1090            }
1091        }
1092
1093        Ok(())
1094    }
1095
1096    /// Best-effort reclaim of the per-table Lance forks a just-deleted branch
1097    /// owned. Runs AFTER the manifest authority flip, so the branch is already
1098    /// gone and these forks are unreachable orphans. A failure here (transient
1099    /// object-store error, the `branch_delete.before_table_cleanup` failpoint)
1100    /// is logged and swallowed: the `cleanup` reconciler is the guaranteed
1101    /// backstop that converges any leftover orphan. Uses `force_delete_branch`
1102    /// so a partially-reclaimed retry is idempotent.
1103    async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1104        let mut seen_paths = HashSet::new();
1105        let mut cleanup_targets = owned_tables
1106            .iter()
1107            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1108            .cloned()
1109            .collect::<Vec<_>>();
1110        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1111
1112        for (table_key, table_path) in cleanup_targets {
1113            let dataset_uri = self.table_store.dataset_uri(&table_path);
1114            let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
1115            {
1116                Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await,
1117                Err(injected) => Err(injected),
1118            };
1119            if let Err(err) = outcome {
1120                tracing::warn!(
1121                    target: "omnigraph::branch_delete::cleanup",
1122                    branch = %branch,
1123                    table = %table_key,
1124                    error = %err,
1125                    "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1126                );
1127            }
1128        }
1129    }
1130
1131    async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1132        let active = self
1133            .coordinator
1134            .read()
1135            .await
1136            .current_branch()
1137            .map(str::to_string);
1138        if active.as_deref() == Some(branch) {
1139            return Err(OmniError::manifest_conflict(format!(
1140                "cannot delete currently active branch '{}'",
1141                branch
1142            )));
1143        }
1144
1145        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1146        let owned_tables = branch_snapshot
1147            .entries()
1148            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1149            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1150            .collect::<Vec<_>>();
1151
1152        // Authority flip (+ best-effort commit-graph reclaim) — must succeed.
1153        self.coordinator.write().await.branch_delete(branch).await?;
1154        // Best-effort per-table fork reclaim; cleanup reconciles any leftover.
1155        self.cleanup_deleted_branch_tables(branch, &owned_tables)
1156            .await;
1157        Ok(())
1158    }
1159
1160    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1161        normalize_branch_name(branch)
1162    }
1163
1164    pub(crate) async fn head_commit_id_for_branch(
1165        &self,
1166        branch: Option<&str>,
1167    ) -> Result<Option<String>> {
1168        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1169        coordinator.ensure_commit_graph_initialized().await?;
1170        coordinator
1171            .head_commit_id()
1172            .await
1173            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1174    }
1175
1176    pub async fn branch_create(&self, name: &str) -> Result<()> {
1177        self.branch_create_as(name, None).await
1178    }
1179
1180    /// Create a branch from the coordinator's currently-open snapshot,
1181    /// with an explicit actor for engine-layer policy enforcement
1182    /// (MR-722 fan-out). Scope is `TargetBranch(name)` — symmetric with
1183    /// `branch_delete_as`: the branch being acted upon is the target.
1184    /// Cedar rules using `target_branch_scope: protected` therefore see
1185    /// the new-branch name and can deny e.g. creating any branch named
1186    /// `main` from a non-privileged actor.
1187    pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1188        self.enforce(
1189            omnigraph_policy::PolicyAction::BranchCreate,
1190            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1191            actor,
1192        )?;
1193        self.ensure_schema_state_valid().await?;
1194        self.ensure_schema_apply_idle("branch_create").await?;
1195        ensure_public_branch_ref(name, "branch_create")?;
1196        self.coordinator.write().await.branch_create(name).await
1197    }
1198
1199    pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1200        self.branch_create_from_as(from, name, None).await
1201    }
1202
1203    /// Create a branch from a specific source branch with an explicit
1204    /// actor for engine-layer policy enforcement (MR-722 fan-out).
1205    ///
1206    /// Scope is `BranchTransition { source, target }` — matches the
1207    /// HTTP-layer convention at `server_branch_create`
1208    /// (branch=Some(from), target_branch=Some(name)), so engine and
1209    /// HTTP fire the same Cedar decision. Pinned-snapshot sources
1210    /// (which aren't a branch ref) materialize as the sentinel
1211    /// `<snapshot>` for the policy check; Cedar rules using
1212    /// `branch_scope: any` still match, rules pinning a specific
1213    /// source branch correctly do not.
1214    pub async fn branch_create_from_as(
1215        &self,
1216        from: impl Into<ReadTarget>,
1217        name: &str,
1218        actor: Option<&str>,
1219    ) -> Result<()> {
1220        let target = from.into();
1221        let source_branch = match &target {
1222            ReadTarget::Branch(b) => b.clone(),
1223            _ => "<snapshot>".to_string(),
1224        };
1225        self.enforce(
1226            omnigraph_policy::PolicyAction::BranchCreate,
1227            &omnigraph_policy::ResourceScope::BranchTransition {
1228                source: source_branch,
1229                target: name.to_string(),
1230            },
1231            actor,
1232        )?;
1233        self.ensure_schema_apply_idle("branch_create_from").await?;
1234        self.branch_create_from_impl(target, name, false).await
1235    }
1236
1237    async fn branch_create_from_impl(
1238        &self,
1239        from: impl Into<ReadTarget>,
1240        name: &str,
1241        allow_internal_refs: bool,
1242    ) -> Result<()> {
1243        let target = from.into();
1244        let ReadTarget::Branch(branch_name) = target else {
1245            return Err(OmniError::manifest(
1246                "branch creation from pinned snapshots is not supported yet".to_string(),
1247            ));
1248        };
1249        if !allow_internal_refs {
1250            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1251            ensure_public_branch_ref(name, "branch_create_from")?;
1252        }
1253        let branch = normalize_branch_name(&branch_name)?;
1254        // Operate on a freshly-opened source coordinator that's owned locally
1255        // — never touch `self.coordinator`. The pre-fix implementation used
1256        // `swap_coordinator_for_branch` + operate + `restore_coordinator` as
1257        // three separate `coordinator.write().await` acquisitions; under
1258        // `&self` concurrency, a second `branch_create_from` could swap
1259        // self.coordinator between this caller's swap and operate steps,
1260        // making the operate run against the wrong source branch and
1261        // forking off the wrong HEAD. Pinned by
1262        // `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`
1263        // in `crates/omnigraph-server/tests/server.rs`.
1264        //
1265        // `branch_create` mutates only the local coord's commit-graph cache;
1266        // the manifest write is durable on disk regardless of which
1267        // coord-handle issued it. Discarding `source_coord` after the call
1268        // is the right shape — the new branch is reachable from any
1269        // subsequent open of any coord.
1270        let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1271        source_coord.branch_create(name).await
1272    }
1273
1274    pub async fn branch_list(&self) -> Result<Vec<String>> {
1275        self.ensure_schema_state_valid().await?;
1276        self.coordinator.read().await.branch_list().await
1277    }
1278
1279    pub async fn branch_delete(&self, name: &str) -> Result<()> {
1280        self.branch_delete_as(name, None).await
1281    }
1282
1283    /// Delete a branch with an explicit actor for engine-layer policy
1284    /// enforcement (MR-722 fan-out). Scope is `TargetBranch(name)` —
1285    /// matches the HTTP-layer convention at `server_branch_delete`
1286    /// (branch=None, target_branch=Some(name)). Cedar rules using
1287    /// `target_branch_scope: protected` therefore correctly gate
1288    /// deletion of protected branches (e.g. deny BranchDelete against
1289    /// `main`).
1290    pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1291        self.enforce(
1292            omnigraph_policy::PolicyAction::BranchDelete,
1293            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1294            actor,
1295        )?;
1296        self.ensure_schema_state_valid().await?;
1297        self.ensure_schema_apply_idle("branch_delete").await?;
1298        ensure_public_branch_ref(name, "branch_delete")?;
1299        self.refresh().await?;
1300        let branch = normalize_branch_name(name)?
1301            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1302        let branches = self.coordinator.read().await.branch_list().await?;
1303        if !branches.iter().any(|candidate| candidate == &branch) {
1304            return Err(OmniError::manifest_not_found(format!(
1305                "branch '{}' not found",
1306                branch
1307            )));
1308        }
1309
1310        self.ensure_branch_delete_safe(&branch, &branches).await?;
1311        self.delete_branch_storage_only(&branch).await
1312    }
1313
1314    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1315        self.ensure_schema_state_valid().await?;
1316        self.coordinator
1317            .read()
1318            .await
1319            .resolve_commit(&SnapshotId::new(commit_id))
1320            .await
1321    }
1322
1323    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1324        self.ensure_schema_state_valid().await?;
1325        let branch = match branch {
1326            Some(branch) => normalize_branch_name(branch)?,
1327            None => None,
1328        };
1329        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1330        coordinator.list_commits().await
1331    }
1332
1333    /// Open a sub-table for mutation with version-drift guard.
1334    ///
1335    /// Checks that the dataset's current version matches the snapshot-pinned
1336    /// version. If another writer has advanced the version, returns an error
1337    /// prompting the caller to refresh and retry (optimistic concurrency).
1338    pub(crate) async fn open_for_mutation(
1339        &self,
1340        table_key: &str,
1341        op_kind: crate::db::MutationOpKind,
1342    ) -> Result<(Dataset, String, Option<String>)> {
1343        table_ops::open_for_mutation(self, table_key, op_kind).await
1344    }
1345
1346    pub(crate) async fn open_for_mutation_on_branch(
1347        &self,
1348        branch: Option<&str>,
1349        table_key: &str,
1350        op_kind: crate::db::MutationOpKind,
1351    ) -> Result<(Dataset, String, Option<String>)> {
1352        table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1353    }
1354
1355    pub(crate) async fn fork_dataset_from_entry_state(
1356        &self,
1357        table_key: &str,
1358        full_path: &str,
1359        source_branch: Option<&str>,
1360        source_version: u64,
1361        active_branch: &str,
1362    ) -> Result<Dataset> {
1363        table_ops::fork_dataset_from_entry_state(
1364            self,
1365            table_key,
1366            full_path,
1367            source_branch,
1368            source_version,
1369            active_branch,
1370        )
1371        .await
1372    }
1373
1374    pub(crate) async fn reopen_for_mutation(
1375        &self,
1376        table_key: &str,
1377        full_path: &str,
1378        table_branch: Option<&str>,
1379        expected_version: u64,
1380        op_kind: crate::db::MutationOpKind,
1381    ) -> Result<Dataset> {
1382        table_ops::reopen_for_mutation(
1383            self,
1384            table_key,
1385            full_path,
1386            table_branch,
1387            expected_version,
1388            op_kind,
1389        )
1390        .await
1391    }
1392
1393    pub(crate) async fn open_dataset_at_state(
1394        &self,
1395        table_path: &str,
1396        table_branch: Option<&str>,
1397        table_version: u64,
1398    ) -> Result<Dataset> {
1399        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1400    }
1401
1402    pub(crate) async fn build_indices_on_dataset(
1403        &self,
1404        table_key: &str,
1405        ds: &mut Dataset,
1406    ) -> Result<()> {
1407        table_ops::build_indices_on_dataset(self, table_key, ds).await
1408    }
1409
1410    pub(crate) async fn build_indices_on_dataset_for_catalog(
1411        &self,
1412        catalog: &Catalog,
1413        table_key: &str,
1414        ds: &mut Dataset,
1415    ) -> Result<()> {
1416        table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1417    }
1418
1419    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
1420    // uses `commit_updates_on_branch_with_expected` exclusively.
1421    #[cfg(test)]
1422    pub(crate) async fn commit_updates(
1423        &mut self,
1424        updates: &[crate::db::SubTableUpdate],
1425    ) -> Result<u64> {
1426        table_ops::commit_updates(self, updates).await
1427    }
1428
1429    pub(crate) async fn commit_manifest_updates(
1430        &self,
1431        updates: &[crate::db::SubTableUpdate],
1432    ) -> Result<u64> {
1433        table_ops::commit_manifest_updates(self, updates).await
1434    }
1435
1436    pub(crate) async fn record_merge_commit(
1437        &self,
1438        manifest_version: u64,
1439        parent_commit_id: &str,
1440        merged_parent_commit_id: &str,
1441        actor_id: Option<&str>,
1442    ) -> Result<String> {
1443        table_ops::record_merge_commit(
1444            self,
1445            manifest_version,
1446            parent_commit_id,
1447            merged_parent_commit_id,
1448            actor_id,
1449        )
1450        .await
1451    }
1452
1453    pub(crate) async fn commit_updates_on_branch_with_expected(
1454        &self,
1455        branch: Option<&str>,
1456        updates: &[crate::db::SubTableUpdate],
1457        expected_table_versions: &std::collections::HashMap<String, u64>,
1458        actor_id: Option<&str>,
1459    ) -> Result<u64> {
1460        table_ops::commit_updates_on_branch_with_expected(
1461            self,
1462            branch,
1463            updates,
1464            expected_table_versions,
1465            actor_id,
1466        )
1467        .await
1468    }
1469
1470    pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1471        table_ops::ensure_commit_graph_initialized(self).await
1472    }
1473
1474    /// Invalidate the cached graph index. Called after edge mutations.
1475    pub(crate) async fn invalidate_graph_index(&self) {
1476        table_ops::invalidate_graph_index(self).await
1477    }
1478}
1479
1480pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1481    let branch = branch.trim();
1482    if branch.is_empty() {
1483        return Err(OmniError::manifest(
1484            "branch name cannot be empty".to_string(),
1485        ));
1486    }
1487    if branch == "main" {
1488        return Ok(None);
1489    }
1490    Ok(Some(branch.to_string()))
1491}
1492
1493pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1494    if super::is_internal_run_branch(branch) {
1495        return Err(OmniError::manifest(format!(
1496            "{} does not allow internal run ref '{}'",
1497            operation, branch
1498        )));
1499    }
1500    if is_internal_system_branch(branch) {
1501        return Err(OmniError::manifest(format!(
1502            "{} does not allow internal system ref '{}'",
1503            operation, branch
1504        )));
1505    }
1506    Ok(())
1507}
1508
1509fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1510    if batches.is_empty() {
1511        return Ok(RecordBatch::new_empty(schema));
1512    }
1513    if batches.len() == 1 {
1514        return Ok(batches.into_iter().next().unwrap());
1515    }
1516    let batch_schema = batches[0].schema();
1517    arrow_select::concat::concat_batches(&batch_schema, &batches)
1518        .map_err(|e| OmniError::Lance(e.to_string()))
1519}
1520
1521fn blob_properties_for_table_key<'a>(
1522    catalog: &'a Catalog,
1523    table_key: &str,
1524) -> Result<&'a std::collections::HashSet<String>> {
1525    if let Some(type_name) = table_key.strip_prefix("node:") {
1526        return catalog
1527            .node_types
1528            .get(type_name)
1529            .map(|node_type| &node_type.blob_properties)
1530            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1531    }
1532    if let Some(type_name) = table_key.strip_prefix("edge:") {
1533        return catalog
1534            .edge_types
1535            .get(type_name)
1536            .map(|edge_type| &edge_type.blob_properties)
1537            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1538    }
1539    Err(OmniError::manifest(format!(
1540        "invalid table key '{}'",
1541        table_key
1542    )))
1543}
1544
1545fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1546    if descriptions.is_null(row) {
1547        return Ok(true);
1548    }
1549
1550    let kind = descriptions
1551        .column_by_name("kind")
1552        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1553        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1554        .or_else(|| {
1555            descriptions
1556                .column_by_name("kind")
1557                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1558                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1559        });
1560    let position = descriptions
1561        .column_by_name("position")
1562        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1563        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1564    let size = descriptions
1565        .column_by_name("size")
1566        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1567        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1568    let blob_uri = descriptions
1569        .column_by_name("blob_uri")
1570        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1571        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1572
1573    let Some(kind) = kind else {
1574        return Ok(true);
1575    };
1576    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1577    if kind != BlobKind::Inline {
1578        return Ok(false);
1579    }
1580
1581    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1582}
1583
1584/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1585///
1586/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1587/// `DataType::LargeBinary` as a placeholder. This function replaces those
1588/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1589fn fixup_blob_schemas(catalog: &mut Catalog) {
1590    for node_type in catalog.node_types.values_mut() {
1591        if node_type.blob_properties.is_empty() {
1592            continue;
1593        }
1594        let fields: Vec<Field> = node_type
1595            .arrow_schema
1596            .fields()
1597            .iter()
1598            .map(|f| {
1599                if node_type.blob_properties.contains(f.name()) {
1600                    blob_field(f.name(), f.is_nullable())
1601                } else {
1602                    f.as_ref().clone()
1603                }
1604            })
1605            .collect();
1606        node_type.arrow_schema = Arc::new(Schema::new(fields));
1607    }
1608    for edge_type in catalog.edge_types.values_mut() {
1609        if edge_type.blob_properties.is_empty() {
1610            continue;
1611        }
1612        let fields: Vec<Field> = edge_type
1613            .arrow_schema
1614            .fields()
1615            .iter()
1616            .map(|f| {
1617                if edge_type.blob_properties.contains(f.name()) {
1618                    blob_field(f.name(), f.is_nullable())
1619                } else {
1620                    f.as_ref().clone()
1621                }
1622            })
1623            .collect();
1624        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1625    }
1626}
1627
1628fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1629    let schema_ast = parse_schema(schema_source)?;
1630    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1631}
1632
1633/// I/O phase of `Omnigraph::init_with_storage`. Split out so the caller
1634/// can pattern-match on the result and run cleanup on error before
1635/// returning the original error.
1636///
1637/// Failpoints fire at the phase boundaries:
1638/// * `init.after_schema_pg_written` — `_schema.pg` is on disk. In strict mode
1639///   this fires in the caller immediately after the atomic ownership claim; in
1640///   force mode it fires here after the explicit overwrite.
1641/// * `init.after_schema_contract_written` — `_schema.pg` + `_schema.ir.json`
1642///   + `__schema_state.json` are on disk.
1643/// * `init.after_coordinator_init` — all schema files plus Lance per-type
1644///   datasets and `__manifest/` are on disk. (The cleanup wrapper can only
1645///   remove the schema files; Lance directories need `delete_prefix` —
1646///   deferred along with `DELETE /graphs/{id}`.)
1647async fn init_storage_phase(
1648    root: &str,
1649    schema_source: &str,
1650    schema_ir: &SchemaIR,
1651    catalog: &Catalog,
1652    storage: &Arc<dyn StorageAdapter>,
1653    write_schema_pg: bool,
1654) -> Result<GraphCoordinator> {
1655    if write_schema_pg {
1656        let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1657        storage.write_text(&schema_path, schema_source).await?;
1658        crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1659    }
1660
1661    write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1662    crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1663
1664    let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1665    crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1666
1667    Ok(coordinator)
1668}
1669
1670/// Best-effort cleanup of init-phase artifacts. Called from
1671/// `init_with_storage` on any error returned by `init_storage_phase`.
1672///
1673/// Removes the three schema files: `_schema.pg`, `_schema.ir.json`,
1674/// `__schema_state.json`. Lance datasets and `__manifest/` are not
1675/// touched here — recursive directory deletion requires a
1676/// `StorageAdapter::delete_prefix` primitive that's deferred along
1677/// with `DELETE /graphs/{id}` (MR-668 PR 2b).
1678///
1679/// Failures to delete are logged via `tracing::warn` and do not mask
1680/// the original init error.
1681async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1682    for uri in [
1683        schema_source_uri(root),
1684        schema_ir_uri(root),
1685        schema_state_uri(root),
1686    ] {
1687        if let Err(err) = storage.delete(&uri).await {
1688            tracing::warn!(
1689                target: "omnigraph::init::cleanup",
1690                uri = %uri,
1691                error = %err,
1692                "init failed; best-effort cleanup could not delete artifact",
1693            );
1694        }
1695    }
1696}
1697
1698fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1699    match type_kind {
1700        SchemaTypeKind::Node => format!("node:{}", name),
1701        SchemaTypeKind::Edge => format!("edge:{}", name),
1702        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1703    }
1704}
1705
1706fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1707    if let Some(type_name) = table_key.strip_prefix("node:") {
1708        let node_type: &NodeType = catalog
1709            .node_types
1710            .get(type_name)
1711            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1712        return Ok(node_type.arrow_schema.clone());
1713    }
1714    if let Some(type_name) = table_key.strip_prefix("edge:") {
1715        let edge_type: &EdgeType = catalog
1716            .edge_types
1717            .get(type_name)
1718            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1719        return Ok(edge_type.arrow_schema.clone());
1720    }
1721    Err(OmniError::manifest(format!(
1722        "invalid table key '{}'",
1723        table_key
1724    )))
1725}
1726
1727fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1728    let mut obj = serde_json::Map::new();
1729    for (i, field) in batch.schema().fields().iter().enumerate() {
1730        obj.insert(
1731            field.name().clone(),
1732            json_value_from_array(batch.column(i).as_ref(), row)?,
1733        );
1734    }
1735    Ok(serde_json::Value::Object(obj))
1736}
1737
1738fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1739    if array.is_null(row) {
1740        return Ok(serde_json::Value::Null);
1741    }
1742
1743    match array.data_type() {
1744        DataType::Utf8 => Ok(serde_json::Value::String(
1745            array
1746                .as_any()
1747                .downcast_ref::<StringArray>()
1748                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1749                .value(row)
1750                .to_string(),
1751        )),
1752        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1753            array
1754                .as_any()
1755                .downcast_ref::<LargeStringArray>()
1756                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1757                .value(row)
1758                .to_string(),
1759        )),
1760        DataType::Boolean => Ok(serde_json::Value::Bool(
1761            array
1762                .as_any()
1763                .downcast_ref::<BooleanArray>()
1764                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1765                .value(row),
1766        )),
1767        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1768            array
1769                .as_any()
1770                .downcast_ref::<Int32Array>()
1771                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1772                .value(row),
1773        ))),
1774        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1775            array
1776                .as_any()
1777                .downcast_ref::<Int64Array>()
1778                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1779                .value(row),
1780        ))),
1781        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1782            array
1783                .as_any()
1784                .downcast_ref::<UInt32Array>()
1785                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1786                .value(row),
1787        ))),
1788        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1789            array
1790                .as_any()
1791                .downcast_ref::<UInt64Array>()
1792                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1793                .value(row),
1794        ))),
1795        DataType::Float32 => {
1796            let value = array
1797                .as_any()
1798                .downcast_ref::<Float32Array>()
1799                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1800                .value(row) as f64;
1801            Ok(serde_json::Value::Number(
1802                serde_json::Number::from_f64(value).ok_or_else(|| {
1803                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1804                })?,
1805            ))
1806        }
1807        DataType::Float64 => {
1808            let value = array
1809                .as_any()
1810                .downcast_ref::<Float64Array>()
1811                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1812                .value(row);
1813            Ok(serde_json::Value::Number(
1814                serde_json::Number::from_f64(value).ok_or_else(|| {
1815                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1816                })?,
1817            ))
1818        }
1819        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1820            array
1821                .as_any()
1822                .downcast_ref::<Date32Array>()
1823                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1824                .value(row),
1825        ))),
1826        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1827            &base64::engine::general_purpose::STANDARD,
1828            array
1829                .as_any()
1830                .downcast_ref::<BinaryArray>()
1831                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1832                .value(row),
1833        ))),
1834        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1835            &base64::engine::general_purpose::STANDARD,
1836            array
1837                .as_any()
1838                .downcast_ref::<LargeBinaryArray>()
1839                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1840                .value(row),
1841        ))),
1842        DataType::List(_) => {
1843            let list = array
1844                .as_any()
1845                .downcast_ref::<ListArray>()
1846                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1847            let values = list.value(row);
1848            let mut out = Vec::with_capacity(values.len());
1849            for idx in 0..values.len() {
1850                out.push(json_value_from_array(values.as_ref(), idx)?);
1851            }
1852            Ok(serde_json::Value::Array(out))
1853        }
1854        DataType::LargeList(_) => {
1855            let list = array
1856                .as_any()
1857                .downcast_ref::<LargeListArray>()
1858                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1859            let values = list.value(row);
1860            let mut out = Vec::with_capacity(values.len());
1861            for idx in 0..values.len() {
1862                out.push(json_value_from_array(values.as_ref(), idx)?);
1863            }
1864            Ok(serde_json::Value::Array(out))
1865        }
1866        DataType::FixedSizeList(_, _) => {
1867            let list = array
1868                .as_any()
1869                .downcast_ref::<FixedSizeListArray>()
1870                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1871            let values = list.value(row);
1872            let mut out = Vec::with_capacity(values.len());
1873            for idx in 0..values.len() {
1874                out.push(json_value_from_array(values.as_ref(), idx)?);
1875            }
1876            Ok(serde_json::Value::Array(out))
1877        }
1878        DataType::Struct(fields) => {
1879            let struct_array = array
1880                .as_any()
1881                .downcast_ref::<StructArray>()
1882                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1883            let mut obj = serde_json::Map::new();
1884            for (field_idx, field) in fields.iter().enumerate() {
1885                obj.insert(
1886                    field.name().clone(),
1887                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1888                );
1889            }
1890            Ok(serde_json::Value::Object(obj))
1891        }
1892        _ => {
1893            let value = arrow_cast::display::array_value_to_string(array, row)
1894                .map_err(|e| OmniError::Lance(e.to_string()))?;
1895            Ok(serde_json::Value::String(value))
1896        }
1897    }
1898}
1899
1900#[cfg(test)]
1901mod tests {
1902    use super::*;
1903    use crate::db::is_internal_run_branch;
1904    use crate::db::manifest::ManifestCoordinator;
1905    use async_trait::async_trait;
1906    use serde_json::Value;
1907    use std::sync::{Arc, Mutex};
1908
1909    use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1910
1911    const TEST_SCHEMA: &str = r#"
1912node Person {
1913    name: String @key
1914    age: I32?
1915}
1916node Company {
1917    name: String @key
1918}
1919edge Knows: Person -> Person {
1920    since: Date?
1921}
1922edge WorksAt: Person -> Company
1923"#;
1924
1925    #[derive(Debug, Default)]
1926    struct RecordingStorageAdapter {
1927        inner: LocalStorageAdapter,
1928        reads: Mutex<Vec<String>>,
1929        writes: Mutex<Vec<String>>,
1930        exists_checks: Mutex<Vec<String>>,
1931        renames: Mutex<Vec<(String, String)>>,
1932        deletes: Mutex<Vec<String>>,
1933    }
1934
1935    impl RecordingStorageAdapter {
1936        fn reads(&self) -> Vec<String> {
1937            self.reads.lock().unwrap().clone()
1938        }
1939
1940        fn writes(&self) -> Vec<String> {
1941            self.writes.lock().unwrap().clone()
1942        }
1943
1944        fn exists_checks(&self) -> Vec<String> {
1945            self.exists_checks.lock().unwrap().clone()
1946        }
1947    }
1948
1949    #[async_trait]
1950    impl StorageAdapter for RecordingStorageAdapter {
1951        async fn read_text(&self, uri: &str) -> Result<String> {
1952            self.reads.lock().unwrap().push(uri.to_string());
1953            self.inner.read_text(uri).await
1954        }
1955
1956        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1957            self.writes.lock().unwrap().push(uri.to_string());
1958            self.inner.write_text(uri, contents).await
1959        }
1960
1961        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
1962            self.writes.lock().unwrap().push(uri.to_string());
1963            self.inner.write_text_if_absent(uri, contents).await
1964        }
1965
1966        async fn exists(&self, uri: &str) -> Result<bool> {
1967            self.exists_checks.lock().unwrap().push(uri.to_string());
1968            self.inner.exists(uri).await
1969        }
1970
1971        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1972            self.renames
1973                .lock()
1974                .unwrap()
1975                .push((from_uri.to_string(), to_uri.to_string()));
1976            self.inner.rename_text(from_uri, to_uri).await
1977        }
1978
1979        async fn delete(&self, uri: &str) -> Result<()> {
1980            self.deletes.lock().unwrap().push(uri.to_string());
1981            self.inner.delete(uri).await
1982        }
1983
1984        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1985            self.inner.list_dir(dir_uri).await
1986        }
1987    }
1988
1989    #[derive(Debug)]
1990    struct InitRaceStorageAdapter {
1991        inner: LocalStorageAdapter,
1992        root: String,
1993        barrier: Arc<tokio::sync::Barrier>,
1994    }
1995
1996    #[async_trait]
1997    impl StorageAdapter for InitRaceStorageAdapter {
1998        async fn read_text(&self, uri: &str) -> Result<String> {
1999            self.inner.read_text(uri).await
2000        }
2001
2002        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2003            self.inner.write_text(uri, contents).await
2004        }
2005
2006        async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2007            self.inner.write_text_if_absent(uri, contents).await
2008        }
2009
2010        async fn exists(&self, uri: &str) -> Result<bool> {
2011            let exists = self.inner.exists(uri).await?;
2012            if uri == schema_state_uri(&self.root) {
2013                self.barrier.wait().await;
2014            }
2015            Ok(exists)
2016        }
2017
2018        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2019            self.inner.rename_text(from_uri, to_uri).await
2020        }
2021
2022        async fn delete(&self, uri: &str) -> Result<()> {
2023            self.inner.delete(uri).await
2024        }
2025
2026        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2027            self.inner.list_dir(dir_uri).await
2028        }
2029    }
2030
2031    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2032    async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2033        let dir = tempfile::tempdir().unwrap();
2034        let uri = dir.path().to_str().unwrap().to_string();
2035        let root = normalize_root_uri(&uri).unwrap();
2036        let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2037            inner: LocalStorageAdapter,
2038            root,
2039            barrier: Arc::new(tokio::sync::Barrier::new(2)),
2040        });
2041
2042        let left = Omnigraph::init_with_storage(
2043            &uri,
2044            TEST_SCHEMA,
2045            Arc::clone(&storage),
2046            InitOptions::default(),
2047        );
2048        let right = Omnigraph::init_with_storage(
2049            &uri,
2050            TEST_SCHEMA,
2051            Arc::clone(&storage),
2052            InitOptions::default(),
2053        );
2054        let (left, right) = tokio::join!(left, right);
2055        let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2056        assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2057
2058        assert!(
2059            dir.path().join("_schema.pg").exists(),
2060            "winning init must leave _schema.pg in place"
2061        );
2062        assert!(
2063            dir.path().join("_schema.ir.json").exists(),
2064            "winning init must leave _schema.ir.json in place"
2065        );
2066        assert!(
2067            dir.path().join("__schema_state.json").exists(),
2068            "winning init must leave __schema_state.json in place"
2069        );
2070    }
2071
2072    #[tokio::test]
2073    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2074        let dir = tempfile::tempdir().unwrap();
2075        let uri = dir.path().to_str().unwrap();
2076        let adapter = Arc::new(RecordingStorageAdapter::default());
2077
2078        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2079            .await
2080            .unwrap();
2081        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2082        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2083        assert!(
2084            adapter
2085                .writes()
2086                .contains(&join_uri(uri, "__schema_state.json"))
2087        );
2088
2089        Omnigraph::open_with_storage(uri, adapter.clone())
2090            .await
2091            .unwrap();
2092        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2093        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2094        assert!(
2095            adapter
2096                .reads()
2097                .contains(&join_uri(uri, "__schema_state.json"))
2098        );
2099        assert!(
2100            adapter
2101                .exists_checks()
2102                .contains(&join_uri(uri, "_schema.ir.json"))
2103        );
2104        assert!(
2105            adapter
2106                .exists_checks()
2107                .contains(&join_uri(uri, "__schema_state.json"))
2108        );
2109        assert!(
2110            adapter
2111                .exists_checks()
2112                .contains(&join_uri(uri, "_graph_commits.lance"))
2113        );
2114    }
2115
2116    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2117        let snapshot = db.snapshot().await;
2118        let ds = snapshot.open(table_key).await.unwrap();
2119        let batches = db.table_store().scan_batches(&ds).await.unwrap();
2120        batches
2121            .into_iter()
2122            .flat_map(|batch| {
2123                (0..batch.num_rows())
2124                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2125                    .collect::<Vec<_>>()
2126            })
2127            .collect()
2128    }
2129
2130    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2131        let (mut ds, full_path, table_branch) = db
2132            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2133            .await
2134            .unwrap();
2135        let schema: Arc<Schema> = Arc::new(ds.schema().into());
2136        let columns: Vec<Arc<dyn Array>> = schema
2137            .fields()
2138            .iter()
2139            .map(|field| match field.name().as_str() {
2140                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2141                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2142                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2143                _ => new_null_array(field.data_type(), 1),
2144            })
2145            .collect();
2146        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2147        let state = db
2148            .table_store()
2149            .append_batch(&full_path, &mut ds, batch)
2150            .await
2151            .unwrap();
2152        db.commit_updates(&[crate::db::SubTableUpdate {
2153            table_key: "node:Person".to_string(),
2154            table_version: state.version,
2155            table_branch,
2156            row_count: state.row_count,
2157            version_metadata: state.version_metadata,
2158        }])
2159        .await
2160        .unwrap();
2161    }
2162
2163    #[tokio::test]
2164    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2165        let dir = tempfile::tempdir().unwrap();
2166        let uri = dir.path().to_str().unwrap();
2167        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2168        seed_person_row(&mut db, "Alice", Some(30)).await;
2169
2170        let desired = TEST_SCHEMA.replace(
2171            "    age: I32?\n}",
2172            "    age: I32?\n    nickname: String?\n}",
2173        );
2174        let result = db.apply_schema(&desired).await.unwrap();
2175        assert!(result.applied);
2176
2177        let reopened = Omnigraph::open(uri).await.unwrap();
2178        let rows = table_rows_json(&reopened, "node:Person").await;
2179        assert_eq!(rows.len(), 1);
2180        assert_eq!(rows[0]["name"], "Alice");
2181        assert_eq!(rows[0]["age"], 30);
2182        assert!(rows[0]["nickname"].is_null());
2183        assert!(
2184            reopened.catalog().node_types["Person"]
2185                .properties
2186                .contains_key("nickname")
2187        );
2188        assert!(dir.path().join("_schema.pg").exists());
2189    }
2190
2191    #[tokio::test]
2192    async fn test_apply_schema_renames_property_and_preserves_values() {
2193        let dir = tempfile::tempdir().unwrap();
2194        let uri = dir.path().to_str().unwrap();
2195        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2196        seed_person_row(&mut db, "Alice", Some(30)).await;
2197
2198        let desired = TEST_SCHEMA.replace(
2199            "    age: I32?\n}",
2200            "    years: I32? @rename_from(\"age\")\n}",
2201        );
2202        db.apply_schema(&desired).await.unwrap();
2203
2204        let reopened = Omnigraph::open(uri).await.unwrap();
2205        let rows = table_rows_json(&reopened, "node:Person").await;
2206        assert_eq!(rows[0]["name"], "Alice");
2207        assert_eq!(rows[0]["years"], 30);
2208        assert!(rows[0].get("age").is_none());
2209    }
2210
2211    #[tokio::test]
2212    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2213        let dir = tempfile::tempdir().unwrap();
2214        let uri = dir.path().to_str().unwrap();
2215        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2216        seed_person_row(&mut db, "Alice", Some(30)).await;
2217        let before_version = db.snapshot().await.version();
2218
2219        let desired = TEST_SCHEMA
2220            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2221            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2222            .replace(
2223                "edge WorksAt: Person -> Company",
2224                "edge WorksAt: Human -> Company",
2225            );
2226        db.apply_schema(&desired).await.unwrap();
2227
2228        let head = db.snapshot().await;
2229        assert!(head.entry("node:Person").is_none());
2230        assert!(head.entry("node:Human").is_some());
2231        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2232            .await
2233            .unwrap();
2234        assert!(historical.entry("node:Person").is_some());
2235        assert!(historical.entry("node:Human").is_none());
2236    }
2237
2238    #[tokio::test]
2239    async fn test_apply_schema_succeeds_after_load() {
2240        // Historical: schema apply used to be blocked by leftover
2241        // `__run__` branches. A defense-in-depth filter now skips
2242        // internal system branches, and run branches were made
2243        // ephemeral on every terminal state — so in practice no
2244        // `__run__` branch survives publish. The filter still guards
2245        // the invariant.
2246        let dir = tempfile::tempdir().unwrap();
2247        let uri = dir.path().to_str().unwrap();
2248        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2249
2250        crate::loader::load_jsonl(
2251            &mut db,
2252            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2253            crate::loader::LoadMode::Overwrite,
2254        )
2255        .await
2256        .unwrap();
2257
2258        let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2259        assert!(
2260            !all_branches.iter().any(|b| is_internal_run_branch(b)),
2261            "run branch should be deleted after publish, got: {:?}",
2262            all_branches
2263        );
2264
2265        let desired = TEST_SCHEMA.replace(
2266            "    age: I32?\n}",
2267            "    age: I32?\n    nickname: String?\n}",
2268        );
2269        let result = db.apply_schema(&desired).await.unwrap();
2270        assert!(result.applied, "schema apply should have applied");
2271    }
2272
2273    #[tokio::test]
2274    async fn test_apply_schema_adds_index_for_existing_property() {
2275        let dir = tempfile::tempdir().unwrap();
2276        let uri = dir.path().to_str().unwrap();
2277        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2278
2279        let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2280        db.apply_schema(&desired).await.unwrap();
2281
2282        let snapshot = db.snapshot().await;
2283        let ds = snapshot.open("node:Person").await.unwrap();
2284        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2285    }
2286
2287    #[tokio::test]
2288    async fn test_apply_schema_rewrite_preserves_existing_indices() {
2289        let dir = tempfile::tempdir().unwrap();
2290        let uri = dir.path().to_str().unwrap();
2291        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2292        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2293        seed_person_row(&mut db, "Alice", Some(30)).await;
2294
2295        let desired = initial_schema.replace(
2296            "    age: I32?\n}",
2297            "    age: I32?\n    nickname: String?\n}",
2298        );
2299        db.apply_schema(&desired).await.unwrap();
2300
2301        let snapshot = db.snapshot().await;
2302        let ds = snapshot.open("node:Person").await.unwrap();
2303        assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
2304        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2305    }
2306
2307    #[tokio::test]
2308    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2309        let dir = tempfile::tempdir().unwrap();
2310        let uri = dir.path().to_str().unwrap();
2311        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2312        let mut db = db;
2313        db.coordinator
2314            .write()
2315            .await
2316            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2317            .await
2318            .unwrap();
2319
2320        let err = db
2321            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2322            .await
2323            .unwrap_err();
2324        assert!(
2325            err.to_string()
2326                .contains("write is unavailable while schema apply is in progress")
2327        );
2328    }
2329
2330    #[tokio::test]
2331    async fn test_commit_updates_rejects_while_schema_apply_locked() {
2332        let dir = tempfile::tempdir().unwrap();
2333        let uri = dir.path().to_str().unwrap();
2334        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2335        db.coordinator
2336            .write()
2337            .await
2338            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2339            .await
2340            .unwrap();
2341
2342        let err = db.commit_updates(&[]).await.unwrap_err();
2343        assert!(
2344            err.to_string()
2345                .contains("write commit is unavailable while schema apply is in progress")
2346        );
2347    }
2348
2349    #[tokio::test]
2350    async fn test_branch_list_hides_schema_apply_lock_branch() {
2351        let dir = tempfile::tempdir().unwrap();
2352        let uri = dir.path().to_str().unwrap();
2353        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2354        db.coordinator
2355            .write()
2356            .await
2357            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2358            .await
2359            .unwrap();
2360
2361        let branches = db.branch_list().await.unwrap();
2362        assert_eq!(branches, vec!["main".to_string()]);
2363    }
2364}