1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::{PropType, ScalarType};
20use omnigraph_compiler::{
21 DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22 build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::storage_layer::SnapshotHandle;
30use crate::table_store::TableStore;
31
32mod export;
33mod optimize;
34mod repair;
35mod schema_apply;
36mod table_ops;
37
38pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
39pub use repair::{
40 RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats,
41};
42pub use schema_apply::SchemaApplyOptions;
43pub use table_ops::PendingIndex;
44
45use super::commit_graph::GraphCommit;
46use super::manifest::{
47 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
48 table_path_for_table_key,
49};
50use super::schema_state::{
51 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
52 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
53 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
54 write_schema_contract, write_schema_contract_staging,
55};
56use super::{
57 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
58 is_schema_apply_lock_branch,
59};
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum MergeOutcome {
63 AlreadyUpToDate,
64 FastForward,
65 Merged,
66}
67
68#[derive(Debug, Clone)]
69pub struct SchemaApplyResult {
70 pub supported: bool,
71 pub applied: bool,
72 pub manifest_version: u64,
73 pub steps: Vec<SchemaMigrationStep>,
74}
75
76#[derive(Debug, Clone)]
77pub struct SchemaApplyPreview {
78 pub plan: SchemaMigrationPlan,
79 pub catalog: Catalog,
80}
81
82pub struct Omnigraph {
88 root_uri: String,
89 storage: Arc<dyn StorageAdapter>,
90 coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
107 table_store: TableStore,
108 runtime_cache: RuntimeCache,
109 catalog: Arc<ArcSwap<Catalog>>,
114 schema_source: Arc<ArcSwap<String>>,
117 write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
123 merge_exclusive: Arc<tokio::sync::Mutex<()>>,
145 policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
161 embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
167 embedding_config: Option<Arc<crate::embedding::EmbeddingConfig>>,
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub enum OpenMode {
185 ReadWrite,
187 ReadOnly,
190}
191
192#[derive(Debug, Clone, Copy, Default)]
205pub struct InitOptions {
206 pub force: bool,
209}
210
211impl Omnigraph {
212 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
219 Self::init_with_options(uri, schema_source, InitOptions::default()).await
220 }
221
222 pub async fn init_with_options(
227 uri: &str,
228 schema_source: &str,
229 options: InitOptions,
230 ) -> Result<Self> {
231 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
232 }
233
234 pub(crate) async fn init_with_storage(
235 uri: &str,
236 schema_source: &str,
237 storage: Arc<dyn StorageAdapter>,
238 options: InitOptions,
239 ) -> Result<Self> {
240 let root = normalize_root_uri(uri)?;
241
242 if !options.force {
252 for candidate in [
253 schema_source_uri(&root),
254 schema_ir_uri(&root),
255 schema_state_uri(&root),
256 ] {
257 if storage.exists(&candidate).await? {
258 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
259 }
260 }
261 }
262
263 let schema_ir = read_schema_ir_from_source(schema_source)?;
264 let mut catalog = build_catalog_from_ir(&schema_ir)?;
265 fixup_blob_schemas(&mut catalog);
266
267 let schema_pg_claimed = if options.force {
275 false
276 } else {
277 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
278 if !storage
279 .write_text_if_absent(&schema_path, schema_source)
280 .await?
281 {
282 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
283 }
284 if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
285 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
286 return Err(err);
287 }
288 true
289 };
290
291 let coordinator = match init_storage_phase(
306 &root,
307 schema_source,
308 &schema_ir,
309 &catalog,
310 &storage,
311 !schema_pg_claimed,
312 )
313 .await
314 {
315 Ok(coordinator) => coordinator,
316 Err(err) => {
317 if schema_pg_claimed || options.force {
318 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
319 }
320 return Err(err);
321 }
322 };
323
324 Ok(Self {
325 root_uri: root.clone(),
326 storage,
327 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
328 table_store: TableStore::new(&root),
329 runtime_cache: RuntimeCache::default(),
330 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
331 schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
332 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
333 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
334 policy: None,
335 embedding: Arc::new(tokio::sync::OnceCell::new()),
336 embedding_config: None,
337 })
338 }
339
340 pub async fn open(uri: &str) -> Result<Self> {
345 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
346 }
347
348 pub async fn open_read_only(uri: &str) -> Result<Self> {
351 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
352 }
353
354 pub(crate) async fn open_with_storage(
357 uri: &str,
358 storage: Arc<dyn StorageAdapter>,
359 ) -> Result<Self> {
360 Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
361 }
362
363 pub(crate) async fn open_with_storage_and_mode(
364 uri: &str,
365 storage: Arc<dyn StorageAdapter>,
366 mode: OpenMode,
367 ) -> Result<Self> {
368 let root = normalize_root_uri(uri)?;
369 if matches!(mode, OpenMode::ReadWrite) {
377 crate::db::manifest::migrate_on_open(&root).await?;
378 }
379 let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
382 if matches!(mode, OpenMode::ReadWrite) {
392 let schema_state_recovery =
393 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
394 .await?;
395 crate::db::manifest::recover_manifest_drift(
402 &root,
403 Arc::clone(&storage),
404 &mut coordinator,
405 crate::db::manifest::RecoveryMode::Full,
406 schema_state_recovery,
407 )
408 .await?;
409 }
410 let schema_path = schema_source_uri(&root);
412 let schema_source = storage.read_text(&schema_path).await?;
413 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
414 let branches = coordinator.branch_list().await?;
415 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
416 &root,
417 Arc::clone(&storage),
418 &branches,
419 ¤t_source_ir,
420 )
421 .await?;
422 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
423 fixup_blob_schemas(&mut catalog);
424
425 Ok(Self {
426 root_uri: root.clone(),
427 storage,
428 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
429 table_store: TableStore::new(&root),
430 runtime_cache: RuntimeCache::default(),
431 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
432 schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
433 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
434 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
435 policy: None,
436 embedding: Arc::new(tokio::sync::OnceCell::new()),
437 embedding_config: None,
438 })
439 }
440
441 pub fn catalog(&self) -> Arc<Catalog> {
445 self.catalog.load_full()
446 }
447
448 pub fn schema_source(&self) -> Arc<String> {
450 self.schema_source.load_full()
451 }
452
453 pub(crate) fn store_catalog(&self, catalog: Catalog) {
457 self.catalog.store(Arc::new(catalog));
458 }
459
460 pub(crate) fn store_schema_source(&self, schema_source: String) {
463 self.schema_source.store(Arc::new(schema_source));
464 }
465
466 pub fn uri(&self) -> &str {
467 &self.root_uri
468 }
469
470 pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
481 self.policy = Some(checker);
482 self
483 }
484
485 pub(crate) fn embedding_cell(
489 &self,
490 ) -> &tokio::sync::OnceCell<crate::embedding::EmbeddingClient> {
491 &self.embedding
492 }
493
494 pub fn with_embedding_config(mut self, config: Arc<crate::embedding::EmbeddingConfig>) -> Self {
499 self.embedding_config = Some(config);
500 self
501 }
502
503 pub(crate) fn embedding_config_ref(&self) -> Option<&crate::embedding::EmbeddingConfig> {
505 self.embedding_config.as_deref()
506 }
507
508 pub(crate) fn enforce(
520 &self,
521 action: omnigraph_policy::PolicyAction,
522 scope: &omnigraph_policy::ResourceScope,
523 actor: Option<&str>,
524 ) -> Result<()> {
525 let Some(checker) = self.policy.as_ref() else {
526 return Ok(());
527 };
528 let Some(actor) = actor else {
529 return Err(OmniError::Policy(
530 "no actor for engine-layer policy check (policy is configured but the call site \
531 didn't thread an actor through — this is almost certainly a bug, not an \
532 intended bypass)"
533 .to_string(),
534 ));
535 };
536 checker
537 .check(action, scope, actor)
538 .map_err(|err| OmniError::Policy(err.to_string()))
539 }
540
541 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
542 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
543 }
544
545 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
546 self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
547 .await
548 }
549
550 pub async fn plan_schema_with_options(
551 &self,
552 desired_schema_source: &str,
553 options: SchemaApplyOptions,
554 ) -> Result<SchemaMigrationPlan> {
555 schema_apply::plan_schema(self, desired_schema_source, options).await
556 }
557
558 pub async fn preview_schema_apply_with_options(
559 &self,
560 desired_schema_source: &str,
561 options: SchemaApplyOptions,
562 ) -> Result<SchemaApplyPreview> {
563 schema_apply::preview_schema_apply(self, desired_schema_source, options).await
564 }
565
566 pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
567 self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
568 .await
569 }
570
571 pub async fn apply_schema_with_options(
572 &self,
573 desired_schema_source: &str,
574 options: SchemaApplyOptions,
575 ) -> Result<SchemaApplyResult> {
576 self.apply_schema_as(desired_schema_source, options, None)
577 .await
578 }
579
580 pub async fn apply_schema_as(
591 &self,
592 desired_schema_source: &str,
593 options: SchemaApplyOptions,
594 actor: Option<&str>,
595 ) -> Result<SchemaApplyResult> {
596 self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
597 .await
598 }
599
600 pub async fn apply_schema_as_with_catalog_check<F>(
601 &self,
602 desired_schema_source: &str,
603 options: SchemaApplyOptions,
604 actor: Option<&str>,
605 validate_catalog: F,
606 ) -> Result<SchemaApplyResult>
607 where
608 F: FnOnce(&Catalog) -> Result<()>,
609 {
610 schema_apply::apply_schema(
611 self,
612 desired_schema_source,
613 options,
614 actor,
615 validate_catalog,
616 )
617 .await
618 }
619
620 pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
621 schema_apply::ensure_schema_apply_idle(self, operation).await
622 }
623
624 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
625 schema_apply::ensure_schema_apply_not_locked(self, operation).await
626 }
627
628 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
638 &self.table_store
639 }
640
641 pub(crate) fn storage_inline_residual(
650 &self,
651 ) -> &dyn crate::storage_layer::InlineCommitResidual {
652 &self.table_store
653 }
654
655 pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
659 self.storage.as_ref()
660 }
661
662 pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
670 Arc::clone(&self.write_queue)
671 }
672
673 pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
679 Arc::clone(&self.merge_exclusive)
680 }
681
682 pub(crate) fn root_uri(&self) -> &str {
685 &self.root_uri
686 }
687
688 pub(crate) async fn open_coordinator_for_branch(
689 &self,
690 branch: Option<&str>,
691 ) -> Result<GraphCoordinator> {
692 match branch {
693 Some(branch) => {
694 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
695 }
696 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
697 }
698 }
699
700 pub(crate) async fn swap_coordinator_for_branch(
701 &self,
702 branch: Option<&str>,
703 ) -> Result<GraphCoordinator> {
704 let next = self.open_coordinator_for_branch(branch).await?;
705 let mut coord = self.coordinator.write().await;
706 Ok(std::mem::replace(&mut *coord, next))
707 }
708
709 pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
710 *self.coordinator.write().await = coordinator;
711 }
712
713 pub(crate) async fn resolved_branch_target(
714 &self,
715 branch: Option<&str>,
716 ) -> Result<ResolvedTarget> {
717 self.ensure_schema_state_valid().await?;
718 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
719 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
720 let coord = self.coordinator.read().await;
721 if normalized.as_deref() == coord.current_branch() {
722 let snapshot_id = coord
723 .head_commit_id()
724 .await?
725 .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
726 return Ok(ResolvedTarget {
727 requested,
728 branch: coord.current_branch().map(str::to_string),
729 snapshot_id,
730 snapshot: coord.snapshot(),
731 });
732 }
733 coord.resolve_target(&requested).await
734 }
735
736 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
737 self.resolved_branch_target(branch)
738 .await
739 .map(|resolved| resolved.snapshot)
740 }
741
742 pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
743 self.ensure_schema_state_valid().await?;
744 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
745 let coord = self.coordinator.read().await;
746 coord
747 .resolve_target(&requested)
748 .await
749 .map(|resolved| resolved.snapshot)
750 }
751
752 pub(crate) async fn version(&self) -> u64 {
753 self.coordinator.read().await.version()
754 }
755
756 pub(crate) async fn snapshot(&self) -> Snapshot {
758 self.coordinator.read().await.snapshot()
759 }
760
761 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
762 self.resolved_target(target)
763 .await
764 .map(|resolved| resolved.snapshot)
765 }
766
767 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
768 self.snapshot_of(target)
769 .await
770 .map(|snapshot| snapshot.version())
771 }
772
773 pub async fn resolved_branch_of(
774 &self,
775 target: impl Into<ReadTarget>,
776 ) -> Result<Option<String>> {
777 self.resolved_target(target)
778 .await
779 .map(|resolved| resolved.branch)
780 }
781
782 pub async fn sync_branch(&self, branch: &str) -> Result<()> {
784 self.ensure_schema_state_valid().await?;
785 let branch = normalize_branch_name(branch)?;
786 let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
787 *self.coordinator.write().await = next;
788 self.runtime_cache.invalidate_all().await;
789 Ok(())
790 }
791
792 pub async fn refresh(&self) -> Result<()> {
832 {
851 let _serial = self
866 .write_queue
867 .acquire(&crate::db::manifest::schema_apply_serial_queue_key())
868 .await;
869 if crate::db::manifest::list_sidecars(&self.root_uri, self.storage.as_ref())
870 .await?
871 .is_empty()
872 {
873 let mut coord = self.coordinator.write().await;
874 coord.refresh().await?;
875 recover_schema_state_files(
876 &self.root_uri,
877 Arc::clone(&self.storage),
878 &coord.snapshot(),
879 )
880 .await?;
881 }
882 } crate::db::manifest::heal_pending_sidecars_roll_forward(
884 &self.root_uri,
885 Arc::clone(&self.storage),
886 &self.coordinator,
887 &self.write_queue,
888 )
889 .await?;
890 self.reload_schema_if_source_changed().await?;
891 self.runtime_cache.invalidate_all().await;
892 Ok(())
893 }
894
895 pub(crate) async fn heal_pending_recovery_sidecars(&self) -> Result<()> {
909 let processed = crate::db::manifest::heal_pending_sidecars_roll_forward(
910 &self.root_uri,
911 Arc::clone(&self.storage),
912 &self.coordinator,
913 &self.write_queue,
914 )
915 .await?;
916 if processed {
917 self.reload_schema_if_source_changed().await?;
923 self.runtime_cache.invalidate_all().await;
924 }
925 Ok(())
926 }
927
928 async fn reload_schema_if_source_changed(&self) -> Result<()> {
929 let schema_path = schema_source_uri(&self.root_uri);
930 let schema_source = self.storage.read_text(&schema_path).await?;
931 if schema_source == *self.schema_source.load_full() {
932 return Ok(());
933 }
934 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
935 let branches = self.coordinator.read().await.branch_list().await?;
936 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
937 &self.root_uri,
938 Arc::clone(&self.storage),
939 &branches,
940 ¤t_source_ir,
941 )
942 .await?;
943 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
944 fixup_blob_schemas(&mut catalog);
945 self.store_schema_source(schema_source);
946 self.store_catalog(catalog);
947 Ok(())
948 }
949
950 pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
958 self.coordinator.write().await.refresh().await?;
959 self.runtime_cache.invalidate_all().await;
960 Ok(())
961 }
962
963 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
964 self.ensure_schema_state_valid().await?;
965 self.coordinator
966 .read()
967 .await
968 .resolve_snapshot_id(branch)
969 .await
970 }
971
972 pub(crate) async fn resolved_target(
973 &self,
974 target: impl Into<ReadTarget>,
975 ) -> Result<ResolvedTarget> {
976 self.ensure_schema_state_valid().await?;
977 self.coordinator
978 .read()
979 .await
980 .resolve_target(&target.into())
981 .await
982 }
983
984 pub async fn diff_between(
987 &self,
988 from: impl Into<ReadTarget>,
989 to: impl Into<ReadTarget>,
990 filter: &crate::changes::ChangeFilter,
991 ) -> Result<crate::changes::ChangeSet> {
992 let from_resolved = self.resolved_target(from).await?;
993 let to_resolved = self.resolved_target(to).await?;
994 crate::changes::diff_snapshots(
995 self.uri(),
996 &from_resolved.snapshot,
997 &to_resolved.snapshot,
998 filter,
999 to_resolved.branch.clone().or(from_resolved.branch.clone()),
1000 )
1001 .await
1002 }
1003
1004 pub async fn diff_commits(
1007 &self,
1008 from_commit_id: &str,
1009 to_commit_id: &str,
1010 filter: &crate::changes::ChangeFilter,
1011 ) -> Result<crate::changes::ChangeSet> {
1012 let coord = self.coordinator.read().await;
1013 let from_commit = coord
1014 .resolve_commit(&SnapshotId::new(from_commit_id))
1015 .await?;
1016 let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
1017 let from_snap = coord
1018 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1019 from_commit.graph_commit_id.clone(),
1020 )))
1021 .await?;
1022 let to_snap = coord
1023 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
1024 to_commit.graph_commit_id.clone(),
1025 )))
1026 .await?;
1027 drop(coord);
1028 crate::changes::diff_snapshots(
1029 self.uri(),
1030 &from_snap.snapshot,
1031 &to_snap.snapshot,
1032 filter,
1033 to_snap.branch.clone().or(from_snap.branch.clone()),
1034 )
1035 .await
1036 }
1037
1038 pub async fn entity_at_target(
1039 &self,
1040 target: impl Into<ReadTarget>,
1041 table_key: &str,
1042 id: &str,
1043 ) -> Result<Option<serde_json::Value>> {
1044 export::entity_at_target(self, target, table_key, id).await
1045 }
1046
1047 pub async fn entity_at(
1049 &self,
1050 table_key: &str,
1051 id: &str,
1052 version: u64,
1053 ) -> Result<Option<serde_json::Value>> {
1054 export::entity_at(self, table_key, id, version).await
1055 }
1056
1057 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
1059 self.ensure_schema_state_valid().await?;
1060 self.coordinator
1061 .read()
1062 .await
1063 .snapshot_at_version(version)
1064 .await
1065 }
1066
1067 pub async fn export_jsonl(
1068 &self,
1069 branch: &str,
1070 type_names: &[String],
1071 table_keys: &[String],
1072 ) -> Result<String> {
1073 export::export_jsonl(self, branch, type_names, table_keys).await
1074 }
1075
1076 pub async fn export_jsonl_to_writer<W: Write>(
1077 &self,
1078 branch: &str,
1079 type_names: &[String],
1080 table_keys: &[String],
1081 writer: &mut W,
1082 ) -> Result<()> {
1083 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
1084 }
1085
1086 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
1090 table_ops::graph_index(self).await
1091 }
1092
1093 pub(crate) async fn graph_index_for_resolved(
1094 &self,
1095 resolved: &ResolvedTarget,
1096 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
1097 table_ops::graph_index_for_resolved(self, resolved).await
1098 }
1099
1100 pub async fn ensure_indices(&self) -> Result<Vec<PendingIndex>> {
1117 table_ops::ensure_indices(self).await
1118 }
1119
1120 pub async fn ensure_indices_on(&self, branch: &str) -> Result<Vec<PendingIndex>> {
1121 table_ops::ensure_indices_on(self, branch).await
1122 }
1123
1124 #[cfg(feature = "failpoints")]
1125 #[doc(hidden)]
1126 pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
1127 &mut self,
1128 branch: &str,
1129 table_key: &str,
1130 table_branch: Option<&str>,
1131 ) -> Result<u64> {
1132 table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
1133 self,
1134 branch,
1135 table_key,
1136 table_branch,
1137 )
1138 .await
1139 }
1140
1141 pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
1144 optimize::optimize_all_tables(self).await
1145 }
1146
1147 pub async fn repair(&self, options: repair::RepairOptions) -> Result<repair::RepairStats> {
1151 repair::repair_all_tables(self, options).await
1152 }
1153
1154 pub async fn cleanup(
1158 &mut self,
1159 options: optimize::CleanupPolicyOptions,
1160 ) -> Result<Vec<optimize::TableCleanupStats>> {
1161 optimize::cleanup_all_tables(self, options).await
1162 }
1163
1164 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1174 self.ensure_schema_state_valid().await?;
1175 let catalog = self.catalog();
1176 let node_type = catalog
1177 .node_types
1178 .get(type_name)
1179 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1180 if !node_type.blob_properties.contains(property) {
1181 return Err(OmniError::manifest(format!(
1182 "property '{}' on type '{}' is not a Blob",
1183 property, type_name
1184 )));
1185 }
1186
1187 let snapshot = self.snapshot().await;
1188 let table_key = format!("node:{}", type_name);
1189 let handle = self
1190 .storage()
1191 .open_snapshot_at_table(&snapshot, &table_key)
1192 .await?;
1193
1194 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1195 let row_id = self
1196 .storage()
1197 .first_row_id_for_filter(&handle, &filter_sql)
1198 .await?
1199 .ok_or_else(|| {
1200 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1201 })?;
1202
1203 let ds = handle.into_arc();
1207 let mut blobs = ds
1208 .take_blobs(&[row_id], property)
1209 .await
1210 .map_err(|e| OmniError::Lance(e.to_string()))?;
1211
1212 blobs.pop().ok_or_else(|| {
1213 OmniError::manifest(format!(
1214 "blob '{}' on {} '{}' returned no data",
1215 property, type_name, id
1216 ))
1217 })
1218 }
1219
1220 pub(crate) async fn active_branch(&self) -> Option<String> {
1221 self.coordinator
1222 .read()
1223 .await
1224 .current_branch()
1225 .map(str::to_string)
1226 }
1227
1228 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1229 let descendants = self
1230 .coordinator
1231 .read()
1232 .await
1233 .branch_descendants(branch)
1234 .await?;
1235 if let Some(descendant) = descendants.first() {
1236 return Err(OmniError::manifest_conflict(format!(
1237 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1238 branch, descendant
1239 )));
1240 }
1241
1242 for other_branch in branches
1243 .iter()
1244 .filter(|candidate| candidate.as_str() != branch)
1245 {
1246 let snapshot = self
1247 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1248 .await?;
1249 if snapshot
1250 .entries()
1251 .any(|entry| entry.table_branch.as_deref() == Some(branch))
1252 {
1253 return Err(OmniError::manifest_conflict(format!(
1254 "cannot delete branch '{}' because branch '{}' still depends on it",
1255 branch, other_branch
1256 )));
1257 }
1258 }
1259
1260 Ok(())
1261 }
1262
1263 async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1271 let mut seen_paths = HashSet::new();
1272 let mut cleanup_targets = owned_tables
1273 .iter()
1274 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1275 .cloned()
1276 .collect::<Vec<_>>();
1277 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1278
1279 for (table_key, table_path) in cleanup_targets {
1280 let dataset_uri = self.storage().dataset_uri(&table_path);
1281 let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
1282 {
1283 Ok(()) => {
1284 self.storage()
1285 .force_delete_branch(&dataset_uri, branch)
1286 .await
1287 }
1288 Err(injected) => Err(injected),
1289 };
1290 if let Err(err) = outcome {
1291 tracing::warn!(
1292 target: "omnigraph::branch_delete::cleanup",
1293 branch = %branch,
1294 table = %table_key,
1295 error = %err,
1296 "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1297 );
1298 }
1299 }
1300 }
1301
1302 async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1303 let active = self
1304 .coordinator
1305 .read()
1306 .await
1307 .current_branch()
1308 .map(str::to_string);
1309 if active.as_deref() == Some(branch) {
1310 return Err(OmniError::manifest_conflict(format!(
1311 "cannot delete currently active branch '{}'",
1312 branch
1313 )));
1314 }
1315
1316 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1317 let owned_tables = branch_snapshot
1318 .entries()
1319 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1320 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1321 .collect::<Vec<_>>();
1322
1323 self.coordinator.write().await.branch_delete(branch).await?;
1325 self.cleanup_deleted_branch_tables(branch, &owned_tables)
1327 .await;
1328 Ok(())
1329 }
1330
1331 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1332 normalize_branch_name(branch)
1333 }
1334
1335 pub(crate) async fn head_commit_id_for_branch(
1336 &self,
1337 branch: Option<&str>,
1338 ) -> Result<Option<String>> {
1339 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1340 coordinator.ensure_commit_graph_initialized().await?;
1341 coordinator
1342 .head_commit_id()
1343 .await
1344 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1345 }
1346
1347 pub async fn branch_create(&self, name: &str) -> Result<()> {
1348 self.branch_create_as(name, None).await
1349 }
1350
1351 pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1359 self.enforce(
1360 omnigraph_policy::PolicyAction::BranchCreate,
1361 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1362 actor,
1363 )?;
1364 self.ensure_schema_state_valid().await?;
1365 self.ensure_schema_apply_idle("branch_create").await?;
1366 ensure_public_branch_ref(name, "branch_create")?;
1367 self.coordinator.write().await.branch_create(name).await
1368 }
1369
1370 pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1371 self.branch_create_from_as(from, name, None).await
1372 }
1373
1374 pub async fn branch_create_from_as(
1386 &self,
1387 from: impl Into<ReadTarget>,
1388 name: &str,
1389 actor: Option<&str>,
1390 ) -> Result<()> {
1391 let target = from.into();
1392 let source_branch = match &target {
1393 ReadTarget::Branch(b) => b.clone(),
1394 _ => "<snapshot>".to_string(),
1395 };
1396 self.enforce(
1397 omnigraph_policy::PolicyAction::BranchCreate,
1398 &omnigraph_policy::ResourceScope::BranchTransition {
1399 source: source_branch,
1400 target: name.to_string(),
1401 },
1402 actor,
1403 )?;
1404 self.ensure_schema_apply_idle("branch_create_from").await?;
1405 self.branch_create_from_impl(target, name, false).await
1406 }
1407
1408 async fn branch_create_from_impl(
1409 &self,
1410 from: impl Into<ReadTarget>,
1411 name: &str,
1412 allow_internal_refs: bool,
1413 ) -> Result<()> {
1414 let target = from.into();
1415 let ReadTarget::Branch(branch_name) = target else {
1416 return Err(OmniError::manifest(
1417 "branch creation from pinned snapshots is not supported yet".to_string(),
1418 ));
1419 };
1420 if !allow_internal_refs {
1421 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1422 ensure_public_branch_ref(name, "branch_create_from")?;
1423 }
1424 let branch = normalize_branch_name(&branch_name)?;
1425 let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1442 source_coord.branch_create(name).await
1443 }
1444
1445 pub async fn branch_list(&self) -> Result<Vec<String>> {
1446 self.ensure_schema_state_valid().await?;
1447 self.coordinator.read().await.branch_list().await
1448 }
1449
1450 pub async fn branch_delete(&self, name: &str) -> Result<()> {
1451 self.branch_delete_as(name, None).await
1452 }
1453
1454 pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1462 self.enforce(
1463 omnigraph_policy::PolicyAction::BranchDelete,
1464 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1465 actor,
1466 )?;
1467 self.ensure_schema_state_valid().await?;
1468 self.ensure_schema_apply_idle("branch_delete").await?;
1469 ensure_public_branch_ref(name, "branch_delete")?;
1470 self.refresh().await?;
1471 let branch = normalize_branch_name(name)?
1472 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1473 let branches = self.coordinator.read().await.branch_list().await?;
1474 if !branches.iter().any(|candidate| candidate == &branch) {
1475 return Err(OmniError::manifest_not_found(format!(
1476 "branch '{}' not found",
1477 branch
1478 )));
1479 }
1480
1481 self.ensure_branch_delete_safe(&branch, &branches).await?;
1482 self.delete_branch_storage_only(&branch).await
1483 }
1484
1485 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1486 self.ensure_schema_state_valid().await?;
1487 self.coordinator
1488 .read()
1489 .await
1490 .resolve_commit(&SnapshotId::new(commit_id))
1491 .await
1492 }
1493
1494 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1495 self.ensure_schema_state_valid().await?;
1496 let branch = match branch {
1497 Some(branch) => normalize_branch_name(branch)?,
1498 None => None,
1499 };
1500 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1501 coordinator.list_commits().await
1502 }
1503
1504 pub(crate) async fn open_for_mutation(
1510 &self,
1511 table_key: &str,
1512 op_kind: crate::db::MutationOpKind,
1513 ) -> Result<(SnapshotHandle, String, Option<String>)> {
1514 table_ops::open_for_mutation(self, table_key, op_kind).await
1515 }
1516
1517 pub(crate) async fn open_for_mutation_on_branch(
1518 &self,
1519 branch: Option<&str>,
1520 table_key: &str,
1521 op_kind: crate::db::MutationOpKind,
1522 ) -> Result<(SnapshotHandle, String, Option<String>)> {
1523 table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1524 }
1525
1526 pub(crate) async fn fork_dataset_from_entry_state(
1534 &self,
1535 table_key: &str,
1536 full_path: &str,
1537 source_branch: Option<&str>,
1538 source_version: u64,
1539 active_branch: &str,
1540 ) -> Result<SnapshotHandle> {
1541 match table_ops::fork_dataset_from_entry_state(
1542 self,
1543 table_key,
1544 full_path,
1545 source_branch,
1546 source_version,
1547 active_branch,
1548 )
1549 .await?
1550 {
1551 crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
1552 crate::storage_layer::ForkOutcome::RefAlreadyExists => {
1553 table_ops::reclaim_orphaned_fork_and_refork(
1554 self,
1555 table_key,
1556 full_path,
1557 source_branch,
1558 source_version,
1559 active_branch,
1560 )
1561 .await
1562 }
1563 }
1564 }
1565
1566 pub(crate) async fn reopen_for_mutation(
1567 &self,
1568 table_key: &str,
1569 full_path: &str,
1570 table_branch: Option<&str>,
1571 expected_version: u64,
1572 op_kind: crate::db::MutationOpKind,
1573 ) -> Result<SnapshotHandle> {
1574 table_ops::reopen_for_mutation(
1575 self,
1576 table_key,
1577 full_path,
1578 table_branch,
1579 expected_version,
1580 op_kind,
1581 )
1582 .await
1583 }
1584
1585 pub(crate) async fn open_dataset_at_state(
1586 &self,
1587 table_path: &str,
1588 table_branch: Option<&str>,
1589 table_version: u64,
1590 ) -> Result<SnapshotHandle> {
1591 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1592 }
1593
1594 pub(crate) async fn build_indices_on_dataset(
1595 &self,
1596 table_key: &str,
1597 ds: &mut SnapshotHandle,
1598 ) -> Result<Vec<PendingIndex>> {
1599 table_ops::build_indices_on_dataset(self, table_key, ds).await
1600 }
1601
1602 #[cfg(test)]
1605 pub(crate) async fn commit_updates(
1606 &mut self,
1607 updates: &[crate::db::SubTableUpdate],
1608 ) -> Result<u64> {
1609 table_ops::commit_updates(self, updates).await
1610 }
1611
1612 pub(crate) async fn commit_manifest_updates(
1613 &self,
1614 updates: &[crate::db::SubTableUpdate],
1615 ) -> Result<u64> {
1616 table_ops::commit_manifest_updates(self, updates).await
1617 }
1618
1619 pub(crate) async fn record_merge_commit(
1620 &self,
1621 manifest_version: u64,
1622 parent_commit_id: &str,
1623 merged_parent_commit_id: &str,
1624 actor_id: Option<&str>,
1625 ) -> Result<String> {
1626 table_ops::record_merge_commit(
1627 self,
1628 manifest_version,
1629 parent_commit_id,
1630 merged_parent_commit_id,
1631 actor_id,
1632 )
1633 .await
1634 }
1635
1636 pub(crate) async fn commit_updates_on_branch_with_expected(
1637 &self,
1638 branch: Option<&str>,
1639 updates: &[crate::db::SubTableUpdate],
1640 expected_table_versions: &std::collections::HashMap<String, u64>,
1641 actor_id: Option<&str>,
1642 ) -> Result<u64> {
1643 table_ops::commit_updates_on_branch_with_expected(
1644 self,
1645 branch,
1646 updates,
1647 expected_table_versions,
1648 actor_id,
1649 )
1650 .await
1651 }
1652
1653 pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1654 table_ops::ensure_commit_graph_initialized(self).await
1655 }
1656
1657 pub(crate) async fn invalidate_graph_index(&self) {
1659 table_ops::invalidate_graph_index(self).await
1660 }
1661}
1662
1663pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1664 let branch = branch.trim();
1665 if branch.is_empty() {
1666 return Err(OmniError::manifest(
1667 "branch name cannot be empty".to_string(),
1668 ));
1669 }
1670 if branch == "main" {
1671 return Ok(None);
1672 }
1673 Ok(Some(branch.to_string()))
1674}
1675
1676pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1677 if is_internal_system_branch(branch) {
1678 return Err(OmniError::manifest(format!(
1679 "{} does not allow internal system ref '{}'",
1680 operation, branch
1681 )));
1682 }
1683 Ok(())
1684}
1685
1686fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1687 if batches.is_empty() {
1688 return Ok(RecordBatch::new_empty(schema));
1689 }
1690 if batches.len() == 1 {
1691 return Ok(batches.into_iter().next().unwrap());
1692 }
1693 let batch_schema = batches[0].schema();
1694 arrow_select::concat::concat_batches(&batch_schema, &batches)
1695 .map_err(|e| OmniError::Lance(e.to_string()))
1696}
1697
1698fn blob_properties_for_table_key<'a>(
1699 catalog: &'a Catalog,
1700 table_key: &str,
1701) -> Result<&'a std::collections::HashSet<String>> {
1702 if let Some(type_name) = table_key.strip_prefix("node:") {
1703 return catalog
1704 .node_types
1705 .get(type_name)
1706 .map(|node_type| &node_type.blob_properties)
1707 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1708 }
1709 if let Some(type_name) = table_key.strip_prefix("edge:") {
1710 return catalog
1711 .edge_types
1712 .get(type_name)
1713 .map(|edge_type| &edge_type.blob_properties)
1714 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1715 }
1716 Err(OmniError::manifest(format!(
1717 "invalid table key '{}'",
1718 table_key
1719 )))
1720}
1721
1722fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1723 if descriptions.is_null(row) {
1724 return Ok(true);
1725 }
1726
1727 let kind = descriptions
1728 .column_by_name("kind")
1729 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1730 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1731 .or_else(|| {
1732 descriptions
1733 .column_by_name("kind")
1734 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1735 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1736 });
1737 let position = descriptions
1738 .column_by_name("position")
1739 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1740 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1741 let size = descriptions
1742 .column_by_name("size")
1743 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1744 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1745 let blob_uri = descriptions
1746 .column_by_name("blob_uri")
1747 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1748 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1749
1750 let Some(kind) = kind else {
1751 return Ok(true);
1752 };
1753 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1754 if kind != BlobKind::Inline {
1755 return Ok(false);
1756 }
1757
1758 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1759}
1760
1761fn fixup_blob_schemas(catalog: &mut Catalog) {
1767 for node_type in catalog.node_types.values_mut() {
1768 if node_type.blob_properties.is_empty() {
1769 continue;
1770 }
1771 let fields: Vec<Field> = node_type
1772 .arrow_schema
1773 .fields()
1774 .iter()
1775 .map(|f| {
1776 if node_type.blob_properties.contains(f.name()) {
1777 blob_field(f.name(), f.is_nullable())
1778 } else {
1779 f.as_ref().clone()
1780 }
1781 })
1782 .collect();
1783 node_type.arrow_schema = Arc::new(Schema::new(fields));
1784 }
1785 for edge_type in catalog.edge_types.values_mut() {
1786 if edge_type.blob_properties.is_empty() {
1787 continue;
1788 }
1789 let fields: Vec<Field> = edge_type
1790 .arrow_schema
1791 .fields()
1792 .iter()
1793 .map(|f| {
1794 if edge_type.blob_properties.contains(f.name()) {
1795 blob_field(f.name(), f.is_nullable())
1796 } else {
1797 f.as_ref().clone()
1798 }
1799 })
1800 .collect();
1801 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1802 }
1803}
1804
1805fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1806 let schema_ast = parse_schema(schema_source)?;
1807 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1808}
1809
1810async fn init_storage_phase(
1825 root: &str,
1826 schema_source: &str,
1827 schema_ir: &SchemaIR,
1828 catalog: &Catalog,
1829 storage: &Arc<dyn StorageAdapter>,
1830 write_schema_pg: bool,
1831) -> Result<GraphCoordinator> {
1832 if write_schema_pg {
1833 let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1834 storage.write_text(&schema_path, schema_source).await?;
1835 crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1836 }
1837
1838 write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1839 crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1840
1841 let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1842 crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1843
1844 Ok(coordinator)
1845}
1846
1847async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1859 for uri in [
1860 schema_source_uri(root),
1861 schema_ir_uri(root),
1862 schema_state_uri(root),
1863 ] {
1864 if let Err(err) = storage.delete(&uri).await {
1865 tracing::warn!(
1866 target: "omnigraph::init::cleanup",
1867 uri = %uri,
1868 error = %err,
1869 "init failed; best-effort cleanup could not delete artifact",
1870 );
1871 }
1872 }
1873}
1874
1875fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1876 match type_kind {
1877 SchemaTypeKind::Node => format!("node:{}", name),
1878 SchemaTypeKind::Edge => format!("edge:{}", name),
1879 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1880 }
1881}
1882
1883fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1884 if let Some(type_name) = table_key.strip_prefix("node:") {
1885 let node_type: &NodeType = catalog
1886 .node_types
1887 .get(type_name)
1888 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1889 return Ok(node_type.arrow_schema.clone());
1890 }
1891 if let Some(type_name) = table_key.strip_prefix("edge:") {
1892 let edge_type: &EdgeType = catalog
1893 .edge_types
1894 .get(type_name)
1895 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1896 return Ok(edge_type.arrow_schema.clone());
1897 }
1898 Err(OmniError::manifest(format!(
1899 "invalid table key '{}'",
1900 table_key
1901 )))
1902}
1903
1904fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1905 let mut obj = serde_json::Map::new();
1906 for (i, field) in batch.schema().fields().iter().enumerate() {
1907 obj.insert(
1908 field.name().clone(),
1909 json_value_from_array(batch.column(i).as_ref(), row)?,
1910 );
1911 }
1912 Ok(serde_json::Value::Object(obj))
1913}
1914
1915fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1916 if array.is_null(row) {
1917 return Ok(serde_json::Value::Null);
1918 }
1919
1920 match array.data_type() {
1921 DataType::Utf8 => Ok(serde_json::Value::String(
1922 array
1923 .as_any()
1924 .downcast_ref::<StringArray>()
1925 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1926 .value(row)
1927 .to_string(),
1928 )),
1929 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1930 array
1931 .as_any()
1932 .downcast_ref::<LargeStringArray>()
1933 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1934 .value(row)
1935 .to_string(),
1936 )),
1937 DataType::Boolean => Ok(serde_json::Value::Bool(
1938 array
1939 .as_any()
1940 .downcast_ref::<BooleanArray>()
1941 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1942 .value(row),
1943 )),
1944 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1945 array
1946 .as_any()
1947 .downcast_ref::<Int32Array>()
1948 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1949 .value(row),
1950 ))),
1951 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1952 array
1953 .as_any()
1954 .downcast_ref::<Int64Array>()
1955 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1956 .value(row),
1957 ))),
1958 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1959 array
1960 .as_any()
1961 .downcast_ref::<UInt32Array>()
1962 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1963 .value(row),
1964 ))),
1965 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1966 array
1967 .as_any()
1968 .downcast_ref::<UInt64Array>()
1969 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1970 .value(row),
1971 ))),
1972 DataType::Float32 => {
1973 let value = array
1974 .as_any()
1975 .downcast_ref::<Float32Array>()
1976 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1977 .value(row) as f64;
1978 Ok(serde_json::Value::Number(
1979 serde_json::Number::from_f64(value).ok_or_else(|| {
1980 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1981 })?,
1982 ))
1983 }
1984 DataType::Float64 => {
1985 let value = array
1986 .as_any()
1987 .downcast_ref::<Float64Array>()
1988 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1989 .value(row);
1990 Ok(serde_json::Value::Number(
1991 serde_json::Number::from_f64(value).ok_or_else(|| {
1992 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1993 })?,
1994 ))
1995 }
1996 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1997 array
1998 .as_any()
1999 .downcast_ref::<Date32Array>()
2000 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
2001 .value(row),
2002 ))),
2003 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
2004 &base64::engine::general_purpose::STANDARD,
2005 array
2006 .as_any()
2007 .downcast_ref::<BinaryArray>()
2008 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
2009 .value(row),
2010 ))),
2011 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
2012 &base64::engine::general_purpose::STANDARD,
2013 array
2014 .as_any()
2015 .downcast_ref::<LargeBinaryArray>()
2016 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
2017 .value(row),
2018 ))),
2019 DataType::List(_) => {
2020 let list = array
2021 .as_any()
2022 .downcast_ref::<ListArray>()
2023 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
2024 let values = list.value(row);
2025 let mut out = Vec::with_capacity(values.len());
2026 for idx in 0..values.len() {
2027 out.push(json_value_from_array(values.as_ref(), idx)?);
2028 }
2029 Ok(serde_json::Value::Array(out))
2030 }
2031 DataType::LargeList(_) => {
2032 let list = array
2033 .as_any()
2034 .downcast_ref::<LargeListArray>()
2035 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
2036 let values = list.value(row);
2037 let mut out = Vec::with_capacity(values.len());
2038 for idx in 0..values.len() {
2039 out.push(json_value_from_array(values.as_ref(), idx)?);
2040 }
2041 Ok(serde_json::Value::Array(out))
2042 }
2043 DataType::FixedSizeList(_, _) => {
2044 let list = array
2045 .as_any()
2046 .downcast_ref::<FixedSizeListArray>()
2047 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
2048 let values = list.value(row);
2049 let mut out = Vec::with_capacity(values.len());
2050 for idx in 0..values.len() {
2051 out.push(json_value_from_array(values.as_ref(), idx)?);
2052 }
2053 Ok(serde_json::Value::Array(out))
2054 }
2055 DataType::Struct(fields) => {
2056 let struct_array = array
2057 .as_any()
2058 .downcast_ref::<StructArray>()
2059 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
2060 let mut obj = serde_json::Map::new();
2061 for (field_idx, field) in fields.iter().enumerate() {
2062 obj.insert(
2063 field.name().clone(),
2064 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
2065 );
2066 }
2067 Ok(serde_json::Value::Object(obj))
2068 }
2069 _ => {
2070 let value = arrow_cast::display::array_value_to_string(array, row)
2071 .map_err(|e| OmniError::Lance(e.to_string()))?;
2072 Ok(serde_json::Value::String(value))
2073 }
2074 }
2075}
2076
2077#[cfg(test)]
2078mod tests {
2079 use super::*;
2080 use crate::db::manifest::ManifestCoordinator;
2081 use async_trait::async_trait;
2082 use serde_json::Value;
2083 use std::sync::{Arc, Mutex};
2084
2085 use crate::storage::{ObjectStorageAdapter, StorageAdapter, join_uri};
2086
2087 const TEST_SCHEMA: &str = r#"
2088node Person {
2089 name: String @key
2090 age: I32?
2091}
2092node Company {
2093 name: String @key
2094}
2095edge Knows: Person -> Person {
2096 since: Date?
2097}
2098edge WorksAt: Person -> Company
2099"#;
2100
2101 #[derive(Debug)]
2102 struct RecordingStorageAdapter {
2103 inner: ObjectStorageAdapter,
2104 reads: Mutex<Vec<String>>,
2105 writes: Mutex<Vec<String>>,
2106 exists_checks: Mutex<Vec<String>>,
2107 renames: Mutex<Vec<(String, String)>>,
2108 deletes: Mutex<Vec<String>>,
2109 }
2110
2111 impl Default for RecordingStorageAdapter {
2112 fn default() -> Self {
2113 Self {
2114 inner: ObjectStorageAdapter::local(),
2115 reads: Mutex::default(),
2116 writes: Mutex::default(),
2117 exists_checks: Mutex::default(),
2118 renames: Mutex::default(),
2119 deletes: Mutex::default(),
2120 }
2121 }
2122 }
2123
2124 impl RecordingStorageAdapter {
2125 fn reads(&self) -> Vec<String> {
2126 self.reads.lock().unwrap().clone()
2127 }
2128
2129 fn writes(&self) -> Vec<String> {
2130 self.writes.lock().unwrap().clone()
2131 }
2132
2133 fn exists_checks(&self) -> Vec<String> {
2134 self.exists_checks.lock().unwrap().clone()
2135 }
2136 }
2137
2138 #[async_trait]
2139 impl StorageAdapter for RecordingStorageAdapter {
2140 async fn read_text(&self, uri: &str) -> Result<String> {
2141 self.reads.lock().unwrap().push(uri.to_string());
2142 self.inner.read_text(uri).await
2143 }
2144
2145 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2146 self.writes.lock().unwrap().push(uri.to_string());
2147 self.inner.write_text(uri, contents).await
2148 }
2149
2150 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2151 self.writes.lock().unwrap().push(uri.to_string());
2152 self.inner.write_text_if_absent(uri, contents).await
2153 }
2154
2155 async fn exists(&self, uri: &str) -> Result<bool> {
2156 self.exists_checks.lock().unwrap().push(uri.to_string());
2157 self.inner.exists(uri).await
2158 }
2159
2160 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2161 self.renames
2162 .lock()
2163 .unwrap()
2164 .push((from_uri.to_string(), to_uri.to_string()));
2165 self.inner.rename_text(from_uri, to_uri).await
2166 }
2167
2168 async fn delete(&self, uri: &str) -> Result<()> {
2169 self.deletes.lock().unwrap().push(uri.to_string());
2170 self.inner.delete(uri).await
2171 }
2172
2173 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2174 self.inner.list_dir(dir_uri).await
2175 }
2176
2177 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2178 self.inner.read_text_versioned(uri).await
2179 }
2180
2181 async fn write_text_if_match(
2182 &self,
2183 uri: &str,
2184 contents: &str,
2185 expected_version: &str,
2186 ) -> Result<Option<String>> {
2187 self.inner
2188 .write_text_if_match(uri, contents, expected_version)
2189 .await
2190 }
2191
2192 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2193 self.inner.delete_prefix(prefix_uri).await
2194 }
2195 }
2196
2197 #[derive(Debug)]
2198 struct InitRaceStorageAdapter {
2199 inner: ObjectStorageAdapter,
2200 root: String,
2201 barrier: Arc<tokio::sync::Barrier>,
2202 }
2203
2204 #[async_trait]
2205 impl StorageAdapter for InitRaceStorageAdapter {
2206 async fn read_text(&self, uri: &str) -> Result<String> {
2207 self.inner.read_text(uri).await
2208 }
2209
2210 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2211 self.inner.write_text(uri, contents).await
2212 }
2213
2214 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2215 self.inner.write_text_if_absent(uri, contents).await
2216 }
2217
2218 async fn exists(&self, uri: &str) -> Result<bool> {
2219 let exists = self.inner.exists(uri).await?;
2220 if uri == schema_state_uri(&self.root) {
2221 self.barrier.wait().await;
2222 }
2223 Ok(exists)
2224 }
2225
2226 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2227 self.inner.rename_text(from_uri, to_uri).await
2228 }
2229
2230 async fn delete(&self, uri: &str) -> Result<()> {
2231 self.inner.delete(uri).await
2232 }
2233
2234 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2235 self.inner.list_dir(dir_uri).await
2236 }
2237
2238 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
2239 self.inner.read_text_versioned(uri).await
2240 }
2241
2242 async fn write_text_if_match(
2243 &self,
2244 uri: &str,
2245 contents: &str,
2246 expected_version: &str,
2247 ) -> Result<Option<String>> {
2248 self.inner
2249 .write_text_if_match(uri, contents, expected_version)
2250 .await
2251 }
2252
2253 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
2254 self.inner.delete_prefix(prefix_uri).await
2255 }
2256 }
2257
2258 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2259 async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2260 let dir = tempfile::tempdir().unwrap();
2261 let uri = dir.path().to_str().unwrap().to_string();
2262 let root = normalize_root_uri(&uri).unwrap();
2263 let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2264 inner: ObjectStorageAdapter::local(),
2265 root,
2266 barrier: Arc::new(tokio::sync::Barrier::new(2)),
2267 });
2268
2269 let left = Omnigraph::init_with_storage(
2270 &uri,
2271 TEST_SCHEMA,
2272 Arc::clone(&storage),
2273 InitOptions::default(),
2274 );
2275 let right = Omnigraph::init_with_storage(
2276 &uri,
2277 TEST_SCHEMA,
2278 Arc::clone(&storage),
2279 InitOptions::default(),
2280 );
2281 let (left, right) = tokio::join!(left, right);
2282 let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2283 assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2284
2285 assert!(
2286 dir.path().join("_schema.pg").exists(),
2287 "winning init must leave _schema.pg in place"
2288 );
2289 assert!(
2290 dir.path().join("_schema.ir.json").exists(),
2291 "winning init must leave _schema.ir.json in place"
2292 );
2293 assert!(
2294 dir.path().join("__schema_state.json").exists(),
2295 "winning init must leave __schema_state.json in place"
2296 );
2297 }
2298
2299 #[tokio::test]
2300 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2301 let dir = tempfile::tempdir().unwrap();
2302 let uri = dir.path().to_str().unwrap();
2303 let adapter = Arc::new(RecordingStorageAdapter::default());
2304
2305 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2306 .await
2307 .unwrap();
2308 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2309 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2310 assert!(
2311 adapter
2312 .writes()
2313 .contains(&join_uri(uri, "__schema_state.json"))
2314 );
2315
2316 Omnigraph::open_with_storage(uri, adapter.clone())
2317 .await
2318 .unwrap();
2319 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2320 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2321 assert!(
2322 adapter
2323 .reads()
2324 .contains(&join_uri(uri, "__schema_state.json"))
2325 );
2326 assert!(
2327 adapter
2328 .exists_checks()
2329 .contains(&join_uri(uri, "_schema.ir.json"))
2330 );
2331 assert!(
2332 adapter
2333 .exists_checks()
2334 .contains(&join_uri(uri, "__schema_state.json"))
2335 );
2336 assert!(
2337 adapter
2338 .exists_checks()
2339 .contains(&join_uri(uri, "_graph_commits.lance"))
2340 );
2341 }
2342
2343 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2344 let snapshot = db.snapshot().await;
2345 let ds = db
2346 .storage()
2347 .open_snapshot_at_table(&snapshot, table_key)
2348 .await
2349 .unwrap();
2350 let batches = db.storage().scan_batches(&ds).await.unwrap();
2351 batches
2352 .into_iter()
2353 .flat_map(|batch| {
2354 (0..batch.num_rows())
2355 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2356 .collect::<Vec<_>>()
2357 })
2358 .collect()
2359 }
2360
2361 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2362 let (ds, full_path, table_branch) = db
2363 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2364 .await
2365 .unwrap();
2366 let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
2367 let columns: Vec<Arc<dyn Array>> = schema
2368 .fields()
2369 .iter()
2370 .map(|field| match field.name().as_str() {
2371 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2372 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2373 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2374 _ => new_null_array(field.data_type(), 1),
2375 })
2376 .collect();
2377 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2378 let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap();
2379 let committed = db.storage().commit_staged(ds, staged).await.unwrap();
2380 let state = db
2381 .storage()
2382 .table_state(&full_path, &committed)
2383 .await
2384 .unwrap();
2385 db.commit_updates(&[crate::db::SubTableUpdate {
2386 table_key: "node:Person".to_string(),
2387 table_version: state.version,
2388 table_branch,
2389 row_count: state.row_count,
2390 version_metadata: state.version_metadata,
2391 }])
2392 .await
2393 .unwrap();
2394 }
2395
2396 #[tokio::test]
2397 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2398 let dir = tempfile::tempdir().unwrap();
2399 let uri = dir.path().to_str().unwrap();
2400 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2401 seed_person_row(&mut db, "Alice", Some(30)).await;
2402
2403 let desired = TEST_SCHEMA.replace(
2404 " age: I32?\n}",
2405 " age: I32?\n nickname: String?\n}",
2406 );
2407 let result = db.apply_schema(&desired).await.unwrap();
2408 assert!(result.applied);
2409
2410 let reopened = Omnigraph::open(uri).await.unwrap();
2411 let rows = table_rows_json(&reopened, "node:Person").await;
2412 assert_eq!(rows.len(), 1);
2413 assert_eq!(rows[0]["name"], "Alice");
2414 assert_eq!(rows[0]["age"], 30);
2415 assert!(rows[0]["nickname"].is_null());
2416 assert!(
2417 reopened.catalog().node_types["Person"]
2418 .properties
2419 .contains_key("nickname")
2420 );
2421 assert!(dir.path().join("_schema.pg").exists());
2422 }
2423
2424 #[tokio::test]
2425 async fn test_apply_schema_renames_property_and_preserves_values() {
2426 let dir = tempfile::tempdir().unwrap();
2427 let uri = dir.path().to_str().unwrap();
2428 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2429 seed_person_row(&mut db, "Alice", Some(30)).await;
2430
2431 let desired = TEST_SCHEMA.replace(
2432 " age: I32?\n}",
2433 " years: I32? @rename_from(\"age\")\n}",
2434 );
2435 db.apply_schema(&desired).await.unwrap();
2436
2437 let reopened = Omnigraph::open(uri).await.unwrap();
2438 let rows = table_rows_json(&reopened, "node:Person").await;
2439 assert_eq!(rows[0]["name"], "Alice");
2440 assert_eq!(rows[0]["years"], 30);
2441 assert!(rows[0].get("age").is_none());
2442 }
2443
2444 #[tokio::test]
2445 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2446 let dir = tempfile::tempdir().unwrap();
2447 let uri = dir.path().to_str().unwrap();
2448 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2449 seed_person_row(&mut db, "Alice", Some(30)).await;
2450 let before_version = db.snapshot().await.version();
2451
2452 let desired = TEST_SCHEMA
2453 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2454 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2455 .replace(
2456 "edge WorksAt: Person -> Company",
2457 "edge WorksAt: Human -> Company",
2458 );
2459 db.apply_schema(&desired).await.unwrap();
2460
2461 let head = db.snapshot().await;
2462 assert!(head.entry("node:Person").is_none());
2463 assert!(head.entry("node:Human").is_some());
2464 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2465 .await
2466 .unwrap();
2467 assert!(historical.entry("node:Person").is_some());
2468 assert!(historical.entry("node:Human").is_none());
2469 }
2470
2471 #[tokio::test]
2472 async fn test_apply_schema_succeeds_after_load() {
2473 let dir = tempfile::tempdir().unwrap();
2480 let uri = dir.path().to_str().unwrap();
2481 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2482
2483 crate::loader::load_jsonl(
2484 &mut db,
2485 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2486 crate::loader::LoadMode::Overwrite,
2487 )
2488 .await
2489 .unwrap();
2490
2491 let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2492 assert!(
2493 !all_branches.iter().any(|b| b.starts_with("__run__")),
2494 "no __run__ branch should exist after publish, got: {:?}",
2495 all_branches
2496 );
2497
2498 let desired = TEST_SCHEMA.replace(
2499 " age: I32?\n}",
2500 " age: I32?\n nickname: String?\n}",
2501 );
2502 let result = db.apply_schema(&desired).await.unwrap();
2503 assert!(result.applied, "schema apply should have applied");
2504 }
2505
2506 #[tokio::test]
2516 async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
2517 let dir = tempfile::tempdir().unwrap();
2518 let uri = dir.path().to_str().unwrap();
2519 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2520
2521 db.branch_create("__run__legacy").await.unwrap();
2524 drop(db);
2525 {
2526 let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
2527 .await
2528 .unwrap();
2529 ds.update_schema_metadata([(
2530 "omnigraph:internal_schema_version".to_string(),
2531 Some("2".to_string()),
2532 )])
2533 .await
2534 .unwrap();
2535 }
2536
2537 let db = Omnigraph::open(uri).await.unwrap();
2540 let branches = db.branch_list().await.unwrap();
2541 assert!(
2542 !branches.iter().any(|b| b.starts_with("__run__")),
2543 "open-time migration must sweep legacy __run__ branches; got {branches:?}",
2544 );
2545
2546 let desired = TEST_SCHEMA.replace(
2549 " age: I32?\n}",
2550 " age: I32?\n nickname: String?\n}",
2551 );
2552 let result = db.apply_schema(&desired).await.unwrap();
2553 assert!(result.applied, "schema apply should have applied");
2554 }
2555
2556 #[tokio::test]
2557 async fn test_apply_schema_defers_index_then_reconciler_builds_it() {
2558 let dir = tempfile::tempdir().unwrap();
2563 let uri = dir.path().to_str().unwrap();
2564 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2565 seed_person_row(&mut db, "Alice", Some(30)).await;
2566
2567 let desired = TEST_SCHEMA.replace("age: I32?", "age: I32? @index");
2568 db.apply_schema(&desired).await.unwrap();
2569
2570 let snapshot = db.snapshot().await;
2572 let ds = db
2573 .storage()
2574 .open_snapshot_at_table(&snapshot, "node:Person")
2575 .await
2576 .unwrap();
2577 assert!(
2578 !db.storage().has_btree_index(&ds, "age").await.unwrap(),
2579 "apply must not build the index inline (deferred to the reconciler)"
2580 );
2581
2582 db.ensure_indices().await.unwrap();
2584 let snapshot = db.snapshot().await;
2585 let ds = db
2586 .storage()
2587 .open_snapshot_at_table(&snapshot, "node:Person")
2588 .await
2589 .unwrap();
2590 assert!(
2591 db.storage().has_btree_index(&ds, "age").await.unwrap(),
2592 "ensure_indices must build the deferred index"
2593 );
2594 }
2595
2596 #[tokio::test]
2597 async fn test_apply_schema_rewrite_defers_index_then_reconciler_restores() {
2598 let dir = tempfile::tempdir().unwrap();
2601 let uri = dir.path().to_str().unwrap();
2602 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2603 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2604 seed_person_row(&mut db, "Alice", Some(30)).await;
2605
2606 let desired = initial_schema.replace(
2607 " age: I32?\n}",
2608 " age: I32?\n nickname: String?\n}",
2609 );
2610 db.apply_schema(&desired).await.unwrap();
2611
2612 db.ensure_indices().await.unwrap();
2614 let snapshot = db.snapshot().await;
2615 let ds = db
2616 .storage()
2617 .open_snapshot_at_table(&snapshot, "node:Person")
2618 .await
2619 .unwrap();
2620 assert!(db.storage().has_btree_index(&ds, "id").await.unwrap());
2621 assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
2622 }
2623
2624 #[tokio::test]
2625 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2626 let dir = tempfile::tempdir().unwrap();
2627 let uri = dir.path().to_str().unwrap();
2628 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2629 let mut db = db;
2630 db.coordinator
2631 .write()
2632 .await
2633 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2634 .await
2635 .unwrap();
2636
2637 let err = db
2638 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2639 .await
2640 .unwrap_err();
2641 assert!(
2642 err.to_string()
2643 .contains("write is unavailable while schema apply is in progress")
2644 );
2645 }
2646
2647 #[tokio::test]
2648 async fn test_commit_updates_rejects_while_schema_apply_locked() {
2649 let dir = tempfile::tempdir().unwrap();
2650 let uri = dir.path().to_str().unwrap();
2651 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2652 db.coordinator
2653 .write()
2654 .await
2655 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2656 .await
2657 .unwrap();
2658
2659 let err = db.commit_updates(&[]).await.unwrap_err();
2660 assert!(
2661 err.to_string()
2662 .contains("write commit is unavailable while schema apply is in progress")
2663 );
2664 }
2665
2666 #[tokio::test]
2667 async fn test_branch_list_hides_schema_apply_lock_branch() {
2668 let dir = tempfile::tempdir().unwrap();
2669 let uri = dir.path().to_str().unwrap();
2670 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2671 db.coordinator
2672 .write()
2673 .await
2674 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2675 .await
2676 .unwrap();
2677
2678 let branches = db.branch_list().await.unwrap();
2679 assert_eq!(branches, vec!["main".to_string()]);
2680 }
2681}