1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::{PropType, ScalarType};
20use omnigraph_compiler::{
21 DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22 build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::storage_layer::SnapshotHandle;
30use crate::table_store::TableStore;
31
32mod export;
33mod optimize;
34mod repair;
35mod schema_apply;
36mod table_ops;
37
38pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
39pub use repair::{
40 RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
41};
42pub use schema_apply::SchemaApplyOptions;
43pub use table_ops::PendingIndex;
44
45use super::commit_graph::GraphCommit;
46use super::manifest::{
47 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
48 table_path_for_table_key,
49};
50use super::schema_state::{
51 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
52 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
53 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
54 write_schema_contract, write_schema_contract_staging,
55};
56use super::{
57 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
58 is_schema_apply_lock_branch,
59};
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum MergeOutcome {
63 AlreadyUpToDate,
64 FastForward,
65 Merged,
66}
67
68#[derive(Debug, Clone)]
69pub struct SchemaApplyResult {
70 pub supported: bool,
71 pub applied: bool,
72 pub manifest_version: u64,
73 pub steps: Vec<SchemaMigrationStep>,
74}
75
76#[derive(Debug, Clone)]
77pub struct SchemaApplyPreview {
78 pub plan: SchemaMigrationPlan,
79 pub catalog: Catalog,
80}
81
82pub struct Omnigraph {
88 root_uri: String,
89 storage: Arc<dyn StorageAdapter>,
90 coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
107 table_store: TableStore,
108 runtime_cache: RuntimeCache,
109 read_caches: Arc<crate::runtime_cache::ReadCaches>,
115 catalog: Arc<ArcSwap<Catalog>>,
120 schema_source: Arc<ArcSwap<String>>,
123 write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
129 merge_exclusive: Arc<tokio::sync::Mutex<()>>,
151 policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
167 embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
173 embedding_config: Option<Arc<crate::embedding::EmbeddingConfig>>,
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum OpenMode {
191 ReadWrite,
193 ReadOnly,
196}
197
198#[derive(Debug, Clone, Copy, Default)]
211pub struct InitOptions {
212 pub force: bool,
215}
216
217impl Omnigraph {
218 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
225 Self::init_with_options(uri, schema_source, InitOptions::default()).await
226 }
227
228 pub async fn init_with_options(
233 uri: &str,
234 schema_source: &str,
235 options: InitOptions,
236 ) -> Result<Self> {
237 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
238 }
239
240 pub(crate) async fn init_with_storage(
241 uri: &str,
242 schema_source: &str,
243 storage: Arc<dyn StorageAdapter>,
244 options: InitOptions,
245 ) -> Result<Self> {
246 let root = normalize_root_uri(uri)?;
247
248 if !options.force {
258 for candidate in [
259 schema_source_uri(&root),
260 schema_ir_uri(&root),
261 schema_state_uri(&root),
262 ] {
263 if storage.exists(&candidate).await? {
264 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
265 }
266 }
267 }
268
269 let schema_ir = read_schema_ir_from_source(schema_source)?;
270 let mut catalog = build_catalog_from_ir(&schema_ir)?;
271 fixup_blob_schemas(&mut catalog);
272
273 let schema_pg_claimed = if options.force {
281 false
282 } else {
283 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
284 if !storage
285 .write_text_if_absent(&schema_path, schema_source)
286 .await?
287 {
288 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
289 }
290 if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
291 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
292 return Err(err);
293 }
294 true
295 };
296
297 let coordinator = match init_storage_phase(
312 &root,
313 schema_source,
314 &schema_ir,
315 &catalog,
316 &storage,
317 !schema_pg_claimed,
318 )
319 .await
320 {
321 Ok(coordinator) => coordinator,
322 Err(err) => {
323 if schema_pg_claimed || options.force {
324 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
325 }
326 return Err(err);
327 }
328 };
329
330 Ok(Self {
331 root_uri: root.clone(),
332 storage,
333 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
334 table_store: TableStore::new(&root),
335 runtime_cache: RuntimeCache::default(),
336 read_caches: Arc::new(crate::runtime_cache::ReadCaches {
341 session: Arc::new(lance::session::Session::default()),
342 handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
343 }),
344 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
345 schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
346 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
347 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
348 policy: None,
349 embedding: Arc::new(tokio::sync::OnceCell::new()),
350 embedding_config: None,
351 })
352 }
353
354 pub async fn open(uri: &str) -> Result<Self> {
359 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
360 }
361
362 pub async fn open_read_only(uri: &str) -> Result<Self> {
365 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
366 }
367
368 pub async fn open_with_storage(uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
372 Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
373 }
374
375 pub(crate) async fn open_with_storage_and_mode(
376 uri: &str,
377 storage: Arc<dyn StorageAdapter>,
378 mode: OpenMode,
379 ) -> Result<Self> {
380 let root = normalize_root_uri(uri)?;
381 if matches!(mode, OpenMode::ReadWrite) {
389 crate::db::manifest::migrate_on_open(&root).await?;
390 }
391 let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
394 if matches!(mode, OpenMode::ReadWrite) {
404 let schema_state_recovery =
405 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
406 .await?;
407 crate::db::manifest::recover_manifest_drift(
414 &root,
415 Arc::clone(&storage),
416 &mut coordinator,
417 crate::db::manifest::RecoveryMode::Full,
418 schema_state_recovery,
419 )
420 .await?;
421 }
422 let schema_path = schema_source_uri(&root);
424 let schema_source = storage.read_text(&schema_path).await?;
425 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
426 let branches = coordinator.branch_list().await?;
427 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
428 &root,
429 Arc::clone(&storage),
430 &branches,
431 ¤t_source_ir,
432 )
433 .await?;
434 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
435 fixup_blob_schemas(&mut catalog);
436
437 Ok(Self {
438 root_uri: root.clone(),
439 storage,
440 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
441 table_store: TableStore::new(&root),
442 runtime_cache: RuntimeCache::default(),
443 read_caches: Arc::new(crate::runtime_cache::ReadCaches {
448 session: Arc::new(lance::session::Session::default()),
449 handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
450 }),
451 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
452 schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
453 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
454 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
455 policy: None,
456 embedding: Arc::new(tokio::sync::OnceCell::new()),
457 embedding_config: None,
458 })
459 }
460
461 pub fn catalog(&self) -> Arc<Catalog> {
465 self.catalog.load_full()
466 }
467
468 pub fn schema_source(&self) -> Arc<String> {
470 self.schema_source.load_full()
471 }
472
473 pub(crate) fn store_catalog(&self, catalog: Catalog) {
477 self.catalog.store(Arc::new(catalog));
478 }
479
480 pub(crate) fn store_schema_source(&self, schema_source: String) {
483 self.schema_source.store(Arc::new(schema_source));
484 }
485
486 pub fn uri(&self) -> &str {
487 &self.root_uri
488 }
489
490 pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
501 self.policy = Some(checker);
502 self
503 }
504
505 pub(crate) fn embedding_cell(
509 &self,
510 ) -> &tokio::sync::OnceCell<crate::embedding::EmbeddingClient> {
511 &self.embedding
512 }
513
514 pub fn with_embedding_config(mut self, config: Arc<crate::embedding::EmbeddingConfig>) -> Self {
519 self.embedding_config = Some(config);
520 self
521 }
522
523 pub(crate) fn embedding_config_ref(&self) -> Option<&crate::embedding::EmbeddingConfig> {
525 self.embedding_config.as_deref()
526 }
527
528 pub(crate) fn enforce(
540 &self,
541 action: omnigraph_policy::PolicyAction,
542 scope: &omnigraph_policy::ResourceScope,
543 actor: Option<&str>,
544 ) -> Result<()> {
545 let Some(checker) = self.policy.as_ref() else {
546 return Ok(());
547 };
548 let Some(actor) = actor else {
549 return Err(OmniError::Policy(
550 "no actor for engine-layer policy check (policy is configured but the call site \
551 didn't thread an actor through — this is almost certainly a bug, not an \
552 intended bypass)"
553 .to_string(),
554 ));
555 };
556 checker
557 .check(action, scope, actor)
558 .map_err(|err| OmniError::Policy(err.to_string()))
559 }
560
561 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
562 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
569 }
570
571 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
572 self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
573 .await
574 }
575
576 pub async fn plan_schema_with_options(
577 &self,
578 desired_schema_source: &str,
579 options: SchemaApplyOptions,
580 ) -> Result<SchemaMigrationPlan> {
581 schema_apply::plan_schema(self, desired_schema_source, options).await
582 }
583
584 pub async fn preview_schema_apply_with_options(
585 &self,
586 desired_schema_source: &str,
587 options: SchemaApplyOptions,
588 ) -> Result<SchemaApplyPreview> {
589 schema_apply::preview_schema_apply(self, desired_schema_source, options).await
590 }
591
592 pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
593 self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
594 .await
595 }
596
597 pub async fn apply_schema_with_options(
598 &self,
599 desired_schema_source: &str,
600 options: SchemaApplyOptions,
601 ) -> Result<SchemaApplyResult> {
602 self.apply_schema_as(desired_schema_source, options, None)
603 .await
604 }
605
606 pub async fn apply_schema_as(
617 &self,
618 desired_schema_source: &str,
619 options: SchemaApplyOptions,
620 actor: Option<&str>,
621 ) -> Result<SchemaApplyResult> {
622 self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
623 .await
624 }
625
626 pub async fn apply_schema_as_with_catalog_check<F>(
627 &self,
628 desired_schema_source: &str,
629 options: SchemaApplyOptions,
630 actor: Option<&str>,
631 validate_catalog: F,
632 ) -> Result<SchemaApplyResult>
633 where
634 F: FnOnce(&Catalog) -> Result<()>,
635 {
636 schema_apply::apply_schema(
637 self,
638 desired_schema_source,
639 options,
640 actor,
641 validate_catalog,
642 )
643 .await
644 }
645
646 pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
647 schema_apply::ensure_schema_apply_idle(self, operation).await
648 }
649
650 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
651 schema_apply::ensure_schema_apply_not_locked(self, operation).await
652 }
653
654 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
664 &self.table_store
665 }
666
667 pub(crate) fn storage_inline_residual(
676 &self,
677 ) -> &dyn crate::storage_layer::InlineCommitResidual {
678 &self.table_store
679 }
680
681 pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
685 self.storage.as_ref()
686 }
687
688 pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
696 Arc::clone(&self.write_queue)
697 }
698
699 pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
705 Arc::clone(&self.merge_exclusive)
706 }
707
708 pub(crate) fn root_uri(&self) -> &str {
711 &self.root_uri
712 }
713
714 pub(crate) async fn open_coordinator_for_branch(
715 &self,
716 branch: Option<&str>,
717 ) -> Result<GraphCoordinator> {
718 match branch {
719 Some(branch) => {
720 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
721 }
722 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
723 }
724 }
725
726 pub(crate) async fn swap_coordinator_for_branch(
727 &self,
728 branch: Option<&str>,
729 ) -> Result<GraphCoordinator> {
730 let next = self.open_coordinator_for_branch(branch).await?;
731 let mut coord = self.coordinator.write().await;
732 Ok(std::mem::replace(&mut *coord, next))
733 }
734
735 pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
736 *self.coordinator.write().await = coordinator;
737 }
738
739 pub(crate) async fn resolved_branch_target(
740 &self,
741 branch: Option<&str>,
742 ) -> Result<ResolvedTarget> {
743 self.ensure_schema_state_valid().await?;
744 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
745 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
746 let coord = self.coordinator.read().await;
747 if normalized.as_deref() == coord.current_branch() {
748 let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
749 SnapshotId::synthetic(
750 coord.current_branch(),
751 coord.version(),
752 coord.manifest_incarnation().e_tag.as_deref(),
753 )
754 });
755 return Ok(ResolvedTarget {
756 requested,
757 branch: coord.current_branch().map(str::to_string),
758 snapshot_id,
759 snapshot: coord.snapshot(),
760 });
761 }
762 coord.resolve_target(&requested).await
763 }
764
765 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
766 self.resolved_branch_target(branch)
767 .await
768 .map(|resolved| resolved.snapshot)
769 }
770
771 pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
772 self.ensure_schema_state_valid().await?;
773 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
774 let coord = self.coordinator.read().await;
775 coord
776 .resolve_target(&requested)
777 .await
778 .map(|resolved| resolved.snapshot)
779 }
780
781 pub(crate) async fn version(&self) -> u64 {
782 self.coordinator.read().await.version()
783 }
784
785 pub(crate) async fn snapshot(&self) -> Snapshot {
787 self.coordinator.read().await.snapshot()
788 }
789
790 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
791 self.resolved_target(target)
792 .await
793 .map(|resolved| resolved.snapshot)
794 }
795
796 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
797 self.snapshot_of(target)
798 .await
799 .map(|snapshot| snapshot.version())
800 }
801
802 pub async fn resolved_branch_of(
803 &self,
804 target: impl Into<ReadTarget>,
805 ) -> Result<Option<String>> {
806 self.resolved_target(target)
807 .await
808 .map(|resolved| resolved.branch)
809 }
810
811 pub async fn sync_branch(&self, branch: &str) -> Result<()> {
813 self.ensure_schema_state_valid().await?;
814 let branch = normalize_branch_name(branch)?;
815 let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
816 *self.coordinator.write().await = next;
817 self.invalidate_read_caches().await;
818 Ok(())
819 }
820
821 async fn invalidate_read_caches(&self) {
822 self.runtime_cache.invalidate_all().await;
823 self.read_caches.handles.invalidate_all().await;
824 }
825
826 pub async fn refresh(&self) -> Result<()> {
866 {
885 let _serial = self
900 .write_queue
901 .acquire(&crate::db::manifest::schema_apply_serial_queue_key())
902 .await;
903 if crate::db::manifest::list_sidecars(&self.root_uri, self.storage.as_ref())
904 .await?
905 .is_empty()
906 {
907 let mut coord = self.coordinator.write().await;
908 coord.refresh().await?;
909 recover_schema_state_files(
910 &self.root_uri,
911 Arc::clone(&self.storage),
912 &coord.snapshot(),
913 )
914 .await?;
915 }
916 } crate::db::manifest::heal_pending_sidecars_roll_forward(
918 &self.root_uri,
919 Arc::clone(&self.storage),
920 &self.coordinator,
921 &self.write_queue,
922 )
923 .await?;
924 self.reload_schema_if_source_changed().await?;
925 self.invalidate_read_caches().await;
926 Ok(())
927 }
928
929 pub(crate) async fn heal_pending_recovery_sidecars(&self) -> Result<()> {
943 let processed = crate::db::manifest::heal_pending_sidecars_roll_forward(
944 &self.root_uri,
945 Arc::clone(&self.storage),
946 &self.coordinator,
947 &self.write_queue,
948 )
949 .await?;
950 if processed {
951 self.reload_schema_if_source_changed().await?;
957 self.invalidate_read_caches().await;
958 }
959 Ok(())
960 }
961
962 async fn reload_schema_if_source_changed(&self) -> Result<()> {
963 let schema_path = schema_source_uri(&self.root_uri);
964 let schema_source = self.storage.read_text(&schema_path).await?;
965 if schema_source == *self.schema_source.load_full() {
966 return Ok(());
967 }
968 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
969 let branches = self.coordinator.read().await.branch_list().await?;
970 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
971 &self.root_uri,
972 Arc::clone(&self.storage),
973 &branches,
974 ¤t_source_ir,
975 )
976 .await?;
977 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
978 fixup_blob_schemas(&mut catalog);
979 self.store_schema_source(schema_source);
980 self.store_catalog(catalog);
981 Ok(())
982 }
983
984 pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
992 self.coordinator.write().await.refresh().await?;
993 self.invalidate_read_caches().await;
994 Ok(())
995 }
996
997 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
998 self.ensure_schema_state_valid().await?;
999 self.coordinator
1000 .read()
1001 .await
1002 .resolve_snapshot_id(branch)
1003 .await
1004 }
1005
1006 pub(crate) async fn resolved_target(
1007 &self,
1008 target: impl Into<ReadTarget>,
1009 ) -> Result<ResolvedTarget> {
1010 self.ensure_schema_state_valid().await?;
1011 let target = target.into();
1012 let mut resolved = self.resolve_target_inner(&target).await?;
1013 if matches!(target, ReadTarget::Branch(_)) {
1020 resolved
1021 .snapshot
1022 .set_read_caches(Arc::clone(&self.read_caches));
1023 }
1024 Ok(resolved)
1025 }
1026
1027 async fn resolve_target_inner(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
1034 if let ReadTarget::Branch(branch) = target {
1035 let normalized = normalize_branch_name(branch)?;
1036 {
1037 let coord = self.coordinator.read().await;
1038 if normalized.as_deref() != coord.current_branch() {
1039 return coord.resolve_target(target).await;
1041 }
1042 let held = coord.manifest_incarnation();
1043 if coord.probe_latest_incarnation().await?.matches(&held) {
1044 return Ok(warm_resolved_target(&coord, target));
1045 }
1046 }
1048 let mut coord = self.coordinator.write().await;
1049 if normalized.as_deref() == coord.current_branch() {
1050 let held = coord.manifest_incarnation();
1053 let mut refreshed = false;
1054 if !coord.probe_latest_incarnation().await?.matches(&held) {
1055 coord.refresh_manifest_only().await?;
1056 refreshed = true;
1057 }
1058 let resolved = warm_resolved_target(&coord, target);
1059 drop(coord);
1060 if refreshed {
1061 self.invalidate_read_caches().await;
1062 }
1063 return Ok(resolved);
1064 }
1065 return coord.resolve_target(target).await;
1067 }
1068
1069 self.coordinator.read().await.resolve_target(target).await
1071 }
1072
1073 pub async fn diff_between(
1076 &self,
1077 from: impl Into<ReadTarget>,
1078 to: impl Into<ReadTarget>,
1079 filter: &crate::changes::ChangeFilter,
1080 ) -> Result<crate::changes::ChangeSet> {
1081 let from_resolved = self.resolved_target(from).await?;
1082 let to_resolved = self.resolved_target(to).await?;
1083 crate::changes::diff_snapshots(
1084 self.uri(),
1085 &from_resolved.snapshot,
1086 &to_resolved.snapshot,
1087 filter,
1088 to_resolved.branch.clone().or(from_resolved.branch.clone()),
1089 )
1090 .await
1091 }
1092
1093 pub async fn diff_commits(
1096 &self,
1097 from_commit_id: &str,
1098 to_commit_id: &str,
1099 filter: &crate::changes::ChangeFilter,
1100 ) -> Result<crate::changes::ChangeSet> {
1101 let coord = self.coordinator.read().await;
1102 let from_commit = coord
1103 .resolve_commit(&SnapshotId::new(from_commit_id))
1104 .await?;
1105 let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
1106 let from_snap = coord
1107 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1108 from_commit.graph_commit_id.clone(),
1109 )))
1110 .await?;
1111 let to_snap = coord
1112 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1113 to_commit.graph_commit_id.clone(),
1114 )))
1115 .await?;
1116 drop(coord);
1117 crate::changes::diff_snapshots(
1118 self.uri(),
1119 &from_snap.snapshot,
1120 &to_snap.snapshot,
1121 filter,
1122 to_snap.branch.clone().or(from_snap.branch.clone()),
1123 )
1124 .await
1125 }
1126
1127 pub async fn entity_at_target(
1128 &self,
1129 target: impl Into<ReadTarget>,
1130 table_key: &str,
1131 id: &str,
1132 ) -> Result<Option<serde_json::Value>> {
1133 export::entity_at_target(self, target, table_key, id).await
1134 }
1135
1136 pub async fn entity_at(
1138 &self,
1139 table_key: &str,
1140 id: &str,
1141 version: u64,
1142 ) -> Result<Option<serde_json::Value>> {
1143 export::entity_at(self, table_key, id, version).await
1144 }
1145
1146 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
1148 self.ensure_schema_state_valid().await?;
1149 self.coordinator
1150 .read()
1151 .await
1152 .snapshot_at_version(version)
1153 .await
1154 }
1155
1156 pub async fn export_jsonl(
1157 &self,
1158 branch: &str,
1159 type_names: &[String],
1160 table_keys: &[String],
1161 ) -> Result<String> {
1162 export::export_jsonl(self, branch, type_names, table_keys).await
1163 }
1164
1165 pub async fn export_jsonl_to_writer<W: Write>(
1166 &self,
1167 branch: &str,
1168 type_names: &[String],
1169 table_keys: &[String],
1170 writer: &mut W,
1171 ) -> Result<()> {
1172 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
1173 }
1174
1175 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
1179 table_ops::graph_index(self).await
1180 }
1181
1182 pub(crate) async fn graph_index_for_resolved(
1183 &self,
1184 resolved: &ResolvedTarget,
1185 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
1186 table_ops::graph_index_for_resolved(self, resolved).await
1187 }
1188
1189 pub async fn ensure_indices(&self) -> Result<Vec<PendingIndex>> {
1206 table_ops::ensure_indices(self).await
1207 }
1208
1209 pub async fn ensure_indices_on(&self, branch: &str) -> Result<Vec<PendingIndex>> {
1210 table_ops::ensure_indices_on(self, branch).await
1211 }
1212
1213 #[cfg(feature = "failpoints")]
1214 #[doc(hidden)]
1215 pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
1216 &mut self,
1217 branch: &str,
1218 table_key: &str,
1219 table_branch: Option<&str>,
1220 ) -> Result<u64> {
1221 table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
1222 self,
1223 branch,
1224 table_key,
1225 table_branch,
1226 )
1227 .await
1228 }
1229
1230 pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
1233 optimize::optimize_all_tables(self).await
1234 }
1235
1236 pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
1240 repair::repair_all_tables(self, options).await
1241 }
1242
1243 pub async fn cleanup(
1247 &mut self,
1248 options: optimize::CleanupPolicyOptions,
1249 ) -> Result<Vec<optimize::TableCleanupStats>> {
1250 optimize::cleanup_all_tables(self, options).await
1251 }
1252
1253 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1263 self.ensure_schema_state_valid().await?;
1264 let catalog = self.catalog();
1265 let node_type = catalog
1266 .node_types
1267 .get(type_name)
1268 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1269 if !node_type.blob_properties.contains(property) {
1270 return Err(OmniError::manifest(format!(
1271 "property '{}' on type '{}' is not a Blob",
1272 property, type_name
1273 )));
1274 }
1275
1276 let snapshot = self.snapshot().await;
1277 let table_key = format!("node:{}", type_name);
1278 let handle = self
1279 .storage()
1280 .open_snapshot_at_table(&snapshot, &table_key)
1281 .await?;
1282
1283 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1284 let row_id = self
1285 .storage()
1286 .first_row_id_for_filter(&handle, &filter_sql)
1287 .await?
1288 .ok_or_else(|| {
1289 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1290 })?;
1291
1292 let ds = handle.into_arc();
1296 let mut blobs = ds
1297 .take_blobs(&[row_id], property)
1298 .await
1299 .map_err(|e| OmniError::Lance(e.to_string()))?;
1300
1301 blobs.pop().ok_or_else(|| {
1302 OmniError::manifest(format!(
1303 "blob '{}' on {} '{}' returned no data",
1304 property, type_name, id
1305 ))
1306 })
1307 }
1308
1309 pub(crate) async fn active_branch(&self) -> Option<String> {
1310 self.coordinator
1311 .read()
1312 .await
1313 .current_branch()
1314 .map(str::to_string)
1315 }
1316
1317 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1318 let descendants = self
1319 .coordinator
1320 .read()
1321 .await
1322 .branch_descendants(branch)
1323 .await?;
1324 if let Some(descendant) = descendants.first() {
1325 return Err(OmniError::manifest_conflict(format!(
1326 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1327 branch, descendant
1328 )));
1329 }
1330
1331 for other_branch in branches
1332 .iter()
1333 .filter(|candidate| candidate.as_str() != branch)
1334 {
1335 let snapshot = self
1336 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1337 .await?;
1338 if snapshot
1339 .entries()
1340 .any(|entry| entry.table_branch.as_deref() == Some(branch))
1341 {
1342 return Err(OmniError::manifest_conflict(format!(
1343 "cannot delete branch '{}' because branch '{}' still depends on it",
1344 branch, other_branch
1345 )));
1346 }
1347 }
1348
1349 Ok(())
1350 }
1351
1352 async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1360 let mut seen_paths = HashSet::new();
1361 let mut cleanup_targets = owned_tables
1362 .iter()
1363 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1364 .cloned()
1365 .collect::<Vec<_>>();
1366 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1367
1368 for (table_key, table_path) in cleanup_targets {
1369 let dataset_uri = self.storage().dataset_uri(&table_path);
1370 let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
1371 {
1372 Ok(()) => {
1373 self.storage()
1374 .force_delete_branch(&dataset_uri, branch)
1375 .await
1376 }
1377 Err(injected) => Err(injected),
1378 };
1379 if let Err(err) = outcome {
1380 tracing::warn!(
1381 target: "omnigraph::branch_delete::cleanup",
1382 branch = %branch,
1383 table = %table_key,
1384 error = %err,
1385 "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1386 );
1387 }
1388 }
1389 }
1390
1391 async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1392 let active = self
1393 .coordinator
1394 .read()
1395 .await
1396 .current_branch()
1397 .map(str::to_string);
1398 if active.as_deref() == Some(branch) {
1399 return Err(OmniError::manifest_conflict(format!(
1400 "cannot delete currently active branch '{}'",
1401 branch
1402 )));
1403 }
1404
1405 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1406 let owned_tables = branch_snapshot
1407 .entries()
1408 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1409 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1410 .collect::<Vec<_>>();
1411
1412 self.coordinator.write().await.branch_delete(branch).await?;
1414 self.cleanup_deleted_branch_tables(branch, &owned_tables)
1416 .await;
1417 Ok(())
1418 }
1419
1420 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1421 normalize_branch_name(branch)
1422 }
1423
1424 pub(crate) async fn head_commit_id_for_branch(
1425 &self,
1426 branch: Option<&str>,
1427 ) -> Result<Option<String>> {
1428 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1429 coordinator.ensure_commit_graph_initialized().await?;
1430 coordinator
1431 .head_commit_id()
1432 .await
1433 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1434 }
1435
1436 pub async fn branch_create(&self, name: &str) -> Result<()> {
1437 self.branch_create_as(name, None).await
1438 }
1439
1440 pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1448 self.enforce(
1449 omnigraph_policy::PolicyAction::BranchCreate,
1450 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1451 actor,
1452 )?;
1453 self.ensure_schema_state_valid().await?;
1454 self.ensure_schema_apply_idle("branch_create").await?;
1455 ensure_public_branch_ref(name, "branch_create")?;
1456 self.coordinator.write().await.branch_create(name).await
1457 }
1458
1459 pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1460 self.branch_create_from_as(from, name, None).await
1461 }
1462
1463 pub async fn branch_create_from_as(
1475 &self,
1476 from: impl Into<ReadTarget>,
1477 name: &str,
1478 actor: Option<&str>,
1479 ) -> Result<()> {
1480 let target = from.into();
1481 let source_branch = match &target {
1482 ReadTarget::Branch(b) => b.clone(),
1483 _ => "<snapshot>".to_string(),
1484 };
1485 self.enforce(
1486 omnigraph_policy::PolicyAction::BranchCreate,
1487 &omnigraph_policy::ResourceScope::BranchTransition {
1488 source: source_branch,
1489 target: name.to_string(),
1490 },
1491 actor,
1492 )?;
1493 self.ensure_schema_apply_idle("branch_create_from").await?;
1494 self.branch_create_from_impl(target, name, false).await
1495 }
1496
1497 async fn branch_create_from_impl(
1498 &self,
1499 from: impl Into<ReadTarget>,
1500 name: &str,
1501 allow_internal_refs: bool,
1502 ) -> Result<()> {
1503 let target = from.into();
1504 let ReadTarget::Branch(branch_name) = target else {
1505 return Err(OmniError::manifest(
1506 "branch creation from pinned snapshots is not supported yet".to_string(),
1507 ));
1508 };
1509 if !allow_internal_refs {
1510 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1511 ensure_public_branch_ref(name, "branch_create_from")?;
1512 }
1513 let branch = normalize_branch_name(&branch_name)?;
1514 let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1531 source_coord.branch_create(name).await
1532 }
1533
1534 pub async fn branch_list(&self) -> Result<Vec<String>> {
1535 self.ensure_schema_state_valid().await?;
1536 self.coordinator.read().await.branch_list().await
1537 }
1538
1539 pub async fn branch_delete(&self, name: &str) -> Result<()> {
1540 self.branch_delete_as(name, None).await
1541 }
1542
1543 pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1551 self.enforce(
1552 omnigraph_policy::PolicyAction::BranchDelete,
1553 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1554 actor,
1555 )?;
1556 self.ensure_schema_state_valid().await?;
1557 self.ensure_schema_apply_idle("branch_delete").await?;
1558 ensure_public_branch_ref(name, "branch_delete")?;
1559 self.refresh().await?;
1560 let branch = normalize_branch_name(name)?
1561 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1562 let branches = self.coordinator.read().await.branch_list().await?;
1563 if !branches.iter().any(|candidate| candidate == &branch) {
1564 return Err(OmniError::manifest_not_found(format!(
1565 "branch '{}' not found",
1566 branch
1567 )));
1568 }
1569
1570 self.ensure_branch_delete_safe(&branch, &branches).await?;
1571 self.delete_branch_storage_only(&branch).await
1572 }
1573
1574 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1575 self.ensure_schema_state_valid().await?;
1576 self.coordinator
1577 .read()
1578 .await
1579 .resolve_commit(&SnapshotId::new(commit_id))
1580 .await
1581 }
1582
1583 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1584 self.ensure_schema_state_valid().await?;
1585 let branch = match branch {
1586 Some(branch) => normalize_branch_name(branch)?,
1587 None => None,
1588 };
1589 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1590 coordinator.list_commits().await
1591 }
1592
1593 pub(crate) async fn open_for_mutation(
1599 &self,
1600 table_key: &str,
1601 op_kind: crate::db::MutationOpKind,
1602 ) -> Result<(SnapshotHandle, String, Option<String>)> {
1603 table_ops::open_for_mutation(self, table_key, op_kind).await
1604 }
1605
1606 pub(crate) async fn open_for_mutation_on_branch(
1607 &self,
1608 branch: Option<&str>,
1609 table_key: &str,
1610 op_kind: crate::db::MutationOpKind,
1611 ) -> Result<(SnapshotHandle, String, Option<String>)> {
1612 table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1613 }
1614
1615 pub(crate) async fn fork_dataset_from_entry_state(
1623 &self,
1624 table_key: &str,
1625 full_path: &str,
1626 source_branch: Option<&str>,
1627 source_version: u64,
1628 active_branch: &str,
1629 ) -> Result<SnapshotHandle> {
1630 match table_ops::fork_dataset_from_entry_state(
1631 self,
1632 table_key,
1633 full_path,
1634 source_branch,
1635 source_version,
1636 active_branch,
1637 )
1638 .await?
1639 {
1640 crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
1641 crate::storage_layer::ForkOutcome::RefAlreadyExists => {
1642 table_ops::reclaim_orphaned_fork_and_refork(
1643 self,
1644 table_key,
1645 full_path,
1646 source_branch,
1647 source_version,
1648 active_branch,
1649 )
1650 .await
1651 }
1652 }
1653 }
1654
1655 pub(crate) async fn reopen_for_mutation(
1656 &self,
1657 table_key: &str,
1658 full_path: &str,
1659 table_branch: Option<&str>,
1660 expected_version: u64,
1661 op_kind: crate::db::MutationOpKind,
1662 ) -> Result<SnapshotHandle> {
1663 table_ops::reopen_for_mutation(
1664 self,
1665 table_key,
1666 full_path,
1667 table_branch,
1668 expected_version,
1669 op_kind,
1670 )
1671 .await
1672 }
1673
1674 pub(crate) async fn open_dataset_at_state(
1675 &self,
1676 table_path: &str,
1677 table_branch: Option<&str>,
1678 table_version: u64,
1679 ) -> Result<SnapshotHandle> {
1680 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1681 }
1682
1683 pub(crate) async fn build_indices_on_dataset(
1684 &self,
1685 table_key: &str,
1686 ds: &mut SnapshotHandle,
1687 ) -> Result<Vec<PendingIndex>> {
1688 table_ops::build_indices_on_dataset(self, table_key, ds).await
1689 }
1690
1691 #[cfg(test)]
1694 pub(crate) async fn commit_updates(
1695 &mut self,
1696 updates: &[crate::db::SubTableUpdate],
1697 ) -> Result<u64> {
1698 table_ops::commit_updates(self, updates).await
1699 }
1700
1701 pub(crate) async fn commit_manifest_updates(
1702 &self,
1703 updates: &[crate::db::SubTableUpdate],
1704 ) -> Result<u64> {
1705 table_ops::commit_manifest_updates(self, updates).await
1706 }
1707
1708 pub(crate) async fn record_merge_commit(
1709 &self,
1710 manifest_version: u64,
1711 parent_commit_id: &str,
1712 merged_parent_commit_id: &str,
1713 actor_id: Option<&str>,
1714 ) -> Result<String> {
1715 table_ops::record_merge_commit(
1716 self,
1717 manifest_version,
1718 parent_commit_id,
1719 merged_parent_commit_id,
1720 actor_id,
1721 )
1722 .await
1723 }
1724
1725 pub(crate) async fn commit_updates_on_branch_with_expected(
1726 &self,
1727 branch: Option<&str>,
1728 updates: &[crate::db::SubTableUpdate],
1729 expected_table_versions: &std::collections::HashMap<String, u64>,
1730 actor_id: Option<&str>,
1731 ) -> Result<u64> {
1732 table_ops::commit_updates_on_branch_with_expected(
1733 self,
1734 branch,
1735 updates,
1736 expected_table_versions,
1737 actor_id,
1738 )
1739 .await
1740 }
1741
1742 pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1743 table_ops::ensure_commit_graph_initialized(self).await
1744 }
1745
1746 pub(crate) async fn invalidate_graph_index(&self) {
1748 table_ops::invalidate_graph_index(self).await
1749 }
1750}
1751
1752pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1753 let branch = branch.trim();
1754 if branch.is_empty() {
1755 return Err(OmniError::manifest(
1756 "branch name cannot be empty".to_string(),
1757 ));
1758 }
1759 if branch == "main" {
1760 return Ok(None);
1761 }
1762 Ok(Some(branch.to_string()))
1763}
1764
1765fn warm_resolved_target(coord: &GraphCoordinator, requested: &ReadTarget) -> ResolvedTarget {
1771 ResolvedTarget {
1772 requested: requested.clone(),
1773 branch: coord.current_branch().map(str::to_string),
1774 snapshot_id: SnapshotId::synthetic(
1775 coord.current_branch(),
1776 coord.version(),
1777 coord.manifest_incarnation().e_tag.as_deref(),
1778 ),
1779 snapshot: coord.snapshot(),
1780 }
1781}
1782
1783pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1784 if is_internal_system_branch(branch) {
1785 return Err(OmniError::manifest(format!(
1786 "{} does not allow internal system ref '{}'",
1787 operation, branch
1788 )));
1789 }
1790 Ok(())
1791}
1792
1793fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1794 if batches.is_empty() {
1795 return Ok(RecordBatch::new_empty(schema));
1796 }
1797 if batches.len() == 1 {
1798 return Ok(batches.into_iter().next().unwrap());
1799 }
1800 let batch_schema = batches[0].schema();
1801 arrow_select::concat::concat_batches(&batch_schema, &batches)
1802 .map_err(|e| OmniError::Lance(e.to_string()))
1803}
1804
1805fn blob_properties_for_table_key<'a>(
1806 catalog: &'a Catalog,
1807 table_key: &str,
1808) -> Result<&'a std::collections::HashSet<String>> {
1809 if let Some(type_name) = table_key.strip_prefix("node:") {
1810 return catalog
1811 .node_types
1812 .get(type_name)
1813 .map(|node_type| &node_type.blob_properties)
1814 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1815 }
1816 if let Some(type_name) = table_key.strip_prefix("edge:") {
1817 return catalog
1818 .edge_types
1819 .get(type_name)
1820 .map(|edge_type| &edge_type.blob_properties)
1821 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1822 }
1823 Err(OmniError::manifest(format!(
1824 "invalid table key '{}'",
1825 table_key
1826 )))
1827}
1828
1829fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1830 if descriptions.is_null(row) {
1831 return Ok(true);
1832 }
1833
1834 let kind = descriptions
1835 .column_by_name("kind")
1836 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1837 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1838 .or_else(|| {
1839 descriptions
1840 .column_by_name("kind")
1841 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1842 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1843 });
1844 let position = descriptions
1845 .column_by_name("position")
1846 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1847 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1848 let size = descriptions
1849 .column_by_name("size")
1850 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1851 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1852 let blob_uri = descriptions
1853 .column_by_name("blob_uri")
1854 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1855 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1856
1857 let Some(kind) = kind else {
1858 return Ok(true);
1859 };
1860 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1861 if kind != BlobKind::Inline {
1862 return Ok(false);
1863 }
1864
1865 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1866}
1867
1868fn fixup_blob_schemas(catalog: &mut Catalog) {
1874 for node_type in catalog.node_types.values_mut() {
1875 if node_type.blob_properties.is_empty() {
1876 continue;
1877 }
1878 let fields: Vec<Field> = node_type
1879 .arrow_schema
1880 .fields()
1881 .iter()
1882 .map(|f| {
1883 if node_type.blob_properties.contains(f.name()) {
1884 blob_field(f.name(), f.is_nullable())
1885 } else {
1886 f.as_ref().clone()
1887 }
1888 })
1889 .collect();
1890 node_type.arrow_schema = Arc::new(Schema::new(fields));
1891 }
1892 for edge_type in catalog.edge_types.values_mut() {
1893 if edge_type.blob_properties.is_empty() {
1894 continue;
1895 }
1896 let fields: Vec<Field> = edge_type
1897 .arrow_schema
1898 .fields()
1899 .iter()
1900 .map(|f| {
1901 if edge_type.blob_properties.contains(f.name()) {
1902 blob_field(f.name(), f.is_nullable())
1903 } else {
1904 f.as_ref().clone()
1905 }
1906 })
1907 .collect();
1908 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1909 }
1910}
1911
1912fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1913 let schema_ast = parse_schema(schema_source)?;
1914 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1915}
1916
1917async fn init_storage_phase(
1932 root: &str,
1933 schema_source: &str,
1934 schema_ir: &SchemaIR,
1935 catalog: &Catalog,
1936 storage: &Arc<dyn StorageAdapter>,
1937 write_schema_pg: bool,
1938) -> Result<GraphCoordinator> {
1939 if write_schema_pg {
1940 let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1941 storage.write_text(&schema_path, schema_source).await?;
1942 crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1943 }
1944
1945 write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1946 crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1947
1948 let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1949 crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1950
1951 Ok(coordinator)
1952}
1953
1954async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1966 for uri in [
1967 schema_source_uri(root),
1968 schema_ir_uri(root),
1969 schema_state_uri(root),
1970 ] {
1971 if let Err(err) = storage.delete(&uri).await {
1972 tracing::warn!(
1973 target: "omnigraph::init::cleanup",
1974 uri = %uri,
1975 error = %err,
1976 "init failed; best-effort cleanup could not delete artifact",
1977 );
1978 }
1979 }
1980}
1981
1982fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1983 match type_kind {
1984 SchemaTypeKind::Node => format!("node:{}", name),
1985 SchemaTypeKind::Edge => format!("edge:{}", name),
1986 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1987 }
1988}
1989
1990fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1991 if let Some(type_name) = table_key.strip_prefix("node:") {
1992 let node_type: &NodeType = catalog
1993 .node_types
1994 .get(type_name)
1995 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1996 return Ok(node_type.arrow_schema.clone());
1997 }
1998 if let Some(type_name) = table_key.strip_prefix("edge:") {
1999 let edge_type: &EdgeType = catalog
2000 .edge_types
2001 .get(type_name)
2002 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
2003 return Ok(edge_type.arrow_schema.clone());
2004 }
2005 Err(OmniError::manifest(format!(
2006 "invalid table key '{}'",
2007 table_key
2008 )))
2009}
2010
2011fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
2012 let mut obj = serde_json::Map::new();
2013 for (i, field) in batch.schema().fields().iter().enumerate() {
2014 obj.insert(
2015 field.name().clone(),
2016 json_value_from_array(batch.column(i).as_ref(), row)?,
2017 );
2018 }
2019 Ok(serde_json::Value::Object(obj))
2020}
2021
2022fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
2023 if array.is_null(row) {
2024 return Ok(serde_json::Value::Null);
2025 }
2026
2027 match array.data_type() {
2028 DataType::Utf8 => Ok(serde_json::Value::String(
2029 array
2030 .as_any()
2031 .downcast_ref::<StringArray>()
2032 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
2033 .value(row)
2034 .to_string(),
2035 )),
2036 DataType::LargeUtf8 => Ok(serde_json::Value::String(
2037 array
2038 .as_any()
2039 .downcast_ref::<LargeStringArray>()
2040 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
2041 .value(row)
2042 .to_string(),
2043 )),
2044 DataType::Boolean => Ok(serde_json::Value::Bool(
2045 array
2046 .as_any()
2047 .downcast_ref::<BooleanArray>()
2048 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
2049 .value(row),
2050 )),
2051 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2052 array
2053 .as_any()
2054 .downcast_ref::<Int32Array>()
2055 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
2056 .value(row),
2057 ))),
2058 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
2059 array
2060 .as_any()
2061 .downcast_ref::<Int64Array>()
2062 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
2063 .value(row),
2064 ))),
2065 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2066 array
2067 .as_any()
2068 .downcast_ref::<UInt32Array>()
2069 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
2070 .value(row),
2071 ))),
2072 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
2073 array
2074 .as_any()
2075 .downcast_ref::<UInt64Array>()
2076 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
2077 .value(row),
2078 ))),
2079 DataType::Float32 => {
2080 let value = array
2081 .as_any()
2082 .downcast_ref::<Float32Array>()
2083 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
2084 .value(row) as f64;
2085 Ok(serde_json::Value::Number(
2086 serde_json::Number::from_f64(value).ok_or_else(|| {
2087 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
2088 })?,
2089 ))
2090 }
2091 DataType::Float64 => {
2092 let value = array
2093 .as_any()
2094 .downcast_ref::<Float64Array>()
2095 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
2096 .value(row);
2097 Ok(serde_json::Value::Number(
2098 serde_json::Number::from_f64(value).ok_or_else(|| {
2099 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
2100 })?,
2101 ))
2102 }
2103 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
2104 array
2105 .as_any()
2106 .downcast_ref::<Date32Array>()
2107 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
2108 .value(row),
2109 ))),
2110 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
2111 &base64::engine::general_purpose::STANDARD,
2112 array
2113 .as_any()
2114 .downcast_ref::<BinaryArray>()
2115 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
2116 .value(row),
2117 ))),
2118 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
2119 &base64::engine::general_purpose::STANDARD,
2120 array
2121 .as_any()
2122 .downcast_ref::<LargeBinaryArray>()
2123 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
2124 .value(row),
2125 ))),
2126 DataType::List(_) => {
2127 let list = array
2128 .as_any()
2129 .downcast_ref::<ListArray>()
2130 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
2131 let values = list.value(row);
2132 let mut out = Vec::with_capacity(values.len());
2133 for idx in 0..values.len() {
2134 out.push(json_value_from_array(values.as_ref(), idx)?);
2135 }
2136 Ok(serde_json::Value::Array(out))
2137 }
2138 DataType::LargeList(_) => {
2139 let list = array
2140 .as_any()
2141 .downcast_ref::<LargeListArray>()
2142 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
2143 let values = list.value(row);
2144 let mut out = Vec::with_capacity(values.len());
2145 for idx in 0..values.len() {
2146 out.push(json_value_from_array(values.as_ref(), idx)?);
2147 }
2148 Ok(serde_json::Value::Array(out))
2149 }
2150 DataType::FixedSizeList(_, _) => {
2151 let list = array
2152 .as_any()
2153 .downcast_ref::<FixedSizeListArray>()
2154 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
2155 let values = list.value(row);
2156 let mut out = Vec::with_capacity(values.len());
2157 for idx in 0..values.len() {
2158 out.push(json_value_from_array(values.as_ref(), idx)?);
2159 }
2160 Ok(serde_json::Value::Array(out))
2161 }
2162 DataType::Struct(fields) => {
2163 let struct_array = array
2164 .as_any()
2165 .downcast_ref::<StructArray>()
2166 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
2167 let mut obj = serde_json::Map::new();
2168 for (field_idx, field) in fields.iter().enumerate() {
2169 obj.insert(
2170 field.name().clone(),
2171 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
2172 );
2173 }
2174 Ok(serde_json::Value::Object(obj))
2175 }
2176 _ => {
2177 let value = arrow_cast::display::array_value_to_string(array, row)
2178 .map_err(|e| OmniError::Lance(e.to_string()))?;
2179 Ok(serde_json::Value::String(value))
2180 }
2181 }
2182}
2183
2184#[cfg(test)]
2185mod tests {
2186 use super::*;
2187 use crate::db::manifest::ManifestCoordinator;
2188 use async_trait::async_trait;
2189 use serde_json::Value;
2190 use std::sync::{Arc, Mutex};
2191
2192 use crate::storage::{ObjectStorageAdapter, StorageAdapter, join_uri};
2193
2194 const TEST_SCHEMA: &str = r#"
2195node Person {
2196 name: String @key
2197 age: I32?
2198}
2199node Company {
2200 name: String @key
2201}
2202edge Knows: Person -> Person {
2203 since: Date?
2204}
2205edge WorksAt: Person -> Company
2206"#;
2207
2208 #[derive(Debug)]
2209 struct RecordingStorageAdapter {
2210 inner: ObjectStorageAdapter,
2211 reads: Mutex<Vec<String>>,
2212 writes: Mutex<Vec<String>>,
2213 exists_checks: Mutex<Vec<String>>,
2214 renames: Mutex<Vec<(String, String)>>,
2215 deletes: Mutex<Vec<String>>,
2216 }
2217
2218 impl Default for RecordingStorageAdapter {
2219 fn default() -> Self {
2220 Self {
2221 inner: ObjectStorageAdapter::local(),
2222 reads: Mutex::default(),
2223 writes: Mutex::default(),
2224 exists_checks: Mutex::default(),
2225 renames: Mutex::default(),
2226 deletes: Mutex::default(),
2227 }
2228 }
2229 }
2230
2231 impl RecordingStorageAdapter {
2232 fn reads(&self) -> Vec<String> {
2233 self.reads.lock().unwrap().clone()
2234 }
2235
2236 fn writes(&self) -> Vec<String> {
2237 self.writes.lock().unwrap().clone()
2238 }
2239
2240 fn exists_checks(&self) -> Vec<String> {
2241 self.exists_checks.lock().unwrap().clone()
2242 }
2243 }
2244
2245 #[async_trait]
2246 impl StorageAdapter for RecordingStorageAdapter {
2247 async fn read_text(&self, uri: &str) -> Result<String> {
2248 self.reads.lock().unwrap().push(uri.to_string());
2249 self.inner.read_text(uri).await
2250 }
2251
2252 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2253 self.writes.lock().unwrap().push(uri.to_string());
2254 self.inner.write_text(uri, contents).await
2255 }
2256
2257 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2258 self.writes.lock().unwrap().push(uri.to_string());
2259 self.inner.write_text_if_absent(uri, contents).await
2260 }
2261
2262 async fn exists(&self, uri: &str) -> Result<bool> {
2263 self.exists_checks.lock().unwrap().push(uri.to_string());
2264 self.inner.exists(uri).await
2265 }
2266
2267 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2268 self.renames
2269 .lock()
2270 .unwrap()
2271 .push((from_uri.to_string(), to_uri.to_string()));
2272 self.inner.rename_text(from_uri, to_uri).await
2273 }
2274
2275 async fn delete(&self, uri: &str) -> Result<()> {
2276 self.deletes.lock().unwrap().push(uri.to_string());
2277 self.inner.delete(uri).await
2278 }
2279
2280 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2281 self.inner.list_dir(dir_uri).await
2282 }
2283
2284 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2285 self.inner.read_text_versioned(uri).await
2286 }
2287
2288 async fn write_text_if_match(
2289 &self,
2290 uri: &str,
2291 contents: &str,
2292 expected_version: &str,
2293 ) -> Result<Option<String>> {
2294 self.inner
2295 .write_text_if_match(uri, contents, expected_version)
2296 .await
2297 }
2298
2299 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2300 self.inner.delete_prefix(prefix_uri).await
2301 }
2302 }
2303
2304 #[derive(Debug)]
2305 struct InitRaceStorageAdapter {
2306 inner: ObjectStorageAdapter,
2307 root: String,
2308 barrier: Arc<tokio::sync::Barrier>,
2309 }
2310
2311 #[async_trait]
2312 impl StorageAdapter for InitRaceStorageAdapter {
2313 async fn read_text(&self, uri: &str) -> Result<String> {
2314 self.inner.read_text(uri).await
2315 }
2316
2317 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2318 self.inner.write_text(uri, contents).await
2319 }
2320
2321 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2322 self.inner.write_text_if_absent(uri, contents).await
2323 }
2324
2325 async fn exists(&self, uri: &str) -> Result<bool> {
2326 let exists = self.inner.exists(uri).await?;
2327 if uri == schema_state_uri(&self.root) {
2328 self.barrier.wait().await;
2329 }
2330 Ok(exists)
2331 }
2332
2333 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2334 self.inner.rename_text(from_uri, to_uri).await
2335 }
2336
2337 async fn delete(&self, uri: &str) -> Result<()> {
2338 self.inner.delete(uri).await
2339 }
2340
2341 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2342 self.inner.list_dir(dir_uri).await
2343 }
2344
2345 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2346 self.inner.read_text_versioned(uri).await
2347 }
2348
2349 async fn write_text_if_match(
2350 &self,
2351 uri: &str,
2352 contents: &str,
2353 expected_version: &str,
2354 ) -> Result<Option<String>> {
2355 self.inner
2356 .write_text_if_match(uri, contents, expected_version)
2357 .await
2358 }
2359
2360 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2361 self.inner.delete_prefix(prefix_uri).await
2362 }
2363 }
2364
2365 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2366 async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2367 let dir = tempfile::tempdir().unwrap();
2368 let uri = dir.path().to_str().unwrap().to_string();
2369 let root = normalize_root_uri(&uri).unwrap();
2370 let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2371 inner: ObjectStorageAdapter::local(),
2372 root,
2373 barrier: Arc::new(tokio::sync::Barrier::new(2)),
2374 });
2375
2376 let left = Omnigraph::init_with_storage(
2377 &uri,
2378 TEST_SCHEMA,
2379 Arc::clone(&storage),
2380 InitOptions::default(),
2381 );
2382 let right = Omnigraph::init_with_storage(
2383 &uri,
2384 TEST_SCHEMA,
2385 Arc::clone(&storage),
2386 InitOptions::default(),
2387 );
2388 let (left, right) = tokio::join!(left, right);
2389 let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2390 assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2391
2392 assert!(
2393 dir.path().join("_schema.pg").exists(),
2394 "winning init must leave _schema.pg in place"
2395 );
2396 assert!(
2397 dir.path().join("_schema.ir.json").exists(),
2398 "winning init must leave _schema.ir.json in place"
2399 );
2400 assert!(
2401 dir.path().join("__schema_state.json").exists(),
2402 "winning init must leave __schema_state.json in place"
2403 );
2404 }
2405
2406 #[tokio::test]
2407 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2408 let dir = tempfile::tempdir().unwrap();
2409 let uri = dir.path().to_str().unwrap();
2410 let adapter = Arc::new(RecordingStorageAdapter::default());
2411
2412 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2413 .await
2414 .unwrap();
2415 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2416 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2417 assert!(
2418 adapter
2419 .writes()
2420 .contains(&join_uri(uri, "__schema_state.json"))
2421 );
2422
2423 Omnigraph::open_with_storage(uri, adapter.clone())
2424 .await
2425 .unwrap();
2426 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2427 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2428 assert!(
2429 adapter
2430 .reads()
2431 .contains(&join_uri(uri, "__schema_state.json"))
2432 );
2433 assert!(
2434 adapter
2435 .exists_checks()
2436 .contains(&join_uri(uri, "_schema.ir.json"))
2437 );
2438 assert!(
2439 adapter
2440 .exists_checks()
2441 .contains(&join_uri(uri, "__schema_state.json"))
2442 );
2443 assert!(
2444 adapter
2445 .exists_checks()
2446 .contains(&join_uri(uri, "_graph_commits.lance"))
2447 );
2448 }
2449
2450 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2451 let snapshot = db.snapshot().await;
2452 let ds = db
2453 .storage()
2454 .open_snapshot_at_table(&snapshot, table_key)
2455 .await
2456 .unwrap();
2457 let batches = db.storage().scan_batches(&ds).await.unwrap();
2458 batches
2459 .into_iter()
2460 .flat_map(|batch| {
2461 (0..batch.num_rows())
2462 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2463 .collect::<Vec<_>>()
2464 })
2465 .collect()
2466 }
2467
2468 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2469 let (ds, full_path, table_branch) = db
2470 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2471 .await
2472 .unwrap();
2473 let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
2474 let columns: Vec<Arc<dyn Array>> = schema
2475 .fields()
2476 .iter()
2477 .map(|field| match field.name().as_str() {
2478 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2479 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2480 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2481 _ => new_null_array(field.data_type(), 1),
2482 })
2483 .collect();
2484 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2485 let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap();
2486 let committed = db.storage().commit_staged(ds, staged).await.unwrap();
2487 let state = db
2488 .storage()
2489 .table_state(&full_path, &committed)
2490 .await
2491 .unwrap();
2492 db.commit_updates(&[crate::db::SubTableUpdate {
2493 table_key: "node:Person".to_string(),
2494 table_version: state.version,
2495 table_branch,
2496 row_count: state.row_count,
2497 version_metadata: state.version_metadata,
2498 }])
2499 .await
2500 .unwrap();
2501 }
2502
2503 #[tokio::test]
2504 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2505 let dir = tempfile::tempdir().unwrap();
2506 let uri = dir.path().to_str().unwrap();
2507 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2508 seed_person_row(&mut db, "Alice", Some(30)).await;
2509
2510 let desired = TEST_SCHEMA.replace(
2511 " age: I32?\n}",
2512 " age: I32?\n nickname: String?\n}",
2513 );
2514 let result = db.apply_schema(&desired).await.unwrap();
2515 assert!(result.applied);
2516
2517 let reopened = Omnigraph::open(uri).await.unwrap();
2518 let rows = table_rows_json(&reopened, "node:Person").await;
2519 assert_eq!(rows.len(), 1);
2520 assert_eq!(rows[0]["name"], "Alice");
2521 assert_eq!(rows[0]["age"], 30);
2522 assert!(rows[0]["nickname"].is_null());
2523 assert!(
2524 reopened.catalog().node_types["Person"]
2525 .properties
2526 .contains_key("nickname")
2527 );
2528 assert!(dir.path().join("_schema.pg").exists());
2529 }
2530
2531 #[tokio::test]
2532 async fn test_apply_schema_renames_property_and_preserves_values() {
2533 let dir = tempfile::tempdir().unwrap();
2534 let uri = dir.path().to_str().unwrap();
2535 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2536 seed_person_row(&mut db, "Alice", Some(30)).await;
2537
2538 let desired = TEST_SCHEMA.replace(
2539 " age: I32?\n}",
2540 " years: I32? @rename_from(\"age\")\n}",
2541 );
2542 db.apply_schema(&desired).await.unwrap();
2543
2544 let reopened = Omnigraph::open(uri).await.unwrap();
2545 let rows = table_rows_json(&reopened, "node:Person").await;
2546 assert_eq!(rows[0]["name"], "Alice");
2547 assert_eq!(rows[0]["years"], 30);
2548 assert!(rows[0].get("age").is_none());
2549 }
2550
2551 #[tokio::test]
2552 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2553 let dir = tempfile::tempdir().unwrap();
2554 let uri = dir.path().to_str().unwrap();
2555 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2556 seed_person_row(&mut db, "Alice", Some(30)).await;
2557 let before_version = db.snapshot().await.version();
2558
2559 let desired = TEST_SCHEMA
2560 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2561 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2562 .replace(
2563 "edge WorksAt: Person -> Company",
2564 "edge WorksAt: Human -> Company",
2565 );
2566 db.apply_schema(&desired).await.unwrap();
2567
2568 let head = db.snapshot().await;
2569 assert!(head.entry("node:Person").is_none());
2570 assert!(head.entry("node:Human").is_some());
2571 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2572 .await
2573 .unwrap();
2574 assert!(historical.entry("node:Person").is_some());
2575 assert!(historical.entry("node:Human").is_none());
2576 }
2577
2578 #[tokio::test]
2579 async fn test_apply_schema_succeeds_after_load() {
2580 let dir = tempfile::tempdir().unwrap();
2587 let uri = dir.path().to_str().unwrap();
2588 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2589
2590 crate::loader::load_jsonl(
2591 &mut db,
2592 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2593 crate::loader::LoadMode::Overwrite,
2594 )
2595 .await
2596 .unwrap();
2597
2598 let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2599 assert!(
2600 !all_branches.iter().any(|b| b.starts_with("__run__")),
2601 "no __run__ branch should exist after publish, got: {:?}",
2602 all_branches
2603 );
2604
2605 let desired = TEST_SCHEMA.replace(
2606 " age: I32?\n}",
2607 " age: I32?\n nickname: String?\n}",
2608 );
2609 let result = db.apply_schema(&desired).await.unwrap();
2610 assert!(result.applied, "schema apply should have applied");
2611 }
2612
2613 #[tokio::test]
2623 async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
2624 let dir = tempfile::tempdir().unwrap();
2625 let uri = dir.path().to_str().unwrap();
2626 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2627
2628 db.branch_create("__run__legacy").await.unwrap();
2631 drop(db);
2632 {
2633 let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
2635 .await
2636 .unwrap();
2637 ds.update_schema_metadata([(
2638 "omnigraph:internal_schema_version".to_string(),
2639 Some("2".to_string()),
2640 )])
2641 .await
2642 .unwrap();
2643 }
2644
2645 let db = Omnigraph::open(uri).await.unwrap();
2648 let branches = db.branch_list().await.unwrap();
2649 assert!(
2650 !branches.iter().any(|b| b.starts_with("__run__")),
2651 "open-time migration must sweep legacy __run__ branches; got {branches:?}",
2652 );
2653
2654 let desired = TEST_SCHEMA.replace(
2657 " age: I32?\n}",
2658 " age: I32?\n nickname: String?\n}",
2659 );
2660 let result = db.apply_schema(&desired).await.unwrap();
2661 assert!(result.applied, "schema apply should have applied");
2662 }
2663
2664 #[tokio::test]
2665 async fn test_apply_schema_defers_index_then_reconciler_builds_it() {
2666 let dir = tempfile::tempdir().unwrap();
2671 let uri = dir.path().to_str().unwrap();
2672 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2673 seed_person_row(&mut db, "Alice", Some(30)).await;
2674
2675 let desired = TEST_SCHEMA.replace("age: I32?", "age: I32? @index");
2676 db.apply_schema(&desired).await.unwrap();
2677
2678 let snapshot = db.snapshot().await;
2680 let ds = db
2681 .storage()
2682 .open_snapshot_at_table(&snapshot, "node:Person")
2683 .await
2684 .unwrap();
2685 assert!(
2686 !db.storage().has_btree_index(&ds, "age").await.unwrap(),
2687 "apply must not build the index inline (deferred to the reconciler)"
2688 );
2689
2690 db.ensure_indices().await.unwrap();
2692 let snapshot = db.snapshot().await;
2693 let ds = db
2694 .storage()
2695 .open_snapshot_at_table(&snapshot, "node:Person")
2696 .await
2697 .unwrap();
2698 assert!(
2699 db.storage().has_btree_index(&ds, "age").await.unwrap(),
2700 "ensure_indices must build the deferred index"
2701 );
2702 }
2703
2704 #[tokio::test]
2705 async fn test_apply_schema_rewrite_defers_index_then_reconciler_restores() {
2706 let dir = tempfile::tempdir().unwrap();
2709 let uri = dir.path().to_str().unwrap();
2710 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2711 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2712 seed_person_row(&mut db, "Alice", Some(30)).await;
2713
2714 let desired = initial_schema.replace(
2715 " age: I32?\n}",
2716 " age: I32?\n nickname: String?\n}",
2717 );
2718 db.apply_schema(&desired).await.unwrap();
2719
2720 db.ensure_indices().await.unwrap();
2722 let snapshot = db.snapshot().await;
2723 let ds = db
2724 .storage()
2725 .open_snapshot_at_table(&snapshot, "node:Person")
2726 .await
2727 .unwrap();
2728 assert!(db.storage().has_btree_index(&ds, "id").await.unwrap());
2729 assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
2730 }
2731
2732 #[tokio::test]
2733 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2734 let dir = tempfile::tempdir().unwrap();
2735 let uri = dir.path().to_str().unwrap();
2736 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2737 let mut db = db;
2738 db.coordinator
2739 .write()
2740 .await
2741 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2742 .await
2743 .unwrap();
2744
2745 let err = db
2746 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2747 .await
2748 .unwrap_err();
2749 assert!(
2750 err.to_string()
2751 .contains("write is unavailable while schema apply is in progress")
2752 );
2753 }
2754
2755 #[tokio::test]
2756 async fn test_commit_updates_rejects_while_schema_apply_locked() {
2757 let dir = tempfile::tempdir().unwrap();
2758 let uri = dir.path().to_str().unwrap();
2759 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2760 db.coordinator
2761 .write()
2762 .await
2763 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2764 .await
2765 .unwrap();
2766
2767 let err = db.commit_updates(&[]).await.unwrap_err();
2768 assert!(
2769 err.to_string()
2770 .contains("write commit is unavailable while schema apply is in progress")
2771 );
2772 }
2773
2774 #[tokio::test]
2775 async fn test_branch_list_hides_schema_apply_lock_branch() {
2776 let dir = tempfile::tempdir().unwrap();
2777 let uri = dir.path().to_str().unwrap();
2778 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2779 db.coordinator
2780 .write()
2781 .await
2782 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2783 .await
2784 .unwrap();
2785
2786 let branches = db.branch_list().await.unwrap();
2787 assert_eq!(branches, vec!["main".to_string()]);
2788 }
2789}