1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::{PropType, ScalarType};
20use omnigraph_compiler::{
21 DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22 build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::storage_layer::SnapshotHandle;
30use crate::table_store::TableStore;
31
32mod export;
33mod optimize;
34mod repair;
35mod schema_apply;
36mod table_ops;
37
38pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
39pub use repair::{
40 RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
41};
42pub use schema_apply::SchemaApplyOptions;
43pub use table_ops::PendingIndex;
44pub(crate) use table_ops::OpenedForMutation;
45
46use super::commit_graph::GraphCommit;
47use super::manifest::{
48 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
49 table_path_for_table_key,
50};
51use super::schema_state::{
52 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
53 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
54 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
55 write_schema_contract, write_schema_contract_staging,
56};
57use super::{
58 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
59 is_schema_apply_lock_branch,
60};
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum MergeOutcome {
64 AlreadyUpToDate,
65 FastForward,
66 Merged,
67}
68
69#[derive(Debug, Clone)]
70pub struct SchemaApplyResult {
71 pub supported: bool,
72 pub applied: bool,
73 pub manifest_version: u64,
74 pub steps: Vec<SchemaMigrationStep>,
75}
76
77#[derive(Debug, Clone)]
78pub struct SchemaApplyPreview {
79 pub plan: SchemaMigrationPlan,
80 pub catalog: Catalog,
81}
82
83pub(crate) struct WriteTxn {
106 pub(crate) branch: Option<String>,
108 pub(crate) base: Snapshot,
110}
111
112pub struct Omnigraph {
118 root_uri: String,
119 storage: Arc<dyn StorageAdapter>,
120 coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
137 table_store: TableStore,
138 runtime_cache: RuntimeCache,
139 read_caches: Arc<crate::runtime_cache::ReadCaches>,
145 catalog: Arc<ArcSwap<Catalog>>,
150 schema_source: Arc<ArcSwap<String>>,
153 write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
159 merge_exclusive: Arc<tokio::sync::Mutex<()>>,
181 policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
197 embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
203 embedding_config: Option<Arc<crate::embedding::EmbeddingConfig>>,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum OpenMode {
221 ReadWrite,
223 ReadOnly,
226}
227
228#[derive(Debug, Clone, Copy, Default)]
241pub struct InitOptions {
242 pub force: bool,
245}
246
247impl Omnigraph {
248 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
255 Self::init_with_options(uri, schema_source, InitOptions::default()).await
256 }
257
258 pub async fn init_with_options(
263 uri: &str,
264 schema_source: &str,
265 options: InitOptions,
266 ) -> Result<Self> {
267 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
268 }
269
270 pub(crate) async fn init_with_storage(
271 uri: &str,
272 schema_source: &str,
273 storage: Arc<dyn StorageAdapter>,
274 options: InitOptions,
275 ) -> Result<Self> {
276 let root = normalize_root_uri(uri)?;
277
278 if !options.force {
288 for candidate in [
289 schema_source_uri(&root),
290 schema_ir_uri(&root),
291 schema_state_uri(&root),
292 ] {
293 if storage.exists(&candidate).await? {
294 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
295 }
296 }
297 }
298
299 let schema_ir = read_schema_ir_from_source(schema_source)?;
300 let mut catalog = build_catalog_from_ir(&schema_ir)?;
301 fixup_blob_schemas(&mut catalog);
302
303 let schema_pg_claimed = if options.force {
311 false
312 } else {
313 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
314 if !storage
315 .write_text_if_absent(&schema_path, schema_source)
316 .await?
317 {
318 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
319 }
320 if let Err(err) = crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_PG_WRITTEN) {
321 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
322 return Err(err);
323 }
324 true
325 };
326
327 let coordinator = match init_storage_phase(
342 &root,
343 schema_source,
344 &schema_ir,
345 &catalog,
346 &storage,
347 !schema_pg_claimed,
348 )
349 .await
350 {
351 Ok(coordinator) => coordinator,
352 Err(err) => {
353 if schema_pg_claimed || options.force {
354 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
355 }
356 return Err(err);
357 }
358 };
359
360 Ok(Self {
361 root_uri: root.clone(),
362 storage,
363 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
364 table_store: TableStore::new(&root),
365 runtime_cache: RuntimeCache::default(),
366 read_caches: Arc::new(crate::runtime_cache::ReadCaches {
371 session: Arc::new(lance::session::Session::default()),
372 handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
373 }),
374 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
375 schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
376 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
377 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
378 policy: None,
379 embedding: Arc::new(tokio::sync::OnceCell::new()),
380 embedding_config: None,
381 })
382 }
383
384 pub async fn open(uri: &str) -> Result<Self> {
389 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
390 }
391
392 pub async fn open_read_only(uri: &str) -> Result<Self> {
395 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
396 }
397
398 pub async fn open_with_storage(uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
402 Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
403 }
404
405 pub(crate) async fn open_with_storage_and_mode(
406 uri: &str,
407 storage: Arc<dyn StorageAdapter>,
408 mode: OpenMode,
409 ) -> Result<Self> {
410 let root = normalize_root_uri(uri)?;
411 if matches!(mode, OpenMode::ReadWrite) {
419 crate::db::manifest::migrate_on_open(&root).await?;
420 }
421 let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
424 if matches!(mode, OpenMode::ReadWrite) {
434 let schema_state_recovery =
435 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
436 .await?;
437 crate::db::manifest::recover_manifest_drift(
444 &root,
445 Arc::clone(&storage),
446 &mut coordinator,
447 crate::db::manifest::RecoveryMode::Full,
448 schema_state_recovery,
449 )
450 .await?;
451 }
452 let schema_path = schema_source_uri(&root);
454 let schema_source = storage.read_text(&schema_path).await?;
455 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
456 let branches = coordinator.branch_list().await?;
457 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
458 &root,
459 Arc::clone(&storage),
460 &branches,
461 ¤t_source_ir,
462 )
463 .await?;
464 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
465 fixup_blob_schemas(&mut catalog);
466
467 Ok(Self {
468 root_uri: root.clone(),
469 storage,
470 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
471 table_store: TableStore::new(&root),
472 runtime_cache: RuntimeCache::default(),
473 read_caches: Arc::new(crate::runtime_cache::ReadCaches {
478 session: Arc::new(lance::session::Session::default()),
479 handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
480 }),
481 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
482 schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
483 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
484 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
485 policy: None,
486 embedding: Arc::new(tokio::sync::OnceCell::new()),
487 embedding_config: None,
488 })
489 }
490
491 pub fn catalog(&self) -> Arc<Catalog> {
495 self.catalog.load_full()
496 }
497
498 pub fn schema_source(&self) -> Arc<String> {
500 self.schema_source.load_full()
501 }
502
503 pub(crate) fn store_catalog(&self, catalog: Catalog) {
507 self.catalog.store(Arc::new(catalog));
508 }
509
510 pub(crate) fn store_schema_source(&self, schema_source: String) {
513 self.schema_source.store(Arc::new(schema_source));
514 }
515
516 pub fn uri(&self) -> &str {
517 &self.root_uri
518 }
519
520 pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
531 self.policy = Some(checker);
532 self
533 }
534
535 pub(crate) fn embedding_cell(
539 &self,
540 ) -> &tokio::sync::OnceCell<crate::embedding::EmbeddingClient> {
541 &self.embedding
542 }
543
544 pub fn with_embedding_config(mut self, config: Arc<crate::embedding::EmbeddingConfig>) -> Self {
549 self.embedding_config = Some(config);
550 self
551 }
552
553 pub(crate) fn embedding_config_ref(&self) -> Option<&crate::embedding::EmbeddingConfig> {
555 self.embedding_config.as_deref()
556 }
557
558 pub(crate) fn enforce(
570 &self,
571 action: omnigraph_policy::PolicyAction,
572 scope: &omnigraph_policy::ResourceScope,
573 actor: Option<&str>,
574 ) -> Result<()> {
575 let Some(checker) = self.policy.as_ref() else {
576 return Ok(());
577 };
578 let Some(actor) = actor else {
579 return Err(OmniError::Policy(
580 "no actor for engine-layer policy check (policy is configured but the call site \
581 didn't thread an actor through — this is almost certainly a bug, not an \
582 intended bypass)"
583 .to_string(),
584 ));
585 };
586 checker
587 .check(action, scope, actor)
588 .map_err(|err| OmniError::Policy(err.to_string()))
589 }
590
591 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
592 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
599 }
600
601 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
602 self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
603 .await
604 }
605
606 pub async fn plan_schema_with_options(
607 &self,
608 desired_schema_source: &str,
609 options: SchemaApplyOptions,
610 ) -> Result<SchemaMigrationPlan> {
611 schema_apply::plan_schema(self, desired_schema_source, options).await
612 }
613
614 pub async fn preview_schema_apply_with_options(
615 &self,
616 desired_schema_source: &str,
617 options: SchemaApplyOptions,
618 ) -> Result<SchemaApplyPreview> {
619 schema_apply::preview_schema_apply(self, desired_schema_source, options).await
620 }
621
622 pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
623 self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
624 .await
625 }
626
627 pub async fn apply_schema_with_options(
628 &self,
629 desired_schema_source: &str,
630 options: SchemaApplyOptions,
631 ) -> Result<SchemaApplyResult> {
632 self.apply_schema_as(desired_schema_source, options, None)
633 .await
634 }
635
636 pub async fn apply_schema_as(
647 &self,
648 desired_schema_source: &str,
649 options: SchemaApplyOptions,
650 actor: Option<&str>,
651 ) -> Result<SchemaApplyResult> {
652 self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
653 .await
654 }
655
656 pub async fn apply_schema_as_with_catalog_check<F>(
657 &self,
658 desired_schema_source: &str,
659 options: SchemaApplyOptions,
660 actor: Option<&str>,
661 validate_catalog: F,
662 ) -> Result<SchemaApplyResult>
663 where
664 F: FnOnce(&Catalog) -> Result<()>,
665 {
666 schema_apply::apply_schema(
667 self,
668 desired_schema_source,
669 options,
670 actor,
671 validate_catalog,
672 )
673 .await
674 }
675
676 pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
677 schema_apply::ensure_schema_apply_idle(self, operation).await
678 }
679
680 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
681 schema_apply::ensure_schema_apply_not_locked(self, operation).await
682 }
683
684 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
694 &self.table_store
695 }
696
697 pub(crate) fn storage_inline_residual(
706 &self,
707 ) -> &dyn crate::storage_layer::InlineCommitResidual {
708 &self.table_store
709 }
710
711 pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
715 self.storage.as_ref()
716 }
717
718 pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
726 Arc::clone(&self.write_queue)
727 }
728
729 pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
735 Arc::clone(&self.merge_exclusive)
736 }
737
738 pub(crate) fn root_uri(&self) -> &str {
741 &self.root_uri
742 }
743
744 pub(crate) async fn open_coordinator_for_branch(
745 &self,
746 branch: Option<&str>,
747 ) -> Result<GraphCoordinator> {
748 match branch {
749 Some(branch) => {
750 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
751 }
752 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
753 }
754 }
755
756 pub(crate) async fn swap_coordinator_for_branch(
757 &self,
758 branch: Option<&str>,
759 ) -> Result<GraphCoordinator> {
760 let next = self.open_coordinator_for_branch(branch).await?;
761 let mut coord = self.coordinator.write().await;
762 Ok(std::mem::replace(&mut *coord, next))
763 }
764
765 pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
766 *self.coordinator.write().await = coordinator;
767 }
768
769 pub(crate) async fn open_write_txn(&self, branch: Option<&str>) -> Result<WriteTxn> {
785 let resolved = self.resolved_branch_target(branch).await?;
786 Ok(WriteTxn {
787 branch: resolved.branch,
788 base: resolved.snapshot,
789 })
790 }
791
792 pub(crate) async fn resolved_branch_target(
793 &self,
794 branch: Option<&str>,
795 ) -> Result<ResolvedTarget> {
796 self.ensure_schema_state_valid().await?;
797 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
798 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
799 let coord = self.coordinator.read().await;
800 if normalized.as_deref() == coord.current_branch() {
801 let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
802 SnapshotId::synthetic(
803 coord.current_branch(),
804 coord.version(),
805 coord.manifest_incarnation().e_tag.as_deref(),
806 )
807 });
808 return Ok(ResolvedTarget {
809 requested,
810 branch: coord.current_branch().map(str::to_string),
811 snapshot_id,
812 snapshot: coord.snapshot(),
813 });
814 }
815 coord.resolve_target(&requested).await
816 }
817
818 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
819 self.resolved_branch_target(branch)
820 .await
821 .map(|resolved| resolved.snapshot)
822 }
823
824 pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
825 self.ensure_schema_state_valid().await?;
826 self.fresh_snapshot_for_branch_unchecked(branch).await
827 }
828
829 pub(crate) async fn fresh_snapshot_for_branch_unchecked(
849 &self,
850 branch: Option<&str>,
851 ) -> Result<Snapshot> {
852 let manifest = match branch {
853 Some(branch) => {
854 crate::db::manifest::ManifestCoordinator::open_at_branch(self.uri(), branch).await?
855 }
856 None => crate::db::manifest::ManifestCoordinator::open(self.uri()).await?,
857 };
858 Ok(manifest.snapshot())
859 }
860
861 pub(crate) async fn version(&self) -> u64 {
862 self.coordinator.read().await.version()
863 }
864
865 pub(crate) async fn snapshot(&self) -> Snapshot {
867 self.coordinator.read().await.snapshot()
868 }
869
870 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
871 self.resolved_target(target)
872 .await
873 .map(|resolved| resolved.snapshot)
874 }
875
876 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
877 self.snapshot_of(target)
878 .await
879 .map(|snapshot| snapshot.version())
880 }
881
882 pub async fn resolved_branch_of(
883 &self,
884 target: impl Into<ReadTarget>,
885 ) -> Result<Option<String>> {
886 self.resolved_target(target)
887 .await
888 .map(|resolved| resolved.branch)
889 }
890
891 pub async fn sync_branch(&self, branch: &str) -> Result<()> {
893 self.ensure_schema_state_valid().await?;
894 let branch = normalize_branch_name(branch)?;
895 let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
896 *self.coordinator.write().await = next;
897 self.invalidate_read_caches().await;
898 Ok(())
899 }
900
901 async fn invalidate_read_caches(&self) {
902 self.runtime_cache.invalidate_all().await;
903 self.read_caches.handles.invalidate_all().await;
904 }
905
906 pub async fn refresh(&self) -> Result<()> {
946 {
965 let _serial = self
980 .write_queue
981 .acquire(&crate::db::manifest::schema_apply_serial_queue_key())
982 .await;
983 if crate::db::manifest::list_sidecars(&self.root_uri, self.storage.as_ref())
984 .await?
985 .is_empty()
986 {
987 let mut coord = self.coordinator.write().await;
988 coord.refresh().await?;
989 recover_schema_state_files(
990 &self.root_uri,
991 Arc::clone(&self.storage),
992 &coord.snapshot(),
993 )
994 .await?;
995 }
996 } crate::db::manifest::heal_pending_sidecars_roll_forward(
998 &self.root_uri,
999 Arc::clone(&self.storage),
1000 &self.coordinator,
1001 &self.write_queue,
1002 )
1003 .await?;
1004 self.reload_schema_if_source_changed().await?;
1005 self.invalidate_read_caches().await;
1006 Ok(())
1007 }
1008
1009 pub(crate) async fn heal_pending_recovery_sidecars(&self) -> Result<()> {
1023 let processed = crate::db::manifest::heal_pending_sidecars_roll_forward(
1024 &self.root_uri,
1025 Arc::clone(&self.storage),
1026 &self.coordinator,
1027 &self.write_queue,
1028 )
1029 .await?;
1030 if processed {
1031 self.reload_schema_if_source_changed().await?;
1037 self.invalidate_read_caches().await;
1038 }
1039 Ok(())
1040 }
1041
1042 async fn reload_schema_if_source_changed(&self) -> Result<()> {
1043 let schema_path = schema_source_uri(&self.root_uri);
1044 let schema_source = self.storage.read_text(&schema_path).await?;
1045 if schema_source == *self.schema_source.load_full() {
1046 return Ok(());
1047 }
1048 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
1049 let branches = self.coordinator.read().await.branch_list().await?;
1050 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
1051 &self.root_uri,
1052 Arc::clone(&self.storage),
1053 &branches,
1054 ¤t_source_ir,
1055 )
1056 .await?;
1057 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
1058 fixup_blob_schemas(&mut catalog);
1059 self.store_schema_source(schema_source);
1060 self.store_catalog(catalog);
1061 Ok(())
1062 }
1063
1064 pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
1072 self.coordinator.write().await.refresh().await?;
1073 self.invalidate_read_caches().await;
1074 Ok(())
1075 }
1076
1077 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
1078 self.ensure_schema_state_valid().await?;
1079 self.coordinator
1080 .read()
1081 .await
1082 .resolve_snapshot_id(branch)
1083 .await
1084 }
1085
1086 pub(crate) async fn resolved_target(
1087 &self,
1088 target: impl Into<ReadTarget>,
1089 ) -> Result<ResolvedTarget> {
1090 self.ensure_schema_state_valid().await?;
1091 let target = target.into();
1092 let mut resolved = self.resolve_target_inner(&target).await?;
1093 if matches!(target, ReadTarget::Branch(_)) {
1100 resolved
1101 .snapshot
1102 .set_read_caches(Arc::clone(&self.read_caches));
1103 }
1104 Ok(resolved)
1105 }
1106
1107 async fn resolve_target_inner(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
1114 if let ReadTarget::Branch(branch) = target {
1115 let normalized = normalize_branch_name(branch)?;
1116 {
1117 let coord = self.coordinator.read().await;
1118 if normalized.as_deref() != coord.current_branch() {
1119 return coord.resolve_target(target).await;
1121 }
1122 let held = coord.manifest_incarnation();
1123 if coord.probe_latest_incarnation().await?.matches(&held) {
1124 return Ok(warm_resolved_target(&coord, target));
1125 }
1126 }
1128 let mut coord = self.coordinator.write().await;
1129 if normalized.as_deref() == coord.current_branch() {
1130 let held = coord.manifest_incarnation();
1133 let mut refreshed = false;
1134 if !coord.probe_latest_incarnation().await?.matches(&held) {
1135 coord.refresh_manifest_only().await?;
1136 refreshed = true;
1137 }
1138 let resolved = warm_resolved_target(&coord, target);
1139 drop(coord);
1140 if refreshed {
1141 self.invalidate_read_caches().await;
1142 }
1143 return Ok(resolved);
1144 }
1145 return coord.resolve_target(target).await;
1147 }
1148
1149 self.coordinator.read().await.resolve_target(target).await
1151 }
1152
1153 pub async fn diff_between(
1156 &self,
1157 from: impl Into<ReadTarget>,
1158 to: impl Into<ReadTarget>,
1159 filter: &crate::changes::ChangeFilter,
1160 ) -> Result<crate::changes::ChangeSet> {
1161 let from_resolved = self.resolved_target(from).await?;
1162 let to_resolved = self.resolved_target(to).await?;
1163 crate::changes::diff_snapshots(
1164 self.uri(),
1165 &from_resolved.snapshot,
1166 &to_resolved.snapshot,
1167 filter,
1168 to_resolved.branch.clone().or(from_resolved.branch.clone()),
1169 )
1170 .await
1171 }
1172
1173 pub async fn diff_commits(
1176 &self,
1177 from_commit_id: &str,
1178 to_commit_id: &str,
1179 filter: &crate::changes::ChangeFilter,
1180 ) -> Result<crate::changes::ChangeSet> {
1181 let coord = self.coordinator.read().await;
1182 let from_commit = coord
1183 .resolve_commit(&SnapshotId::new(from_commit_id))
1184 .await?;
1185 let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
1186 let from_snap = coord
1187 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1188 from_commit.graph_commit_id.clone(),
1189 )))
1190 .await?;
1191 let to_snap = coord
1192 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1193 to_commit.graph_commit_id.clone(),
1194 )))
1195 .await?;
1196 drop(coord);
1197 crate::changes::diff_snapshots(
1198 self.uri(),
1199 &from_snap.snapshot,
1200 &to_snap.snapshot,
1201 filter,
1202 to_snap.branch.clone().or(from_snap.branch.clone()),
1203 )
1204 .await
1205 }
1206
1207 pub async fn entity_at_target(
1208 &self,
1209 target: impl Into<ReadTarget>,
1210 table_key: &str,
1211 id: &str,
1212 ) -> Result<Option<serde_json::Value>> {
1213 export::entity_at_target(self, target, table_key, id).await
1214 }
1215
1216 pub async fn entity_at(
1218 &self,
1219 table_key: &str,
1220 id: &str,
1221 version: u64,
1222 ) -> Result<Option<serde_json::Value>> {
1223 export::entity_at(self, table_key, id, version).await
1224 }
1225
1226 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
1228 self.ensure_schema_state_valid().await?;
1229 self.coordinator
1230 .read()
1231 .await
1232 .snapshot_at_version(version)
1233 .await
1234 }
1235
1236 pub async fn export_jsonl(
1237 &self,
1238 branch: &str,
1239 type_names: &[String],
1240 table_keys: &[String],
1241 ) -> Result<String> {
1242 export::export_jsonl(self, branch, type_names, table_keys).await
1243 }
1244
1245 pub async fn export_jsonl_to_writer<W: Write>(
1246 &self,
1247 branch: &str,
1248 type_names: &[String],
1249 table_keys: &[String],
1250 writer: &mut W,
1251 ) -> Result<()> {
1252 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
1253 }
1254
1255 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
1259 table_ops::graph_index(self).await
1260 }
1261
1262 pub(crate) async fn graph_index_for_resolved(
1263 &self,
1264 resolved: &ResolvedTarget,
1265 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
1266 table_ops::graph_index_for_resolved(self, resolved).await
1267 }
1268
1269 pub async fn ensure_indices(&self) -> Result<Vec<PendingIndex>> {
1286 table_ops::ensure_indices(self).await
1287 }
1288
1289 pub async fn ensure_indices_on(&self, branch: &str) -> Result<Vec<PendingIndex>> {
1290 table_ops::ensure_indices_on(self, branch).await
1291 }
1292
1293 #[cfg(feature = "failpoints")]
1294 #[doc(hidden)]
1295 pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
1296 &mut self,
1297 branch: &str,
1298 table_key: &str,
1299 table_branch: Option<&str>,
1300 ) -> Result<u64> {
1301 table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
1302 self,
1303 branch,
1304 table_key,
1305 table_branch,
1306 )
1307 .await
1308 }
1309
1310 pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
1313 optimize::optimize_all_tables(self).await
1314 }
1315
1316 pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
1320 repair::repair_all_tables(self, options).await
1321 }
1322
1323 pub async fn cleanup(
1327 &mut self,
1328 options: optimize::CleanupPolicyOptions,
1329 ) -> Result<Vec<optimize::TableCleanupStats>> {
1330 optimize::cleanup_all_tables(self, options).await
1331 }
1332
1333 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1343 self.ensure_schema_state_valid().await?;
1344 let catalog = self.catalog();
1345 let node_type = catalog
1346 .node_types
1347 .get(type_name)
1348 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1349 if !node_type.blob_properties.contains(property) {
1350 return Err(OmniError::manifest(format!(
1351 "property '{}' on type '{}' is not a Blob",
1352 property, type_name
1353 )));
1354 }
1355
1356 let snapshot = self.snapshot().await;
1357 let table_key = format!("node:{}", type_name);
1358 let handle = self
1359 .storage()
1360 .open_snapshot_at_table(&snapshot, &table_key)
1361 .await?;
1362
1363 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1364 let row_id = self
1365 .storage()
1366 .first_row_id_for_filter(&handle, &filter_sql)
1367 .await?
1368 .ok_or_else(|| {
1369 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1370 })?;
1371
1372 let ds = handle.into_arc();
1376 let mut blobs = ds
1377 .take_blobs(&[row_id], property)
1378 .await
1379 .map_err(|e| OmniError::Lance(e.to_string()))?;
1380
1381 blobs.pop().ok_or_else(|| {
1382 OmniError::manifest(format!(
1383 "blob '{}' on {} '{}' returned no data",
1384 property, type_name, id
1385 ))
1386 })
1387 }
1388
1389 pub(crate) async fn active_branch(&self) -> Option<String> {
1390 self.coordinator
1391 .read()
1392 .await
1393 .current_branch()
1394 .map(str::to_string)
1395 }
1396
1397 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1398 let descendants = self
1399 .coordinator
1400 .read()
1401 .await
1402 .branch_descendants(branch)
1403 .await?;
1404 if let Some(descendant) = descendants.first() {
1405 return Err(OmniError::manifest_conflict(format!(
1406 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1407 branch, descendant
1408 )));
1409 }
1410
1411 for other_branch in branches
1412 .iter()
1413 .filter(|candidate| candidate.as_str() != branch)
1414 {
1415 let snapshot = self
1416 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1417 .await?;
1418 if snapshot
1419 .entries()
1420 .any(|entry| entry.table_branch.as_deref() == Some(branch))
1421 {
1422 return Err(OmniError::manifest_conflict(format!(
1423 "cannot delete branch '{}' because branch '{}' still depends on it",
1424 branch, other_branch
1425 )));
1426 }
1427 }
1428
1429 Ok(())
1430 }
1431
1432 async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1440 let mut seen_paths = HashSet::new();
1441 let mut cleanup_targets = owned_tables
1442 .iter()
1443 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1444 .cloned()
1445 .collect::<Vec<_>>();
1446 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1447
1448 for (table_key, table_path) in cleanup_targets {
1449 let dataset_uri = self.storage().dataset_uri(&table_path);
1450 let outcome = match crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_DELETE_BEFORE_TABLE_CLEANUP)
1451 {
1452 Ok(()) => {
1453 self.storage()
1454 .force_delete_branch(&dataset_uri, branch)
1455 .await
1456 }
1457 Err(injected) => Err(injected),
1458 };
1459 if let Err(err) = outcome {
1460 tracing::warn!(
1461 target: "omnigraph::branch_delete::cleanup",
1462 branch = %branch,
1463 table = %table_key,
1464 error = %err,
1465 "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1466 );
1467 }
1468 }
1469 }
1470
1471 async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1472 let active = self
1473 .coordinator
1474 .read()
1475 .await
1476 .current_branch()
1477 .map(str::to_string);
1478 if active.as_deref() == Some(branch) {
1479 return Err(OmniError::manifest_conflict(format!(
1480 "cannot delete currently active branch '{}'",
1481 branch
1482 )));
1483 }
1484
1485 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1486 let owned_tables = branch_snapshot
1487 .entries()
1488 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1489 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1490 .collect::<Vec<_>>();
1491
1492 self.coordinator.write().await.branch_delete(branch).await?;
1494 self.cleanup_deleted_branch_tables(branch, &owned_tables)
1496 .await;
1497 Ok(())
1498 }
1499
1500 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1501 normalize_branch_name(branch)
1502 }
1503
1504 pub(crate) async fn head_commit_id_for_branch(
1505 &self,
1506 branch: Option<&str>,
1507 ) -> Result<Option<String>> {
1508 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1509 coordinator.ensure_commit_graph_initialized().await?;
1510 coordinator
1511 .head_commit_id()
1512 .await
1513 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1514 }
1515
1516 pub async fn branch_create(&self, name: &str) -> Result<()> {
1517 self.branch_create_as(name, None).await
1518 }
1519
1520 pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1528 self.enforce(
1529 omnigraph_policy::PolicyAction::BranchCreate,
1530 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1531 actor,
1532 )?;
1533 self.ensure_schema_state_valid().await?;
1534 self.ensure_schema_apply_idle("branch_create").await?;
1535 ensure_public_branch_ref(name, "branch_create")?;
1536 self.coordinator.write().await.branch_create(name).await
1537 }
1538
1539 pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1540 self.branch_create_from_as(from, name, None).await
1541 }
1542
1543 pub async fn branch_create_from_as(
1555 &self,
1556 from: impl Into<ReadTarget>,
1557 name: &str,
1558 actor: Option<&str>,
1559 ) -> Result<()> {
1560 let target = from.into();
1561 let source_branch = match &target {
1562 ReadTarget::Branch(b) => b.clone(),
1563 _ => "<snapshot>".to_string(),
1564 };
1565 self.enforce(
1566 omnigraph_policy::PolicyAction::BranchCreate,
1567 &omnigraph_policy::ResourceScope::BranchTransition {
1568 source: source_branch,
1569 target: name.to_string(),
1570 },
1571 actor,
1572 )?;
1573 self.ensure_schema_apply_idle("branch_create_from").await?;
1574 self.branch_create_from_impl(target, name, false).await
1575 }
1576
1577 async fn branch_create_from_impl(
1578 &self,
1579 from: impl Into<ReadTarget>,
1580 name: &str,
1581 allow_internal_refs: bool,
1582 ) -> Result<()> {
1583 let target = from.into();
1584 let ReadTarget::Branch(branch_name) = target else {
1585 return Err(OmniError::manifest(
1586 "branch creation from pinned snapshots is not supported yet".to_string(),
1587 ));
1588 };
1589 if !allow_internal_refs {
1590 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1591 ensure_public_branch_ref(name, "branch_create_from")?;
1592 }
1593 let branch = normalize_branch_name(&branch_name)?;
1594 let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1611 source_coord.branch_create(name).await
1612 }
1613
1614 pub async fn branch_list(&self) -> Result<Vec<String>> {
1615 self.ensure_schema_state_valid().await?;
1616 self.coordinator.read().await.branch_list().await
1617 }
1618
1619 pub async fn branch_delete(&self, name: &str) -> Result<()> {
1620 self.branch_delete_as(name, None).await
1621 }
1622
1623 pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1631 self.enforce(
1632 omnigraph_policy::PolicyAction::BranchDelete,
1633 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1634 actor,
1635 )?;
1636 self.ensure_schema_state_valid().await?;
1637 self.ensure_schema_apply_idle("branch_delete").await?;
1638 ensure_public_branch_ref(name, "branch_delete")?;
1639 self.refresh().await?;
1640 let branch = normalize_branch_name(name)?
1641 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1642 let branches = self.coordinator.read().await.branch_list().await?;
1643 if !branches.iter().any(|candidate| candidate == &branch) {
1644 return Err(OmniError::manifest_not_found(format!(
1645 "branch '{}' not found",
1646 branch
1647 )));
1648 }
1649
1650 self.ensure_branch_delete_safe(&branch, &branches).await?;
1651 self.delete_branch_storage_only(&branch).await
1652 }
1653
1654 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1655 self.ensure_schema_state_valid().await?;
1656 self.coordinator
1657 .read()
1658 .await
1659 .resolve_commit(&SnapshotId::new(commit_id))
1660 .await
1661 }
1662
1663 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1664 self.ensure_schema_state_valid().await?;
1665 let branch = match branch {
1666 Some(branch) => normalize_branch_name(branch)?,
1667 None => None,
1668 };
1669 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1670 coordinator.list_commits().await
1671 }
1672
1673 pub(crate) async fn open_for_mutation(
1679 &self,
1680 table_key: &str,
1681 op_kind: crate::db::MutationOpKind,
1682 ) -> Result<OpenedForMutation> {
1683 table_ops::open_for_mutation(self, table_key, op_kind).await
1684 }
1685
1686 pub(crate) async fn open_for_mutation_on_branch(
1687 &self,
1688 branch: Option<&str>,
1689 table_key: &str,
1690 op_kind: crate::db::MutationOpKind,
1691 txn: Option<&crate::db::WriteTxn>,
1692 ) -> Result<OpenedForMutation> {
1693 table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind, txn).await
1694 }
1695
1696 pub(crate) async fn fork_dataset_from_entry_state(
1704 &self,
1705 table_key: &str,
1706 full_path: &str,
1707 source_branch: Option<&str>,
1708 source_version: u64,
1709 active_branch: &str,
1710 ) -> Result<SnapshotHandle> {
1711 match table_ops::fork_dataset_from_entry_state(
1712 self,
1713 table_key,
1714 full_path,
1715 source_branch,
1716 source_version,
1717 active_branch,
1718 )
1719 .await?
1720 {
1721 crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
1722 crate::storage_layer::ForkOutcome::RefAlreadyExists => {
1723 table_ops::reclaim_orphaned_fork_and_refork(
1724 self,
1725 table_key,
1726 full_path,
1727 source_branch,
1728 source_version,
1729 active_branch,
1730 )
1731 .await
1732 }
1733 }
1734 }
1735
1736 pub(crate) async fn reopen_for_mutation(
1737 &self,
1738 table_key: &str,
1739 full_path: &str,
1740 table_branch: Option<&str>,
1741 expected_version: u64,
1742 op_kind: crate::db::MutationOpKind,
1743 ) -> Result<SnapshotHandle> {
1744 table_ops::reopen_for_mutation(
1745 self,
1746 table_key,
1747 full_path,
1748 table_branch,
1749 expected_version,
1750 op_kind,
1751 )
1752 .await
1753 }
1754
1755 pub(crate) async fn open_dataset_at_state(
1756 &self,
1757 table_path: &str,
1758 table_branch: Option<&str>,
1759 table_version: u64,
1760 ) -> Result<SnapshotHandle> {
1761 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1762 }
1763
1764 pub(crate) async fn build_indices_on_dataset(
1765 &self,
1766 table_key: &str,
1767 ds: &mut SnapshotHandle,
1768 ) -> Result<Vec<PendingIndex>> {
1769 table_ops::build_indices_on_dataset(self, table_key, ds).await
1770 }
1771
1772 #[cfg(test)]
1775 pub(crate) async fn commit_updates(
1776 &mut self,
1777 updates: &[crate::db::SubTableUpdate],
1778 ) -> Result<u64> {
1779 table_ops::commit_updates(self, updates).await
1780 }
1781
1782 pub(crate) async fn commit_manifest_updates(
1783 &self,
1784 updates: &[crate::db::SubTableUpdate],
1785 ) -> Result<u64> {
1786 table_ops::commit_manifest_updates(self, updates).await
1787 }
1788
1789 pub(crate) async fn record_merge_commit(
1790 &self,
1791 manifest_version: u64,
1792 parent_commit_id: &str,
1793 merged_parent_commit_id: &str,
1794 actor_id: Option<&str>,
1795 ) -> Result<String> {
1796 table_ops::record_merge_commit(
1797 self,
1798 manifest_version,
1799 parent_commit_id,
1800 merged_parent_commit_id,
1801 actor_id,
1802 )
1803 .await
1804 }
1805
1806 pub(crate) async fn commit_updates_on_branch_with_expected(
1807 &self,
1808 branch: Option<&str>,
1809 updates: &[crate::db::SubTableUpdate],
1810 expected_table_versions: &std::collections::HashMap<String, u64>,
1811 actor_id: Option<&str>,
1812 txn: Option<&crate::db::WriteTxn>,
1813 committed_handles: std::collections::HashMap<String, crate::storage_layer::SnapshotHandle>,
1814 ) -> Result<u64> {
1815 table_ops::commit_updates_on_branch_with_expected(
1816 self,
1817 branch,
1818 updates,
1819 expected_table_versions,
1820 actor_id,
1821 txn,
1822 committed_handles,
1823 )
1824 .await
1825 }
1826
1827 pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1828 table_ops::ensure_commit_graph_initialized(self).await
1829 }
1830
1831 pub(crate) async fn invalidate_graph_index(&self) {
1833 table_ops::invalidate_graph_index(self).await
1834 }
1835}
1836
1837pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1838 let branch = branch.trim();
1839 if branch.is_empty() {
1840 return Err(OmniError::manifest(
1841 "branch name cannot be empty".to_string(),
1842 ));
1843 }
1844 if branch == "main" {
1845 return Ok(None);
1846 }
1847 Ok(Some(branch.to_string()))
1848}
1849
1850fn warm_resolved_target(coord: &GraphCoordinator, requested: &ReadTarget) -> ResolvedTarget {
1856 ResolvedTarget {
1857 requested: requested.clone(),
1858 branch: coord.current_branch().map(str::to_string),
1859 snapshot_id: SnapshotId::synthetic(
1860 coord.current_branch(),
1861 coord.version(),
1862 coord.manifest_incarnation().e_tag.as_deref(),
1863 ),
1864 snapshot: coord.snapshot(),
1865 }
1866}
1867
1868pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1869 if is_internal_system_branch(branch) {
1870 return Err(OmniError::manifest(format!(
1871 "{} does not allow internal system ref '{}'",
1872 operation, branch
1873 )));
1874 }
1875 Ok(())
1876}
1877
1878fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1879 if batches.is_empty() {
1880 return Ok(RecordBatch::new_empty(schema));
1881 }
1882 if batches.len() == 1 {
1883 return Ok(batches.into_iter().next().unwrap());
1884 }
1885 let batch_schema = batches[0].schema();
1886 arrow_select::concat::concat_batches(&batch_schema, &batches)
1887 .map_err(|e| OmniError::Lance(e.to_string()))
1888}
1889
1890fn blob_properties_for_table_key<'a>(
1891 catalog: &'a Catalog,
1892 table_key: &str,
1893) -> Result<&'a std::collections::HashSet<String>> {
1894 if let Some(type_name) = table_key.strip_prefix("node:") {
1895 return catalog
1896 .node_types
1897 .get(type_name)
1898 .map(|node_type| &node_type.blob_properties)
1899 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1900 }
1901 if let Some(type_name) = table_key.strip_prefix("edge:") {
1902 return catalog
1903 .edge_types
1904 .get(type_name)
1905 .map(|edge_type| &edge_type.blob_properties)
1906 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1907 }
1908 Err(OmniError::manifest(format!(
1909 "invalid table key '{}'",
1910 table_key
1911 )))
1912}
1913
1914fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1915 if descriptions.is_null(row) {
1916 return Ok(true);
1917 }
1918
1919 let kind = descriptions
1920 .column_by_name("kind")
1921 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1922 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1923 .or_else(|| {
1924 descriptions
1925 .column_by_name("kind")
1926 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1927 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1928 });
1929 let position = descriptions
1930 .column_by_name("position")
1931 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1932 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1933 let size = descriptions
1934 .column_by_name("size")
1935 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1936 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1937 let blob_uri = descriptions
1938 .column_by_name("blob_uri")
1939 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1940 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1941
1942 let Some(kind) = kind else {
1943 return Ok(true);
1944 };
1945 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1946 if kind != BlobKind::Inline {
1947 return Ok(false);
1948 }
1949
1950 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1951}
1952
1953fn fixup_blob_schemas(catalog: &mut Catalog) {
1959 for node_type in catalog.node_types.values_mut() {
1960 if node_type.blob_properties.is_empty() {
1961 continue;
1962 }
1963 let fields: Vec<Field> = node_type
1964 .arrow_schema
1965 .fields()
1966 .iter()
1967 .map(|f| {
1968 if node_type.blob_properties.contains(f.name()) {
1969 blob_field(f.name(), f.is_nullable())
1970 } else {
1971 f.as_ref().clone()
1972 }
1973 })
1974 .collect();
1975 node_type.arrow_schema = Arc::new(Schema::new(fields));
1976 }
1977 for edge_type in catalog.edge_types.values_mut() {
1978 if edge_type.blob_properties.is_empty() {
1979 continue;
1980 }
1981 let fields: Vec<Field> = edge_type
1982 .arrow_schema
1983 .fields()
1984 .iter()
1985 .map(|f| {
1986 if edge_type.blob_properties.contains(f.name()) {
1987 blob_field(f.name(), f.is_nullable())
1988 } else {
1989 f.as_ref().clone()
1990 }
1991 })
1992 .collect();
1993 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1994 }
1995}
1996
1997fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1998 let schema_ast = parse_schema(schema_source)?;
1999 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
2000}
2001
2002async fn init_storage_phase(
2017 root: &str,
2018 schema_source: &str,
2019 schema_ir: &SchemaIR,
2020 catalog: &Catalog,
2021 storage: &Arc<dyn StorageAdapter>,
2022 write_schema_pg: bool,
2023) -> Result<GraphCoordinator> {
2024 if write_schema_pg {
2025 let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
2026 storage.write_text(&schema_path, schema_source).await?;
2027 crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_PG_WRITTEN)?;
2028 }
2029
2030 write_schema_contract(root, storage.as_ref(), schema_ir).await?;
2031 crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_CONTRACT_WRITTEN)?;
2032
2033 let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
2034 crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_COORDINATOR_INIT)?;
2035
2036 Ok(coordinator)
2037}
2038
2039async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
2051 for uri in [
2052 schema_source_uri(root),
2053 schema_ir_uri(root),
2054 schema_state_uri(root),
2055 ] {
2056 if let Err(err) = storage.delete(&uri).await {
2057 tracing::warn!(
2058 target: "omnigraph::init::cleanup",
2059 uri = %uri,
2060 error = %err,
2061 "init failed; best-effort cleanup could not delete artifact",
2062 );
2063 }
2064 }
2065}
2066
2067fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
2068 match type_kind {
2069 SchemaTypeKind::Node => format!("node:{}", name),
2070 SchemaTypeKind::Edge => format!("edge:{}", name),
2071 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
2072 }
2073}
2074
2075fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
2076 if let Some(type_name) = table_key.strip_prefix("node:") {
2077 let node_type: &NodeType = catalog
2078 .node_types
2079 .get(type_name)
2080 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
2081 return Ok(node_type.arrow_schema.clone());
2082 }
2083 if let Some(type_name) = table_key.strip_prefix("edge:") {
2084 let edge_type: &EdgeType = catalog
2085 .edge_types
2086 .get(type_name)
2087 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
2088 return Ok(edge_type.arrow_schema.clone());
2089 }
2090 Err(OmniError::manifest(format!(
2091 "invalid table key '{}'",
2092 table_key
2093 )))
2094}
2095
2096fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
2097 let mut obj = serde_json::Map::new();
2098 for (i, field) in batch.schema().fields().iter().enumerate() {
2099 obj.insert(
2100 field.name().clone(),
2101 json_value_from_array(batch.column(i).as_ref(), row)?,
2102 );
2103 }
2104 Ok(serde_json::Value::Object(obj))
2105}
2106
2107fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
2108 if array.is_null(row) {
2109 return Ok(serde_json::Value::Null);
2110 }
2111
2112 match array.data_type() {
2113 DataType::Utf8 => Ok(serde_json::Value::String(
2114 array
2115 .as_any()
2116 .downcast_ref::<StringArray>()
2117 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
2118 .value(row)
2119 .to_string(),
2120 )),
2121 DataType::LargeUtf8 => Ok(serde_json::Value::String(
2122 array
2123 .as_any()
2124 .downcast_ref::<LargeStringArray>()
2125 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
2126 .value(row)
2127 .to_string(),
2128 )),
2129 DataType::Boolean => Ok(serde_json::Value::Bool(
2130 array
2131 .as_any()
2132 .downcast_ref::<BooleanArray>()
2133 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
2134 .value(row),
2135 )),
2136 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2137 array
2138 .as_any()
2139 .downcast_ref::<Int32Array>()
2140 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
2141 .value(row),
2142 ))),
2143 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
2144 array
2145 .as_any()
2146 .downcast_ref::<Int64Array>()
2147 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
2148 .value(row),
2149 ))),
2150 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2151 array
2152 .as_any()
2153 .downcast_ref::<UInt32Array>()
2154 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
2155 .value(row),
2156 ))),
2157 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
2158 array
2159 .as_any()
2160 .downcast_ref::<UInt64Array>()
2161 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
2162 .value(row),
2163 ))),
2164 DataType::Float32 => {
2165 let value = array
2166 .as_any()
2167 .downcast_ref::<Float32Array>()
2168 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
2169 .value(row) as f64;
2170 Ok(serde_json::Value::Number(
2171 serde_json::Number::from_f64(value).ok_or_else(|| {
2172 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
2173 })?,
2174 ))
2175 }
2176 DataType::Float64 => {
2177 let value = array
2178 .as_any()
2179 .downcast_ref::<Float64Array>()
2180 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
2181 .value(row);
2182 Ok(serde_json::Value::Number(
2183 serde_json::Number::from_f64(value).ok_or_else(|| {
2184 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
2185 })?,
2186 ))
2187 }
2188 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2189 array
2190 .as_any()
2191 .downcast_ref::<Date32Array>()
2192 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
2193 .value(row),
2194 ))),
2195 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
2196 &base64::engine::general_purpose::STANDARD,
2197 array
2198 .as_any()
2199 .downcast_ref::<BinaryArray>()
2200 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
2201 .value(row),
2202 ))),
2203 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
2204 &base64::engine::general_purpose::STANDARD,
2205 array
2206 .as_any()
2207 .downcast_ref::<LargeBinaryArray>()
2208 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
2209 .value(row),
2210 ))),
2211 DataType::List(_) => {
2212 let list = array
2213 .as_any()
2214 .downcast_ref::<ListArray>()
2215 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
2216 let values = list.value(row);
2217 let mut out = Vec::with_capacity(values.len());
2218 for idx in 0..values.len() {
2219 out.push(json_value_from_array(values.as_ref(), idx)?);
2220 }
2221 Ok(serde_json::Value::Array(out))
2222 }
2223 DataType::LargeList(_) => {
2224 let list = array
2225 .as_any()
2226 .downcast_ref::<LargeListArray>()
2227 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
2228 let values = list.value(row);
2229 let mut out = Vec::with_capacity(values.len());
2230 for idx in 0..values.len() {
2231 out.push(json_value_from_array(values.as_ref(), idx)?);
2232 }
2233 Ok(serde_json::Value::Array(out))
2234 }
2235 DataType::FixedSizeList(_, _) => {
2236 let list = array
2237 .as_any()
2238 .downcast_ref::<FixedSizeListArray>()
2239 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
2240 let values = list.value(row);
2241 let mut out = Vec::with_capacity(values.len());
2242 for idx in 0..values.len() {
2243 out.push(json_value_from_array(values.as_ref(), idx)?);
2244 }
2245 Ok(serde_json::Value::Array(out))
2246 }
2247 DataType::Struct(fields) => {
2248 let struct_array = array
2249 .as_any()
2250 .downcast_ref::<StructArray>()
2251 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
2252 let mut obj = serde_json::Map::new();
2253 for (field_idx, field) in fields.iter().enumerate() {
2254 obj.insert(
2255 field.name().clone(),
2256 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
2257 );
2258 }
2259 Ok(serde_json::Value::Object(obj))
2260 }
2261 _ => {
2262 let value = arrow_cast::display::array_value_to_string(array, row)
2263 .map_err(|e| OmniError::Lance(e.to_string()))?;
2264 Ok(serde_json::Value::String(value))
2265 }
2266 }
2267}
2268
2269#[cfg(test)]
2270mod tests {
2271 use super::*;
2272 use crate::db::manifest::ManifestCoordinator;
2273 use async_trait::async_trait;
2274 use serde_json::Value;
2275 use std::sync::{Arc, Mutex};
2276
2277 use crate::storage::{ObjectStorageAdapter, StorageAdapter, join_uri};
2278
2279 const TEST_SCHEMA: &str = r#"
2280node Person {
2281 name: String @key
2282 age: I32?
2283}
2284node Company {
2285 name: String @key
2286}
2287edge Knows: Person -> Person {
2288 since: Date?
2289}
2290edge WorksAt: Person -> Company
2291"#;
2292
2293 #[derive(Debug)]
2294 struct RecordingStorageAdapter {
2295 inner: ObjectStorageAdapter,
2296 reads: Mutex<Vec<String>>,
2297 writes: Mutex<Vec<String>>,
2298 exists_checks: Mutex<Vec<String>>,
2299 renames: Mutex<Vec<(String, String)>>,
2300 deletes: Mutex<Vec<String>>,
2301 }
2302
2303 impl Default for RecordingStorageAdapter {
2304 fn default() -> Self {
2305 Self {
2306 inner: ObjectStorageAdapter::local(),
2307 reads: Mutex::default(),
2308 writes: Mutex::default(),
2309 exists_checks: Mutex::default(),
2310 renames: Mutex::default(),
2311 deletes: Mutex::default(),
2312 }
2313 }
2314 }
2315
2316 impl RecordingStorageAdapter {
2317 fn reads(&self) -> Vec<String> {
2318 self.reads.lock().unwrap().clone()
2319 }
2320
2321 fn writes(&self) -> Vec<String> {
2322 self.writes.lock().unwrap().clone()
2323 }
2324
2325 fn exists_checks(&self) -> Vec<String> {
2326 self.exists_checks.lock().unwrap().clone()
2327 }
2328 }
2329
2330 #[async_trait]
2331 impl StorageAdapter for RecordingStorageAdapter {
2332 async fn read_text(&self, uri: &str) -> Result<String> {
2333 self.reads.lock().unwrap().push(uri.to_string());
2334 self.inner.read_text(uri).await
2335 }
2336
2337 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2338 self.writes.lock().unwrap().push(uri.to_string());
2339 self.inner.write_text(uri, contents).await
2340 }
2341
2342 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2343 self.writes.lock().unwrap().push(uri.to_string());
2344 self.inner.write_text_if_absent(uri, contents).await
2345 }
2346
2347 async fn exists(&self, uri: &str) -> Result<bool> {
2348 self.exists_checks.lock().unwrap().push(uri.to_string());
2349 self.inner.exists(uri).await
2350 }
2351
2352 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2353 self.renames
2354 .lock()
2355 .unwrap()
2356 .push((from_uri.to_string(), to_uri.to_string()));
2357 self.inner.rename_text(from_uri, to_uri).await
2358 }
2359
2360 async fn delete(&self, uri: &str) -> Result<()> {
2361 self.deletes.lock().unwrap().push(uri.to_string());
2362 self.inner.delete(uri).await
2363 }
2364
2365 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2366 self.inner.list_dir(dir_uri).await
2367 }
2368
2369 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2370 self.inner.read_text_versioned(uri).await
2371 }
2372
2373 async fn write_text_if_match(
2374 &self,
2375 uri: &str,
2376 contents: &str,
2377 expected_version: &str,
2378 ) -> Result<Option<String>> {
2379 self.inner
2380 .write_text_if_match(uri, contents, expected_version)
2381 .await
2382 }
2383
2384 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2385 self.inner.delete_prefix(prefix_uri).await
2386 }
2387 }
2388
2389 #[derive(Debug)]
2390 struct InitRaceStorageAdapter {
2391 inner: ObjectStorageAdapter,
2392 root: String,
2393 barrier: Arc<tokio::sync::Barrier>,
2394 }
2395
2396 #[async_trait]
2397 impl StorageAdapter for InitRaceStorageAdapter {
2398 async fn read_text(&self, uri: &str) -> Result<String> {
2399 self.inner.read_text(uri).await
2400 }
2401
2402 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2403 self.inner.write_text(uri, contents).await
2404 }
2405
2406 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2407 self.inner.write_text_if_absent(uri, contents).await
2408 }
2409
2410 async fn exists(&self, uri: &str) -> Result<bool> {
2411 let exists = self.inner.exists(uri).await?;
2412 if uri == schema_state_uri(&self.root) {
2413 self.barrier.wait().await;
2414 }
2415 Ok(exists)
2416 }
2417
2418 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2419 self.inner.rename_text(from_uri, to_uri).await
2420 }
2421
2422 async fn delete(&self, uri: &str) -> Result<()> {
2423 self.inner.delete(uri).await
2424 }
2425
2426 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2427 self.inner.list_dir(dir_uri).await
2428 }
2429
2430 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2431 self.inner.read_text_versioned(uri).await
2432 }
2433
2434 async fn write_text_if_match(
2435 &self,
2436 uri: &str,
2437 contents: &str,
2438 expected_version: &str,
2439 ) -> Result<Option<String>> {
2440 self.inner
2441 .write_text_if_match(uri, contents, expected_version)
2442 .await
2443 }
2444
2445 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2446 self.inner.delete_prefix(prefix_uri).await
2447 }
2448 }
2449
2450 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2451 async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2452 let dir = tempfile::tempdir().unwrap();
2453 let uri = dir.path().to_str().unwrap().to_string();
2454 let root = normalize_root_uri(&uri).unwrap();
2455 let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2456 inner: ObjectStorageAdapter::local(),
2457 root,
2458 barrier: Arc::new(tokio::sync::Barrier::new(2)),
2459 });
2460
2461 let left = Omnigraph::init_with_storage(
2462 &uri,
2463 TEST_SCHEMA,
2464 Arc::clone(&storage),
2465 InitOptions::default(),
2466 );
2467 let right = Omnigraph::init_with_storage(
2468 &uri,
2469 TEST_SCHEMA,
2470 Arc::clone(&storage),
2471 InitOptions::default(),
2472 );
2473 let (left, right) = tokio::join!(left, right);
2474 let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2475 assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2476
2477 assert!(
2478 dir.path().join("_schema.pg").exists(),
2479 "winning init must leave _schema.pg in place"
2480 );
2481 assert!(
2482 dir.path().join("_schema.ir.json").exists(),
2483 "winning init must leave _schema.ir.json in place"
2484 );
2485 assert!(
2486 dir.path().join("__schema_state.json").exists(),
2487 "winning init must leave __schema_state.json in place"
2488 );
2489 }
2490
2491 #[tokio::test]
2492 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2493 let dir = tempfile::tempdir().unwrap();
2494 let uri = dir.path().to_str().unwrap();
2495 let adapter = Arc::new(RecordingStorageAdapter::default());
2496
2497 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2498 .await
2499 .unwrap();
2500 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2501 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2502 assert!(
2503 adapter
2504 .writes()
2505 .contains(&join_uri(uri, "__schema_state.json"))
2506 );
2507
2508 Omnigraph::open_with_storage(uri, adapter.clone())
2509 .await
2510 .unwrap();
2511 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2512 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2513 assert!(
2514 adapter
2515 .reads()
2516 .contains(&join_uri(uri, "__schema_state.json"))
2517 );
2518 assert!(
2519 adapter
2520 .exists_checks()
2521 .contains(&join_uri(uri, "_schema.ir.json"))
2522 );
2523 assert!(
2524 adapter
2525 .exists_checks()
2526 .contains(&join_uri(uri, "__schema_state.json"))
2527 );
2528 assert!(
2529 adapter
2530 .exists_checks()
2531 .contains(&join_uri(uri, "_graph_commits.lance"))
2532 );
2533 }
2534
2535 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2536 let snapshot = db.snapshot().await;
2537 let ds = db
2538 .storage()
2539 .open_snapshot_at_table(&snapshot, table_key)
2540 .await
2541 .unwrap();
2542 let batches = db.storage().scan_batches(&ds).await.unwrap();
2543 batches
2544 .into_iter()
2545 .flat_map(|batch| {
2546 (0..batch.num_rows())
2547 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2548 .collect::<Vec<_>>()
2549 })
2550 .collect()
2551 }
2552
2553 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2554 let (ds, full_path, table_branch) = db
2557 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2558 .await
2559 .unwrap()
2560 .require_handle("seed_person_row test");
2561 let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
2562 let columns: Vec<Arc<dyn Array>> = schema
2563 .fields()
2564 .iter()
2565 .map(|field| match field.name().as_str() {
2566 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2567 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2568 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2569 _ => new_null_array(field.data_type(), 1),
2570 })
2571 .collect();
2572 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2573 let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap();
2574 let committed = db.storage().commit_staged(ds, staged).await.unwrap();
2575 let state = db
2576 .storage()
2577 .table_state(&full_path, &committed)
2578 .await
2579 .unwrap();
2580 db.commit_updates(&[crate::db::SubTableUpdate {
2581 table_key: "node:Person".to_string(),
2582 table_version: state.version,
2583 table_branch,
2584 row_count: state.row_count,
2585 version_metadata: state.version_metadata,
2586 }])
2587 .await
2588 .unwrap();
2589 }
2590
2591 #[tokio::test]
2592 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2593 let dir = tempfile::tempdir().unwrap();
2594 let uri = dir.path().to_str().unwrap();
2595 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2596 seed_person_row(&mut db, "Alice", Some(30)).await;
2597
2598 let desired = TEST_SCHEMA.replace(
2599 " age: I32?\n}",
2600 " age: I32?\n nickname: String?\n}",
2601 );
2602 let result = db.apply_schema(&desired).await.unwrap();
2603 assert!(result.applied);
2604
2605 let reopened = Omnigraph::open(uri).await.unwrap();
2606 let rows = table_rows_json(&reopened, "node:Person").await;
2607 assert_eq!(rows.len(), 1);
2608 assert_eq!(rows[0]["name"], "Alice");
2609 assert_eq!(rows[0]["age"], 30);
2610 assert!(rows[0]["nickname"].is_null());
2611 assert!(
2612 reopened.catalog().node_types["Person"]
2613 .properties
2614 .contains_key("nickname")
2615 );
2616 assert!(dir.path().join("_schema.pg").exists());
2617 }
2618
2619 #[tokio::test]
2620 async fn test_apply_schema_renames_property_and_preserves_values() {
2621 let dir = tempfile::tempdir().unwrap();
2622 let uri = dir.path().to_str().unwrap();
2623 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2624 seed_person_row(&mut db, "Alice", Some(30)).await;
2625
2626 let desired = TEST_SCHEMA.replace(
2627 " age: I32?\n}",
2628 " years: I32? @rename_from(\"age\")\n}",
2629 );
2630 db.apply_schema(&desired).await.unwrap();
2631
2632 let reopened = Omnigraph::open(uri).await.unwrap();
2633 let rows = table_rows_json(&reopened, "node:Person").await;
2634 assert_eq!(rows[0]["name"], "Alice");
2635 assert_eq!(rows[0]["years"], 30);
2636 assert!(rows[0].get("age").is_none());
2637 }
2638
2639 #[tokio::test]
2640 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2641 let dir = tempfile::tempdir().unwrap();
2642 let uri = dir.path().to_str().unwrap();
2643 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2644 seed_person_row(&mut db, "Alice", Some(30)).await;
2645 let before_version = db.snapshot().await.version();
2646
2647 let desired = TEST_SCHEMA
2648 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2649 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2650 .replace(
2651 "edge WorksAt: Person -> Company",
2652 "edge WorksAt: Human -> Company",
2653 );
2654 db.apply_schema(&desired).await.unwrap();
2655
2656 let head = db.snapshot().await;
2657 assert!(head.entry("node:Person").is_none());
2658 assert!(head.entry("node:Human").is_some());
2659 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2660 .await
2661 .unwrap();
2662 assert!(historical.entry("node:Person").is_some());
2663 assert!(historical.entry("node:Human").is_none());
2664 }
2665
2666 #[tokio::test]
2667 async fn test_apply_schema_succeeds_after_load() {
2668 let dir = tempfile::tempdir().unwrap();
2675 let uri = dir.path().to_str().unwrap();
2676 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2677
2678 crate::loader::load_jsonl(
2679 &mut db,
2680 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2681 crate::loader::LoadMode::Overwrite,
2682 )
2683 .await
2684 .unwrap();
2685
2686 let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2687 assert!(
2688 !all_branches.iter().any(|b| b.starts_with("__run__")),
2689 "no __run__ branch should exist after publish, got: {:?}",
2690 all_branches
2691 );
2692
2693 let desired = TEST_SCHEMA.replace(
2694 " age: I32?\n}",
2695 " age: I32?\n nickname: String?\n}",
2696 );
2697 let result = db.apply_schema(&desired).await.unwrap();
2698 assert!(result.applied, "schema apply should have applied");
2699 }
2700
2701 #[tokio::test]
2711 async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
2712 let dir = tempfile::tempdir().unwrap();
2713 let uri = dir.path().to_str().unwrap();
2714 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2715
2716 db.branch_create("__run__legacy").await.unwrap();
2719 drop(db);
2720 {
2721 let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
2723 .await
2724 .unwrap();
2725 ds.update_schema_metadata([(
2726 "omnigraph:internal_schema_version".to_string(),
2727 Some("2".to_string()),
2728 )])
2729 .await
2730 .unwrap();
2731 }
2732
2733 let db = Omnigraph::open(uri).await.unwrap();
2736 let branches = db.branch_list().await.unwrap();
2737 assert!(
2738 !branches.iter().any(|b| b.starts_with("__run__")),
2739 "open-time migration must sweep legacy __run__ branches; got {branches:?}",
2740 );
2741
2742 let desired = TEST_SCHEMA.replace(
2745 " age: I32?\n}",
2746 " age: I32?\n nickname: String?\n}",
2747 );
2748 let result = db.apply_schema(&desired).await.unwrap();
2749 assert!(result.applied, "schema apply should have applied");
2750 }
2751
2752 #[tokio::test]
2753 async fn test_apply_schema_defers_index_then_reconciler_builds_it() {
2754 let dir = tempfile::tempdir().unwrap();
2759 let uri = dir.path().to_str().unwrap();
2760 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2761 seed_person_row(&mut db, "Alice", Some(30)).await;
2762
2763 let desired = TEST_SCHEMA.replace("age: I32?", "age: I32? @index");
2764 db.apply_schema(&desired).await.unwrap();
2765
2766 let snapshot = db.snapshot().await;
2768 let ds = db
2769 .storage()
2770 .open_snapshot_at_table(&snapshot, "node:Person")
2771 .await
2772 .unwrap();
2773 assert!(
2774 !db.storage().has_btree_index(&ds, "age").await.unwrap(),
2775 "apply must not build the index inline (deferred to the reconciler)"
2776 );
2777
2778 db.ensure_indices().await.unwrap();
2780 let snapshot = db.snapshot().await;
2781 let ds = db
2782 .storage()
2783 .open_snapshot_at_table(&snapshot, "node:Person")
2784 .await
2785 .unwrap();
2786 assert!(
2787 db.storage().has_btree_index(&ds, "age").await.unwrap(),
2788 "ensure_indices must build the deferred index"
2789 );
2790 }
2791
2792 #[tokio::test]
2793 async fn test_apply_schema_rewrite_defers_index_then_reconciler_restores() {
2794 let dir = tempfile::tempdir().unwrap();
2797 let uri = dir.path().to_str().unwrap();
2798 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2799 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2800 seed_person_row(&mut db, "Alice", Some(30)).await;
2801
2802 let desired = initial_schema.replace(
2803 " age: I32?\n}",
2804 " age: I32?\n nickname: String?\n}",
2805 );
2806 db.apply_schema(&desired).await.unwrap();
2807
2808 db.ensure_indices().await.unwrap();
2810 let snapshot = db.snapshot().await;
2811 let ds = db
2812 .storage()
2813 .open_snapshot_at_table(&snapshot, "node:Person")
2814 .await
2815 .unwrap();
2816 assert!(db.storage().has_btree_index(&ds, "id").await.unwrap());
2817 assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
2818 }
2819
2820 #[tokio::test]
2821 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2822 let dir = tempfile::tempdir().unwrap();
2823 let uri = dir.path().to_str().unwrap();
2824 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2825 let mut db = db;
2826 db.coordinator
2827 .write()
2828 .await
2829 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2830 .await
2831 .unwrap();
2832
2833 let err = db
2834 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2835 .await
2836 .unwrap_err();
2837 assert!(
2838 err.to_string()
2839 .contains("write is unavailable while schema apply is in progress")
2840 );
2841 }
2842
2843 #[tokio::test]
2844 async fn test_commit_updates_rejects_while_schema_apply_locked() {
2845 let dir = tempfile::tempdir().unwrap();
2846 let uri = dir.path().to_str().unwrap();
2847 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2848 db.coordinator
2849 .write()
2850 .await
2851 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2852 .await
2853 .unwrap();
2854
2855 let err = db.commit_updates(&[]).await.unwrap_err();
2856 assert!(
2857 err.to_string()
2858 .contains("write commit is unavailable while schema apply is in progress")
2859 );
2860 }
2861
2862 #[tokio::test]
2863 async fn test_branch_list_hides_schema_apply_lock_branch() {
2864 let dir = tempfile::tempdir().unwrap();
2865 let uri = dir.path().to_str().unwrap();
2866 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2867 db.coordinator
2868 .write()
2869 .await
2870 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2871 .await
2872 .unwrap();
2873
2874 let branches = db.branch_list().await.unwrap();
2875 assert_eq!(branches, vec!["main".to_string()]);
2876 }
2877}