Skip to main content

omnigraph/db/
omnigraph.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arrow_array::{
6    Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
7    Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
8    RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
9};
10use arrow_schema::{DataType, Field, Schema};
11use lance::Dataset;
12use lance::blob::{BlobArrayBuilder, blob_field};
13use lance::dataset::BlobFile;
14use lance::dataset::scanner::ColumnOrdering;
15use lance::datatypes::BlobKind;
16use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
17use omnigraph_compiler::schema::parser::parse_schema;
18use omnigraph_compiler::types::ScalarType;
19use omnigraph_compiler::{
20    SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
21    build_schema_ir, plan_schema_migration,
22};
23
24use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
25use crate::db::run_registry::{RunRecord, RunStatus};
26use crate::error::{ManifestErrorKind, OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod schema_apply;
33mod table_ops;
34
35use super::commit_graph::GraphCommit;
36use super::manifest::{
37    ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
38    table_path_for_table_key,
39};
40use super::schema_state::{
41    SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
42    validate_schema_contract, write_schema_contract,
43};
44use super::{
45    ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId,
46    is_internal_system_branch, is_schema_apply_lock_branch,
47};
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum MergeOutcome {
51    AlreadyUpToDate,
52    FastForward,
53    Merged,
54}
55
56#[derive(Debug, Clone)]
57pub struct SchemaApplyResult {
58    pub supported: bool,
59    pub applied: bool,
60    pub manifest_version: u64,
61    pub steps: Vec<SchemaMigrationStep>,
62}
63
64/// Top-level handle to an Omnigraph database.
65///
66/// An Omnigraph is a Lance-native graph database with git-style branching.
67/// It stores typed property graphs as per-type Lance datasets coordinated
68/// through a Lance manifest table.
69pub struct Omnigraph {
70    root_uri: String,
71    storage: Arc<dyn StorageAdapter>,
72    coordinator: GraphCoordinator,
73    table_store: TableStore,
74    runtime_cache: RuntimeCache,
75    catalog: Catalog,
76    schema_source: String,
77    pub(crate) audit_actor_id: Option<String>,
78}
79
80impl Omnigraph {
81    /// Create a new repo at `uri` from schema source.
82    ///
83    /// Creates `_schema.pg`, per-type Lance datasets, and `__manifest`.
84    pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
85        Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
86    }
87
88    pub(crate) async fn init_with_storage(
89        uri: &str,
90        schema_source: &str,
91        storage: Arc<dyn StorageAdapter>,
92    ) -> Result<Self> {
93        let root = normalize_root_uri(uri)?;
94        let schema_ir = read_schema_ir_from_source(schema_source)?;
95        let mut catalog = build_catalog_from_ir(&schema_ir)?;
96        fixup_blob_schemas(&mut catalog);
97
98        // Write _schema.pg
99        let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
100        storage.write_text(&schema_path, schema_source).await?;
101        write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
102
103        // Create manifest + per-type datasets
104        let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
105
106        Ok(Self {
107            root_uri: root.clone(),
108            storage,
109            coordinator,
110            table_store: TableStore::new(&root),
111            runtime_cache: RuntimeCache::default(),
112            catalog,
113            schema_source: schema_source.to_string(),
114            audit_actor_id: None,
115        })
116    }
117
118    /// Open an existing repo.
119    ///
120    /// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
121    pub async fn open(uri: &str) -> Result<Self> {
122        Self::open_with_storage(uri, storage_for_uri(uri)?).await
123    }
124
125    pub(crate) async fn open_with_storage(
126        uri: &str,
127        storage: Arc<dyn StorageAdapter>,
128    ) -> Result<Self> {
129        let root = normalize_root_uri(uri)?;
130        // Read _schema.pg
131        let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
132        let schema_source = storage.read_text(&schema_path).await?;
133        let current_source_ir = read_schema_ir_from_source(&schema_source)?;
134        let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
135        let branches = coordinator.branch_list().await?;
136        let (accepted_ir, _) = load_or_bootstrap_schema_contract(
137            &root,
138            Arc::clone(&storage),
139            &branches,
140            &current_source_ir,
141        )
142        .await?;
143        let mut catalog = build_catalog_from_ir(&accepted_ir)?;
144        fixup_blob_schemas(&mut catalog);
145
146        Ok(Self {
147            root_uri: root.clone(),
148            storage,
149            coordinator,
150            table_store: TableStore::new(&root),
151            runtime_cache: RuntimeCache::default(),
152            catalog,
153            schema_source,
154            audit_actor_id: None,
155        })
156    }
157
158    pub fn catalog(&self) -> &Catalog {
159        &self.catalog
160    }
161
162    pub fn schema_source(&self) -> &str {
163        &self.schema_source
164    }
165
166    pub fn uri(&self) -> &str {
167        &self.root_uri
168    }
169
170    pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
171        validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
172    }
173
174    pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
175        schema_apply::plan_schema(self, desired_schema_source).await
176    }
177
178    pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
179        schema_apply::apply_schema(self, desired_schema_source).await
180    }
181
182    pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
183        schema_apply::ensure_schema_apply_idle(self, operation).await
184    }
185
186    async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
187        schema_apply::ensure_schema_apply_not_locked(self, operation).await
188    }
189
190    pub(crate) fn table_store(&self) -> &TableStore {
191        &self.table_store
192    }
193
194    pub(crate) async fn open_coordinator_for_branch(
195        &self,
196        branch: Option<&str>,
197    ) -> Result<GraphCoordinator> {
198        match branch {
199            Some(branch) => {
200                GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
201            }
202            None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
203        }
204    }
205
206    pub(crate) async fn swap_coordinator_for_branch(
207        &mut self,
208        branch: Option<&str>,
209    ) -> Result<GraphCoordinator> {
210        let next = self.open_coordinator_for_branch(branch).await?;
211        Ok(std::mem::replace(&mut self.coordinator, next))
212    }
213
214    pub(crate) fn restore_coordinator(&mut self, coordinator: GraphCoordinator) {
215        self.coordinator = coordinator;
216    }
217
218    pub(crate) async fn resolved_branch_target(
219        &self,
220        branch: Option<&str>,
221    ) -> Result<ResolvedTarget> {
222        self.ensure_schema_state_valid().await?;
223        let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
224        let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
225        if normalized.as_deref() == self.coordinator.current_branch() {
226            let snapshot_id = self.coordinator.head_commit_id().await?.unwrap_or_else(|| {
227                SnapshotId::synthetic(
228                    self.coordinator.current_branch(),
229                    self.coordinator.version(),
230                )
231            });
232            return Ok(ResolvedTarget {
233                requested,
234                branch: self.coordinator.current_branch().map(str::to_string),
235                snapshot_id,
236                snapshot: self.coordinator.snapshot(),
237            });
238        }
239        self.coordinator.resolve_target(&requested).await
240    }
241
242    pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
243        self.resolved_branch_target(branch)
244            .await
245            .map(|resolved| resolved.snapshot)
246    }
247
248    pub(crate) fn version(&self) -> u64 {
249        self.coordinator.version()
250    }
251
252    /// Return an immutable Snapshot from the known manifest state. No storage I/O.
253    pub(crate) fn snapshot(&self) -> Snapshot {
254        self.coordinator.snapshot()
255    }
256
257    pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
258        self.resolved_target(target)
259            .await
260            .map(|resolved| resolved.snapshot)
261    }
262
263    pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
264        self.snapshot_of(target)
265            .await
266            .map(|snapshot| snapshot.version())
267    }
268
269    pub async fn resolved_branch_of(
270        &self,
271        target: impl Into<ReadTarget>,
272    ) -> Result<Option<String>> {
273        self.resolved_target(target)
274            .await
275            .map(|resolved| resolved.branch)
276    }
277
278    /// Synchronize this handle's write base to the latest head of the named branch.
279    pub async fn sync_branch(&mut self, branch: &str) -> Result<()> {
280        self.ensure_schema_state_valid().await?;
281        let branch = normalize_branch_name(branch)?;
282        self.coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
283        self.runtime_cache.invalidate_all().await;
284        Ok(())
285    }
286
287    /// Re-read the handle-local coordinator state from storage.
288    pub(crate) async fn refresh(&mut self) -> Result<()> {
289        self.coordinator.refresh().await?;
290        self.runtime_cache.invalidate_all().await;
291        Ok(())
292    }
293
294    pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
295        self.ensure_schema_state_valid().await?;
296        self.coordinator.resolve_snapshot_id(branch).await
297    }
298
299    pub(crate) async fn resolved_target(
300        &self,
301        target: impl Into<ReadTarget>,
302    ) -> Result<ResolvedTarget> {
303        self.ensure_schema_state_valid().await?;
304        self.coordinator.resolve_target(&target.into()).await
305    }
306
307    // ─── Change detection ────────────────────────────────────────────────
308
309    pub async fn diff_between(
310        &self,
311        from: impl Into<ReadTarget>,
312        to: impl Into<ReadTarget>,
313        filter: &crate::changes::ChangeFilter,
314    ) -> Result<crate::changes::ChangeSet> {
315        let from_resolved = self.resolved_target(from).await?;
316        let to_resolved = self.resolved_target(to).await?;
317        crate::changes::diff_snapshots(
318            self.uri(),
319            &from_resolved.snapshot,
320            &to_resolved.snapshot,
321            filter,
322            to_resolved.branch.clone().or(from_resolved.branch.clone()),
323        )
324        .await
325    }
326
327    /// Diff two graph commits. Resolves each commit to `(manifest_branch, manifest_version)`
328    /// and creates branch-aware snapshots. Supports cross-branch comparison.
329    pub async fn diff_commits(
330        &self,
331        from_commit_id: &str,
332        to_commit_id: &str,
333        filter: &crate::changes::ChangeFilter,
334    ) -> Result<crate::changes::ChangeSet> {
335        let from_commit = self
336            .coordinator
337            .resolve_commit(&SnapshotId::new(from_commit_id))
338            .await?;
339        let to_commit = self
340            .coordinator
341            .resolve_commit(&SnapshotId::new(to_commit_id))
342            .await?;
343        let from_snap = self
344            .coordinator
345            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
346                from_commit.graph_commit_id.clone(),
347            )))
348            .await?;
349        let to_snap = self
350            .coordinator
351            .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
352                to_commit.graph_commit_id.clone(),
353            )))
354            .await?;
355        crate::changes::diff_snapshots(
356            self.uri(),
357            &from_snap.snapshot,
358            &to_snap.snapshot,
359            filter,
360            to_snap.branch.clone().or(from_snap.branch.clone()),
361        )
362        .await
363    }
364
365    pub async fn entity_at_target(
366        &self,
367        target: impl Into<ReadTarget>,
368        table_key: &str,
369        id: &str,
370    ) -> Result<Option<serde_json::Value>> {
371        export::entity_at_target(self, target, table_key, id).await
372    }
373
374    /// Read one entity at a specific manifest version via time travel (on-demand enrichment).
375    pub async fn entity_at(
376        &self,
377        table_key: &str,
378        id: &str,
379        version: u64,
380    ) -> Result<Option<serde_json::Value>> {
381        export::entity_at(self, table_key, id, version).await
382    }
383
384    /// Create a Snapshot at any historical manifest version.
385    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
386        self.ensure_schema_state_valid().await?;
387        self.coordinator.snapshot_at_version(version).await
388    }
389
390    pub async fn export_jsonl(
391        &self,
392        branch: &str,
393        type_names: &[String],
394        table_keys: &[String],
395    ) -> Result<String> {
396        export::export_jsonl(self, branch, type_names, table_keys).await
397    }
398
399    pub async fn export_jsonl_to_writer<W: Write>(
400        &self,
401        branch: &str,
402        type_names: &[String],
403        table_keys: &[String],
404        writer: &mut W,
405    ) -> Result<()> {
406        export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
407    }
408
409    // ─── Graph index ──────────────────────────────────────────────────────
410
411    /// Get or build the graph index for the current snapshot.
412    pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
413        table_ops::graph_index(self).await
414    }
415
416    pub(crate) async fn graph_index_for_resolved(
417        &self,
418        resolved: &ResolvedTarget,
419    ) -> Result<Arc<crate::graph_index::GraphIndex>> {
420        table_ops::graph_index_for_resolved(self, resolved).await
421    }
422
423    /// Ensure BTree scalar indices exist on key columns.
424    /// Idempotent — Lance skips if index already exists.
425    ///
426    /// Opens sub-tables at their latest version (not snapshot-pinned) because
427    /// indices must be created on the current head. Any version drift from the
428    /// snapshot is expected and logged. The resulting versions are committed
429    /// back to the manifest.
430    ///
431    /// On named branches, indexing preserves lazy branching:
432    /// unbranched subtables keep inheriting `main`, while subtables inherited
433    /// from an ancestor branch are first forked into the active branch before
434    /// their index metadata is updated.
435    pub async fn ensure_indices(&mut self) -> Result<()> {
436        table_ops::ensure_indices(self).await
437    }
438
439    pub async fn ensure_indices_on(&mut self, branch: &str) -> Result<()> {
440        table_ops::ensure_indices_on(self, branch).await
441    }
442
443    /// Read a blob from a node by its string ID and property name.
444    ///
445    /// Returns a `BlobFile` handle with async `read()`, `seek()`, `tell()`,
446    /// and metadata accessors (`size()`, `kind()`, `uri()`).
447    ///
448    /// ```ignore
449    /// let blob = db.read_blob("Document", "readme", "content").await?;
450    /// let bytes = blob.read().await?;
451    /// ```
452    pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
453        self.ensure_schema_state_valid().await?;
454        let node_type = self
455            .catalog
456            .node_types
457            .get(type_name)
458            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
459        if !node_type.blob_properties.contains(property) {
460            return Err(OmniError::manifest(format!(
461                "property '{}' on type '{}' is not a Blob",
462                property, type_name
463            )));
464        }
465
466        let snapshot = self.snapshot();
467        let table_key = format!("node:{}", type_name);
468        let ds = snapshot.open(&table_key).await?;
469
470        let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
471        let row_id = self
472            .table_store
473            .first_row_id_for_filter(&ds, &filter_sql)
474            .await?
475            .ok_or_else(|| {
476                OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
477            })?;
478
479        // Use take_blobs to get the BlobFile handle
480        let ds = Arc::new(ds);
481        let mut blobs = ds
482            .take_blobs(&[row_id], property)
483            .await
484            .map_err(|e| OmniError::Lance(e.to_string()))?;
485
486        blobs.pop().ok_or_else(|| {
487            OmniError::manifest(format!(
488                "blob '{}' on {} '{}' returned no data",
489                property, type_name, id
490            ))
491        })
492    }
493
494    pub(crate) fn active_branch(&self) -> Option<&str> {
495        self.coordinator.current_branch()
496    }
497
498    async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
499        let descendants = self.coordinator.branch_descendants(branch).await?;
500        if let Some(descendant) = descendants.first() {
501            return Err(OmniError::manifest_conflict(format!(
502                "cannot delete branch '{}' because descendant branch '{}' still depends on it",
503                branch, descendant
504            )));
505        }
506
507        for run in self.list_runs().await? {
508            if run.target_branch == branch
509                && matches!(run.status, RunStatus::Running | RunStatus::Failed)
510            {
511                return Err(OmniError::manifest_conflict(format!(
512                    "cannot delete branch '{}' while run '{}' targeting it is {}",
513                    branch,
514                    run.run_id,
515                    run.status.as_str()
516                )));
517            }
518        }
519
520        for other_branch in branches
521            .iter()
522            .filter(|candidate| candidate.as_str() != branch)
523        {
524            let snapshot = self
525                .snapshot_of(ReadTarget::branch(other_branch.as_str()))
526                .await?;
527            if snapshot
528                .entries()
529                .any(|entry| entry.table_branch.as_deref() == Some(branch))
530            {
531                return Err(OmniError::manifest_conflict(format!(
532                    "cannot delete branch '{}' because branch '{}' still depends on it",
533                    branch, other_branch
534                )));
535            }
536        }
537
538        Ok(())
539    }
540
541    async fn cleanup_deleted_branch_tables(
542        &self,
543        branch: &str,
544        owned_tables: &[(String, String)],
545    ) -> Result<()> {
546        let mut seen_paths = HashSet::new();
547        let mut cleanup_targets = owned_tables
548            .iter()
549            .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
550            .cloned()
551            .collect::<Vec<_>>();
552        cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
553
554        for (table_key, table_path) in cleanup_targets {
555            let dataset_uri = self.table_store.dataset_uri(&table_path);
556            if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
557                return Err(OmniError::manifest_internal(format!(
558                    "branch '{}' was deleted but cleanup failed for {}: {}",
559                    branch, table_key, err
560                )));
561            }
562        }
563
564        Ok(())
565    }
566
567    async fn delete_branch_storage_only(&mut self, branch: &str) -> Result<()> {
568        if self.coordinator.current_branch() == Some(branch) {
569            return Err(OmniError::manifest_conflict(format!(
570                "cannot delete currently active branch '{}'",
571                branch
572            )));
573        }
574
575        let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
576        let owned_tables = branch_snapshot
577            .entries()
578            .filter(|entry| entry.table_branch.as_deref() == Some(branch))
579            .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
580            .collect::<Vec<_>>();
581
582        self.coordinator.branch_delete(branch).await?;
583        self.cleanup_deleted_branch_tables(branch, &owned_tables)
584            .await
585    }
586
587    async fn cleanup_terminal_run_branches_for_target(&mut self, branch: &str) -> Result<()> {
588        let terminal_run_branches = self
589            .list_runs()
590            .await?
591            .into_iter()
592            .filter(|run| {
593                run.target_branch == branch
594                    && matches!(run.status, RunStatus::Published | RunStatus::Aborted)
595            })
596            .map(|run| run.run_branch)
597            .collect::<Vec<_>>();
598
599        for run_branch in terminal_run_branches {
600            match self.delete_branch_storage_only(&run_branch).await {
601                Ok(()) => {}
602                Err(OmniError::Manifest(err)) if err.kind == ManifestErrorKind::NotFound => {}
603                Err(err) => return Err(err),
604            }
605        }
606
607        Ok(())
608    }
609
610    pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
611        normalize_branch_name(branch)
612    }
613
614    pub(crate) async fn head_commit_id_for_branch(
615        &self,
616        branch: Option<&str>,
617    ) -> Result<Option<String>> {
618        let mut coordinator = self.open_coordinator_for_branch(branch).await?;
619        coordinator.ensure_commit_graph_initialized().await?;
620        coordinator
621            .head_commit_id()
622            .await
623            .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
624    }
625
626    pub async fn branch_create(&mut self, name: &str) -> Result<()> {
627        self.ensure_schema_state_valid().await?;
628        self.ensure_schema_apply_idle("branch_create").await?;
629        ensure_public_branch_ref(name, "branch_create")?;
630        self.coordinator.branch_create(name).await
631    }
632
633    pub(crate) fn current_audit_actor(&self) -> Option<&str> {
634        self.audit_actor_id.as_deref()
635    }
636
637    pub async fn branch_create_from(
638        &mut self,
639        from: impl Into<ReadTarget>,
640        name: &str,
641    ) -> Result<()> {
642        self.ensure_schema_apply_idle("branch_create_from").await?;
643        self.branch_create_from_impl(from, name, false).await
644    }
645
646    async fn branch_create_from_impl(
647        &mut self,
648        from: impl Into<ReadTarget>,
649        name: &str,
650        allow_internal_refs: bool,
651    ) -> Result<()> {
652        let target = from.into();
653        let ReadTarget::Branch(branch_name) = target else {
654            return Err(OmniError::manifest(
655                "branch creation from pinned snapshots is not supported yet".to_string(),
656            ));
657        };
658        if !allow_internal_refs {
659            ensure_public_branch_ref(&branch_name, "branch_create_from")?;
660            ensure_public_branch_ref(name, "branch_create_from")?;
661        }
662        let branch = normalize_branch_name(&branch_name)?;
663        let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?;
664        let result = self.coordinator.branch_create(name).await;
665        self.restore_coordinator(previous);
666        result
667    }
668
669    pub async fn branch_list(&self) -> Result<Vec<String>> {
670        self.ensure_schema_state_valid().await?;
671        self.coordinator.branch_list().await
672    }
673
674    pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
675        self.ensure_schema_state_valid().await?;
676        self.ensure_schema_apply_idle("branch_delete").await?;
677        ensure_public_branch_ref(name, "branch_delete")?;
678        self.refresh().await?;
679        let branch = normalize_branch_name(name)?
680            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
681        let branches = self.coordinator.branch_list().await?;
682        if !branches.iter().any(|candidate| candidate == &branch) {
683            return Err(OmniError::manifest_not_found(format!(
684                "branch '{}' not found",
685                branch
686            )));
687        }
688
689        self.ensure_branch_delete_safe(&branch, &branches).await?;
690        self.cleanup_terminal_run_branches_for_target(&branch)
691            .await?;
692        self.delete_branch_storage_only(&branch).await
693    }
694
695    pub(crate) async fn latest_branch_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
696        let normalized = normalize_branch_name(branch)?;
697        let fresh = self
698            .open_coordinator_for_branch(normalized.as_deref())
699            .await?;
700        fresh.resolve_snapshot_id(branch).await
701    }
702
703    pub async fn begin_run(
704        &mut self,
705        target_branch: &str,
706        operation_hash: Option<&str>,
707    ) -> Result<RunRecord> {
708        self.begin_run_as(target_branch, operation_hash, None).await
709    }
710
711    pub async fn begin_run_as(
712        &mut self,
713        target_branch: &str,
714        operation_hash: Option<&str>,
715        actor_id: Option<&str>,
716    ) -> Result<RunRecord> {
717        self.ensure_schema_state_valid().await?;
718        self.ensure_schema_apply_idle("begin_run").await?;
719        ensure_public_branch_ref(target_branch, "begin_run")?;
720        let target_branch =
721            normalize_branch_name(target_branch)?.unwrap_or_else(|| "main".to_string());
722        let fresh = self
723            .open_coordinator_for_branch(Self::normalize_branch_name(&target_branch)?.as_deref())
724            .await?;
725        let base_snapshot_id = fresh.resolve_snapshot_id(&target_branch).await?;
726        let base_manifest_version = fresh.version();
727        let record = RunRecord::new(
728            target_branch.clone(),
729            base_snapshot_id.as_str(),
730            base_manifest_version,
731            operation_hash.map(str::to_string),
732            actor_id
733                .map(str::to_string)
734                .or_else(|| self.current_audit_actor().map(str::to_string)),
735        )?;
736
737        self.branch_create_from_impl(
738            ReadTarget::branch(target_branch.clone()),
739            &record.run_branch,
740            true,
741        )
742        .await?;
743        self.coordinator.append_run_record(&record).await?;
744        Ok(record)
745    }
746
747    pub async fn get_run(&self, run_id: &RunId) -> Result<RunRecord> {
748        self.ensure_schema_state_valid().await?;
749        self.coordinator.get_run(run_id).await
750    }
751
752    pub async fn list_runs(&self) -> Result<Vec<RunRecord>> {
753        self.ensure_schema_state_valid().await?;
754        self.coordinator.list_runs().await
755    }
756
757    pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
758        self.ensure_schema_state_valid().await?;
759        self.coordinator
760            .resolve_commit(&SnapshotId::new(commit_id))
761            .await
762    }
763
764    pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
765        self.ensure_schema_state_valid().await?;
766        let branch = match branch {
767            Some(branch) => normalize_branch_name(branch)?,
768            None => None,
769        };
770        let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
771        coordinator.list_commits().await
772    }
773
774    pub async fn abort_run(&mut self, run_id: &RunId) -> Result<RunRecord> {
775        self.ensure_schema_state_valid().await?;
776        let run = self.get_run(run_id).await?;
777        match run.status {
778            RunStatus::Running | RunStatus::Failed => {
779                let updated = run.with_status(RunStatus::Aborted, None)?;
780                self.coordinator.append_run_record(&updated).await?;
781                Ok(updated)
782            }
783            RunStatus::Published => Err(OmniError::manifest_conflict(format!(
784                "run '{}' is already published",
785                run_id
786            ))),
787            RunStatus::Aborted => Err(OmniError::manifest_conflict(format!(
788                "run '{}' is already aborted",
789                run_id
790            ))),
791        }
792    }
793
794    pub async fn fail_run(&mut self, run_id: &RunId) -> Result<RunRecord> {
795        self.ensure_schema_state_valid().await?;
796        let run = self.get_run(run_id).await?;
797        match run.status {
798            RunStatus::Running => {
799                let updated = run.with_status(RunStatus::Failed, None)?;
800                self.coordinator.append_run_record(&updated).await?;
801                Ok(updated)
802            }
803            RunStatus::Failed => Ok(run),
804            RunStatus::Published => Err(OmniError::manifest_conflict(format!(
805                "run '{}' is already published",
806                run_id
807            ))),
808            RunStatus::Aborted => Err(OmniError::manifest_conflict(format!(
809                "run '{}' is already aborted",
810                run_id
811            ))),
812        }
813    }
814
815    pub async fn publish_run(&mut self, run_id: &RunId) -> Result<SnapshotId> {
816        self.publish_run_as(run_id, None).await
817    }
818
819    pub async fn publish_run_as(
820        &mut self,
821        run_id: &RunId,
822        actor_id: Option<&str>,
823    ) -> Result<SnapshotId> {
824        self.ensure_schema_state_valid().await?;
825        self.ensure_schema_apply_idle("publish_run").await?;
826        let run = self.get_run(run_id).await?;
827        match run.status {
828            RunStatus::Running => {}
829            RunStatus::Published => {
830                return run
831                    .published_snapshot_id
832                    .clone()
833                    .map(SnapshotId::new)
834                    .ok_or_else(|| {
835                        OmniError::manifest(format!(
836                            "run '{}' is published but missing published snapshot id",
837                            run_id
838                        ))
839                    });
840            }
841            RunStatus::Failed | RunStatus::Aborted => {
842                return Err(OmniError::manifest_conflict(format!(
843                    "run '{}' is not publishable from status '{}'",
844                    run_id,
845                    run.status.as_str()
846                )));
847            }
848        }
849
850        let publish_actor = actor_id
851            .map(str::to_string)
852            .or_else(|| run.actor_id.clone());
853        let current_target_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
854        let previous_actor = self.audit_actor_id.clone();
855        self.audit_actor_id = publish_actor.clone();
856        let publish_result = if current_target_snapshot_id.as_str() == run.base_snapshot_id {
857            let run_for_promotion = run.clone();
858            self.sync_branch(&run_for_promotion.target_branch).await?;
859            self.promote_run_snapshot_to_target(&run_for_promotion)
860                .await
861        } else {
862            let run_branch = run.run_branch.clone();
863            let target_branch = run.target_branch.clone();
864            self.branch_merge_internal(&run_branch, &target_branch)
865                .await?;
866            self.reify_internal_run_refs(&target_branch, &run_branch)
867                .await
868        };
869        self.audit_actor_id = previous_actor;
870        publish_result?;
871        let published_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
872        let updated = run.with_status(
873            RunStatus::Published,
874            Some(published_snapshot_id.as_str().to_string()),
875        )?;
876        self.coordinator.append_run_record(&updated).await?;
877        Ok(published_snapshot_id)
878    }
879
880    async fn promote_run_snapshot_to_target(&mut self, run: &RunRecord) -> Result<()> {
881        let target_snapshot = self
882            .snapshot_of(ReadTarget::branch(run.target_branch.as_str()))
883            .await?;
884        let run_snapshot = self
885            .snapshot_of(ReadTarget::branch(run.run_branch.as_str()))
886            .await?;
887        let mut table_keys = std::collections::BTreeSet::new();
888        for entry in target_snapshot.entries() {
889            table_keys.insert(entry.table_key.clone());
890        }
891        for entry in run_snapshot.entries() {
892            table_keys.insert(entry.table_key.clone());
893        }
894
895        let mut updates = Vec::new();
896        let mut changed_edge_tables = false;
897        let target_branch = normalize_branch_name(&run.target_branch)?;
898
899        for table_key in table_keys {
900            let target_entry = target_snapshot.entry(&table_key);
901            let run_entry = run_snapshot.entry(&table_key);
902            if same_manifest_state(target_entry, run_entry) {
903                continue;
904            }
905            let Some(_run_entry) = run_entry else {
906                return Err(OmniError::manifest(format!(
907                    "run '{}' removed table '{}' which publish_run does not support",
908                    run.run_id, table_key
909                )));
910            };
911
912            let source_ds = run_snapshot.open(&table_key).await?;
913            let batch = self.batch_for_table_rewrite(&source_ds, &table_key).await?;
914
915            let (mut target_ds, full_path, table_branch) = self
916                .open_for_mutation_on_branch(target_branch.as_deref(), &table_key)
917                .await?;
918            let state = self
919                .table_store()
920                .overwrite_batch(&full_path, &mut target_ds, batch)
921                .await?;
922            updates.push(crate::db::SubTableUpdate {
923                table_key: table_key.clone(),
924                table_version: state.version,
925                table_branch,
926                row_count: state.row_count,
927                version_metadata: state.version_metadata,
928            });
929            if table_key.starts_with("edge:") {
930                changed_edge_tables = true;
931            }
932        }
933
934        if !updates.is_empty() {
935            self.commit_updates_on_branch(target_branch.as_deref(), &updates)
936                .await?;
937            if changed_edge_tables {
938                self.invalidate_graph_index().await;
939            }
940        }
941
942        Ok(())
943    }
944
945    async fn reify_internal_run_refs(
946        &mut self,
947        target_branch: &str,
948        run_branch: &str,
949    ) -> Result<()> {
950        let target_snapshot = self.snapshot_of(ReadTarget::branch(target_branch)).await?;
951        let mut updates = Vec::new();
952        let mut changed_edge_tables = false;
953        let target_branch = normalize_branch_name(target_branch)?;
954
955        for entry in target_snapshot.entries() {
956            if entry.table_branch.as_deref() != Some(run_branch) {
957                continue;
958            }
959
960            let source_ds = target_snapshot.open(&entry.table_key).await?;
961            let batch = self
962                .batch_for_table_rewrite(&source_ds, &entry.table_key)
963                .await?;
964
965            let (mut target_ds, full_path, table_branch) = self
966                .open_for_mutation_on_branch(target_branch.as_deref(), &entry.table_key)
967                .await?;
968            let state = self
969                .table_store()
970                .overwrite_batch(&full_path, &mut target_ds, batch)
971                .await?;
972            updates.push(crate::db::SubTableUpdate {
973                table_key: entry.table_key.clone(),
974                table_version: state.version,
975                table_branch,
976                row_count: state.row_count,
977                version_metadata: state.version_metadata,
978            });
979            if entry.table_key.starts_with("edge:") {
980                changed_edge_tables = true;
981            }
982        }
983
984        if !updates.is_empty() {
985            self.commit_updates_on_branch(target_branch.as_deref(), &updates)
986                .await?;
987            if changed_edge_tables {
988                self.invalidate_graph_index().await;
989            }
990        }
991
992        Ok(())
993    }
994
995    /// Open a sub-table for mutation with version-drift guard.
996    ///
997    /// Checks that the dataset's current version matches the snapshot-pinned
998    /// version. If another writer has advanced the version, returns an error
999    /// prompting the caller to refresh and retry (optimistic concurrency).
1000    pub(crate) async fn open_for_mutation(
1001        &self,
1002        table_key: &str,
1003    ) -> Result<(Dataset, String, Option<String>)> {
1004        table_ops::open_for_mutation(self, table_key).await
1005    }
1006
1007    pub(crate) async fn open_for_mutation_on_branch(
1008        &self,
1009        branch: Option<&str>,
1010        table_key: &str,
1011    ) -> Result<(Dataset, String, Option<String>)> {
1012        table_ops::open_for_mutation_on_branch(self, branch, table_key).await
1013    }
1014
1015    pub(crate) async fn fork_dataset_from_entry_state(
1016        &self,
1017        table_key: &str,
1018        full_path: &str,
1019        source_branch: Option<&str>,
1020        source_version: u64,
1021        active_branch: &str,
1022    ) -> Result<Dataset> {
1023        table_ops::fork_dataset_from_entry_state(
1024            self,
1025            table_key,
1026            full_path,
1027            source_branch,
1028            source_version,
1029            active_branch,
1030        )
1031        .await
1032    }
1033
1034    pub(crate) async fn reopen_for_mutation(
1035        &self,
1036        table_key: &str,
1037        full_path: &str,
1038        table_branch: Option<&str>,
1039        expected_version: u64,
1040    ) -> Result<Dataset> {
1041        table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version)
1042            .await
1043    }
1044
1045    pub(crate) async fn open_dataset_at_state(
1046        &self,
1047        table_path: &str,
1048        table_branch: Option<&str>,
1049        table_version: u64,
1050    ) -> Result<Dataset> {
1051        table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1052    }
1053
1054    pub(crate) async fn build_indices_on_dataset(
1055        &self,
1056        table_key: &str,
1057        ds: &mut Dataset,
1058    ) -> Result<()> {
1059        table_ops::build_indices_on_dataset(self, table_key, ds).await
1060    }
1061
1062    pub(crate) async fn build_indices_on_dataset_for_catalog(
1063        &self,
1064        catalog: &Catalog,
1065        table_key: &str,
1066        ds: &mut Dataset,
1067    ) -> Result<()> {
1068        table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1069    }
1070
1071    pub(crate) async fn commit_updates(
1072        &mut self,
1073        updates: &[crate::db::SubTableUpdate],
1074    ) -> Result<u64> {
1075        table_ops::commit_updates(self, updates).await
1076    }
1077
1078    pub(crate) async fn commit_manifest_updates(
1079        &mut self,
1080        updates: &[crate::db::SubTableUpdate],
1081    ) -> Result<u64> {
1082        table_ops::commit_manifest_updates(self, updates).await
1083    }
1084
1085    pub(crate) async fn record_merge_commit(
1086        &mut self,
1087        manifest_version: u64,
1088        parent_commit_id: &str,
1089        merged_parent_commit_id: &str,
1090    ) -> Result<String> {
1091        table_ops::record_merge_commit(
1092            self,
1093            manifest_version,
1094            parent_commit_id,
1095            merged_parent_commit_id,
1096        )
1097        .await
1098    }
1099
1100    pub(crate) async fn commit_updates_on_branch(
1101        &mut self,
1102        branch: Option<&str>,
1103        updates: &[crate::db::SubTableUpdate],
1104    ) -> Result<u64> {
1105        table_ops::commit_updates_on_branch(self, branch, updates).await
1106    }
1107
1108    pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
1109        table_ops::ensure_commit_graph_initialized(self).await
1110    }
1111
1112    /// Invalidate the cached graph index. Called after edge mutations.
1113    pub(crate) async fn invalidate_graph_index(&self) {
1114        table_ops::invalidate_graph_index(self).await
1115    }
1116
1117    async fn batch_for_table_rewrite(
1118        &self,
1119        source_ds: &Dataset,
1120        table_key: &str,
1121    ) -> Result<RecordBatch> {
1122        schema_apply::batch_for_table_rewrite(self, source_ds, table_key).await
1123    }
1124}
1125
1126pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1127    let branch = branch.trim();
1128    if branch.is_empty() {
1129        return Err(OmniError::manifest(
1130            "branch name cannot be empty".to_string(),
1131        ));
1132    }
1133    if branch == "main" {
1134        return Ok(None);
1135    }
1136    Ok(Some(branch.to_string()))
1137}
1138
1139fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1140    if super::is_internal_run_branch(branch) {
1141        return Err(OmniError::manifest(format!(
1142            "{} does not allow internal run ref '{}'",
1143            operation, branch
1144        )));
1145    }
1146    if is_internal_system_branch(branch) {
1147        return Err(OmniError::manifest(format!(
1148            "{} does not allow internal system ref '{}'",
1149            operation, branch
1150        )));
1151    }
1152    Ok(())
1153}
1154
1155fn same_manifest_state(
1156    left: Option<&crate::db::SubTableEntry>,
1157    right: Option<&crate::db::SubTableEntry>,
1158) -> bool {
1159    match (left, right) {
1160        (None, None) => true,
1161        (Some(left), Some(right)) => {
1162            left.table_path == right.table_path
1163                && left.table_version == right.table_version
1164                && left.table_branch == right.table_branch
1165                && left.row_count == right.row_count
1166        }
1167        _ => false,
1168    }
1169}
1170
1171fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1172    if batches.is_empty() {
1173        return Ok(RecordBatch::new_empty(schema));
1174    }
1175    if batches.len() == 1 {
1176        return Ok(batches.into_iter().next().unwrap());
1177    }
1178    let batch_schema = batches[0].schema();
1179    arrow_select::concat::concat_batches(&batch_schema, &batches)
1180        .map_err(|e| OmniError::Lance(e.to_string()))
1181}
1182
1183fn blob_properties_for_table_key<'a>(
1184    catalog: &'a Catalog,
1185    table_key: &str,
1186) -> Result<&'a std::collections::HashSet<String>> {
1187    if let Some(type_name) = table_key.strip_prefix("node:") {
1188        return catalog
1189            .node_types
1190            .get(type_name)
1191            .map(|node_type| &node_type.blob_properties)
1192            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1193    }
1194    if let Some(type_name) = table_key.strip_prefix("edge:") {
1195        return catalog
1196            .edge_types
1197            .get(type_name)
1198            .map(|edge_type| &edge_type.blob_properties)
1199            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1200    }
1201    Err(OmniError::manifest(format!(
1202        "invalid table key '{}'",
1203        table_key
1204    )))
1205}
1206
1207fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1208    if descriptions.is_null(row) {
1209        return Ok(true);
1210    }
1211
1212    let kind = descriptions
1213        .column_by_name("kind")
1214        .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1215        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1216        .or_else(|| {
1217            descriptions
1218                .column_by_name("kind")
1219                .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1220                .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1221        });
1222    let position = descriptions
1223        .column_by_name("position")
1224        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1225        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1226    let size = descriptions
1227        .column_by_name("size")
1228        .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1229        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1230    let blob_uri = descriptions
1231        .column_by_name("blob_uri")
1232        .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1233        .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1234
1235    let Some(kind) = kind else {
1236        return Ok(true);
1237    };
1238    let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1239    if kind != BlobKind::Inline {
1240        return Ok(false);
1241    }
1242
1243    Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1244}
1245
1246/// Replace placeholder `LargeBinary` fields with Lance blob v2 fields.
1247///
1248/// The compiler crate has no Lance dependency, so `ScalarType::Blob` maps to
1249/// `DataType::LargeBinary` as a placeholder. This function replaces those
1250/// fields with the real blob v2 struct type via `lance::blob::blob_field()`.
1251fn fixup_blob_schemas(catalog: &mut Catalog) {
1252    for node_type in catalog.node_types.values_mut() {
1253        if node_type.blob_properties.is_empty() {
1254            continue;
1255        }
1256        let fields: Vec<Field> = node_type
1257            .arrow_schema
1258            .fields()
1259            .iter()
1260            .map(|f| {
1261                if node_type.blob_properties.contains(f.name()) {
1262                    blob_field(f.name(), f.is_nullable())
1263                } else {
1264                    f.as_ref().clone()
1265                }
1266            })
1267            .collect();
1268        node_type.arrow_schema = Arc::new(Schema::new(fields));
1269    }
1270    for edge_type in catalog.edge_types.values_mut() {
1271        if edge_type.blob_properties.is_empty() {
1272            continue;
1273        }
1274        let fields: Vec<Field> = edge_type
1275            .arrow_schema
1276            .fields()
1277            .iter()
1278            .map(|f| {
1279                if edge_type.blob_properties.contains(f.name()) {
1280                    blob_field(f.name(), f.is_nullable())
1281                } else {
1282                    f.as_ref().clone()
1283                }
1284            })
1285            .collect();
1286        edge_type.arrow_schema = Arc::new(Schema::new(fields));
1287    }
1288}
1289
1290fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1291    let schema_ast = parse_schema(schema_source)?;
1292    build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1293}
1294
1295fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1296    match type_kind {
1297        SchemaTypeKind::Node => format!("node:{}", name),
1298        SchemaTypeKind::Edge => format!("edge:{}", name),
1299        SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1300    }
1301}
1302
1303fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1304    if let Some(type_name) = table_key.strip_prefix("node:") {
1305        let node_type: &NodeType = catalog
1306            .node_types
1307            .get(type_name)
1308            .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1309        return Ok(node_type.arrow_schema.clone());
1310    }
1311    if let Some(type_name) = table_key.strip_prefix("edge:") {
1312        let edge_type: &EdgeType = catalog
1313            .edge_types
1314            .get(type_name)
1315            .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1316        return Ok(edge_type.arrow_schema.clone());
1317    }
1318    Err(OmniError::manifest(format!(
1319        "invalid table key '{}'",
1320        table_key
1321    )))
1322}
1323
1324fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1325    let mut obj = serde_json::Map::new();
1326    for (i, field) in batch.schema().fields().iter().enumerate() {
1327        obj.insert(
1328            field.name().clone(),
1329            json_value_from_array(batch.column(i).as_ref(), row)?,
1330        );
1331    }
1332    Ok(serde_json::Value::Object(obj))
1333}
1334
1335fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1336    if array.is_null(row) {
1337        return Ok(serde_json::Value::Null);
1338    }
1339
1340    match array.data_type() {
1341        DataType::Utf8 => Ok(serde_json::Value::String(
1342            array
1343                .as_any()
1344                .downcast_ref::<StringArray>()
1345                .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1346                .value(row)
1347                .to_string(),
1348        )),
1349        DataType::LargeUtf8 => Ok(serde_json::Value::String(
1350            array
1351                .as_any()
1352                .downcast_ref::<LargeStringArray>()
1353                .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1354                .value(row)
1355                .to_string(),
1356        )),
1357        DataType::Boolean => Ok(serde_json::Value::Bool(
1358            array
1359                .as_any()
1360                .downcast_ref::<BooleanArray>()
1361                .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1362                .value(row),
1363        )),
1364        DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1365            array
1366                .as_any()
1367                .downcast_ref::<Int32Array>()
1368                .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1369                .value(row),
1370        ))),
1371        DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1372            array
1373                .as_any()
1374                .downcast_ref::<Int64Array>()
1375                .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1376                .value(row),
1377        ))),
1378        DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1379            array
1380                .as_any()
1381                .downcast_ref::<UInt32Array>()
1382                .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1383                .value(row),
1384        ))),
1385        DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1386            array
1387                .as_any()
1388                .downcast_ref::<UInt64Array>()
1389                .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1390                .value(row),
1391        ))),
1392        DataType::Float32 => {
1393            let value = array
1394                .as_any()
1395                .downcast_ref::<Float32Array>()
1396                .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1397                .value(row) as f64;
1398            Ok(serde_json::Value::Number(
1399                serde_json::Number::from_f64(value).ok_or_else(|| {
1400                    OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1401                })?,
1402            ))
1403        }
1404        DataType::Float64 => {
1405            let value = array
1406                .as_any()
1407                .downcast_ref::<Float64Array>()
1408                .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1409                .value(row);
1410            Ok(serde_json::Value::Number(
1411                serde_json::Number::from_f64(value).ok_or_else(|| {
1412                    OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1413                })?,
1414            ))
1415        }
1416        DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1417            array
1418                .as_any()
1419                .downcast_ref::<Date32Array>()
1420                .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1421                .value(row),
1422        ))),
1423        DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1424            &base64::engine::general_purpose::STANDARD,
1425            array
1426                .as_any()
1427                .downcast_ref::<BinaryArray>()
1428                .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1429                .value(row),
1430        ))),
1431        DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1432            &base64::engine::general_purpose::STANDARD,
1433            array
1434                .as_any()
1435                .downcast_ref::<LargeBinaryArray>()
1436                .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1437                .value(row),
1438        ))),
1439        DataType::List(_) => {
1440            let list = array
1441                .as_any()
1442                .downcast_ref::<ListArray>()
1443                .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1444            let values = list.value(row);
1445            let mut out = Vec::with_capacity(values.len());
1446            for idx in 0..values.len() {
1447                out.push(json_value_from_array(values.as_ref(), idx)?);
1448            }
1449            Ok(serde_json::Value::Array(out))
1450        }
1451        DataType::LargeList(_) => {
1452            let list = array
1453                .as_any()
1454                .downcast_ref::<LargeListArray>()
1455                .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1456            let values = list.value(row);
1457            let mut out = Vec::with_capacity(values.len());
1458            for idx in 0..values.len() {
1459                out.push(json_value_from_array(values.as_ref(), idx)?);
1460            }
1461            Ok(serde_json::Value::Array(out))
1462        }
1463        DataType::FixedSizeList(_, _) => {
1464            let list = array
1465                .as_any()
1466                .downcast_ref::<FixedSizeListArray>()
1467                .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1468            let values = list.value(row);
1469            let mut out = Vec::with_capacity(values.len());
1470            for idx in 0..values.len() {
1471                out.push(json_value_from_array(values.as_ref(), idx)?);
1472            }
1473            Ok(serde_json::Value::Array(out))
1474        }
1475        DataType::Struct(fields) => {
1476            let struct_array = array
1477                .as_any()
1478                .downcast_ref::<StructArray>()
1479                .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1480            let mut obj = serde_json::Map::new();
1481            for (field_idx, field) in fields.iter().enumerate() {
1482                obj.insert(
1483                    field.name().clone(),
1484                    json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1485                );
1486            }
1487            Ok(serde_json::Value::Object(obj))
1488        }
1489        _ => {
1490            let value = arrow_cast::display::array_value_to_string(array, row)
1491                .map_err(|e| OmniError::Lance(e.to_string()))?;
1492            Ok(serde_json::Value::String(value))
1493        }
1494    }
1495}
1496
1497#[cfg(test)]
1498mod tests {
1499    use super::*;
1500    use crate::db::manifest::ManifestCoordinator;
1501    use async_trait::async_trait;
1502    use omnigraph_compiler::{SchemaMigrationStep, SchemaTypeKind};
1503    use serde_json::Value;
1504    use std::fs;
1505    use std::sync::Mutex;
1506
1507    use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1508
1509    const TEST_SCHEMA: &str = r#"
1510node Person {
1511    name: String @key
1512    age: I32?
1513}
1514node Company {
1515    name: String @key
1516}
1517edge Knows: Person -> Person {
1518    since: Date?
1519}
1520edge WorksAt: Person -> Company
1521"#;
1522
1523    #[derive(Debug, Default)]
1524    struct RecordingStorageAdapter {
1525        inner: LocalStorageAdapter,
1526        reads: Mutex<Vec<String>>,
1527        writes: Mutex<Vec<String>>,
1528        exists_checks: Mutex<Vec<String>>,
1529    }
1530
1531    impl RecordingStorageAdapter {
1532        fn reads(&self) -> Vec<String> {
1533            self.reads.lock().unwrap().clone()
1534        }
1535
1536        fn writes(&self) -> Vec<String> {
1537            self.writes.lock().unwrap().clone()
1538        }
1539
1540        fn exists_checks(&self) -> Vec<String> {
1541            self.exists_checks.lock().unwrap().clone()
1542        }
1543    }
1544
1545    #[async_trait]
1546    impl StorageAdapter for RecordingStorageAdapter {
1547        async fn read_text(&self, uri: &str) -> Result<String> {
1548            self.reads.lock().unwrap().push(uri.to_string());
1549            self.inner.read_text(uri).await
1550        }
1551
1552        async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1553            self.writes.lock().unwrap().push(uri.to_string());
1554            self.inner.write_text(uri, contents).await
1555        }
1556
1557        async fn exists(&self, uri: &str) -> Result<bool> {
1558            self.exists_checks.lock().unwrap().push(uri.to_string());
1559            self.inner.exists(uri).await
1560        }
1561    }
1562
1563    #[tokio::test]
1564    async fn test_init_creates_repo() {
1565        let dir = tempfile::tempdir().unwrap();
1566        let uri = dir.path().to_str().unwrap();
1567
1568        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1569
1570        // Schema file written
1571        assert!(dir.path().join("_schema.pg").exists());
1572        assert!(dir.path().join("_schema.ir.json").exists());
1573        assert!(dir.path().join("__schema_state.json").exists());
1574
1575        // Manifest created with correct entries
1576        let snap = db.snapshot();
1577        assert!(snap.entry("node:Person").is_some());
1578        assert!(snap.entry("node:Company").is_some());
1579        assert!(snap.entry("edge:Knows").is_some());
1580        assert!(snap.entry("edge:WorksAt").is_some());
1581
1582        // Catalog is correct
1583        assert_eq!(db.catalog().node_types.len(), 2);
1584        assert_eq!(db.catalog().edge_types.len(), 2);
1585        assert_eq!(
1586            db.catalog().node_types["Person"].key_property(),
1587            Some("name")
1588        );
1589    }
1590
1591    #[tokio::test]
1592    async fn test_open_reads_existing_repo() {
1593        let dir = tempfile::tempdir().unwrap();
1594        let uri = dir.path().to_str().unwrap();
1595
1596        Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1597
1598        // Re-open
1599        let db = Omnigraph::open(uri).await.unwrap();
1600        assert_eq!(db.catalog().node_types.len(), 2);
1601        assert_eq!(db.catalog().edge_types.len(), 2);
1602        let snap = db.snapshot();
1603        assert!(snap.entry("node:Person").is_some());
1604        assert!(snap.entry("edge:Knows").is_some());
1605    }
1606
1607    #[tokio::test]
1608    async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1609        let dir = tempfile::tempdir().unwrap();
1610        let uri = dir.path().to_str().unwrap();
1611        let adapter = Arc::new(RecordingStorageAdapter::default());
1612
1613        Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1614            .await
1615            .unwrap();
1616        assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1617        assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1618        assert!(
1619            adapter
1620                .writes()
1621                .contains(&join_uri(uri, "__schema_state.json"))
1622        );
1623
1624        Omnigraph::open_with_storage(uri, adapter.clone())
1625            .await
1626            .unwrap();
1627        assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1628        assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1629        assert!(
1630            adapter
1631                .reads()
1632                .contains(&join_uri(uri, "__schema_state.json"))
1633        );
1634        assert!(
1635            adapter
1636                .exists_checks()
1637                .contains(&join_uri(uri, "_schema.ir.json"))
1638        );
1639        assert!(
1640            adapter
1641                .exists_checks()
1642                .contains(&join_uri(uri, "__schema_state.json"))
1643        );
1644        assert!(
1645            adapter
1646                .exists_checks()
1647                .contains(&join_uri(uri, "_graph_commits.lance"))
1648        );
1649    }
1650
1651    #[tokio::test]
1652    async fn test_open_bootstraps_legacy_schema_state_for_main_only_repo() {
1653        let dir = tempfile::tempdir().unwrap();
1654        let uri = dir.path().to_str().unwrap();
1655        Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1656
1657        fs::remove_file(dir.path().join("_schema.ir.json")).unwrap();
1658        fs::remove_file(dir.path().join("__schema_state.json")).unwrap();
1659
1660        let db = Omnigraph::open(uri).await.unwrap();
1661        assert_eq!(db.catalog().node_types.len(), 2);
1662        assert!(dir.path().join("_schema.ir.json").exists());
1663        assert!(dir.path().join("__schema_state.json").exists());
1664    }
1665
1666    #[tokio::test]
1667    async fn test_open_rejects_legacy_repo_with_public_branch() {
1668        let dir = tempfile::tempdir().unwrap();
1669        let uri = dir.path().to_str().unwrap();
1670        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1671        db.branch_create("feature").await.unwrap();
1672
1673        fs::remove_file(dir.path().join("_schema.ir.json")).unwrap();
1674        fs::remove_file(dir.path().join("__schema_state.json")).unwrap();
1675
1676        let err = match Omnigraph::open(uri).await {
1677            Ok(_) => panic!("expected legacy repo with public branch to fail schema bootstrap"),
1678            Err(err) => err,
1679        };
1680        let message = err.to_string();
1681        assert!(message.contains("public branches block schema evolution entirely"));
1682    }
1683
1684    #[tokio::test]
1685    async fn test_long_lived_handle_rejects_schema_source_drift() {
1686        let dir = tempfile::tempdir().unwrap();
1687        let uri = dir.path().to_str().unwrap();
1688        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1689
1690        let drifted = TEST_SCHEMA.replace("age: I32?", "age: I64?");
1691        fs::write(dir.path().join("_schema.pg"), drifted).unwrap();
1692
1693        let err = match db.snapshot_of(ReadTarget::branch("main")).await {
1694            Ok(_) => panic!("expected schema source drift to be rejected"),
1695            Err(err) => err,
1696        };
1697        assert!(
1698            err.to_string()
1699                .contains("current _schema.pg no longer matches the accepted compiled schema")
1700        );
1701    }
1702
1703    #[tokio::test]
1704    async fn test_long_lived_handle_rejects_schema_ir_drift() {
1705        let dir = tempfile::tempdir().unwrap();
1706        let uri = dir.path().to_str().unwrap();
1707        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1708
1709        fs::write(dir.path().join("_schema.ir.json"), "{not valid json").unwrap();
1710
1711        let err = match db.snapshot_of(ReadTarget::branch("main")).await {
1712            Ok(_) => panic!("expected schema IR drift to be rejected"),
1713            Err(err) => err,
1714        };
1715        assert!(
1716            err.to_string()
1717                .contains("accepted compiled schema contract in _schema.ir.json is invalid")
1718        );
1719    }
1720
1721    #[tokio::test]
1722    async fn test_long_lived_handle_rejects_ir_and_source_updates_without_state_update() {
1723        let dir = tempfile::tempdir().unwrap();
1724        let uri = dir.path().to_str().unwrap();
1725        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1726
1727        let drifted = TEST_SCHEMA.replace("age: I32?", "age: I64?");
1728        let drifted_ir = read_schema_ir_from_source(&drifted).unwrap();
1729        let drifted_ir_json = omnigraph_compiler::schema_ir_pretty_json(&drifted_ir).unwrap();
1730        fs::write(dir.path().join("_schema.pg"), drifted).unwrap();
1731        fs::write(dir.path().join("_schema.ir.json"), drifted_ir_json).unwrap();
1732
1733        let err = match db.snapshot_of(ReadTarget::branch("main")).await {
1734            Ok(_) => panic!("expected schema state mismatch to be rejected"),
1735            Err(err) => err,
1736        };
1737        assert!(
1738            err.to_string()
1739                .contains("accepted compiled schema does not match the recorded schema state")
1740        );
1741    }
1742
1743    #[tokio::test]
1744    async fn test_comment_only_schema_edit_keeps_schema_state_valid() {
1745        let dir = tempfile::tempdir().unwrap();
1746        let uri = dir.path().to_str().unwrap();
1747        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1748
1749        let commented = format!("// comment-only drift\n{}", TEST_SCHEMA);
1750        fs::write(dir.path().join("_schema.pg"), commented).unwrap();
1751
1752        let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
1753        assert!(snapshot.entry("node:Person").is_some());
1754    }
1755
1756    #[tokio::test]
1757    async fn test_plan_schema_reports_supported_additive_change() {
1758        let dir = tempfile::tempdir().unwrap();
1759        let uri = dir.path().to_str().unwrap();
1760        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1761
1762        let desired = TEST_SCHEMA.replace(
1763            "    age: I32?\n}",
1764            "    age: I32?\n    nickname: String?\n}",
1765        );
1766
1767        let plan = db.plan_schema(&desired).await.unwrap();
1768        assert!(plan.supported);
1769        assert!(plan.steps.iter().any(|step| matches!(
1770            step,
1771            SchemaMigrationStep::AddProperty {
1772                type_kind: SchemaTypeKind::Node,
1773                type_name,
1774                property_name,
1775                ..
1776            } if type_name == "Person" && property_name == "nickname"
1777        )));
1778    }
1779
1780    #[tokio::test]
1781    async fn test_plan_schema_rejects_when_schema_contract_has_drifted() {
1782        let dir = tempfile::tempdir().unwrap();
1783        let uri = dir.path().to_str().unwrap();
1784        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1785
1786        let drifted = TEST_SCHEMA.replace("age: I32?", "age: I64?");
1787        fs::write(dir.path().join("_schema.pg"), drifted).unwrap();
1788
1789        let err = db.plan_schema(TEST_SCHEMA).await.unwrap_err();
1790        assert!(
1791            err.to_string()
1792                .contains("current _schema.pg no longer matches the accepted compiled schema")
1793        );
1794    }
1795
1796    async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1797        let snapshot = db.snapshot();
1798        let ds = snapshot.open(table_key).await.unwrap();
1799        let batches = db.table_store().scan_batches(&ds).await.unwrap();
1800        batches
1801            .into_iter()
1802            .flat_map(|batch| {
1803                (0..batch.num_rows())
1804                    .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1805                    .collect::<Vec<_>>()
1806            })
1807            .collect()
1808    }
1809
1810    async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1811        let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
1812        let schema: Arc<Schema> = Arc::new(ds.schema().into());
1813        let columns: Vec<Arc<dyn Array>> = schema
1814            .fields()
1815            .iter()
1816            .map(|field| match field.name().as_str() {
1817                "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1818                "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1819                "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1820                _ => new_null_array(field.data_type(), 1),
1821            })
1822            .collect();
1823        let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1824        let state = db
1825            .table_store()
1826            .append_batch(&full_path, &mut ds, batch)
1827            .await
1828            .unwrap();
1829        db.commit_updates(&[crate::db::SubTableUpdate {
1830            table_key: "node:Person".to_string(),
1831            table_version: state.version,
1832            table_branch,
1833            row_count: state.row_count,
1834            version_metadata: state.version_metadata,
1835        }])
1836        .await
1837        .unwrap();
1838    }
1839
1840    #[tokio::test]
1841    async fn test_apply_schema_noop_returns_not_applied() {
1842        let dir = tempfile::tempdir().unwrap();
1843        let uri = dir.path().to_str().unwrap();
1844        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1845
1846        let result = db.apply_schema(TEST_SCHEMA).await.unwrap();
1847        assert!(result.supported);
1848        assert!(!result.applied);
1849        assert!(result.steps.is_empty());
1850    }
1851
1852    #[tokio::test]
1853    async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1854        let dir = tempfile::tempdir().unwrap();
1855        let uri = dir.path().to_str().unwrap();
1856        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1857        seed_person_row(&mut db, "Alice", Some(30)).await;
1858
1859        let desired = TEST_SCHEMA.replace(
1860            "    age: I32?\n}",
1861            "    age: I32?\n    nickname: String?\n}",
1862        );
1863        let result = db.apply_schema(&desired).await.unwrap();
1864        assert!(result.applied);
1865
1866        let reopened = Omnigraph::open(uri).await.unwrap();
1867        let rows = table_rows_json(&reopened, "node:Person").await;
1868        assert_eq!(rows.len(), 1);
1869        assert_eq!(rows[0]["name"], "Alice");
1870        assert_eq!(rows[0]["age"], 30);
1871        assert!(rows[0]["nickname"].is_null());
1872        assert!(
1873            reopened.catalog().node_types["Person"]
1874                .properties
1875                .contains_key("nickname")
1876        );
1877        assert!(dir.path().join("_schema.pg").exists());
1878    }
1879
1880    #[tokio::test]
1881    async fn test_apply_schema_renames_property_and_preserves_values() {
1882        let dir = tempfile::tempdir().unwrap();
1883        let uri = dir.path().to_str().unwrap();
1884        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1885        seed_person_row(&mut db, "Alice", Some(30)).await;
1886
1887        let desired = TEST_SCHEMA.replace(
1888            "    age: I32?\n}",
1889            "    years: I32? @rename_from(\"age\")\n}",
1890        );
1891        db.apply_schema(&desired).await.unwrap();
1892
1893        let reopened = Omnigraph::open(uri).await.unwrap();
1894        let rows = table_rows_json(&reopened, "node:Person").await;
1895        assert_eq!(rows[0]["name"], "Alice");
1896        assert_eq!(rows[0]["years"], 30);
1897        assert!(rows[0].get("age").is_none());
1898    }
1899
1900    #[tokio::test]
1901    async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1902        let dir = tempfile::tempdir().unwrap();
1903        let uri = dir.path().to_str().unwrap();
1904        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1905        seed_person_row(&mut db, "Alice", Some(30)).await;
1906        let before_version = db.snapshot().version();
1907
1908        let desired = TEST_SCHEMA
1909            .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1910            .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1911            .replace(
1912                "edge WorksAt: Person -> Company",
1913                "edge WorksAt: Human -> Company",
1914            );
1915        db.apply_schema(&desired).await.unwrap();
1916
1917        let head = db.snapshot();
1918        assert!(head.entry("node:Person").is_none());
1919        assert!(head.entry("node:Human").is_some());
1920        let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1921            .await
1922            .unwrap();
1923        assert!(historical.entry("node:Person").is_some());
1924        assert!(historical.entry("node:Human").is_none());
1925    }
1926
1927    #[tokio::test]
1928    async fn test_apply_schema_rejects_when_non_main_branch_exists() {
1929        let dir = tempfile::tempdir().unwrap();
1930        let uri = dir.path().to_str().unwrap();
1931        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1932        db.branch_create("feature").await.unwrap();
1933
1934        let desired = TEST_SCHEMA.replace(
1935            "    age: I32?\n}",
1936            "    age: I32?\n    nickname: String?\n}",
1937        );
1938        let err = db.apply_schema(&desired).await.unwrap_err();
1939        assert!(
1940            err.to_string()
1941                .contains("schema apply requires a repo with only main")
1942        );
1943    }
1944
1945    #[tokio::test]
1946    async fn test_apply_schema_adds_index_for_existing_property() {
1947        let dir = tempfile::tempdir().unwrap();
1948        let uri = dir.path().to_str().unwrap();
1949        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1950
1951        let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1952        db.apply_schema(&desired).await.unwrap();
1953
1954        let snapshot = db.snapshot();
1955        let ds = snapshot.open("node:Person").await.unwrap();
1956        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1957    }
1958
1959    #[tokio::test]
1960    async fn test_apply_schema_rewrite_preserves_existing_indices() {
1961        let dir = tempfile::tempdir().unwrap();
1962        let uri = dir.path().to_str().unwrap();
1963        let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1964        let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1965        seed_person_row(&mut db, "Alice", Some(30)).await;
1966
1967        let desired = initial_schema.replace(
1968            "    age: I32?\n}",
1969            "    age: I32?\n    nickname: String?\n}",
1970        );
1971        db.apply_schema(&desired).await.unwrap();
1972
1973        let snapshot = db.snapshot();
1974        let ds = snapshot.open("node:Person").await.unwrap();
1975        assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1976        assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1977    }
1978
1979    #[tokio::test]
1980    async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1981        let dir = tempfile::tempdir().unwrap();
1982        let uri = dir.path().to_str().unwrap();
1983        let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1984        let mut db = db;
1985        db.coordinator
1986            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1987            .await
1988            .unwrap();
1989
1990        let err = db.open_for_mutation("node:Person").await.unwrap_err();
1991        assert!(
1992            err.to_string()
1993                .contains("write is unavailable while schema apply is in progress")
1994        );
1995    }
1996
1997    #[tokio::test]
1998    async fn test_commit_updates_rejects_while_schema_apply_locked() {
1999        let dir = tempfile::tempdir().unwrap();
2000        let uri = dir.path().to_str().unwrap();
2001        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2002        db.coordinator
2003            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2004            .await
2005            .unwrap();
2006
2007        let err = db.commit_updates(&[]).await.unwrap_err();
2008        assert!(
2009            err.to_string()
2010                .contains("write commit is unavailable while schema apply is in progress")
2011        );
2012    }
2013
2014    #[tokio::test]
2015    async fn test_branch_list_hides_schema_apply_lock_branch() {
2016        let dir = tempfile::tempdir().unwrap();
2017        let uri = dir.path().to_str().unwrap();
2018        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2019        db.coordinator
2020            .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2021            .await
2022            .unwrap();
2023
2024        let branches = db.branch_list().await.unwrap();
2025        assert_eq!(branches, vec!["main".to_string()]);
2026    }
2027
2028    #[tokio::test]
2029    async fn test_apply_schema_unsupported_plan_does_not_advance_manifest() {
2030        let dir = tempfile::tempdir().unwrap();
2031        let uri = dir.path().to_str().unwrap();
2032        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2033        let before_version = db.snapshot().version();
2034
2035        let desired = TEST_SCHEMA.replace("age: I32?", "age: I64?");
2036        let err = db.apply_schema(&desired).await.unwrap_err();
2037        assert!(err.to_string().contains("changing property type"));
2038        assert_eq!(db.snapshot().version(), before_version);
2039    }
2040
2041    #[tokio::test]
2042    async fn test_open_nonexistent_fails() {
2043        let result = Omnigraph::open("/tmp/nonexistent_omnigraph_test_xyz").await;
2044        assert!(result.is_err());
2045    }
2046
2047    #[tokio::test]
2048    async fn test_snapshot_version_is_pinned() {
2049        let dir = tempfile::tempdir().unwrap();
2050        let uri = dir.path().to_str().unwrap();
2051
2052        let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2053
2054        // Take snapshot before any writes
2055        let snap1 = db.snapshot();
2056        let v1 = snap1.version();
2057
2058        // Load data — advances manifest version
2059        crate::loader::load_jsonl(
2060            &mut db,
2061            r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2062            crate::loader::LoadMode::Overwrite,
2063        )
2064        .await
2065        .unwrap();
2066
2067        // Snapshot from handle sees new version
2068        let snap2 = db.snapshot();
2069        assert!(snap2.version() > v1);
2070
2071        // But the old snapshot is still pinned
2072        assert_eq!(snap1.version(), v1);
2073    }
2074}