1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21 DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22 build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod repair;
34mod schema_apply;
35mod table_ops;
36
37pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
38pub use repair::{
39 RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
40};
41pub use schema_apply::SchemaApplyOptions;
42
43use super::commit_graph::GraphCommit;
44use super::manifest::{
45 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
46 table_path_for_table_key,
47};
48use super::schema_state::{
49 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
50 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
51 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
52 write_schema_contract, write_schema_contract_staging,
53};
54use super::{
55 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
56 is_schema_apply_lock_branch,
57};
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum MergeOutcome {
61 AlreadyUpToDate,
62 FastForward,
63 Merged,
64}
65
66#[derive(Debug, Clone)]
67pub struct SchemaApplyResult {
68 pub supported: bool,
69 pub applied: bool,
70 pub manifest_version: u64,
71 pub steps: Vec<SchemaMigrationStep>,
72}
73
74#[derive(Debug, Clone)]
75pub struct SchemaApplyPreview {
76 pub plan: SchemaMigrationPlan,
77 pub catalog: Catalog,
78}
79
80pub struct Omnigraph {
86 root_uri: String,
87 storage: Arc<dyn StorageAdapter>,
88 coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
105 table_store: TableStore,
106 runtime_cache: RuntimeCache,
107 catalog: Arc<ArcSwap<Catalog>>,
112 schema_source: Arc<ArcSwap<String>>,
115 write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
120 merge_exclusive: Arc<tokio::sync::Mutex<()>>,
142 policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum OpenMode {
171 ReadWrite,
173 ReadOnly,
176}
177
178#[derive(Debug, Clone, Copy, Default)]
191pub struct InitOptions {
192 pub force: bool,
195}
196
197impl Omnigraph {
198 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
205 Self::init_with_options(uri, schema_source, InitOptions::default()).await
206 }
207
208 pub async fn init_with_options(
213 uri: &str,
214 schema_source: &str,
215 options: InitOptions,
216 ) -> Result<Self> {
217 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
218 }
219
220 pub(crate) async fn init_with_storage(
221 uri: &str,
222 schema_source: &str,
223 storage: Arc<dyn StorageAdapter>,
224 options: InitOptions,
225 ) -> Result<Self> {
226 let root = normalize_root_uri(uri)?;
227
228 if !options.force {
238 for candidate in [
239 schema_source_uri(&root),
240 schema_ir_uri(&root),
241 schema_state_uri(&root),
242 ] {
243 if storage.exists(&candidate).await? {
244 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
245 }
246 }
247 }
248
249 let schema_ir = read_schema_ir_from_source(schema_source)?;
250 let mut catalog = build_catalog_from_ir(&schema_ir)?;
251 fixup_blob_schemas(&mut catalog);
252
253 let schema_pg_claimed = if options.force {
261 false
262 } else {
263 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
264 if !storage
265 .write_text_if_absent(&schema_path, schema_source)
266 .await?
267 {
268 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
269 }
270 if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
271 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
272 return Err(err);
273 }
274 true
275 };
276
277 let coordinator = match init_storage_phase(
292 &root,
293 schema_source,
294 &schema_ir,
295 &catalog,
296 &storage,
297 !schema_pg_claimed,
298 )
299 .await
300 {
301 Ok(coordinator) => coordinator,
302 Err(err) => {
303 if schema_pg_claimed || options.force {
304 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
305 }
306 return Err(err);
307 }
308 };
309
310 Ok(Self {
311 root_uri: root.clone(),
312 storage,
313 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
314 table_store: TableStore::new(&root),
315 runtime_cache: RuntimeCache::default(),
316 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
317 schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
318 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
319 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
320 policy: None,
321 })
322 }
323
324 pub async fn open(uri: &str) -> Result<Self> {
329 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
330 }
331
332 pub async fn open_read_only(uri: &str) -> Result<Self> {
335 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
336 }
337
338 pub(crate) async fn open_with_storage(
341 uri: &str,
342 storage: Arc<dyn StorageAdapter>,
343 ) -> Result<Self> {
344 Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
345 }
346
347 pub(crate) async fn open_with_storage_and_mode(
348 uri: &str,
349 storage: Arc<dyn StorageAdapter>,
350 mode: OpenMode,
351 ) -> Result<Self> {
352 let root = normalize_root_uri(uri)?;
353 if matches!(mode, OpenMode::ReadWrite) {
361 crate::db::manifest::migrate_on_open(&root).await?;
362 }
363 let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
366 if matches!(mode, OpenMode::ReadWrite) {
376 let schema_state_recovery =
377 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
378 .await?;
379 crate::db::manifest::recover_manifest_drift(
385 &root,
386 Arc::clone(&storage),
387 &mut coordinator,
388 crate::db::manifest::RecoveryMode::Full,
389 schema_state_recovery,
390 )
391 .await?;
392 }
393 let schema_path = schema_source_uri(&root);
395 let schema_source = storage.read_text(&schema_path).await?;
396 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
397 let branches = coordinator.branch_list().await?;
398 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
399 &root,
400 Arc::clone(&storage),
401 &branches,
402 ¤t_source_ir,
403 )
404 .await?;
405 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
406 fixup_blob_schemas(&mut catalog);
407
408 Ok(Self {
409 root_uri: root.clone(),
410 storage,
411 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
412 table_store: TableStore::new(&root),
413 runtime_cache: RuntimeCache::default(),
414 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
415 schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
416 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
417 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
418 policy: None,
419 })
420 }
421
422 pub fn catalog(&self) -> Arc<Catalog> {
426 self.catalog.load_full()
427 }
428
429 pub fn schema_source(&self) -> Arc<String> {
431 self.schema_source.load_full()
432 }
433
434 pub(crate) fn store_catalog(&self, catalog: Catalog) {
438 self.catalog.store(Arc::new(catalog));
439 }
440
441 pub(crate) fn store_schema_source(&self, schema_source: String) {
444 self.schema_source.store(Arc::new(schema_source));
445 }
446
447 pub fn uri(&self) -> &str {
448 &self.root_uri
449 }
450
451 pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
462 self.policy = Some(checker);
463 self
464 }
465
466 pub(crate) fn enforce(
478 &self,
479 action: omnigraph_policy::PolicyAction,
480 scope: &omnigraph_policy::ResourceScope,
481 actor: Option<&str>,
482 ) -> Result<()> {
483 let Some(checker) = self.policy.as_ref() else {
484 return Ok(());
485 };
486 let Some(actor) = actor else {
487 return Err(OmniError::Policy(
488 "no actor for engine-layer policy check (policy is configured but the call site \
489 didn't thread an actor through — this is almost certainly a bug, not an \
490 intended bypass)"
491 .to_string(),
492 ));
493 };
494 checker
495 .check(action, scope, actor)
496 .map_err(|err| OmniError::Policy(err.to_string()))
497 }
498
499 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
500 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
501 }
502
503 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
504 self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
505 .await
506 }
507
508 pub async fn plan_schema_with_options(
509 &self,
510 desired_schema_source: &str,
511 options: SchemaApplyOptions,
512 ) -> Result<SchemaMigrationPlan> {
513 schema_apply::plan_schema(self, desired_schema_source, options).await
514 }
515
516 pub async fn preview_schema_apply_with_options(
517 &self,
518 desired_schema_source: &str,
519 options: SchemaApplyOptions,
520 ) -> Result<SchemaApplyPreview> {
521 schema_apply::preview_schema_apply(self, desired_schema_source, options).await
522 }
523
524 pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
525 self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
526 .await
527 }
528
529 pub async fn apply_schema_with_options(
530 &self,
531 desired_schema_source: &str,
532 options: SchemaApplyOptions,
533 ) -> Result<SchemaApplyResult> {
534 self.apply_schema_as(desired_schema_source, options, None)
535 .await
536 }
537
538 pub async fn apply_schema_as(
549 &self,
550 desired_schema_source: &str,
551 options: SchemaApplyOptions,
552 actor: Option<&str>,
553 ) -> Result<SchemaApplyResult> {
554 self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
555 .await
556 }
557
558 pub async fn apply_schema_as_with_catalog_check<F>(
559 &self,
560 desired_schema_source: &str,
561 options: SchemaApplyOptions,
562 actor: Option<&str>,
563 validate_catalog: F,
564 ) -> Result<SchemaApplyResult>
565 where
566 F: FnOnce(&Catalog) -> Result<()>,
567 {
568 schema_apply::apply_schema(
569 self,
570 desired_schema_source,
571 options,
572 actor,
573 validate_catalog,
574 )
575 .await
576 }
577
578 pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
579 schema_apply::ensure_schema_apply_idle(self, operation).await
580 }
581
582 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
583 schema_apply::ensure_schema_apply_not_locked(self, operation).await
584 }
585
586 pub(crate) fn table_store(&self) -> &TableStore {
587 &self.table_store
588 }
589
590 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
599 &self.table_store
600 }
601
602 pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
606 self.storage.as_ref()
607 }
608
609 pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
617 Arc::clone(&self.write_queue)
618 }
619
620 pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
626 Arc::clone(&self.merge_exclusive)
627 }
628
629 pub(crate) fn root_uri(&self) -> &str {
632 &self.root_uri
633 }
634
635 pub(crate) async fn open_coordinator_for_branch(
636 &self,
637 branch: Option<&str>,
638 ) -> Result<GraphCoordinator> {
639 match branch {
640 Some(branch) => {
641 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
642 }
643 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
644 }
645 }
646
647 pub(crate) async fn swap_coordinator_for_branch(
648 &self,
649 branch: Option<&str>,
650 ) -> Result<GraphCoordinator> {
651 let next = self.open_coordinator_for_branch(branch).await?;
652 let mut coord = self.coordinator.write().await;
653 Ok(std::mem::replace(&mut *coord, next))
654 }
655
656 pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
657 *self.coordinator.write().await = coordinator;
658 }
659
660 pub(crate) async fn resolved_branch_target(
661 &self,
662 branch: Option<&str>,
663 ) -> Result<ResolvedTarget> {
664 self.ensure_schema_state_valid().await?;
665 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
666 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
667 let coord = self.coordinator.read().await;
668 if normalized.as_deref() == coord.current_branch() {
669 let snapshot_id = coord
670 .head_commit_id()
671 .await?
672 .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
673 return Ok(ResolvedTarget {
674 requested,
675 branch: coord.current_branch().map(str::to_string),
676 snapshot_id,
677 snapshot: coord.snapshot(),
678 });
679 }
680 coord.resolve_target(&requested).await
681 }
682
683 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
684 self.resolved_branch_target(branch)
685 .await
686 .map(|resolved| resolved.snapshot)
687 }
688
689 pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
690 self.ensure_schema_state_valid().await?;
691 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
692 let coord = self.coordinator.read().await;
693 coord
694 .resolve_target(&requested)
695 .await
696 .map(|resolved| resolved.snapshot)
697 }
698
699 pub(crate) async fn version(&self) -> u64 {
700 self.coordinator.read().await.version()
701 }
702
703 pub(crate) async fn snapshot(&self) -> Snapshot {
705 self.coordinator.read().await.snapshot()
706 }
707
708 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
709 self.resolved_target(target)
710 .await
711 .map(|resolved| resolved.snapshot)
712 }
713
714 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
715 self.snapshot_of(target)
716 .await
717 .map(|snapshot| snapshot.version())
718 }
719
720 pub async fn resolved_branch_of(
721 &self,
722 target: impl Into<ReadTarget>,
723 ) -> Result<Option<String>> {
724 self.resolved_target(target)
725 .await
726 .map(|resolved| resolved.branch)
727 }
728
729 pub async fn sync_branch(&self, branch: &str) -> Result<()> {
731 self.ensure_schema_state_valid().await?;
732 let branch = normalize_branch_name(branch)?;
733 let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
734 *self.coordinator.write().await = next;
735 self.runtime_cache.invalidate_all().await;
736 Ok(())
737 }
738
739 pub async fn refresh(&self) -> Result<()> {
772 {
779 let mut coord = self.coordinator.write().await;
780 coord.refresh().await?;
781 let schema_state_recovery = recover_schema_state_files(
782 &self.root_uri,
783 Arc::clone(&self.storage),
784 &coord.snapshot(),
785 )
786 .await?;
787 crate::db::manifest::recover_manifest_drift(
788 &self.root_uri,
789 Arc::clone(&self.storage),
790 &mut *coord,
791 crate::db::manifest::RecoveryMode::RollForwardOnly,
792 schema_state_recovery,
793 )
794 .await?;
795 } self.reload_schema_if_source_changed().await?;
797 self.runtime_cache.invalidate_all().await;
798 Ok(())
799 }
800
801 async fn reload_schema_if_source_changed(&self) -> Result<()> {
802 let schema_path = schema_source_uri(&self.root_uri);
803 let schema_source = self.storage.read_text(&schema_path).await?;
804 if schema_source == *self.schema_source.load_full() {
805 return Ok(());
806 }
807 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
808 let branches = self.coordinator.read().await.branch_list().await?;
809 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
810 &self.root_uri,
811 Arc::clone(&self.storage),
812 &branches,
813 ¤t_source_ir,
814 )
815 .await?;
816 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
817 fixup_blob_schemas(&mut catalog);
818 self.store_schema_source(schema_source);
819 self.store_catalog(catalog);
820 Ok(())
821 }
822
823 pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
831 self.coordinator.write().await.refresh().await?;
832 self.runtime_cache.invalidate_all().await;
833 Ok(())
834 }
835
836 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
837 self.ensure_schema_state_valid().await?;
838 self.coordinator
839 .read()
840 .await
841 .resolve_snapshot_id(branch)
842 .await
843 }
844
845 pub(crate) async fn resolved_target(
846 &self,
847 target: impl Into<ReadTarget>,
848 ) -> Result<ResolvedTarget> {
849 self.ensure_schema_state_valid().await?;
850 self.coordinator
851 .read()
852 .await
853 .resolve_target(&target.into())
854 .await
855 }
856
857 pub async fn diff_between(
860 &self,
861 from: impl Into<ReadTarget>,
862 to: impl Into<ReadTarget>,
863 filter: &crate::changes::ChangeFilter,
864 ) -> Result<crate::changes::ChangeSet> {
865 let from_resolved = self.resolved_target(from).await?;
866 let to_resolved = self.resolved_target(to).await?;
867 crate::changes::diff_snapshots(
868 self.uri(),
869 &from_resolved.snapshot,
870 &to_resolved.snapshot,
871 filter,
872 to_resolved.branch.clone().or(from_resolved.branch.clone()),
873 )
874 .await
875 }
876
877 pub async fn diff_commits(
880 &self,
881 from_commit_id: &str,
882 to_commit_id: &str,
883 filter: &crate::changes::ChangeFilter,
884 ) -> Result<crate::changes::ChangeSet> {
885 let coord = self.coordinator.read().await;
886 let from_commit = coord
887 .resolve_commit(&SnapshotId::new(from_commit_id))
888 .await?;
889 let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
890 let from_snap = coord
891 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
892 from_commit.graph_commit_id.clone(),
893 )))
894 .await?;
895 let to_snap = coord
896 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
897 to_commit.graph_commit_id.clone(),
898 )))
899 .await?;
900 drop(coord);
901 crate::changes::diff_snapshots(
902 self.uri(),
903 &from_snap.snapshot,
904 &to_snap.snapshot,
905 filter,
906 to_snap.branch.clone().or(from_snap.branch.clone()),
907 )
908 .await
909 }
910
911 pub async fn entity_at_target(
912 &self,
913 target: impl Into<ReadTarget>,
914 table_key: &str,
915 id: &str,
916 ) -> Result<Option<serde_json::Value>> {
917 export::entity_at_target(self, target, table_key, id).await
918 }
919
920 pub async fn entity_at(
922 &self,
923 table_key: &str,
924 id: &str,
925 version: u64,
926 ) -> Result<Option<serde_json::Value>> {
927 export::entity_at(self, table_key, id, version).await
928 }
929
930 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
932 self.ensure_schema_state_valid().await?;
933 self.coordinator
934 .read()
935 .await
936 .snapshot_at_version(version)
937 .await
938 }
939
940 pub async fn export_jsonl(
941 &self,
942 branch: &str,
943 type_names: &[String],
944 table_keys: &[String],
945 ) -> Result<String> {
946 export::export_jsonl(self, branch, type_names, table_keys).await
947 }
948
949 pub async fn export_jsonl_to_writer<W: Write>(
950 &self,
951 branch: &str,
952 type_names: &[String],
953 table_keys: &[String],
954 writer: &mut W,
955 ) -> Result<()> {
956 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
957 }
958
959 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
963 table_ops::graph_index(self).await
964 }
965
966 pub(crate) async fn graph_index_for_resolved(
967 &self,
968 resolved: &ResolvedTarget,
969 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
970 table_ops::graph_index_for_resolved(self, resolved).await
971 }
972
973 pub async fn ensure_indices(&self) -> Result<()> {
986 table_ops::ensure_indices(self).await
987 }
988
989 pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
990 table_ops::ensure_indices_on(self, branch).await
991 }
992
993 #[cfg(feature = "failpoints")]
994 #[doc(hidden)]
995 pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
996 &mut self,
997 branch: &str,
998 table_key: &str,
999 table_branch: Option<&str>,
1000 ) -> Result<u64> {
1001 table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
1002 self,
1003 branch,
1004 table_key,
1005 table_branch,
1006 )
1007 .await
1008 }
1009
1010 pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
1013 optimize::optimize_all_tables(self).await
1014 }
1015
1016 pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
1020 repair::repair_all_tables(self, options).await
1021 }
1022
1023 pub async fn cleanup(
1027 &mut self,
1028 options: optimize::CleanupPolicyOptions,
1029 ) -> Result<Vec<optimize::TableCleanupStats>> {
1030 optimize::cleanup_all_tables(self, options).await
1031 }
1032
1033 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1043 self.ensure_schema_state_valid().await?;
1044 let catalog = self.catalog();
1045 let node_type = catalog
1046 .node_types
1047 .get(type_name)
1048 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1049 if !node_type.blob_properties.contains(property) {
1050 return Err(OmniError::manifest(format!(
1051 "property '{}' on type '{}' is not a Blob",
1052 property, type_name
1053 )));
1054 }
1055
1056 let snapshot = self.snapshot().await;
1057 let table_key = format!("node:{}", type_name);
1058 let ds = snapshot.open(&table_key).await?;
1059
1060 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1061 let row_id = self
1062 .table_store
1063 .first_row_id_for_filter(&ds, &filter_sql)
1064 .await?
1065 .ok_or_else(|| {
1066 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1067 })?;
1068
1069 let ds = Arc::new(ds);
1071 let mut blobs = ds
1072 .take_blobs(&[row_id], property)
1073 .await
1074 .map_err(|e| OmniError::Lance(e.to_string()))?;
1075
1076 blobs.pop().ok_or_else(|| {
1077 OmniError::manifest(format!(
1078 "blob '{}' on {} '{}' returned no data",
1079 property, type_name, id
1080 ))
1081 })
1082 }
1083
1084 pub(crate) async fn active_branch(&self) -> Option<String> {
1085 self.coordinator
1086 .read()
1087 .await
1088 .current_branch()
1089 .map(str::to_string)
1090 }
1091
1092 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1093 let descendants = self
1094 .coordinator
1095 .read()
1096 .await
1097 .branch_descendants(branch)
1098 .await?;
1099 if let Some(descendant) = descendants.first() {
1100 return Err(OmniError::manifest_conflict(format!(
1101 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1102 branch, descendant
1103 )));
1104 }
1105
1106 for other_branch in branches
1107 .iter()
1108 .filter(|candidate| candidate.as_str() != branch)
1109 {
1110 let snapshot = self
1111 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1112 .await?;
1113 if snapshot
1114 .entries()
1115 .any(|entry| entry.table_branch.as_deref() == Some(branch))
1116 {
1117 return Err(OmniError::manifest_conflict(format!(
1118 "cannot delete branch '{}' because branch '{}' still depends on it",
1119 branch, other_branch
1120 )));
1121 }
1122 }
1123
1124 Ok(())
1125 }
1126
1127 async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1135 let mut seen_paths = HashSet::new();
1136 let mut cleanup_targets = owned_tables
1137 .iter()
1138 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1139 .cloned()
1140 .collect::<Vec<_>>();
1141 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1142
1143 for (table_key, table_path) in cleanup_targets {
1144 let dataset_uri = self.table_store.dataset_uri(&table_path);
1145 let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
1146 {
1147 Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await,
1148 Err(injected) => Err(injected),
1149 };
1150 if let Err(err) = outcome {
1151 tracing::warn!(
1152 target: "omnigraph::branch_delete::cleanup",
1153 branch = %branch,
1154 table = %table_key,
1155 error = %err,
1156 "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1157 );
1158 }
1159 }
1160 }
1161
1162 async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1163 let active = self
1164 .coordinator
1165 .read()
1166 .await
1167 .current_branch()
1168 .map(str::to_string);
1169 if active.as_deref() == Some(branch) {
1170 return Err(OmniError::manifest_conflict(format!(
1171 "cannot delete currently active branch '{}'",
1172 branch
1173 )));
1174 }
1175
1176 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1177 let owned_tables = branch_snapshot
1178 .entries()
1179 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1180 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1181 .collect::<Vec<_>>();
1182
1183 self.coordinator.write().await.branch_delete(branch).await?;
1185 self.cleanup_deleted_branch_tables(branch, &owned_tables)
1187 .await;
1188 Ok(())
1189 }
1190
1191 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1192 normalize_branch_name(branch)
1193 }
1194
1195 pub(crate) async fn head_commit_id_for_branch(
1196 &self,
1197 branch: Option<&str>,
1198 ) -> Result<Option<String>> {
1199 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1200 coordinator.ensure_commit_graph_initialized().await?;
1201 coordinator
1202 .head_commit_id()
1203 .await
1204 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1205 }
1206
1207 pub async fn branch_create(&self, name: &str) -> Result<()> {
1208 self.branch_create_as(name, None).await
1209 }
1210
1211 pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1219 self.enforce(
1220 omnigraph_policy::PolicyAction::BranchCreate,
1221 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1222 actor,
1223 )?;
1224 self.ensure_schema_state_valid().await?;
1225 self.ensure_schema_apply_idle("branch_create").await?;
1226 ensure_public_branch_ref(name, "branch_create")?;
1227 self.coordinator.write().await.branch_create(name).await
1228 }
1229
1230 pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1231 self.branch_create_from_as(from, name, None).await
1232 }
1233
1234 pub async fn branch_create_from_as(
1246 &self,
1247 from: impl Into<ReadTarget>,
1248 name: &str,
1249 actor: Option<&str>,
1250 ) -> Result<()> {
1251 let target = from.into();
1252 let source_branch = match &target {
1253 ReadTarget::Branch(b) => b.clone(),
1254 _ => "<snapshot>".to_string(),
1255 };
1256 self.enforce(
1257 omnigraph_policy::PolicyAction::BranchCreate,
1258 &omnigraph_policy::ResourceScope::BranchTransition {
1259 source: source_branch,
1260 target: name.to_string(),
1261 },
1262 actor,
1263 )?;
1264 self.ensure_schema_apply_idle("branch_create_from").await?;
1265 self.branch_create_from_impl(target, name, false).await
1266 }
1267
1268 async fn branch_create_from_impl(
1269 &self,
1270 from: impl Into<ReadTarget>,
1271 name: &str,
1272 allow_internal_refs: bool,
1273 ) -> Result<()> {
1274 let target = from.into();
1275 let ReadTarget::Branch(branch_name) = target else {
1276 return Err(OmniError::manifest(
1277 "branch creation from pinned snapshots is not supported yet".to_string(),
1278 ));
1279 };
1280 if !allow_internal_refs {
1281 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1282 ensure_public_branch_ref(name, "branch_create_from")?;
1283 }
1284 let branch = normalize_branch_name(&branch_name)?;
1285 let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1302 source_coord.branch_create(name).await
1303 }
1304
1305 pub async fn branch_list(&self) -> Result<Vec<String>> {
1306 self.ensure_schema_state_valid().await?;
1307 self.coordinator.read().await.branch_list().await
1308 }
1309
1310 pub async fn branch_delete(&self, name: &str) -> Result<()> {
1311 self.branch_delete_as(name, None).await
1312 }
1313
1314 pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1322 self.enforce(
1323 omnigraph_policy::PolicyAction::BranchDelete,
1324 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1325 actor,
1326 )?;
1327 self.ensure_schema_state_valid().await?;
1328 self.ensure_schema_apply_idle("branch_delete").await?;
1329 ensure_public_branch_ref(name, "branch_delete")?;
1330 self.refresh().await?;
1331 let branch = normalize_branch_name(name)?
1332 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1333 let branches = self.coordinator.read().await.branch_list().await?;
1334 if !branches.iter().any(|candidate| candidate == &branch) {
1335 return Err(OmniError::manifest_not_found(format!(
1336 "branch '{}' not found",
1337 branch
1338 )));
1339 }
1340
1341 self.ensure_branch_delete_safe(&branch, &branches).await?;
1342 self.delete_branch_storage_only(&branch).await
1343 }
1344
1345 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1346 self.ensure_schema_state_valid().await?;
1347 self.coordinator
1348 .read()
1349 .await
1350 .resolve_commit(&SnapshotId::new(commit_id))
1351 .await
1352 }
1353
1354 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1355 self.ensure_schema_state_valid().await?;
1356 let branch = match branch {
1357 Some(branch) => normalize_branch_name(branch)?,
1358 None => None,
1359 };
1360 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1361 coordinator.list_commits().await
1362 }
1363
1364 pub(crate) async fn open_for_mutation(
1370 &self,
1371 table_key: &str,
1372 op_kind: crate::db::MutationOpKind,
1373 ) -> Result<(Dataset, String, Option<String>)> {
1374 table_ops::open_for_mutation(self, table_key, op_kind).await
1375 }
1376
1377 pub(crate) async fn open_for_mutation_on_branch(
1378 &self,
1379 branch: Option<&str>,
1380 table_key: &str,
1381 op_kind: crate::db::MutationOpKind,
1382 ) -> Result<(Dataset, String, Option<String>)> {
1383 table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1384 }
1385
1386 pub(crate) async fn fork_dataset_from_entry_state(
1387 &self,
1388 table_key: &str,
1389 full_path: &str,
1390 source_branch: Option<&str>,
1391 source_version: u64,
1392 active_branch: &str,
1393 ) -> Result<Dataset> {
1394 table_ops::fork_dataset_from_entry_state(
1395 self,
1396 table_key,
1397 full_path,
1398 source_branch,
1399 source_version,
1400 active_branch,
1401 )
1402 .await
1403 }
1404
1405 pub(crate) async fn reopen_for_mutation(
1406 &self,
1407 table_key: &str,
1408 full_path: &str,
1409 table_branch: Option<&str>,
1410 expected_version: u64,
1411 op_kind: crate::db::MutationOpKind,
1412 ) -> Result<Dataset> {
1413 table_ops::reopen_for_mutation(
1414 self,
1415 table_key,
1416 full_path,
1417 table_branch,
1418 expected_version,
1419 op_kind,
1420 )
1421 .await
1422 }
1423
1424 pub(crate) async fn open_dataset_at_state(
1425 &self,
1426 table_path: &str,
1427 table_branch: Option<&str>,
1428 table_version: u64,
1429 ) -> Result<Dataset> {
1430 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1431 }
1432
1433 pub(crate) async fn build_indices_on_dataset(
1434 &self,
1435 table_key: &str,
1436 ds: &mut Dataset,
1437 ) -> Result<()> {
1438 table_ops::build_indices_on_dataset(self, table_key, ds).await
1439 }
1440
1441 pub(crate) async fn build_indices_on_dataset_for_catalog(
1442 &self,
1443 catalog: &Catalog,
1444 table_key: &str,
1445 ds: &mut Dataset,
1446 ) -> Result<()> {
1447 table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1448 }
1449
1450 #[cfg(test)]
1453 pub(crate) async fn commit_updates(
1454 &mut self,
1455 updates: &[crate::db::SubTableUpdate],
1456 ) -> Result<u64> {
1457 table_ops::commit_updates(self, updates).await
1458 }
1459
1460 pub(crate) async fn commit_manifest_updates(
1461 &self,
1462 updates: &[crate::db::SubTableUpdate],
1463 ) -> Result<u64> {
1464 table_ops::commit_manifest_updates(self, updates).await
1465 }
1466
1467 pub(crate) async fn record_merge_commit(
1468 &self,
1469 manifest_version: u64,
1470 parent_commit_id: &str,
1471 merged_parent_commit_id: &str,
1472 actor_id: Option<&str>,
1473 ) -> Result<String> {
1474 table_ops::record_merge_commit(
1475 self,
1476 manifest_version,
1477 parent_commit_id,
1478 merged_parent_commit_id,
1479 actor_id,
1480 )
1481 .await
1482 }
1483
1484 pub(crate) async fn commit_updates_on_branch_with_expected(
1485 &self,
1486 branch: Option<&str>,
1487 updates: &[crate::db::SubTableUpdate],
1488 expected_table_versions: &std::collections::HashMap<String, u64>,
1489 actor_id: Option<&str>,
1490 ) -> Result<u64> {
1491 table_ops::commit_updates_on_branch_with_expected(
1492 self,
1493 branch,
1494 updates,
1495 expected_table_versions,
1496 actor_id,
1497 )
1498 .await
1499 }
1500
1501 pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1502 table_ops::ensure_commit_graph_initialized(self).await
1503 }
1504
1505 pub(crate) async fn invalidate_graph_index(&self) {
1507 table_ops::invalidate_graph_index(self).await
1508 }
1509}
1510
1511pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1512 let branch = branch.trim();
1513 if branch.is_empty() {
1514 return Err(OmniError::manifest(
1515 "branch name cannot be empty".to_string(),
1516 ));
1517 }
1518 if branch == "main" {
1519 return Ok(None);
1520 }
1521 Ok(Some(branch.to_string()))
1522}
1523
1524pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1525 if is_internal_system_branch(branch) {
1526 return Err(OmniError::manifest(format!(
1527 "{} does not allow internal system ref '{}'",
1528 operation, branch
1529 )));
1530 }
1531 Ok(())
1532}
1533
1534fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1535 if batches.is_empty() {
1536 return Ok(RecordBatch::new_empty(schema));
1537 }
1538 if batches.len() == 1 {
1539 return Ok(batches.into_iter().next().unwrap());
1540 }
1541 let batch_schema = batches[0].schema();
1542 arrow_select::concat::concat_batches(&batch_schema, &batches)
1543 .map_err(|e| OmniError::Lance(e.to_string()))
1544}
1545
1546fn blob_properties_for_table_key<'a>(
1547 catalog: &'a Catalog,
1548 table_key: &str,
1549) -> Result<&'a std::collections::HashSet<String>> {
1550 if let Some(type_name) = table_key.strip_prefix("node:") {
1551 return catalog
1552 .node_types
1553 .get(type_name)
1554 .map(|node_type| &node_type.blob_properties)
1555 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1556 }
1557 if let Some(type_name) = table_key.strip_prefix("edge:") {
1558 return catalog
1559 .edge_types
1560 .get(type_name)
1561 .map(|edge_type| &edge_type.blob_properties)
1562 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1563 }
1564 Err(OmniError::manifest(format!(
1565 "invalid table key '{}'",
1566 table_key
1567 )))
1568}
1569
1570fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1571 if descriptions.is_null(row) {
1572 return Ok(true);
1573 }
1574
1575 let kind = descriptions
1576 .column_by_name("kind")
1577 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1578 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1579 .or_else(|| {
1580 descriptions
1581 .column_by_name("kind")
1582 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1583 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1584 });
1585 let position = descriptions
1586 .column_by_name("position")
1587 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1588 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1589 let size = descriptions
1590 .column_by_name("size")
1591 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1592 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1593 let blob_uri = descriptions
1594 .column_by_name("blob_uri")
1595 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1596 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1597
1598 let Some(kind) = kind else {
1599 return Ok(true);
1600 };
1601 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1602 if kind != BlobKind::Inline {
1603 return Ok(false);
1604 }
1605
1606 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1607}
1608
1609fn fixup_blob_schemas(catalog: &mut Catalog) {
1615 for node_type in catalog.node_types.values_mut() {
1616 if node_type.blob_properties.is_empty() {
1617 continue;
1618 }
1619 let fields: Vec<Field> = node_type
1620 .arrow_schema
1621 .fields()
1622 .iter()
1623 .map(|f| {
1624 if node_type.blob_properties.contains(f.name()) {
1625 blob_field(f.name(), f.is_nullable())
1626 } else {
1627 f.as_ref().clone()
1628 }
1629 })
1630 .collect();
1631 node_type.arrow_schema = Arc::new(Schema::new(fields));
1632 }
1633 for edge_type in catalog.edge_types.values_mut() {
1634 if edge_type.blob_properties.is_empty() {
1635 continue;
1636 }
1637 let fields: Vec<Field> = edge_type
1638 .arrow_schema
1639 .fields()
1640 .iter()
1641 .map(|f| {
1642 if edge_type.blob_properties.contains(f.name()) {
1643 blob_field(f.name(), f.is_nullable())
1644 } else {
1645 f.as_ref().clone()
1646 }
1647 })
1648 .collect();
1649 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1650 }
1651}
1652
1653fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1654 let schema_ast = parse_schema(schema_source)?;
1655 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1656}
1657
1658async fn init_storage_phase(
1673 root: &str,
1674 schema_source: &str,
1675 schema_ir: &SchemaIR,
1676 catalog: &Catalog,
1677 storage: &Arc<dyn StorageAdapter>,
1678 write_schema_pg: bool,
1679) -> Result<GraphCoordinator> {
1680 if write_schema_pg {
1681 let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1682 storage.write_text(&schema_path, schema_source).await?;
1683 crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1684 }
1685
1686 write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1687 crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1688
1689 let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1690 crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1691
1692 Ok(coordinator)
1693}
1694
1695async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1707 for uri in [
1708 schema_source_uri(root),
1709 schema_ir_uri(root),
1710 schema_state_uri(root),
1711 ] {
1712 if let Err(err) = storage.delete(&uri).await {
1713 tracing::warn!(
1714 target: "omnigraph::init::cleanup",
1715 uri = %uri,
1716 error = %err,
1717 "init failed; best-effort cleanup could not delete artifact",
1718 );
1719 }
1720 }
1721}
1722
1723fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1724 match type_kind {
1725 SchemaTypeKind::Node => format!("node:{}", name),
1726 SchemaTypeKind::Edge => format!("edge:{}", name),
1727 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1728 }
1729}
1730
1731fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1732 if let Some(type_name) = table_key.strip_prefix("node:") {
1733 let node_type: &NodeType = catalog
1734 .node_types
1735 .get(type_name)
1736 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1737 return Ok(node_type.arrow_schema.clone());
1738 }
1739 if let Some(type_name) = table_key.strip_prefix("edge:") {
1740 let edge_type: &EdgeType = catalog
1741 .edge_types
1742 .get(type_name)
1743 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1744 return Ok(edge_type.arrow_schema.clone());
1745 }
1746 Err(OmniError::manifest(format!(
1747 "invalid table key '{}'",
1748 table_key
1749 )))
1750}
1751
1752fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1753 let mut obj = serde_json::Map::new();
1754 for (i, field) in batch.schema().fields().iter().enumerate() {
1755 obj.insert(
1756 field.name().clone(),
1757 json_value_from_array(batch.column(i).as_ref(), row)?,
1758 );
1759 }
1760 Ok(serde_json::Value::Object(obj))
1761}
1762
1763fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1764 if array.is_null(row) {
1765 return Ok(serde_json::Value::Null);
1766 }
1767
1768 match array.data_type() {
1769 DataType::Utf8 => Ok(serde_json::Value::String(
1770 array
1771 .as_any()
1772 .downcast_ref::<StringArray>()
1773 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1774 .value(row)
1775 .to_string(),
1776 )),
1777 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1778 array
1779 .as_any()
1780 .downcast_ref::<LargeStringArray>()
1781 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1782 .value(row)
1783 .to_string(),
1784 )),
1785 DataType::Boolean => Ok(serde_json::Value::Bool(
1786 array
1787 .as_any()
1788 .downcast_ref::<BooleanArray>()
1789 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1790 .value(row),
1791 )),
1792 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1793 array
1794 .as_any()
1795 .downcast_ref::<Int32Array>()
1796 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1797 .value(row),
1798 ))),
1799 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1800 array
1801 .as_any()
1802 .downcast_ref::<Int64Array>()
1803 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1804 .value(row),
1805 ))),
1806 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1807 array
1808 .as_any()
1809 .downcast_ref::<UInt32Array>()
1810 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1811 .value(row),
1812 ))),
1813 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1814 array
1815 .as_any()
1816 .downcast_ref::<UInt64Array>()
1817 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1818 .value(row),
1819 ))),
1820 DataType::Float32 => {
1821 let value = array
1822 .as_any()
1823 .downcast_ref::<Float32Array>()
1824 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1825 .value(row) as f64;
1826 Ok(serde_json::Value::Number(
1827 serde_json::Number::from_f64(value).ok_or_else(|| {
1828 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1829 })?,
1830 ))
1831 }
1832 DataType::Float64 => {
1833 let value = array
1834 .as_any()
1835 .downcast_ref::<Float64Array>()
1836 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1837 .value(row);
1838 Ok(serde_json::Value::Number(
1839 serde_json::Number::from_f64(value).ok_or_else(|| {
1840 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1841 })?,
1842 ))
1843 }
1844 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1845 array
1846 .as_any()
1847 .downcast_ref::<Date32Array>()
1848 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1849 .value(row),
1850 ))),
1851 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1852 &base64::engine::general_purpose::STANDARD,
1853 array
1854 .as_any()
1855 .downcast_ref::<BinaryArray>()
1856 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1857 .value(row),
1858 ))),
1859 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1860 &base64::engine::general_purpose::STANDARD,
1861 array
1862 .as_any()
1863 .downcast_ref::<LargeBinaryArray>()
1864 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1865 .value(row),
1866 ))),
1867 DataType::List(_) => {
1868 let list = array
1869 .as_any()
1870 .downcast_ref::<ListArray>()
1871 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1872 let values = list.value(row);
1873 let mut out = Vec::with_capacity(values.len());
1874 for idx in 0..values.len() {
1875 out.push(json_value_from_array(values.as_ref(), idx)?);
1876 }
1877 Ok(serde_json::Value::Array(out))
1878 }
1879 DataType::LargeList(_) => {
1880 let list = array
1881 .as_any()
1882 .downcast_ref::<LargeListArray>()
1883 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1884 let values = list.value(row);
1885 let mut out = Vec::with_capacity(values.len());
1886 for idx in 0..values.len() {
1887 out.push(json_value_from_array(values.as_ref(), idx)?);
1888 }
1889 Ok(serde_json::Value::Array(out))
1890 }
1891 DataType::FixedSizeList(_, _) => {
1892 let list = array
1893 .as_any()
1894 .downcast_ref::<FixedSizeListArray>()
1895 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1896 let values = list.value(row);
1897 let mut out = Vec::with_capacity(values.len());
1898 for idx in 0..values.len() {
1899 out.push(json_value_from_array(values.as_ref(), idx)?);
1900 }
1901 Ok(serde_json::Value::Array(out))
1902 }
1903 DataType::Struct(fields) => {
1904 let struct_array = array
1905 .as_any()
1906 .downcast_ref::<StructArray>()
1907 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1908 let mut obj = serde_json::Map::new();
1909 for (field_idx, field) in fields.iter().enumerate() {
1910 obj.insert(
1911 field.name().clone(),
1912 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1913 );
1914 }
1915 Ok(serde_json::Value::Object(obj))
1916 }
1917 _ => {
1918 let value = arrow_cast::display::array_value_to_string(array, row)
1919 .map_err(|e| OmniError::Lance(e.to_string()))?;
1920 Ok(serde_json::Value::String(value))
1921 }
1922 }
1923}
1924
1925#[cfg(test)]
1926mod tests {
1927 use super::*;
1928 use crate::db::manifest::ManifestCoordinator;
1929 use async_trait::async_trait;
1930 use serde_json::Value;
1931 use std::sync::{Arc, Mutex};
1932
1933 use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1934
1935 const TEST_SCHEMA: &str = r#"
1936node Person {
1937 name: String @key
1938 age: I32?
1939}
1940node Company {
1941 name: String @key
1942}
1943edge Knows: Person -> Person {
1944 since: Date?
1945}
1946edge WorksAt: Person -> Company
1947"#;
1948
1949 #[derive(Debug, Default)]
1950 struct RecordingStorageAdapter {
1951 inner: LocalStorageAdapter,
1952 reads: Mutex<Vec<String>>,
1953 writes: Mutex<Vec<String>>,
1954 exists_checks: Mutex<Vec<String>>,
1955 renames: Mutex<Vec<(String, String)>>,
1956 deletes: Mutex<Vec<String>>,
1957 }
1958
1959 impl RecordingStorageAdapter {
1960 fn reads(&self) -> Vec<String> {
1961 self.reads.lock().unwrap().clone()
1962 }
1963
1964 fn writes(&self) -> Vec<String> {
1965 self.writes.lock().unwrap().clone()
1966 }
1967
1968 fn exists_checks(&self) -> Vec<String> {
1969 self.exists_checks.lock().unwrap().clone()
1970 }
1971 }
1972
1973 #[async_trait]
1974 impl StorageAdapter for RecordingStorageAdapter {
1975 async fn read_text(&self, uri: &str) -> Result<String> {
1976 self.reads.lock().unwrap().push(uri.to_string());
1977 self.inner.read_text(uri).await
1978 }
1979
1980 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1981 self.writes.lock().unwrap().push(uri.to_string());
1982 self.inner.write_text(uri, contents).await
1983 }
1984
1985 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
1986 self.writes.lock().unwrap().push(uri.to_string());
1987 self.inner.write_text_if_absent(uri, contents).await
1988 }
1989
1990 async fn exists(&self, uri: &str) -> Result<bool> {
1991 self.exists_checks.lock().unwrap().push(uri.to_string());
1992 self.inner.exists(uri).await
1993 }
1994
1995 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1996 self.renames
1997 .lock()
1998 .unwrap()
1999 .push((from_uri.to_string(), to_uri.to_string()));
2000 self.inner.rename_text(from_uri, to_uri).await
2001 }
2002
2003 async fn delete(&self, uri: &str) -> Result<()> {
2004 self.deletes.lock().unwrap().push(uri.to_string());
2005 self.inner.delete(uri).await
2006 }
2007
2008 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2009 self.inner.list_dir(dir_uri).await
2010 }
2011 }
2012
2013 #[derive(Debug)]
2014 struct InitRaceStorageAdapter {
2015 inner: LocalStorageAdapter,
2016 root: String,
2017 barrier: Arc<tokio::sync::Barrier>,
2018 }
2019
2020 #[async_trait]
2021 impl StorageAdapter for InitRaceStorageAdapter {
2022 async fn read_text(&self, uri: &str) -> Result<String> {
2023 self.inner.read_text(uri).await
2024 }
2025
2026 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2027 self.inner.write_text(uri, contents).await
2028 }
2029
2030 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2031 self.inner.write_text_if_absent(uri, contents).await
2032 }
2033
2034 async fn exists(&self, uri: &str) -> Result<bool> {
2035 let exists = self.inner.exists(uri).await?;
2036 if uri == schema_state_uri(&self.root) {
2037 self.barrier.wait().await;
2038 }
2039 Ok(exists)
2040 }
2041
2042 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2043 self.inner.rename_text(from_uri, to_uri).await
2044 }
2045
2046 async fn delete(&self, uri: &str) -> Result<()> {
2047 self.inner.delete(uri).await
2048 }
2049
2050 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2051 self.inner.list_dir(dir_uri).await
2052 }
2053 }
2054
2055 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2056 async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2057 let dir = tempfile::tempdir().unwrap();
2058 let uri = dir.path().to_str().unwrap().to_string();
2059 let root = normalize_root_uri(&uri).unwrap();
2060 let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2061 inner: LocalStorageAdapter,
2062 root,
2063 barrier: Arc::new(tokio::sync::Barrier::new(2)),
2064 });
2065
2066 let left = Omnigraph::init_with_storage(
2067 &uri,
2068 TEST_SCHEMA,
2069 Arc::clone(&storage),
2070 InitOptions::default(),
2071 );
2072 let right = Omnigraph::init_with_storage(
2073 &uri,
2074 TEST_SCHEMA,
2075 Arc::clone(&storage),
2076 InitOptions::default(),
2077 );
2078 let (left, right) = tokio::join!(left, right);
2079 let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2080 assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2081
2082 assert!(
2083 dir.path().join("_schema.pg").exists(),
2084 "winning init must leave _schema.pg in place"
2085 );
2086 assert!(
2087 dir.path().join("_schema.ir.json").exists(),
2088 "winning init must leave _schema.ir.json in place"
2089 );
2090 assert!(
2091 dir.path().join("__schema_state.json").exists(),
2092 "winning init must leave __schema_state.json in place"
2093 );
2094 }
2095
2096 #[tokio::test]
2097 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2098 let dir = tempfile::tempdir().unwrap();
2099 let uri = dir.path().to_str().unwrap();
2100 let adapter = Arc::new(RecordingStorageAdapter::default());
2101
2102 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2103 .await
2104 .unwrap();
2105 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2106 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2107 assert!(
2108 adapter
2109 .writes()
2110 .contains(&join_uri(uri, "__schema_state.json"))
2111 );
2112
2113 Omnigraph::open_with_storage(uri, adapter.clone())
2114 .await
2115 .unwrap();
2116 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2117 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2118 assert!(
2119 adapter
2120 .reads()
2121 .contains(&join_uri(uri, "__schema_state.json"))
2122 );
2123 assert!(
2124 adapter
2125 .exists_checks()
2126 .contains(&join_uri(uri, "_schema.ir.json"))
2127 );
2128 assert!(
2129 adapter
2130 .exists_checks()
2131 .contains(&join_uri(uri, "__schema_state.json"))
2132 );
2133 assert!(
2134 adapter
2135 .exists_checks()
2136 .contains(&join_uri(uri, "_graph_commits.lance"))
2137 );
2138 }
2139
2140 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2141 let snapshot = db.snapshot().await;
2142 let ds = snapshot.open(table_key).await.unwrap();
2143 let batches = db.table_store().scan_batches(&ds).await.unwrap();
2144 batches
2145 .into_iter()
2146 .flat_map(|batch| {
2147 (0..batch.num_rows())
2148 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2149 .collect::<Vec<_>>()
2150 })
2151 .collect()
2152 }
2153
2154 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2155 let (mut ds, full_path, table_branch) = db
2156 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2157 .await
2158 .unwrap();
2159 let schema: Arc<Schema> = Arc::new(ds.schema().into());
2160 let columns: Vec<Arc<dyn Array>> = schema
2161 .fields()
2162 .iter()
2163 .map(|field| match field.name().as_str() {
2164 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2165 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2166 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2167 _ => new_null_array(field.data_type(), 1),
2168 })
2169 .collect();
2170 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2171 let state = db
2172 .table_store()
2173 .append_batch(&full_path, &mut ds, batch)
2174 .await
2175 .unwrap();
2176 db.commit_updates(&[crate::db::SubTableUpdate {
2177 table_key: "node:Person".to_string(),
2178 table_version: state.version,
2179 table_branch,
2180 row_count: state.row_count,
2181 version_metadata: state.version_metadata,
2182 }])
2183 .await
2184 .unwrap();
2185 }
2186
2187 #[tokio::test]
2188 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2189 let dir = tempfile::tempdir().unwrap();
2190 let uri = dir.path().to_str().unwrap();
2191 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2192 seed_person_row(&mut db, "Alice", Some(30)).await;
2193
2194 let desired = TEST_SCHEMA.replace(
2195 " age: I32?\n}",
2196 " age: I32?\n nickname: String?\n}",
2197 );
2198 let result = db.apply_schema(&desired).await.unwrap();
2199 assert!(result.applied);
2200
2201 let reopened = Omnigraph::open(uri).await.unwrap();
2202 let rows = table_rows_json(&reopened, "node:Person").await;
2203 assert_eq!(rows.len(), 1);
2204 assert_eq!(rows[0]["name"], "Alice");
2205 assert_eq!(rows[0]["age"], 30);
2206 assert!(rows[0]["nickname"].is_null());
2207 assert!(
2208 reopened.catalog().node_types["Person"]
2209 .properties
2210 .contains_key("nickname")
2211 );
2212 assert!(dir.path().join("_schema.pg").exists());
2213 }
2214
2215 #[tokio::test]
2216 async fn test_apply_schema_renames_property_and_preserves_values() {
2217 let dir = tempfile::tempdir().unwrap();
2218 let uri = dir.path().to_str().unwrap();
2219 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2220 seed_person_row(&mut db, "Alice", Some(30)).await;
2221
2222 let desired = TEST_SCHEMA.replace(
2223 " age: I32?\n}",
2224 " years: I32? @rename_from(\"age\")\n}",
2225 );
2226 db.apply_schema(&desired).await.unwrap();
2227
2228 let reopened = Omnigraph::open(uri).await.unwrap();
2229 let rows = table_rows_json(&reopened, "node:Person").await;
2230 assert_eq!(rows[0]["name"], "Alice");
2231 assert_eq!(rows[0]["years"], 30);
2232 assert!(rows[0].get("age").is_none());
2233 }
2234
2235 #[tokio::test]
2236 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2237 let dir = tempfile::tempdir().unwrap();
2238 let uri = dir.path().to_str().unwrap();
2239 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2240 seed_person_row(&mut db, "Alice", Some(30)).await;
2241 let before_version = db.snapshot().await.version();
2242
2243 let desired = TEST_SCHEMA
2244 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2245 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2246 .replace(
2247 "edge WorksAt: Person -> Company",
2248 "edge WorksAt: Human -> Company",
2249 );
2250 db.apply_schema(&desired).await.unwrap();
2251
2252 let head = db.snapshot().await;
2253 assert!(head.entry("node:Person").is_none());
2254 assert!(head.entry("node:Human").is_some());
2255 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2256 .await
2257 .unwrap();
2258 assert!(historical.entry("node:Person").is_some());
2259 assert!(historical.entry("node:Human").is_none());
2260 }
2261
2262 #[tokio::test]
2263 async fn test_apply_schema_succeeds_after_load() {
2264 let dir = tempfile::tempdir().unwrap();
2271 let uri = dir.path().to_str().unwrap();
2272 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2273
2274 crate::loader::load_jsonl(
2275 &mut db,
2276 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2277 crate::loader::LoadMode::Overwrite,
2278 )
2279 .await
2280 .unwrap();
2281
2282 let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2283 assert!(
2284 !all_branches.iter().any(|b| b.starts_with("__run__")),
2285 "no __run__ branch should exist after publish, got: {:?}",
2286 all_branches
2287 );
2288
2289 let desired = TEST_SCHEMA.replace(
2290 " age: I32?\n}",
2291 " age: I32?\n nickname: String?\n}",
2292 );
2293 let result = db.apply_schema(&desired).await.unwrap();
2294 assert!(result.applied, "schema apply should have applied");
2295 }
2296
2297 #[tokio::test]
2307 async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
2308 let dir = tempfile::tempdir().unwrap();
2309 let uri = dir.path().to_str().unwrap();
2310 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2311
2312 db.branch_create("__run__legacy").await.unwrap();
2315 drop(db);
2316 {
2317 let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
2318 .await
2319 .unwrap();
2320 ds.update_schema_metadata([(
2321 "omnigraph:internal_schema_version".to_string(),
2322 Some("2".to_string()),
2323 )])
2324 .await
2325 .unwrap();
2326 }
2327
2328 let db = Omnigraph::open(uri).await.unwrap();
2331 let branches = db.branch_list().await.unwrap();
2332 assert!(
2333 !branches.iter().any(|b| b.starts_with("__run__")),
2334 "open-time migration must sweep legacy __run__ branches; got {branches:?}",
2335 );
2336
2337 let desired = TEST_SCHEMA.replace(
2340 " age: I32?\n}",
2341 " age: I32?\n nickname: String?\n}",
2342 );
2343 let result = db.apply_schema(&desired).await.unwrap();
2344 assert!(result.applied, "schema apply should have applied");
2345 }
2346
2347 #[tokio::test]
2348 async fn test_apply_schema_adds_index_for_existing_property() {
2349 let dir = tempfile::tempdir().unwrap();
2350 let uri = dir.path().to_str().unwrap();
2351 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2352
2353 let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2354 db.apply_schema(&desired).await.unwrap();
2355
2356 let snapshot = db.snapshot().await;
2357 let ds = snapshot.open("node:Person").await.unwrap();
2358 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2359 }
2360
2361 #[tokio::test]
2362 async fn test_apply_schema_rewrite_preserves_existing_indices() {
2363 let dir = tempfile::tempdir().unwrap();
2364 let uri = dir.path().to_str().unwrap();
2365 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2366 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2367 seed_person_row(&mut db, "Alice", Some(30)).await;
2368
2369 let desired = initial_schema.replace(
2370 " age: I32?\n}",
2371 " age: I32?\n nickname: String?\n}",
2372 );
2373 db.apply_schema(&desired).await.unwrap();
2374
2375 let snapshot = db.snapshot().await;
2376 let ds = snapshot.open("node:Person").await.unwrap();
2377 assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
2378 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2379 }
2380
2381 #[tokio::test]
2382 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2383 let dir = tempfile::tempdir().unwrap();
2384 let uri = dir.path().to_str().unwrap();
2385 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2386 let mut db = db;
2387 db.coordinator
2388 .write()
2389 .await
2390 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2391 .await
2392 .unwrap();
2393
2394 let err = db
2395 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2396 .await
2397 .unwrap_err();
2398 assert!(
2399 err.to_string()
2400 .contains("write is unavailable while schema apply is in progress")
2401 );
2402 }
2403
2404 #[tokio::test]
2405 async fn test_commit_updates_rejects_while_schema_apply_locked() {
2406 let dir = tempfile::tempdir().unwrap();
2407 let uri = dir.path().to_str().unwrap();
2408 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2409 db.coordinator
2410 .write()
2411 .await
2412 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2413 .await
2414 .unwrap();
2415
2416 let err = db.commit_updates(&[]).await.unwrap_err();
2417 assert!(
2418 err.to_string()
2419 .contains("write commit is unavailable while schema apply is in progress")
2420 );
2421 }
2422
2423 #[tokio::test]
2424 async fn test_branch_list_hides_schema_apply_lock_branch() {
2425 let dir = tempfile::tempdir().unwrap();
2426 let uri = dir.path().to_str().unwrap();
2427 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2428 db.coordinator
2429 .write()
2430 .await
2431 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2432 .await
2433 .unwrap();
2434
2435 let branches = db.branch_list().await.unwrap();
2436 assert_eq!(branches, vec!["main".to_string()]);
2437 }
2438}