Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21    DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22    build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
37pub use schema_apply::SchemaApplyOptions;
38
39use super::commit_graph::GraphCommit;
40use super::manifest::{
41    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
42    table_path_for_table_key,
43};
44use super::schema_state::{
45    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
46    recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
47    schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
48    write_schema_contract, write_schema_contract_staging,
49};
50use super::{
51    ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
52    is_schema_apply_lock_branch,
53};
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum MergeOutcome {
57    AlreadyUpToDate,
58    FastForward,
59    Merged,
60}
61
62#[derive(Debug, Clone)]
63pub struct SchemaApplyResult {
64    pub supported: bool,
65    pub applied: bool,
66    pub manifest_version: u64,
67    pub steps: Vec<SchemaMigrationStep>,
68}
69
70/// Top-level handle to an Omnigraph database.
71///
72/// An Omnigraph is a Lance-native graph database with git-style branching.
73/// It stores typed property graphs as per-type Lance datasets coordinated
74/// through a Lance manifest table.
75pub struct Omnigraph {
76    root_uri: String,
77    storage: Arc<dyn StorageAdapter>,
78    /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
79    /// this so engine write APIs can be `&self` (the HTTP server's
80    /// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
81    /// calls without a global write lock). Reads (`snapshot`, `version`,
82    /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
83    /// `list_commits`, …) acquire `.read().await` and parallelize.
84    /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
85    /// `record_*`) acquire `.write().await` and serialize. The atomic
86    /// commit invariant — `commit_manifest_updates` followed by
87    /// `record_graph_commit` must be atomic — is preserved by the
88    /// single `.write()` covering both calls inside
89    /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
90    /// converted from `Mutex` to `RwLock` because the bench showed
91    /// the Mutex was the dominant serializer for disjoint-table
92    /// workloads. Lock acquisition order: always before `runtime_cache`
93    /// (when both are needed in one scope).
94    coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
95    table_store: TableStore,
96    runtime_cache: RuntimeCache,
97    /// Read-heavy on every query, written only by `apply_schema`. ArcSwap
98    /// gives atomic pointer swap with zero-cost reads (`load()` returns a
99    /// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
100    /// don't contend on a lock to read the catalog.
101    catalog: Arc<ArcSwap<Catalog>>,
102    /// Read-heavy on schema introspection paths, written only by
103    /// `apply_schema`. Same ArcSwap rationale as `catalog`.
104    schema_source: Arc<ArcSwap<String>>,
105    /// Per-`(table_key, branch)` writer queues. Reachable from engine
106    /// internals (mutation finalize, schema_apply, branch_merge,
107    /// ensure_indices, delete_where) and from future MR-870 recovery
108    /// reconciler. PR 1b adds the field; callers acquire in commits 4+.
109    write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
110    /// Process-wide mutex held across the swap → operate → restore window
111    /// in `branch_merge_impl`. Two concurrent merges with distinct targets
112    /// would otherwise interleave their three separate
113    /// `coordinator.write().await` acquisitions, leaving each merge's
114    /// inner body running against the other's swapped coord. Pinned by
115    /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other`
116    /// in `crates/omnigraph-server/tests/server.rs`.
117    ///
118    /// Cost: serializes ALL concurrent branch merges process-wide.
119    /// Acceptable because branch merges are heavy (table rewrites, index
120    /// rebuilds), per-(table, branch) queues inside `commit_all` already
121    /// serialize the data path, and merges are rare relative to /change
122    /// or /ingest. A finer-grained per-target-branch mutex is a follow-up
123    /// if telemetry shows merge concurrency matters.
124    ///
125    /// The deeper fix — refactor `branch_merge_on_current_target` to take
126    /// an explicit target coord parameter so `self.coordinator` is never
127    /// used as scratch space — is the round-1 shape applied to
128    /// `branch_create_from_impl`. Deferred because it requires unwinding
129    /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()`
130    /// call inside the merge body.
131    merge_exclusive: Arc<tokio::sync::Mutex<()>>,
132    /// Optional policy checker for engine-layer enforcement (MR-722).
133    /// `None` = no enforcement; mutating methods are unconditionally
134    /// allowed (this is the embedded/dev default). `Some` = every
135    /// mutating method calls `self.enforce(action, scope, actor)` at
136    /// entry; denial returns `OmniError::Policy`.
137    ///
138    /// Per chassis design (see `omnigraph_policy::PolicyChecker`), the
139    /// trait surface is deliberately coarse — action × scope × actor.
140    /// Per-row / per-type / per-column scope lives at the query layer
141    /// (MR-725), which extends the same trait with a different method.
142    /// Don't be tempted to add per-row enforcement here.
143    ///
144    /// Set via `with_policy(checker)` after construction. Today only
145    /// `apply_schema_as` consults this field (PR #2 proof-of-concept);
146    /// PR #3 fans the `enforce()` call out to the remaining writers.
147    policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
148}
149
150/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
151///
152/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
153/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
154/// inspection — should not trigger writes (they may run with read-only
155/// object-store credentials, and silent open-time mutations are
156/// surprising). They also don't need recovery: reads always resolve
157/// through the manifest pin, which is the consistent snapshot regardless
158/// of any Phase B → Phase C drift on the per-table side.
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub enum OpenMode {
161    /// Run the recovery sweep on open. Default for `Omnigraph::open`.
162    ReadWrite,
163    /// Skip the recovery sweep. Use for read-only consumers via
164    /// [`Omnigraph::open_read_only`].
165    ReadOnly,
166}
167
168impl Omnigraph {
169    /// Create a new repo at `uri` from schema source.
170    ///
171    /// Creates `_schema.pg`, per-type Lance datasets, and `__manifest`.
172    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
173        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
174    }
175
176    pub(crate) async fn init_with_storage(
177        uri: &str,
178        schema_source: &str,
179        storage: Arc<dyn StorageAdapter>,
180    ) -> Result<Self> {
181        let root = normalize_root_uri(uri)?;
182        let schema_ir = read_schema_ir_from_source(schema_source)?;
183        let mut catalog = build_catalog_from_ir(&schema_ir)?;
184        fixup_blob_schemas(&mut catalog);
185
186        // Write _schema.pg
187        let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
188        storage.write_text(&schema_path, schema_source).await?;
189        write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
190
191        // Create manifest + per-type datasets
192        let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
193
194        Ok(Self {
195            root_uri: root.clone(),
196            storage,
197            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
198            table_store: TableStore::new(&root),
199            runtime_cache: RuntimeCache::default(),
200            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
201            schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
202            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
203            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
204            policy: None,
205        })
206    }
207
208    /// Open an existing repo (read-write).
209    ///
210    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
211    /// Runs the open-time recovery sweep before returning — see [`OpenMode`].
212    pub async fn open(uri: &str) -> Result<Self> {
213        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
214    }
215
216    /// Open an existing repo for read-only consumers (NDJSON export,
217    /// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
218    pub async fn open_read_only(uri: &str) -> Result<Self> {
219        Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
220    }
221
222    /// `open_with_storage` retained for existing callers (init/test paths).
223    /// Defaults to `OpenMode::ReadWrite`.
224    pub(crate) async fn open_with_storage(
225        uri: &str,
226        storage: Arc<dyn StorageAdapter>,
227    ) -> Result<Self> {
228        Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
229    }
230
231    pub(crate) async fn open_with_storage_and_mode(
232        uri: &str,
233        storage: Arc<dyn StorageAdapter>,
234        mode: OpenMode,
235    ) -> Result<Self> {
236        let root = normalize_root_uri(uri)?;
237        // Open the coordinator first so the schema-staging recovery sweep can
238        // compare its snapshot against any leftover staging files.
239        let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
240        // Both the schema-state recovery sweep AND the manifest-drift
241        // recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
242        // consumers (NDJSON export, `commit list`, schema show) shouldn't
243        // trigger object-store mutations: they may run with read-only
244        // credentials, and silent open-time writes are surprising. Both
245        // sweeps' work is recoverable on the next ReadWrite open, so
246        // skipping under ReadOnly doesn't lose any safety guarantees —
247        // the manifest pin is the consistent snapshot regardless of
248        // drift on the per-table side or leftover schema-staging files.
249        if matches!(mode, OpenMode::ReadWrite) {
250            let schema_state_recovery =
251                recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
252                    .await?;
253            // Recovery sweep: close the Phase B → Phase C residual on
254            // any sidecar left over from a crashed writer. Continuous
255            // in-process recovery for long-running servers (no restart
256            // required between Phase B failure and recovery) is a
257            // separate background-reconciler effort.
258            crate::db::manifest::recover_manifest_drift(
259                &root,
260                Arc::clone(&storage),
261                &mut coordinator,
262                crate::db::manifest::RecoveryMode::Full,
263                schema_state_recovery,
264            )
265            .await?;
266        }
267        // Read _schema.pg (post-recovery — may have just been renamed in).
268        let schema_path = schema_source_uri(&root);
269        let schema_source = storage.read_text(&schema_path).await?;
270        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
271        let branches = coordinator.branch_list().await?;
272        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
273            &root,
274            Arc::clone(&storage),
275            &branches,
276            &current_source_ir,
277        )
278        .await?;
279        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
280        fixup_blob_schemas(&mut catalog);
281
282        Ok(Self {
283            root_uri: root.clone(),
284            storage,
285            coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
286            table_store: TableStore::new(&root),
287            runtime_cache: RuntimeCache::default(),
288            catalog: Arc::new(ArcSwap::from_pointee(catalog)),
289            schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
290            write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
291            merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
292            policy: None,
293        })
294    }
295
296    /// Returns an `Arc<Catalog>` snapshot. Cheap clone of the current
297    /// catalog pointer; callers can hold the returned `Arc` across awaits
298    /// without blocking concurrent `apply_schema`.
299    pub fn catalog(&self) -> Arc<Catalog> {
300        self.catalog.load_full()
301    }
302
303    /// Returns an `Arc<String>` snapshot of the schema source.
304    pub fn schema_source(&self) -> Arc<String> {
305        self.schema_source.load_full()
306    }
307
308    /// Atomically swap the in-memory catalog. Concurrent readers see
309    /// either the old or the new pointer; never a torn state. Used by
310    /// `apply_schema` and `reload_schema_if_source_changed`.
311    pub(crate) fn store_catalog(&self, catalog: Catalog) {
312        self.catalog.store(Arc::new(catalog));
313    }
314
315    /// Atomically swap the in-memory schema source. Same rationale as
316    /// [`store_catalog`](Self::store_catalog).
317    pub(crate) fn store_schema_source(&self, schema_source: String) {
318        self.schema_source.store(Arc::new(schema_source));
319    }
320
321    pub fn uri(&self) -> &str {
322        &self.root_uri
323    }
324
325    /// Install a policy checker for engine-layer enforcement (MR-722).
326    /// Builder-style setter — consumes `self`, returns `Self`. Calling
327    /// this on a `Omnigraph` previously without policy enables
328    /// `enforce()` to fire at every mutating engine method that's been
329    /// wired to call it (currently `apply_schema_as`; PR #3 fans out to
330    /// the remaining writers).
331    ///
332    /// Embedded callers that don't care about authorization should
333    /// just not call this. Server / CLI callers that have loaded a
334    /// `PolicyEngine` from `policy.yaml` pass it here.
335    pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
336        self.policy = Some(checker);
337        self
338    }
339
340    /// Engine-layer policy enforcement gate (MR-722 chassis core).
341    ///
342    /// * If no policy is installed → no-op (returns `Ok(())`).
343    /// * If policy is installed AND actor is None → denial with a
344    ///   clear "no actor for engine-layer policy check" message.
345    ///   Forces server / CLI / SDK callers to thread an actor through
346    ///   when policy is configured — silent bypass via "I forgot the
347    ///   actor" is exactly the footgun this gate is here to prevent.
348    /// * If policy is installed AND actor is Some → call
349    ///   `PolicyChecker::check(action, scope, actor)`; map denial /
350    ///   internal failure to `OmniError::Policy(...)`.
351    pub(crate) fn enforce(
352        &self,
353        action: omnigraph_policy::PolicyAction,
354        scope: &omnigraph_policy::ResourceScope,
355        actor: Option<&str>,
356    ) -> Result<()> {
357        let Some(checker) = self.policy.as_ref() else {
358            return Ok(());
359        };
360        let Some(actor) = actor else {
361            return Err(OmniError::Policy(
362                "no actor for engine-layer policy check (policy is configured but the call site \
363                 didn't thread an actor through — this is almost certainly a bug, not an \
364                 intended bypass)"
365                    .to_string(),
366            ));
367        };
368        checker
369            .check(action, scope, actor)
370            .map_err(|err| OmniError::Policy(err.to_string()))
371    }
372
373    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
374        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
375    }
376
377    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
378        self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
379            .await
380    }
381
382    pub async fn plan_schema_with_options(
383        &self,
384        desired_schema_source: &str,
385        options: SchemaApplyOptions,
386    ) -> Result<SchemaMigrationPlan> {
387        schema_apply::plan_schema(self, desired_schema_source, options).await
388    }
389
390    pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
391        self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
392            .await
393    }
394
395    pub async fn apply_schema_with_options(
396        &self,
397        desired_schema_source: &str,
398        options: SchemaApplyOptions,
399    ) -> Result<SchemaApplyResult> {
400        self.apply_schema_as(desired_schema_source, options, None).await
401    }
402
403    /// Apply a schema migration with an explicit actor for engine-layer
404    /// policy enforcement (MR-722). When a `PolicyChecker` is installed
405    /// via [`Self::with_policy`], this method calls `enforce(SchemaApply,
406    /// Branch("main"), actor)` before any apply work happens. Denial
407    /// returns `OmniError::Policy` and leaves the manifest untouched.
408    ///
409    /// The no-actor variants (`apply_schema`, `apply_schema_with_options`)
410    /// pass `None` here. They work fine without a policy; if a policy IS
411    /// installed and actor is None, enforcement intentionally fails to
412    /// prevent silent-bypass-via-forgetting-the-actor footguns.
413    pub async fn apply_schema_as(
414        &self,
415        desired_schema_source: &str,
416        options: SchemaApplyOptions,
417        actor: Option<&str>,
418    ) -> Result<SchemaApplyResult> {
419        schema_apply::apply_schema(self, desired_schema_source, options, actor).await
420    }
421
422    pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
423        schema_apply::ensure_schema_apply_idle(self, operation).await
424    }
425
426    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
427        schema_apply::ensure_schema_apply_not_locked(self, operation).await
428    }
429
430    pub(crate) fn table_store(&self) -> &TableStore {
431        &self.table_store
432    }
433
434    /// Engine-facing trait surface around `TableStore`.
435    ///
436    /// This is the canonical accessor for newly-written engine code. The
437    /// trait's signatures use opaque `SnapshotHandle` / `StagedHandle`
438    /// instead of leaking `lance::Dataset` /
439    /// `lance::dataset::transaction::Transaction`. Existing call sites
440    /// that still use `db.table_store.X(...)` (the inherent struct
441    /// methods) are migrated incrementally.
442    pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
443        &self.table_store
444    }
445
446    /// Engine-level access to the object-store adapter (S3 / local fs).
447    /// Used by the recovery sidecar protocol — writers in the engine
448    /// call this to write/delete sidecars at `__recovery/{ulid}.json`.
449    pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
450        self.storage.as_ref()
451    }
452
453    /// Per-`(table_key, branch)` writer queues.
454    ///
455    /// Engine-internal writers (mutation finalize, schema_apply,
456    /// branch_merge, ensure_indices, delete_where) and the future MR-870
457    /// recovery reconciler reach the queue manager via this accessor.
458    /// Returns an `Arc` clone so callers can hold the manager across
459    /// `&mut self` engine API boundaries.
460    pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
461        Arc::clone(&self.write_queue)
462    }
463
464    /// Engine-internal access to the merge-exclusive mutex. Held across
465    /// the swap → operate → restore window in `branch_merge_impl` so
466    /// concurrent merges with distinct targets don't corrupt
467    /// `self.coordinator` mid-operation. See the field doc on
468    /// `Omnigraph::merge_exclusive` for the full design rationale.
469    pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
470        Arc::clone(&self.merge_exclusive)
471    }
472
473    /// Engine-level access to the repo's normalized root URI. Used by
474    /// the recovery sidecar protocol to compute `__recovery/` paths.
475    pub(crate) fn root_uri(&self) -> &str {
476        &self.root_uri
477    }
478
479    pub(crate) async fn open_coordinator_for_branch(
480        &self,
481        branch: Option<&str>,
482    ) -> Result<GraphCoordinator> {
483        match branch {
484            Some(branch) => {
485                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
486            }
487            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
488        }
489    }
490
491    pub(crate) async fn swap_coordinator_for_branch(
492        &self,
493        branch: Option<&str>,
494    ) -> Result<GraphCoordinator> {
495        let next = self.open_coordinator_for_branch(branch).await?;
496        let mut coord = self.coordinator.write().await;
497        Ok(std::mem::replace(&mut *coord, next))
498    }
499
500    pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
501        *self.coordinator.write().await = coordinator;
502    }
503
504    pub(crate) async fn resolved_branch_target(
505        &self,
506        branch: Option<&str>,
507    ) -> Result<ResolvedTarget> {
508        self.ensure_schema_state_valid().await?;
509        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
510        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
511        let coord = self.coordinator.read().await;
512        if normalized.as_deref() == coord.current_branch() {
513            let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
514                SnapshotId::synthetic(coord.current_branch(), coord.version())
515            });
516            return Ok(ResolvedTarget {
517                requested,
518                branch: coord.current_branch().map(str::to_string),
519                snapshot_id,
520                snapshot: coord.snapshot(),
521            });
522        }
523        coord.resolve_target(&requested).await
524    }
525
526    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
527        self.resolved_branch_target(branch)
528            .await
529            .map(|resolved| resolved.snapshot)
530    }
531
532    pub(crate) async fn version(&self) -> u64 {
533        self.coordinator.read().await.version()
534    }
535
536    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
537    pub(crate) async fn snapshot(&self) -> Snapshot {
538        self.coordinator.read().await.snapshot()
539    }
540
541    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
542        self.resolved_target(target)
543            .await
544            .map(|resolved| resolved.snapshot)
545    }
546
547    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
548        self.snapshot_of(target)
549            .await
550            .map(|snapshot| snapshot.version())
551    }
552
553    pub async fn resolved_branch_of(
554        &self,
555        target: impl Into<ReadTarget>,
556    ) -> Result<Option<String>> {
557        self.resolved_target(target)
558            .await
559            .map(|resolved| resolved.branch)
560    }
561
562    /// Synchronize this handle's write base to the latest head of the named branch.
563    pub async fn sync_branch(&self, branch: &str) -> Result<()> {
564        self.ensure_schema_state_valid().await?;
565        let branch = normalize_branch_name(branch)?;
566        let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
567        *self.coordinator.write().await = next;
568        self.runtime_cache.invalidate_all().await;
569        Ok(())
570    }
571
572    /// Re-read the handle-local coordinator state from storage AND run
573    /// in-process recovery. Closes the Phase B → Phase C residual (e.g.
574    /// `MutationStaging::finalize` crash mid-publish in a long-running
575    /// server) without restart.
576    ///
577    /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
578    /// recovery sequence, in the same order, with one restriction: the
579    /// manifest-drift sweep runs in `RollForwardOnly` mode (rollback /
580    /// abort cases defer to the next ReadWrite open because
581    /// `Dataset::restore` is unsafe under concurrency). Each step:
582    ///
583    /// 1. `coordinator.refresh()` — re-read manifest.
584    /// 2. `recover_schema_state_files` — complete an in-flight
585    ///    schema_apply's staging→final rename if a SchemaApply sidecar
586    ///    is on disk; idempotent + early-returns when no staging files
587    ///    exist. Required BEFORE manifest-drift recovery so a
588    ///    SchemaApply roll-forward doesn't publish the manifest while
589    ///    the staging files remain unrenamed (which would corrupt the
590    ///    repo: data on new schema, catalog on old).
591    /// 3. `recover_manifest_drift(... RollForwardOnly)` — close the
592    ///    finalize→publisher residual via roll-forward; defer rollback
593    ///    work to next ReadWrite open.
594    /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
595    ///
596    /// Steady state cost: one `list_dir` of `__recovery/` (typically
597    /// returns empty → early return for both passes). No additional
598    /// Lance reads.
599    ///
600    /// Engine-internal callers that already hold an in-flight sidecar
601    /// (e.g. `schema_apply` mid-write) MUST use
602    /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
603    /// avoid the recovery sweep racing their own sidecar.
604    pub async fn refresh(&self) -> Result<()> {
605        // Scope the coord write guard to the recovery section only.
606        // `reload_schema_if_source_changed` (below) acquires
607        // `self.coordinator.read().await` when the on-disk schema source
608        // has drifted from the cached `schema_source`. Tokio's RwLock is
609        // not reentrant, so holding the write across that call deadlocks.
610        // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`.
611        {
612            let mut coord = self.coordinator.write().await;
613            coord.refresh().await?;
614            let schema_state_recovery = recover_schema_state_files(
615                &self.root_uri,
616                Arc::clone(&self.storage),
617                &coord.snapshot(),
618            )
619            .await?;
620            crate::db::manifest::recover_manifest_drift(
621                &self.root_uri,
622                Arc::clone(&self.storage),
623                &mut *coord,
624                crate::db::manifest::RecoveryMode::RollForwardOnly,
625                schema_state_recovery,
626            )
627            .await?;
628        } // ← write guard released before reload's read acquisition
629        self.reload_schema_if_source_changed().await?;
630        self.runtime_cache.invalidate_all().await;
631        Ok(())
632    }
633
634    async fn reload_schema_if_source_changed(&self) -> Result<()> {
635        let schema_path = schema_source_uri(&self.root_uri);
636        let schema_source = self.storage.read_text(&schema_path).await?;
637        if schema_source == *self.schema_source.load_full() {
638            return Ok(());
639        }
640        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
641        let branches = self.coordinator.read().await.branch_list().await?;
642        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
643            &self.root_uri,
644            Arc::clone(&self.storage),
645            &branches,
646            &current_source_ir,
647        )
648        .await?;
649        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
650        fixup_blob_schemas(&mut catalog);
651        self.store_schema_source(schema_source);
652        self.store_catalog(catalog);
653        Ok(())
654    }
655
656    /// Refresh coordinator state and invalidate the runtime cache WITHOUT
657    /// running the recovery sweep. Engine-internal callers that hold an
658    /// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
659    /// internal lease-check refresh) need this variant: running recovery
660    /// here would observe the caller's own sidecar, classify it as
661    /// RolledPastExpected, and roll it forward — racing the caller's
662    /// own publish path.
663    pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
664        self.coordinator.write().await.refresh().await?;
665        self.runtime_cache.invalidate_all().await;
666        Ok(())
667    }
668
669    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
670        self.ensure_schema_state_valid().await?;
671        self.coordinator.read().await.resolve_snapshot_id(branch).await
672    }
673
674    pub(crate) async fn resolved_target(
675        &self,
676        target: impl Into<ReadTarget>,
677    ) -> Result<ResolvedTarget> {
678        self.ensure_schema_state_valid().await?;
679        self.coordinator.read().await.resolve_target(&target.into()).await
680    }
681
682    // ─── Change detection ────────────────────────────────────────────────
683
684    pub async fn diff_between(
685        &self,
686        from: impl Into<ReadTarget>,
687        to: impl Into<ReadTarget>,
688        filter: &crate::changes::ChangeFilter,
689    ) -> Result<crate::changes::ChangeSet> {
690        let from_resolved = self.resolved_target(from).await?;
691        let to_resolved = self.resolved_target(to).await?;
692        crate::changes::diff_snapshots(
693            self.uri(),
694            &from_resolved.snapshot,
695            &to_resolved.snapshot,
696            filter,
697            to_resolved.branch.clone().or(from_resolved.branch.clone()),
698        )
699        .await
700    }
701
702    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
703    /// and creates branch-aware snapshots. Supports cross-branch comparison.
704    pub async fn diff_commits(
705        &self,
706        from_commit_id: &str,
707        to_commit_id: &str,
708        filter: &crate::changes::ChangeFilter,
709    ) -> Result<crate::changes::ChangeSet> {
710        let coord = self.coordinator.read().await;
711        let from_commit = coord.resolve_commit(&SnapshotId::new(from_commit_id)).await?;
712        let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
713        let from_snap = coord
714            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
715                from_commit.graph_commit_id.clone(),
716            )))
717            .await?;
718        let to_snap = coord
719            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
720                to_commit.graph_commit_id.clone(),
721            )))
722            .await?;
723        drop(coord);
724        crate::changes::diff_snapshots(
725            self.uri(),
726            &from_snap.snapshot,
727            &to_snap.snapshot,
728            filter,
729            to_snap.branch.clone().or(from_snap.branch.clone()),
730        )
731        .await
732    }
733
734    pub async fn entity_at_target(
735        &self,
736        target: impl Into<ReadTarget>,
737        table_key: &str,
738        id: &str,
739    ) -> Result<Option<serde_json::Value>> {
740        export::entity_at_target(self, target, table_key, id).await
741    }
742
743    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
744    pub async fn entity_at(
745        &self,
746        table_key: &str,
747        id: &str,
748        version: u64,
749    ) -> Result<Option<serde_json::Value>> {
750        export::entity_at(self, table_key, id, version).await
751    }
752
753    /// Create a Snapshot at any historical manifest version.
754    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
755        self.ensure_schema_state_valid().await?;
756        self.coordinator.read().await.snapshot_at_version(version).await
757    }
758
759    pub async fn export_jsonl(
760        &self,
761        branch: &str,
762        type_names: &[String],
763        table_keys: &[String],
764    ) -> Result<String> {
765        export::export_jsonl(self, branch, type_names, table_keys).await
766    }
767
768    pub async fn export_jsonl_to_writer<W: Write>(
769        &self,
770        branch: &str,
771        type_names: &[String],
772        table_keys: &[String],
773        writer: &mut W,
774    ) -> Result<()> {
775        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
776    }
777
778    // ─── Graph index ──────────────────────────────────────────────────────
779
780    /// Get or build the graph index for the current snapshot.
781    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
782        table_ops::graph_index(self).await
783    }
784
785    pub(crate) async fn graph_index_for_resolved(
786        &self,
787        resolved: &ResolvedTarget,
788    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
789        table_ops::graph_index_for_resolved(self, resolved).await
790    }
791
792    /// Ensure BTree scalar indices exist on key columns.
793    /// Idempotent — Lance skips if index already exists.
794    ///
795    /// Opens sub-tables at their latest version (not snapshot-pinned) because
796    /// indices must be created on the current head. Any version drift from the
797    /// snapshot is expected and logged. The resulting versions are committed
798    /// back to the manifest.
799    ///
800    /// On named branches, indexing preserves lazy branching:
801    /// unbranched subtables keep inheriting `main`, while subtables inherited
802    /// from an ancestor branch are first forked into the active branch before
803    /// their index metadata is updated.
804    pub async fn ensure_indices(&self) -> Result<()> {
805        table_ops::ensure_indices(self).await
806    }
807
808    pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
809        table_ops::ensure_indices_on(self, branch).await
810    }
811
812    #[cfg(feature = "failpoints")]
813    #[doc(hidden)]
814    pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
815        &mut self,
816        branch: &str,
817        table_key: &str,
818        table_branch: Option<&str>,
819    ) -> Result<u64> {
820        table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
821            self,
822            branch,
823            table_key,
824            table_branch,
825        )
826        .await
827    }
828
829    /// Compact small Lance fragments into fewer larger ones across every
830    /// node + edge table on `main`. See [`optimize`] for details.
831    pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
832        optimize::optimize_all_tables(self).await
833    }
834
835    /// Remove Lance manifests (and the fragments they uniquely own) per the
836    /// given [`optimize::CleanupPolicyOptions`]. Destructive to version
837    /// history. See [`optimize`] for details.
838    pub async fn cleanup(
839        &mut self,
840        options: optimize::CleanupPolicyOptions,
841    ) -> Result<Vec<optimize::TableCleanupStats>> {
842        optimize::cleanup_all_tables(self, options).await
843    }
844
845    /// Read a blob from a node by its string ID and property name.
846    ///
847    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
848    /// and metadata accessors (`size()`, `kind()`, `uri()`).
849    ///
850    /// ```ignore
851    /// let blob = db.read_blob("Document", "readme", "content").await?;
852    /// let bytes = blob.read().await?;
853    /// ```
854    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
855        self.ensure_schema_state_valid().await?;
856        let catalog = self.catalog();
857        let node_type = catalog
858            .node_types
859            .get(type_name)
860            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
861        if !node_type.blob_properties.contains(property) {
862            return Err(OmniError::manifest(format!(
863                "property '{}' on type '{}' is not a Blob",
864                property, type_name
865            )));
866        }
867
868        let snapshot = self.snapshot().await;
869        let table_key = format!("node:{}", type_name);
870        let ds = snapshot.open(&table_key).await?;
871
872        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
873        let row_id = self
874            .table_store
875            .first_row_id_for_filter(&ds, &filter_sql)
876            .await?
877            .ok_or_else(|| {
878                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
879            })?;
880
881        // Use take_blobs to get the BlobFile handle
882        let ds = Arc::new(ds);
883        let mut blobs = ds
884            .take_blobs(&[row_id], property)
885            .await
886            .map_err(|e| OmniError::Lance(e.to_string()))?;
887
888        blobs.pop().ok_or_else(|| {
889            OmniError::manifest(format!(
890                "blob '{}' on {} '{}' returned no data",
891                property, type_name, id
892            ))
893        })
894    }
895
896    pub(crate) async fn active_branch(&self) -> Option<String> {
897        self.coordinator.read().await.current_branch().map(str::to_string)
898    }
899
900    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
901        let descendants = self.coordinator.read().await.branch_descendants(branch).await?;
902        if let Some(descendant) = descendants.first() {
903            return Err(OmniError::manifest_conflict(format!(
904                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
905                branch, descendant
906            )));
907        }
908
909        for other_branch in branches
910            .iter()
911            .filter(|candidate| candidate.as_str() != branch)
912        {
913            let snapshot = self
914                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
915                .await?;
916            if snapshot
917                .entries()
918                .any(|entry| entry.table_branch.as_deref() == Some(branch))
919            {
920                return Err(OmniError::manifest_conflict(format!(
921                    "cannot delete branch '{}' because branch '{}' still depends on it",
922                    branch, other_branch
923                )));
924            }
925        }
926
927        Ok(())
928    }
929
930    async fn cleanup_deleted_branch_tables(
931        &self,
932        branch: &str,
933        owned_tables: &[(String, String)],
934    ) -> Result<()> {
935        let mut seen_paths = HashSet::new();
936        let mut cleanup_targets = owned_tables
937            .iter()
938            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
939            .cloned()
940            .collect::<Vec<_>>();
941        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
942
943        for (table_key, table_path) in cleanup_targets {
944            let dataset_uri = self.table_store.dataset_uri(&table_path);
945            if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
946                return Err(OmniError::manifest_internal(format!(
947                    "branch '{}' was deleted but cleanup failed for {}: {}",
948                    branch, table_key, err
949                )));
950            }
951        }
952
953        Ok(())
954    }
955
956    async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
957        let active = self.coordinator.read().await.current_branch().map(str::to_string);
958        if active.as_deref() == Some(branch) {
959            return Err(OmniError::manifest_conflict(format!(
960                "cannot delete currently active branch '{}'",
961                branch
962            )));
963        }
964
965        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
966        let owned_tables = branch_snapshot
967            .entries()
968            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
969            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
970            .collect::<Vec<_>>();
971
972        self.coordinator.write().await.branch_delete(branch).await?;
973        self.cleanup_deleted_branch_tables(branch, &owned_tables)
974            .await
975    }
976
977    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
978        normalize_branch_name(branch)
979    }
980
981    pub(crate) async fn head_commit_id_for_branch(
982        &self,
983        branch: Option<&str>,
984    ) -> Result<Option<String>> {
985        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
986        coordinator.ensure_commit_graph_initialized().await?;
987        coordinator
988            .head_commit_id()
989            .await
990            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
991    }
992
993    pub async fn branch_create(&self, name: &str) -> Result<()> {
994        self.branch_create_as(name, None).await
995    }
996
997    /// Create a branch from the coordinator's currently-open snapshot,
998    /// with an explicit actor for engine-layer policy enforcement
999    /// (MR-722 fan-out). Scope is `TargetBranch(name)` — symmetric with
1000    /// `branch_delete_as`: the branch being acted upon is the target.
1001    /// Cedar rules using `target_branch_scope: protected` therefore see
1002    /// the new-branch name and can deny e.g. creating any branch named
1003    /// `main` from a non-privileged actor.
1004    pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1005        self.enforce(
1006            omnigraph_policy::PolicyAction::BranchCreate,
1007            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1008            actor,
1009        )?;
1010        self.ensure_schema_state_valid().await?;
1011        self.ensure_schema_apply_idle("branch_create").await?;
1012        ensure_public_branch_ref(name, "branch_create")?;
1013        self.coordinator.write().await.branch_create(name).await
1014    }
1015
1016    pub async fn branch_create_from(
1017        &self,
1018        from: impl Into<ReadTarget>,
1019        name: &str,
1020    ) -> Result<()> {
1021        self.branch_create_from_as(from, name, None).await
1022    }
1023
1024    /// Create a branch from a specific source branch with an explicit
1025    /// actor for engine-layer policy enforcement (MR-722 fan-out).
1026    ///
1027    /// Scope is `BranchTransition { source, target }` — matches the
1028    /// HTTP-layer convention at `server_branch_create`
1029    /// (branch=Some(from), target_branch=Some(name)), so engine and
1030    /// HTTP fire the same Cedar decision. Pinned-snapshot sources
1031    /// (which aren't a branch ref) materialize as the sentinel
1032    /// `<snapshot>` for the policy check; Cedar rules using
1033    /// `branch_scope: any` still match, rules pinning a specific
1034    /// source branch correctly do not.
1035    pub async fn branch_create_from_as(
1036        &self,
1037        from: impl Into<ReadTarget>,
1038        name: &str,
1039        actor: Option<&str>,
1040    ) -> Result<()> {
1041        let target = from.into();
1042        let source_branch = match &target {
1043            ReadTarget::Branch(b) => b.clone(),
1044            _ => "<snapshot>".to_string(),
1045        };
1046        self.enforce(
1047            omnigraph_policy::PolicyAction::BranchCreate,
1048            &omnigraph_policy::ResourceScope::BranchTransition {
1049                source: source_branch,
1050                target: name.to_string(),
1051            },
1052            actor,
1053        )?;
1054        self.ensure_schema_apply_idle("branch_create_from").await?;
1055        self.branch_create_from_impl(target, name, false).await
1056    }
1057
1058    async fn branch_create_from_impl(
1059        &self,
1060        from: impl Into<ReadTarget>,
1061        name: &str,
1062        allow_internal_refs: bool,
1063    ) -> Result<()> {
1064        let target = from.into();
1065        let ReadTarget::Branch(branch_name) = target else {
1066            return Err(OmniError::manifest(
1067                "branch creation from pinned snapshots is not supported yet".to_string(),
1068            ));
1069        };
1070        if !allow_internal_refs {
1071            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1072            ensure_public_branch_ref(name, "branch_create_from")?;
1073        }
1074        let branch = normalize_branch_name(&branch_name)?;
1075        // Operate on a freshly-opened source coordinator that's owned locally
1076        // — never touch `self.coordinator`. The pre-fix implementation used
1077        // `swap_coordinator_for_branch` + operate + `restore_coordinator` as
1078        // three separate `coordinator.write().await` acquisitions; under
1079        // `&self` concurrency, a second `branch_create_from` could swap
1080        // self.coordinator between this caller's swap and operate steps,
1081        // making the operate run against the wrong source branch and
1082        // forking off the wrong HEAD. Pinned by
1083        // `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`
1084        // in `crates/omnigraph-server/tests/server.rs`.
1085        //
1086        // `branch_create` mutates only the local coord's commit-graph cache;
1087        // the manifest write is durable on disk regardless of which
1088        // coord-handle issued it. Discarding `source_coord` after the call
1089        // is the right shape — the new branch is reachable from any
1090        // subsequent open of any coord.
1091        let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1092        source_coord.branch_create(name).await
1093    }
1094
1095    pub async fn branch_list(&self) -> Result<Vec<String>> {
1096        self.ensure_schema_state_valid().await?;
1097        self.coordinator.read().await.branch_list().await
1098    }
1099
1100    pub async fn branch_delete(&self, name: &str) -> Result<()> {
1101        self.branch_delete_as(name, None).await
1102    }
1103
1104    /// Delete a branch with an explicit actor for engine-layer policy
1105    /// enforcement (MR-722 fan-out). Scope is `TargetBranch(name)` —
1106    /// matches the HTTP-layer convention at `server_branch_delete`
1107    /// (branch=None, target_branch=Some(name)). Cedar rules using
1108    /// `target_branch_scope: protected` therefore correctly gate
1109    /// deletion of protected branches (e.g. deny BranchDelete against
1110    /// `main`).
1111    pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1112        self.enforce(
1113            omnigraph_policy::PolicyAction::BranchDelete,
1114            &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1115            actor,
1116        )?;
1117        self.ensure_schema_state_valid().await?;
1118        self.ensure_schema_apply_idle("branch_delete").await?;
1119        ensure_public_branch_ref(name, "branch_delete")?;
1120        self.refresh().await?;
1121        let branch = normalize_branch_name(name)?
1122            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1123        let branches = self.coordinator.read().await.branch_list().await?;
1124        if !branches.iter().any(|candidate| candidate == &branch) {
1125            return Err(OmniError::manifest_not_found(format!(
1126                "branch '{}' not found",
1127                branch
1128            )));
1129        }
1130
1131        self.ensure_branch_delete_safe(&branch, &branches).await?;
1132        self.delete_branch_storage_only(&branch).await
1133    }
1134
1135    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1136        self.ensure_schema_state_valid().await?;
1137        self.coordinator.read().await
1138            .resolve_commit(&SnapshotId::new(commit_id))
1139            .await
1140    }
1141
1142    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1143        self.ensure_schema_state_valid().await?;
1144        let branch = match branch {
1145            Some(branch) => normalize_branch_name(branch)?,
1146            None => None,
1147        };
1148        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1149        coordinator.list_commits().await
1150    }
1151
1152    /// Open a sub-table for mutation with version-drift guard.
1153    ///
1154    /// Checks that the dataset's current version matches the snapshot-pinned
1155    /// version. If another writer has advanced the version, returns an error
1156    /// prompting the caller to refresh and retry (optimistic concurrency).
1157    pub(crate) async fn open_for_mutation(
1158        &self,
1159        table_key: &str,
1160        op_kind: crate::db::MutationOpKind,
1161    ) -> Result<(Dataset, String, Option<String>)> {
1162        table_ops::open_for_mutation(self, table_key, op_kind).await
1163    }
1164
1165    pub(crate) async fn open_for_mutation_on_branch(
1166        &self,
1167        branch: Option<&str>,
1168        table_key: &str,
1169        op_kind: crate::db::MutationOpKind,
1170    ) -> Result<(Dataset, String, Option<String>)> {
1171        table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1172    }
1173
1174    pub(crate) async fn fork_dataset_from_entry_state(
1175        &self,
1176        table_key: &str,
1177        full_path: &str,
1178        source_branch: Option<&str>,
1179        source_version: u64,
1180        active_branch: &str,
1181    ) -> Result<Dataset> {
1182        table_ops::fork_dataset_from_entry_state(
1183            self,
1184            table_key,
1185            full_path,
1186            source_branch,
1187            source_version,
1188            active_branch,
1189        )
1190        .await
1191    }
1192
1193    pub(crate) async fn reopen_for_mutation(
1194        &self,
1195        table_key: &str,
1196        full_path: &str,
1197        table_branch: Option<&str>,
1198        expected_version: u64,
1199        op_kind: crate::db::MutationOpKind,
1200    ) -> Result<Dataset> {
1201        table_ops::reopen_for_mutation(
1202            self,
1203            table_key,
1204            full_path,
1205            table_branch,
1206            expected_version,
1207            op_kind,
1208        )
1209        .await
1210    }
1211
1212    pub(crate) async fn open_dataset_at_state(
1213        &self,
1214        table_path: &str,
1215        table_branch: Option<&str>,
1216        table_version: u64,
1217    ) -> Result<Dataset> {
1218        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1219    }
1220
1221    pub(crate) async fn build_indices_on_dataset(
1222        &self,
1223        table_key: &str,
1224        ds: &mut Dataset,
1225    ) -> Result<()> {
1226        table_ops::build_indices_on_dataset(self, table_key, ds).await
1227    }
1228
1229    pub(crate) async fn build_indices_on_dataset_for_catalog(
1230        &self,
1231        catalog: &Catalog,
1232        table_key: &str,
1233        ds: &mut Dataset,
1234    ) -> Result<()> {
1235        table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1236    }
1237
1238    // Used only by in-tree tests (`#[cfg(test)]`); the runtime path now
1239    // uses `commit_updates_on_branch_with_expected` exclusively.
1240    #[cfg(test)]
1241    pub(crate) async fn commit_updates(
1242        &mut self,
1243        updates: &[crate::db::SubTableUpdate],
1244    ) -> Result<u64> {
1245        table_ops::commit_updates(self, updates).await
1246    }
1247
1248    pub(crate) async fn commit_manifest_updates(
1249        &self,
1250        updates: &[crate::db::SubTableUpdate],
1251    ) -> Result<u64> {
1252        table_ops::commit_manifest_updates(self, updates).await
1253    }
1254
1255    pub(crate) async fn record_merge_commit(
1256        &self,
1257        manifest_version: u64,
1258        parent_commit_id: &str,
1259        merged_parent_commit_id: &str,
1260        actor_id: Option<&str>,
1261    ) -> Result<String> {
1262        table_ops::record_merge_commit(
1263            self,
1264            manifest_version,
1265            parent_commit_id,
1266            merged_parent_commit_id,
1267            actor_id,
1268        )
1269        .await
1270    }
1271
1272    pub(crate) async fn commit_updates_on_branch_with_expected(
1273        &self,
1274        branch: Option<&str>,
1275        updates: &[crate::db::SubTableUpdate],
1276        expected_table_versions: &std::collections::HashMap<String, u64>,
1277        actor_id: Option<&str>,
1278    ) -> Result<u64> {
1279        table_ops::commit_updates_on_branch_with_expected(
1280            self,
1281            branch,
1282            updates,
1283            expected_table_versions,
1284            actor_id,
1285        )
1286        .await
1287    }
1288
1289    pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1290        table_ops::ensure_commit_graph_initialized(self).await
1291    }
1292
1293    /// Invalidate the cached graph index. Called after edge mutations.
1294    pub(crate) async fn invalidate_graph_index(&self) {
1295        table_ops::invalidate_graph_index(self).await
1296    }
1297}
1298
1299pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1300    let branch = branch.trim();
1301    if branch.is_empty() {
1302        return Err(OmniError::manifest(
1303            "branch name cannot be empty".to_string(),
1304        ));
1305    }
1306    if branch == "main" {
1307        return Ok(None);
1308    }
1309    Ok(Some(branch.to_string()))
1310}
1311
1312pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1313    if super::is_internal_run_branch(branch) {
1314        return Err(OmniError::manifest(format!(
1315            "{} does not allow internal run ref '{}'",
1316            operation, branch
1317        )));
1318    }
1319    if is_internal_system_branch(branch) {
1320        return Err(OmniError::manifest(format!(
1321            "{} does not allow internal system ref '{}'",
1322            operation, branch
1323        )));
1324    }
1325    Ok(())
1326}
1327
1328fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1329    if batches.is_empty() {
1330        return Ok(RecordBatch::new_empty(schema));
1331    }
1332    if batches.len() == 1 {
1333        return Ok(batches.into_iter().next().unwrap());
1334    }
1335    let batch_schema = batches[0].schema();
1336    arrow_select::concat::concat_batches(&batch_schema, &batches)
1337        .map_err(|e| OmniError::Lance(e.to_string()))
1338}
1339
1340fn blob_properties_for_table_key<'a>(
1341    catalog: &'a Catalog,
1342    table_key: &str,
1343) -> Result<&'a std::collections::HashSet<String>> {
1344    if let Some(type_name) = table_key.strip_prefix("node:") {
1345        return catalog
1346            .node_types
1347            .get(type_name)
1348            .map(|node_type| &node_type.blob_properties)
1349            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1350    }
1351    if let Some(type_name) = table_key.strip_prefix("edge:") {
1352        return catalog
1353            .edge_types
1354            .get(type_name)
1355            .map(|edge_type| &edge_type.blob_properties)
1356            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1357    }
1358    Err(OmniError::manifest(format!(
1359        "invalid table key '{}'",
1360        table_key
1361    )))
1362}
1363
1364fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1365    if descriptions.is_null(row) {
1366        return Ok(true);
1367    }
1368
1369    let kind = descriptions
1370        .column_by_name("kind")
1371        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1372        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1373        .or_else(|| {
1374            descriptions
1375                .column_by_name("kind")
1376                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1377                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1378        });
1379    let position = descriptions
1380        .column_by_name("position")
1381        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1382        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1383    let size = descriptions
1384        .column_by_name("size")
1385        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1386        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1387    let blob_uri = descriptions
1388        .column_by_name("blob_uri")
1389        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1390        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1391
1392    let Some(kind) = kind else {
1393        return Ok(true);
1394    };
1395    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1396    if kind != BlobKind::Inline {
1397        return Ok(false);
1398    }
1399
1400    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1401}
1402
1403/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1404///
1405/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1406/// `DataType::LargeBinary` as a placeholder. This function replaces those
1407/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1408fn fixup_blob_schemas(catalog: &mut Catalog) {
1409    for node_type in catalog.node_types.values_mut() {
1410        if node_type.blob_properties.is_empty() {
1411            continue;
1412        }
1413        let fields: Vec<Field> = node_type
1414            .arrow_schema
1415            .fields()
1416            .iter()
1417            .map(|f| {
1418                if node_type.blob_properties.contains(f.name()) {
1419                    blob_field(f.name(), f.is_nullable())
1420                } else {
1421                    f.as_ref().clone()
1422                }
1423            })
1424            .collect();
1425        node_type.arrow_schema = Arc::new(Schema::new(fields));
1426    }
1427    for edge_type in catalog.edge_types.values_mut() {
1428        if edge_type.blob_properties.is_empty() {
1429            continue;
1430        }
1431        let fields: Vec<Field> = edge_type
1432            .arrow_schema
1433            .fields()
1434            .iter()
1435            .map(|f| {
1436                if edge_type.blob_properties.contains(f.name()) {
1437                    blob_field(f.name(), f.is_nullable())
1438                } else {
1439                    f.as_ref().clone()
1440                }
1441            })
1442            .collect();
1443        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1444    }
1445}
1446
1447fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1448    let schema_ast = parse_schema(schema_source)?;
1449    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1450}
1451
1452fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1453    match type_kind {
1454        SchemaTypeKind::Node => format!("node:{}", name),
1455        SchemaTypeKind::Edge => format!("edge:{}", name),
1456        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1457    }
1458}
1459
1460fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1461    if let Some(type_name) = table_key.strip_prefix("node:") {
1462        let node_type: &NodeType = catalog
1463            .node_types
1464            .get(type_name)
1465            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1466        return Ok(node_type.arrow_schema.clone());
1467    }
1468    if let Some(type_name) = table_key.strip_prefix("edge:") {
1469        let edge_type: &EdgeType = catalog
1470            .edge_types
1471            .get(type_name)
1472            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1473        return Ok(edge_type.arrow_schema.clone());
1474    }
1475    Err(OmniError::manifest(format!(
1476        "invalid table key '{}'",
1477        table_key
1478    )))
1479}
1480
1481fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1482    let mut obj = serde_json::Map::new();
1483    for (i, field) in batch.schema().fields().iter().enumerate() {
1484        obj.insert(
1485            field.name().clone(),
1486            json_value_from_array(batch.column(i).as_ref(), row)?,
1487        );
1488    }
1489    Ok(serde_json::Value::Object(obj))
1490}
1491
1492fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1493    if array.is_null(row) {
1494        return Ok(serde_json::Value::Null);
1495    }
1496
1497    match array.data_type() {
1498        DataType::Utf8 => Ok(serde_json::Value::String(
1499            array
1500                .as_any()
1501                .downcast_ref::<StringArray>()
1502                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1503                .value(row)
1504                .to_string(),
1505        )),
1506        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1507            array
1508                .as_any()
1509                .downcast_ref::<LargeStringArray>()
1510                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1511                .value(row)
1512                .to_string(),
1513        )),
1514        DataType::Boolean => Ok(serde_json::Value::Bool(
1515            array
1516                .as_any()
1517                .downcast_ref::<BooleanArray>()
1518                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1519                .value(row),
1520        )),
1521        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1522            array
1523                .as_any()
1524                .downcast_ref::<Int32Array>()
1525                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1526                .value(row),
1527        ))),
1528        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1529            array
1530                .as_any()
1531                .downcast_ref::<Int64Array>()
1532                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1533                .value(row),
1534        ))),
1535        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1536            array
1537                .as_any()
1538                .downcast_ref::<UInt32Array>()
1539                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1540                .value(row),
1541        ))),
1542        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1543            array
1544                .as_any()
1545                .downcast_ref::<UInt64Array>()
1546                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1547                .value(row),
1548        ))),
1549        DataType::Float32 => {
1550            let value = array
1551                .as_any()
1552                .downcast_ref::<Float32Array>()
1553                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1554                .value(row) as f64;
1555            Ok(serde_json::Value::Number(
1556                serde_json::Number::from_f64(value).ok_or_else(|| {
1557                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1558                })?,
1559            ))
1560        }
1561        DataType::Float64 => {
1562            let value = array
1563                .as_any()
1564                .downcast_ref::<Float64Array>()
1565                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1566                .value(row);
1567            Ok(serde_json::Value::Number(
1568                serde_json::Number::from_f64(value).ok_or_else(|| {
1569                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1570                })?,
1571            ))
1572        }
1573        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1574            array
1575                .as_any()
1576                .downcast_ref::<Date32Array>()
1577                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1578                .value(row),
1579        ))),
1580        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1581            &base64::engine::general_purpose::STANDARD,
1582            array
1583                .as_any()
1584                .downcast_ref::<BinaryArray>()
1585                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1586                .value(row),
1587        ))),
1588        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1589            &base64::engine::general_purpose::STANDARD,
1590            array
1591                .as_any()
1592                .downcast_ref::<LargeBinaryArray>()
1593                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1594                .value(row),
1595        ))),
1596        DataType::List(_) => {
1597            let list = array
1598                .as_any()
1599                .downcast_ref::<ListArray>()
1600                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1601            let values = list.value(row);
1602            let mut out = Vec::with_capacity(values.len());
1603            for idx in 0..values.len() {
1604                out.push(json_value_from_array(values.as_ref(), idx)?);
1605            }
1606            Ok(serde_json::Value::Array(out))
1607        }
1608        DataType::LargeList(_) => {
1609            let list = array
1610                .as_any()
1611                .downcast_ref::<LargeListArray>()
1612                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1613            let values = list.value(row);
1614            let mut out = Vec::with_capacity(values.len());
1615            for idx in 0..values.len() {
1616                out.push(json_value_from_array(values.as_ref(), idx)?);
1617            }
1618            Ok(serde_json::Value::Array(out))
1619        }
1620        DataType::FixedSizeList(_, _) => {
1621            let list = array
1622                .as_any()
1623                .downcast_ref::<FixedSizeListArray>()
1624                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1625            let values = list.value(row);
1626            let mut out = Vec::with_capacity(values.len());
1627            for idx in 0..values.len() {
1628                out.push(json_value_from_array(values.as_ref(), idx)?);
1629            }
1630            Ok(serde_json::Value::Array(out))
1631        }
1632        DataType::Struct(fields) => {
1633            let struct_array = array
1634                .as_any()
1635                .downcast_ref::<StructArray>()
1636                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1637            let mut obj = serde_json::Map::new();
1638            for (field_idx, field) in fields.iter().enumerate() {
1639                obj.insert(
1640                    field.name().clone(),
1641                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1642                );
1643            }
1644            Ok(serde_json::Value::Object(obj))
1645        }
1646        _ => {
1647            let value = arrow_cast::display::array_value_to_string(array, row)
1648                .map_err(|e| OmniError::Lance(e.to_string()))?;
1649            Ok(serde_json::Value::String(value))
1650        }
1651    }
1652}
1653
1654#[cfg(test)]
1655mod tests {
1656    use super::*;
1657    use crate::db::is_internal_run_branch;
1658    use crate::db::manifest::ManifestCoordinator;
1659    use async_trait::async_trait;
1660    use serde_json::Value;
1661    use std::sync::Mutex;
1662
1663    use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1664
1665    const TEST_SCHEMA: &str = r#"
1666node Person {
1667    name: String @key
1668    age: I32?
1669}
1670node Company {
1671    name: String @key
1672}
1673edge Knows: Person -> Person {
1674    since: Date?
1675}
1676edge WorksAt: Person -> Company
1677"#;
1678
1679    #[derive(Debug, Default)]
1680    struct RecordingStorageAdapter {
1681        inner: LocalStorageAdapter,
1682        reads: Mutex<Vec<String>>,
1683        writes: Mutex<Vec<String>>,
1684        exists_checks: Mutex<Vec<String>>,
1685        renames: Mutex<Vec<(String, String)>>,
1686        deletes: Mutex<Vec<String>>,
1687    }
1688
1689    impl RecordingStorageAdapter {
1690        fn reads(&self) -> Vec<String> {
1691            self.reads.lock().unwrap().clone()
1692        }
1693
1694        fn writes(&self) -> Vec<String> {
1695            self.writes.lock().unwrap().clone()
1696        }
1697
1698        fn exists_checks(&self) -> Vec<String> {
1699            self.exists_checks.lock().unwrap().clone()
1700        }
1701    }
1702
1703    #[async_trait]
1704    impl StorageAdapter for RecordingStorageAdapter {
1705        async fn read_text(&self, uri: &str) -> Result<String> {
1706            self.reads.lock().unwrap().push(uri.to_string());
1707            self.inner.read_text(uri).await
1708        }
1709
1710        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1711            self.writes.lock().unwrap().push(uri.to_string());
1712            self.inner.write_text(uri, contents).await
1713        }
1714
1715        async fn exists(&self, uri: &str) -> Result<bool> {
1716            self.exists_checks.lock().unwrap().push(uri.to_string());
1717            self.inner.exists(uri).await
1718        }
1719
1720        async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1721            self.renames
1722                .lock()
1723                .unwrap()
1724                .push((from_uri.to_string(), to_uri.to_string()));
1725            self.inner.rename_text(from_uri, to_uri).await
1726        }
1727
1728        async fn delete(&self, uri: &str) -> Result<()> {
1729            self.deletes.lock().unwrap().push(uri.to_string());
1730            self.inner.delete(uri).await
1731        }
1732
1733        async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1734            self.inner.list_dir(dir_uri).await
1735        }
1736    }
1737
1738    #[tokio::test]
1739    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1740        let dir = tempfile::tempdir().unwrap();
1741        let uri = dir.path().to_str().unwrap();
1742        let adapter = Arc::new(RecordingStorageAdapter::default());
1743
1744        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1745            .await
1746            .unwrap();
1747        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1748        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1749        assert!(
1750            adapter
1751                .writes()
1752                .contains(&join_uri(uri, "__schema_state.json"))
1753        );
1754
1755        Omnigraph::open_with_storage(uri, adapter.clone())
1756            .await
1757            .unwrap();
1758        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1759        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1760        assert!(
1761            adapter
1762                .reads()
1763                .contains(&join_uri(uri, "__schema_state.json"))
1764        );
1765        assert!(
1766            adapter
1767                .exists_checks()
1768                .contains(&join_uri(uri, "_schema.ir.json"))
1769        );
1770        assert!(
1771            adapter
1772                .exists_checks()
1773                .contains(&join_uri(uri, "__schema_state.json"))
1774        );
1775        assert!(
1776            adapter
1777                .exists_checks()
1778                .contains(&join_uri(uri, "_graph_commits.lance"))
1779        );
1780    }
1781
1782    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1783        let snapshot = db.snapshot().await;
1784        let ds = snapshot.open(table_key).await.unwrap();
1785        let batches = db.table_store().scan_batches(&ds).await.unwrap();
1786        batches
1787            .into_iter()
1788            .flat_map(|batch| {
1789                (0..batch.num_rows())
1790                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1791                    .collect::<Vec<_>>()
1792            })
1793            .collect()
1794    }
1795
1796    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1797        let (mut ds, full_path, table_branch) = db
1798            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
1799            .await
1800            .unwrap();
1801        let schema: Arc<Schema> = Arc::new(ds.schema().into());
1802        let columns: Vec<Arc<dyn Array>> = schema
1803            .fields()
1804            .iter()
1805            .map(|field| match field.name().as_str() {
1806                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1807                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1808                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1809                _ => new_null_array(field.data_type(), 1),
1810            })
1811            .collect();
1812        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1813        let state = db
1814            .table_store()
1815            .append_batch(&full_path, &mut ds, batch)
1816            .await
1817            .unwrap();
1818        db.commit_updates(&[crate::db::SubTableUpdate {
1819            table_key: "node:Person".to_string(),
1820            table_version: state.version,
1821            table_branch,
1822            row_count: state.row_count,
1823            version_metadata: state.version_metadata,
1824        }])
1825        .await
1826        .unwrap();
1827    }
1828
1829    #[tokio::test]
1830    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1831        let dir = tempfile::tempdir().unwrap();
1832        let uri = dir.path().to_str().unwrap();
1833        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1834        seed_person_row(&mut db, "Alice", Some(30)).await;
1835
1836        let desired = TEST_SCHEMA.replace(
1837            "    age: I32?\n}",
1838            "    age: I32?\n    nickname: String?\n}",
1839        );
1840        let result = db.apply_schema(&desired).await.unwrap();
1841        assert!(result.applied);
1842
1843        let reopened = Omnigraph::open(uri).await.unwrap();
1844        let rows = table_rows_json(&reopened, "node:Person").await;
1845        assert_eq!(rows.len(), 1);
1846        assert_eq!(rows[0]["name"], "Alice");
1847        assert_eq!(rows[0]["age"], 30);
1848        assert!(rows[0]["nickname"].is_null());
1849        assert!(
1850            reopened.catalog().node_types["Person"]
1851                .properties
1852                .contains_key("nickname")
1853        );
1854        assert!(dir.path().join("_schema.pg").exists());
1855    }
1856
1857    #[tokio::test]
1858    async fn test_apply_schema_renames_property_and_preserves_values() {
1859        let dir = tempfile::tempdir().unwrap();
1860        let uri = dir.path().to_str().unwrap();
1861        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1862        seed_person_row(&mut db, "Alice", Some(30)).await;
1863
1864        let desired = TEST_SCHEMA.replace(
1865            "    age: I32?\n}",
1866            "    years: I32? @rename_from(\"age\")\n}",
1867        );
1868        db.apply_schema(&desired).await.unwrap();
1869
1870        let reopened = Omnigraph::open(uri).await.unwrap();
1871        let rows = table_rows_json(&reopened, "node:Person").await;
1872        assert_eq!(rows[0]["name"], "Alice");
1873        assert_eq!(rows[0]["years"], 30);
1874        assert!(rows[0].get("age").is_none());
1875    }
1876
1877    #[tokio::test]
1878    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1879        let dir = tempfile::tempdir().unwrap();
1880        let uri = dir.path().to_str().unwrap();
1881        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1882        seed_person_row(&mut db, "Alice", Some(30)).await;
1883        let before_version = db.snapshot().await.version();
1884
1885        let desired = TEST_SCHEMA
1886            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1887            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1888            .replace(
1889                "edge WorksAt: Person -> Company",
1890                "edge WorksAt: Human -> Company",
1891            );
1892        db.apply_schema(&desired).await.unwrap();
1893
1894        let head = db.snapshot().await;
1895        assert!(head.entry("node:Person").is_none());
1896        assert!(head.entry("node:Human").is_some());
1897        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1898            .await
1899            .unwrap();
1900        assert!(historical.entry("node:Person").is_some());
1901        assert!(historical.entry("node:Human").is_none());
1902    }
1903
1904    #[tokio::test]
1905    async fn test_apply_schema_succeeds_after_load() {
1906        // Historical: schema apply used to be blocked by leftover
1907        // `__run__` branches. A defense-in-depth filter now skips
1908        // internal system branches, and run branches were made
1909        // ephemeral on every terminal state — so in practice no
1910        // `__run__` branch survives publish. The filter still guards
1911        // the invariant.
1912        let dir = tempfile::tempdir().unwrap();
1913        let uri = dir.path().to_str().unwrap();
1914        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1915
1916        crate::loader::load_jsonl(
1917            &mut db,
1918            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
1919            crate::loader::LoadMode::Overwrite,
1920        )
1921        .await
1922        .unwrap();
1923
1924        let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
1925        assert!(
1926            !all_branches.iter().any(|b| is_internal_run_branch(b)),
1927            "run branch should be deleted after publish, got: {:?}",
1928            all_branches
1929        );
1930
1931        let desired = TEST_SCHEMA.replace(
1932            "    age: I32?\n}",
1933            "    age: I32?\n    nickname: String?\n}",
1934        );
1935        let result = db.apply_schema(&desired).await.unwrap();
1936        assert!(result.applied, "schema apply should have applied");
1937    }
1938
1939    #[tokio::test]
1940    async fn test_apply_schema_adds_index_for_existing_property() {
1941        let dir = tempfile::tempdir().unwrap();
1942        let uri = dir.path().to_str().unwrap();
1943        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1944
1945        let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1946        db.apply_schema(&desired).await.unwrap();
1947
1948        let snapshot = db.snapshot().await;
1949        let ds = snapshot.open("node:Person").await.unwrap();
1950        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1951    }
1952
1953    #[tokio::test]
1954    async fn test_apply_schema_rewrite_preserves_existing_indices() {
1955        let dir = tempfile::tempdir().unwrap();
1956        let uri = dir.path().to_str().unwrap();
1957        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1958        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1959        seed_person_row(&mut db, "Alice", Some(30)).await;
1960
1961        let desired = initial_schema.replace(
1962            "    age: I32?\n}",
1963            "    age: I32?\n    nickname: String?\n}",
1964        );
1965        db.apply_schema(&desired).await.unwrap();
1966
1967        let snapshot = db.snapshot().await;
1968        let ds = snapshot.open("node:Person").await.unwrap();
1969        assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1970        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1971    }
1972
1973    #[tokio::test]
1974    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1975        let dir = tempfile::tempdir().unwrap();
1976        let uri = dir.path().to_str().unwrap();
1977        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1978        let mut db = db;
1979        db.coordinator
1980            .write()
1981            .await
1982            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1983            .await
1984            .unwrap();
1985
1986        let err = db
1987            .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
1988            .await
1989            .unwrap_err();
1990        assert!(
1991            err.to_string()
1992                .contains("write is unavailable while schema apply is in progress")
1993        );
1994    }
1995
1996    #[tokio::test]
1997    async fn test_commit_updates_rejects_while_schema_apply_locked() {
1998        let dir = tempfile::tempdir().unwrap();
1999        let uri = dir.path().to_str().unwrap();
2000        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2001        db.coordinator
2002            .write()
2003            .await
2004            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2005            .await
2006            .unwrap();
2007
2008        let err = db.commit_updates(&[]).await.unwrap_err();
2009        assert!(
2010            err.to_string()
2011                .contains("write commit is unavailable while schema apply is in progress")
2012        );
2013    }
2014
2015    #[tokio::test]
2016    async fn test_branch_list_hides_schema_apply_lock_branch() {
2017        let dir = tempfile::tempdir().unwrap();
2018        let uri = dir.path().to_str().unwrap();
2019        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2020        db.coordinator
2021            .write()
2022            .await
2023            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2024            .await
2025            .unwrap();
2026
2027        let branches = db.branch_list().await.unwrap();
2028        assert_eq!(branches, vec!["main".to_string()]);
2029    }
2030}