1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21 DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22 build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats};
37pub use schema_apply::SchemaApplyOptions;
38
39use super::commit_graph::GraphCommit;
40use super::manifest::{
41 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
42 table_path_for_table_key,
43};
44use super::schema_state::{
45 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
46 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
47 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
48 write_schema_contract, write_schema_contract_staging,
49};
50use super::{
51 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
52 is_schema_apply_lock_branch,
53};
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum MergeOutcome {
57 AlreadyUpToDate,
58 FastForward,
59 Merged,
60}
61
62#[derive(Debug, Clone)]
63pub struct SchemaApplyResult {
64 pub supported: bool,
65 pub applied: bool,
66 pub manifest_version: u64,
67 pub steps: Vec<SchemaMigrationStep>,
68}
69
70#[derive(Debug, Clone)]
71pub struct SchemaApplyPreview {
72 pub plan: SchemaMigrationPlan,
73 pub catalog: Catalog,
74}
75
76pub struct Omnigraph {
82 root_uri: String,
83 storage: Arc<dyn StorageAdapter>,
84 coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
101 table_store: TableStore,
102 runtime_cache: RuntimeCache,
103 catalog: Arc<ArcSwap<Catalog>>,
108 schema_source: Arc<ArcSwap<String>>,
111 write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
116 merge_exclusive: Arc<tokio::sync::Mutex<()>>,
138 policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum OpenMode {
167 ReadWrite,
169 ReadOnly,
172}
173
174#[derive(Debug, Clone, Copy, Default)]
187pub struct InitOptions {
188 pub force: bool,
191}
192
193impl Omnigraph {
194 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
201 Self::init_with_options(uri, schema_source, InitOptions::default()).await
202 }
203
204 pub async fn init_with_options(
209 uri: &str,
210 schema_source: &str,
211 options: InitOptions,
212 ) -> Result<Self> {
213 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
214 }
215
216 pub(crate) async fn init_with_storage(
217 uri: &str,
218 schema_source: &str,
219 storage: Arc<dyn StorageAdapter>,
220 options: InitOptions,
221 ) -> Result<Self> {
222 let root = normalize_root_uri(uri)?;
223
224 if !options.force {
234 for candidate in [
235 schema_source_uri(&root),
236 schema_ir_uri(&root),
237 schema_state_uri(&root),
238 ] {
239 if storage.exists(&candidate).await? {
240 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
241 }
242 }
243 }
244
245 let schema_ir = read_schema_ir_from_source(schema_source)?;
246 let mut catalog = build_catalog_from_ir(&schema_ir)?;
247 fixup_blob_schemas(&mut catalog);
248
249 let schema_pg_claimed = if options.force {
257 false
258 } else {
259 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
260 if !storage
261 .write_text_if_absent(&schema_path, schema_source)
262 .await?
263 {
264 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
265 }
266 if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
267 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
268 return Err(err);
269 }
270 true
271 };
272
273 let coordinator = match init_storage_phase(
288 &root,
289 schema_source,
290 &schema_ir,
291 &catalog,
292 &storage,
293 !schema_pg_claimed,
294 )
295 .await
296 {
297 Ok(coordinator) => coordinator,
298 Err(err) => {
299 if schema_pg_claimed || options.force {
300 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
301 }
302 return Err(err);
303 }
304 };
305
306 Ok(Self {
307 root_uri: root.clone(),
308 storage,
309 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
310 table_store: TableStore::new(&root),
311 runtime_cache: RuntimeCache::default(),
312 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
313 schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
314 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
315 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
316 policy: None,
317 })
318 }
319
320 pub async fn open(uri: &str) -> Result<Self> {
325 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
326 }
327
328 pub async fn open_read_only(uri: &str) -> Result<Self> {
331 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
332 }
333
334 pub(crate) async fn open_with_storage(
337 uri: &str,
338 storage: Arc<dyn StorageAdapter>,
339 ) -> Result<Self> {
340 Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
341 }
342
343 pub(crate) async fn open_with_storage_and_mode(
344 uri: &str,
345 storage: Arc<dyn StorageAdapter>,
346 mode: OpenMode,
347 ) -> Result<Self> {
348 let root = normalize_root_uri(uri)?;
349 let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
352 if matches!(mode, OpenMode::ReadWrite) {
362 let schema_state_recovery =
363 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
364 .await?;
365 crate::db::manifest::recover_manifest_drift(
371 &root,
372 Arc::clone(&storage),
373 &mut coordinator,
374 crate::db::manifest::RecoveryMode::Full,
375 schema_state_recovery,
376 )
377 .await?;
378 }
379 let schema_path = schema_source_uri(&root);
381 let schema_source = storage.read_text(&schema_path).await?;
382 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
383 let branches = coordinator.branch_list().await?;
384 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
385 &root,
386 Arc::clone(&storage),
387 &branches,
388 ¤t_source_ir,
389 )
390 .await?;
391 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
392 fixup_blob_schemas(&mut catalog);
393
394 Ok(Self {
395 root_uri: root.clone(),
396 storage,
397 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
398 table_store: TableStore::new(&root),
399 runtime_cache: RuntimeCache::default(),
400 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
401 schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
402 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
403 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
404 policy: None,
405 })
406 }
407
408 pub fn catalog(&self) -> Arc<Catalog> {
412 self.catalog.load_full()
413 }
414
415 pub fn schema_source(&self) -> Arc<String> {
417 self.schema_source.load_full()
418 }
419
420 pub(crate) fn store_catalog(&self, catalog: Catalog) {
424 self.catalog.store(Arc::new(catalog));
425 }
426
427 pub(crate) fn store_schema_source(&self, schema_source: String) {
430 self.schema_source.store(Arc::new(schema_source));
431 }
432
433 pub fn uri(&self) -> &str {
434 &self.root_uri
435 }
436
437 pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
448 self.policy = Some(checker);
449 self
450 }
451
452 pub(crate) fn enforce(
464 &self,
465 action: omnigraph_policy::PolicyAction,
466 scope: &omnigraph_policy::ResourceScope,
467 actor: Option<&str>,
468 ) -> Result<()> {
469 let Some(checker) = self.policy.as_ref() else {
470 return Ok(());
471 };
472 let Some(actor) = actor else {
473 return Err(OmniError::Policy(
474 "no actor for engine-layer policy check (policy is configured but the call site \
475 didn't thread an actor through — this is almost certainly a bug, not an \
476 intended bypass)"
477 .to_string(),
478 ));
479 };
480 checker
481 .check(action, scope, actor)
482 .map_err(|err| OmniError::Policy(err.to_string()))
483 }
484
485 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
486 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
487 }
488
489 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
490 self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
491 .await
492 }
493
494 pub async fn plan_schema_with_options(
495 &self,
496 desired_schema_source: &str,
497 options: SchemaApplyOptions,
498 ) -> Result<SchemaMigrationPlan> {
499 schema_apply::plan_schema(self, desired_schema_source, options).await
500 }
501
502 pub async fn preview_schema_apply_with_options(
503 &self,
504 desired_schema_source: &str,
505 options: SchemaApplyOptions,
506 ) -> Result<SchemaApplyPreview> {
507 schema_apply::preview_schema_apply(self, desired_schema_source, options).await
508 }
509
510 pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
511 self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
512 .await
513 }
514
515 pub async fn apply_schema_with_options(
516 &self,
517 desired_schema_source: &str,
518 options: SchemaApplyOptions,
519 ) -> Result<SchemaApplyResult> {
520 self.apply_schema_as(desired_schema_source, options, None)
521 .await
522 }
523
524 pub async fn apply_schema_as(
535 &self,
536 desired_schema_source: &str,
537 options: SchemaApplyOptions,
538 actor: Option<&str>,
539 ) -> Result<SchemaApplyResult> {
540 self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
541 .await
542 }
543
544 pub async fn apply_schema_as_with_catalog_check<F>(
545 &self,
546 desired_schema_source: &str,
547 options: SchemaApplyOptions,
548 actor: Option<&str>,
549 validate_catalog: F,
550 ) -> Result<SchemaApplyResult>
551 where
552 F: FnOnce(&Catalog) -> Result<()>,
553 {
554 schema_apply::apply_schema(
555 self,
556 desired_schema_source,
557 options,
558 actor,
559 validate_catalog,
560 )
561 .await
562 }
563
564 pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
565 schema_apply::ensure_schema_apply_idle(self, operation).await
566 }
567
568 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
569 schema_apply::ensure_schema_apply_not_locked(self, operation).await
570 }
571
572 pub(crate) fn table_store(&self) -> &TableStore {
573 &self.table_store
574 }
575
576 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
585 &self.table_store
586 }
587
588 pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
592 self.storage.as_ref()
593 }
594
595 pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
603 Arc::clone(&self.write_queue)
604 }
605
606 pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
612 Arc::clone(&self.merge_exclusive)
613 }
614
615 pub(crate) fn root_uri(&self) -> &str {
618 &self.root_uri
619 }
620
621 pub(crate) async fn open_coordinator_for_branch(
622 &self,
623 branch: Option<&str>,
624 ) -> Result<GraphCoordinator> {
625 match branch {
626 Some(branch) => {
627 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
628 }
629 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
630 }
631 }
632
633 pub(crate) async fn swap_coordinator_for_branch(
634 &self,
635 branch: Option<&str>,
636 ) -> Result<GraphCoordinator> {
637 let next = self.open_coordinator_for_branch(branch).await?;
638 let mut coord = self.coordinator.write().await;
639 Ok(std::mem::replace(&mut *coord, next))
640 }
641
642 pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
643 *self.coordinator.write().await = coordinator;
644 }
645
646 pub(crate) async fn resolved_branch_target(
647 &self,
648 branch: Option<&str>,
649 ) -> Result<ResolvedTarget> {
650 self.ensure_schema_state_valid().await?;
651 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
652 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
653 let coord = self.coordinator.read().await;
654 if normalized.as_deref() == coord.current_branch() {
655 let snapshot_id = coord
656 .head_commit_id()
657 .await?
658 .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
659 return Ok(ResolvedTarget {
660 requested,
661 branch: coord.current_branch().map(str::to_string),
662 snapshot_id,
663 snapshot: coord.snapshot(),
664 });
665 }
666 coord.resolve_target(&requested).await
667 }
668
669 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
670 self.resolved_branch_target(branch)
671 .await
672 .map(|resolved| resolved.snapshot)
673 }
674
675 pub(crate) async fn version(&self) -> u64 {
676 self.coordinator.read().await.version()
677 }
678
679 pub(crate) async fn snapshot(&self) -> Snapshot {
681 self.coordinator.read().await.snapshot()
682 }
683
684 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
685 self.resolved_target(target)
686 .await
687 .map(|resolved| resolved.snapshot)
688 }
689
690 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
691 self.snapshot_of(target)
692 .await
693 .map(|snapshot| snapshot.version())
694 }
695
696 pub async fn resolved_branch_of(
697 &self,
698 target: impl Into<ReadTarget>,
699 ) -> Result<Option<String>> {
700 self.resolved_target(target)
701 .await
702 .map(|resolved| resolved.branch)
703 }
704
705 pub async fn sync_branch(&self, branch: &str) -> Result<()> {
707 self.ensure_schema_state_valid().await?;
708 let branch = normalize_branch_name(branch)?;
709 let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
710 *self.coordinator.write().await = next;
711 self.runtime_cache.invalidate_all().await;
712 Ok(())
713 }
714
715 pub async fn refresh(&self) -> Result<()> {
748 {
755 let mut coord = self.coordinator.write().await;
756 coord.refresh().await?;
757 let schema_state_recovery = recover_schema_state_files(
758 &self.root_uri,
759 Arc::clone(&self.storage),
760 &coord.snapshot(),
761 )
762 .await?;
763 crate::db::manifest::recover_manifest_drift(
764 &self.root_uri,
765 Arc::clone(&self.storage),
766 &mut *coord,
767 crate::db::manifest::RecoveryMode::RollForwardOnly,
768 schema_state_recovery,
769 )
770 .await?;
771 } self.reload_schema_if_source_changed().await?;
773 self.runtime_cache.invalidate_all().await;
774 Ok(())
775 }
776
777 async fn reload_schema_if_source_changed(&self) -> Result<()> {
778 let schema_path = schema_source_uri(&self.root_uri);
779 let schema_source = self.storage.read_text(&schema_path).await?;
780 if schema_source == *self.schema_source.load_full() {
781 return Ok(());
782 }
783 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
784 let branches = self.coordinator.read().await.branch_list().await?;
785 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
786 &self.root_uri,
787 Arc::clone(&self.storage),
788 &branches,
789 ¤t_source_ir,
790 )
791 .await?;
792 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
793 fixup_blob_schemas(&mut catalog);
794 self.store_schema_source(schema_source);
795 self.store_catalog(catalog);
796 Ok(())
797 }
798
799 pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
807 self.coordinator.write().await.refresh().await?;
808 self.runtime_cache.invalidate_all().await;
809 Ok(())
810 }
811
812 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
813 self.ensure_schema_state_valid().await?;
814 self.coordinator
815 .read()
816 .await
817 .resolve_snapshot_id(branch)
818 .await
819 }
820
821 pub(crate) async fn resolved_target(
822 &self,
823 target: impl Into<ReadTarget>,
824 ) -> Result<ResolvedTarget> {
825 self.ensure_schema_state_valid().await?;
826 self.coordinator
827 .read()
828 .await
829 .resolve_target(&target.into())
830 .await
831 }
832
833 pub async fn diff_between(
836 &self,
837 from: impl Into<ReadTarget>,
838 to: impl Into<ReadTarget>,
839 filter: &crate::changes::ChangeFilter,
840 ) -> Result<crate::changes::ChangeSet> {
841 let from_resolved = self.resolved_target(from).await?;
842 let to_resolved = self.resolved_target(to).await?;
843 crate::changes::diff_snapshots(
844 self.uri(),
845 &from_resolved.snapshot,
846 &to_resolved.snapshot,
847 filter,
848 to_resolved.branch.clone().or(from_resolved.branch.clone()),
849 )
850 .await
851 }
852
853 pub async fn diff_commits(
856 &self,
857 from_commit_id: &str,
858 to_commit_id: &str,
859 filter: &crate::changes::ChangeFilter,
860 ) -> Result<crate::changes::ChangeSet> {
861 let coord = self.coordinator.read().await;
862 let from_commit = coord
863 .resolve_commit(&SnapshotId::new(from_commit_id))
864 .await?;
865 let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
866 let from_snap = coord
867 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
868 from_commit.graph_commit_id.clone(),
869 )))
870 .await?;
871 let to_snap = coord
872 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
873 to_commit.graph_commit_id.clone(),
874 )))
875 .await?;
876 drop(coord);
877 crate::changes::diff_snapshots(
878 self.uri(),
879 &from_snap.snapshot,
880 &to_snap.snapshot,
881 filter,
882 to_snap.branch.clone().or(from_snap.branch.clone()),
883 )
884 .await
885 }
886
887 pub async fn entity_at_target(
888 &self,
889 target: impl Into<ReadTarget>,
890 table_key: &str,
891 id: &str,
892 ) -> Result<Option<serde_json::Value>> {
893 export::entity_at_target(self, target, table_key, id).await
894 }
895
896 pub async fn entity_at(
898 &self,
899 table_key: &str,
900 id: &str,
901 version: u64,
902 ) -> Result<Option<serde_json::Value>> {
903 export::entity_at(self, table_key, id, version).await
904 }
905
906 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
908 self.ensure_schema_state_valid().await?;
909 self.coordinator
910 .read()
911 .await
912 .snapshot_at_version(version)
913 .await
914 }
915
916 pub async fn export_jsonl(
917 &self,
918 branch: &str,
919 type_names: &[String],
920 table_keys: &[String],
921 ) -> Result<String> {
922 export::export_jsonl(self, branch, type_names, table_keys).await
923 }
924
925 pub async fn export_jsonl_to_writer<W: Write>(
926 &self,
927 branch: &str,
928 type_names: &[String],
929 table_keys: &[String],
930 writer: &mut W,
931 ) -> Result<()> {
932 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
933 }
934
935 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
939 table_ops::graph_index(self).await
940 }
941
942 pub(crate) async fn graph_index_for_resolved(
943 &self,
944 resolved: &ResolvedTarget,
945 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
946 table_ops::graph_index_for_resolved(self, resolved).await
947 }
948
949 pub async fn ensure_indices(&self) -> Result<()> {
962 table_ops::ensure_indices(self).await
963 }
964
965 pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
966 table_ops::ensure_indices_on(self, branch).await
967 }
968
969 #[cfg(feature = "failpoints")]
970 #[doc(hidden)]
971 pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
972 &mut self,
973 branch: &str,
974 table_key: &str,
975 table_branch: Option<&str>,
976 ) -> Result<u64> {
977 table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
978 self,
979 branch,
980 table_key,
981 table_branch,
982 )
983 .await
984 }
985
986 pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
989 optimize::optimize_all_tables(self).await
990 }
991
992 pub async fn cleanup(
996 &mut self,
997 options: optimize::CleanupPolicyOptions,
998 ) -> Result<Vec<optimize::TableCleanupStats>> {
999 optimize::cleanup_all_tables(self, options).await
1000 }
1001
1002 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
1012 self.ensure_schema_state_valid().await?;
1013 let catalog = self.catalog();
1014 let node_type = catalog
1015 .node_types
1016 .get(type_name)
1017 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1018 if !node_type.blob_properties.contains(property) {
1019 return Err(OmniError::manifest(format!(
1020 "property '{}' on type '{}' is not a Blob",
1021 property, type_name
1022 )));
1023 }
1024
1025 let snapshot = self.snapshot().await;
1026 let table_key = format!("node:{}", type_name);
1027 let ds = snapshot.open(&table_key).await?;
1028
1029 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
1030 let row_id = self
1031 .table_store
1032 .first_row_id_for_filter(&ds, &filter_sql)
1033 .await?
1034 .ok_or_else(|| {
1035 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1036 })?;
1037
1038 let ds = Arc::new(ds);
1040 let mut blobs = ds
1041 .take_blobs(&[row_id], property)
1042 .await
1043 .map_err(|e| OmniError::Lance(e.to_string()))?;
1044
1045 blobs.pop().ok_or_else(|| {
1046 OmniError::manifest(format!(
1047 "blob '{}' on {} '{}' returned no data",
1048 property, type_name, id
1049 ))
1050 })
1051 }
1052
1053 pub(crate) async fn active_branch(&self) -> Option<String> {
1054 self.coordinator
1055 .read()
1056 .await
1057 .current_branch()
1058 .map(str::to_string)
1059 }
1060
1061 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1062 let descendants = self
1063 .coordinator
1064 .read()
1065 .await
1066 .branch_descendants(branch)
1067 .await?;
1068 if let Some(descendant) = descendants.first() {
1069 return Err(OmniError::manifest_conflict(format!(
1070 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1071 branch, descendant
1072 )));
1073 }
1074
1075 for other_branch in branches
1076 .iter()
1077 .filter(|candidate| candidate.as_str() != branch)
1078 {
1079 let snapshot = self
1080 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1081 .await?;
1082 if snapshot
1083 .entries()
1084 .any(|entry| entry.table_branch.as_deref() == Some(branch))
1085 {
1086 return Err(OmniError::manifest_conflict(format!(
1087 "cannot delete branch '{}' because branch '{}' still depends on it",
1088 branch, other_branch
1089 )));
1090 }
1091 }
1092
1093 Ok(())
1094 }
1095
1096 async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) {
1104 let mut seen_paths = HashSet::new();
1105 let mut cleanup_targets = owned_tables
1106 .iter()
1107 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1108 .cloned()
1109 .collect::<Vec<_>>();
1110 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1111
1112 for (table_key, table_path) in cleanup_targets {
1113 let dataset_uri = self.table_store.dataset_uri(&table_path);
1114 let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
1115 {
1116 Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await,
1117 Err(injected) => Err(injected),
1118 };
1119 if let Err(err) = outcome {
1120 tracing::warn!(
1121 target: "omnigraph::branch_delete::cleanup",
1122 branch = %branch,
1123 table = %table_key,
1124 error = %err,
1125 "best-effort fork reclaim failed; cleanup will reconcile the orphan",
1126 );
1127 }
1128 }
1129 }
1130
1131 async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1132 let active = self
1133 .coordinator
1134 .read()
1135 .await
1136 .current_branch()
1137 .map(str::to_string);
1138 if active.as_deref() == Some(branch) {
1139 return Err(OmniError::manifest_conflict(format!(
1140 "cannot delete currently active branch '{}'",
1141 branch
1142 )));
1143 }
1144
1145 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1146 let owned_tables = branch_snapshot
1147 .entries()
1148 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1149 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1150 .collect::<Vec<_>>();
1151
1152 self.coordinator.write().await.branch_delete(branch).await?;
1154 self.cleanup_deleted_branch_tables(branch, &owned_tables)
1156 .await;
1157 Ok(())
1158 }
1159
1160 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1161 normalize_branch_name(branch)
1162 }
1163
1164 pub(crate) async fn head_commit_id_for_branch(
1165 &self,
1166 branch: Option<&str>,
1167 ) -> Result<Option<String>> {
1168 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1169 coordinator.ensure_commit_graph_initialized().await?;
1170 coordinator
1171 .head_commit_id()
1172 .await
1173 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1174 }
1175
1176 pub async fn branch_create(&self, name: &str) -> Result<()> {
1177 self.branch_create_as(name, None).await
1178 }
1179
1180 pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1188 self.enforce(
1189 omnigraph_policy::PolicyAction::BranchCreate,
1190 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1191 actor,
1192 )?;
1193 self.ensure_schema_state_valid().await?;
1194 self.ensure_schema_apply_idle("branch_create").await?;
1195 ensure_public_branch_ref(name, "branch_create")?;
1196 self.coordinator.write().await.branch_create(name).await
1197 }
1198
1199 pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1200 self.branch_create_from_as(from, name, None).await
1201 }
1202
1203 pub async fn branch_create_from_as(
1215 &self,
1216 from: impl Into<ReadTarget>,
1217 name: &str,
1218 actor: Option<&str>,
1219 ) -> Result<()> {
1220 let target = from.into();
1221 let source_branch = match &target {
1222 ReadTarget::Branch(b) => b.clone(),
1223 _ => "<snapshot>".to_string(),
1224 };
1225 self.enforce(
1226 omnigraph_policy::PolicyAction::BranchCreate,
1227 &omnigraph_policy::ResourceScope::BranchTransition {
1228 source: source_branch,
1229 target: name.to_string(),
1230 },
1231 actor,
1232 )?;
1233 self.ensure_schema_apply_idle("branch_create_from").await?;
1234 self.branch_create_from_impl(target, name, false).await
1235 }
1236
1237 async fn branch_create_from_impl(
1238 &self,
1239 from: impl Into<ReadTarget>,
1240 name: &str,
1241 allow_internal_refs: bool,
1242 ) -> Result<()> {
1243 let target = from.into();
1244 let ReadTarget::Branch(branch_name) = target else {
1245 return Err(OmniError::manifest(
1246 "branch creation from pinned snapshots is not supported yet".to_string(),
1247 ));
1248 };
1249 if !allow_internal_refs {
1250 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1251 ensure_public_branch_ref(name, "branch_create_from")?;
1252 }
1253 let branch = normalize_branch_name(&branch_name)?;
1254 let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1271 source_coord.branch_create(name).await
1272 }
1273
1274 pub async fn branch_list(&self) -> Result<Vec<String>> {
1275 self.ensure_schema_state_valid().await?;
1276 self.coordinator.read().await.branch_list().await
1277 }
1278
1279 pub async fn branch_delete(&self, name: &str) -> Result<()> {
1280 self.branch_delete_as(name, None).await
1281 }
1282
1283 pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1291 self.enforce(
1292 omnigraph_policy::PolicyAction::BranchDelete,
1293 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1294 actor,
1295 )?;
1296 self.ensure_schema_state_valid().await?;
1297 self.ensure_schema_apply_idle("branch_delete").await?;
1298 ensure_public_branch_ref(name, "branch_delete")?;
1299 self.refresh().await?;
1300 let branch = normalize_branch_name(name)?
1301 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1302 let branches = self.coordinator.read().await.branch_list().await?;
1303 if !branches.iter().any(|candidate| candidate == &branch) {
1304 return Err(OmniError::manifest_not_found(format!(
1305 "branch '{}' not found",
1306 branch
1307 )));
1308 }
1309
1310 self.ensure_branch_delete_safe(&branch, &branches).await?;
1311 self.delete_branch_storage_only(&branch).await
1312 }
1313
1314 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1315 self.ensure_schema_state_valid().await?;
1316 self.coordinator
1317 .read()
1318 .await
1319 .resolve_commit(&SnapshotId::new(commit_id))
1320 .await
1321 }
1322
1323 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1324 self.ensure_schema_state_valid().await?;
1325 let branch = match branch {
1326 Some(branch) => normalize_branch_name(branch)?,
1327 None => None,
1328 };
1329 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1330 coordinator.list_commits().await
1331 }
1332
1333 pub(crate) async fn open_for_mutation(
1339 &self,
1340 table_key: &str,
1341 op_kind: crate::db::MutationOpKind,
1342 ) -> Result<(Dataset, String, Option<String>)> {
1343 table_ops::open_for_mutation(self, table_key, op_kind).await
1344 }
1345
1346 pub(crate) async fn open_for_mutation_on_branch(
1347 &self,
1348 branch: Option<&str>,
1349 table_key: &str,
1350 op_kind: crate::db::MutationOpKind,
1351 ) -> Result<(Dataset, String, Option<String>)> {
1352 table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1353 }
1354
1355 pub(crate) async fn fork_dataset_from_entry_state(
1356 &self,
1357 table_key: &str,
1358 full_path: &str,
1359 source_branch: Option<&str>,
1360 source_version: u64,
1361 active_branch: &str,
1362 ) -> Result<Dataset> {
1363 table_ops::fork_dataset_from_entry_state(
1364 self,
1365 table_key,
1366 full_path,
1367 source_branch,
1368 source_version,
1369 active_branch,
1370 )
1371 .await
1372 }
1373
1374 pub(crate) async fn reopen_for_mutation(
1375 &self,
1376 table_key: &str,
1377 full_path: &str,
1378 table_branch: Option<&str>,
1379 expected_version: u64,
1380 op_kind: crate::db::MutationOpKind,
1381 ) -> Result<Dataset> {
1382 table_ops::reopen_for_mutation(
1383 self,
1384 table_key,
1385 full_path,
1386 table_branch,
1387 expected_version,
1388 op_kind,
1389 )
1390 .await
1391 }
1392
1393 pub(crate) async fn open_dataset_at_state(
1394 &self,
1395 table_path: &str,
1396 table_branch: Option<&str>,
1397 table_version: u64,
1398 ) -> Result<Dataset> {
1399 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1400 }
1401
1402 pub(crate) async fn build_indices_on_dataset(
1403 &self,
1404 table_key: &str,
1405 ds: &mut Dataset,
1406 ) -> Result<()> {
1407 table_ops::build_indices_on_dataset(self, table_key, ds).await
1408 }
1409
1410 pub(crate) async fn build_indices_on_dataset_for_catalog(
1411 &self,
1412 catalog: &Catalog,
1413 table_key: &str,
1414 ds: &mut Dataset,
1415 ) -> Result<()> {
1416 table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1417 }
1418
1419 #[cfg(test)]
1422 pub(crate) async fn commit_updates(
1423 &mut self,
1424 updates: &[crate::db::SubTableUpdate],
1425 ) -> Result<u64> {
1426 table_ops::commit_updates(self, updates).await
1427 }
1428
1429 pub(crate) async fn commit_manifest_updates(
1430 &self,
1431 updates: &[crate::db::SubTableUpdate],
1432 ) -> Result<u64> {
1433 table_ops::commit_manifest_updates(self, updates).await
1434 }
1435
1436 pub(crate) async fn record_merge_commit(
1437 &self,
1438 manifest_version: u64,
1439 parent_commit_id: &str,
1440 merged_parent_commit_id: &str,
1441 actor_id: Option<&str>,
1442 ) -> Result<String> {
1443 table_ops::record_merge_commit(
1444 self,
1445 manifest_version,
1446 parent_commit_id,
1447 merged_parent_commit_id,
1448 actor_id,
1449 )
1450 .await
1451 }
1452
1453 pub(crate) async fn commit_updates_on_branch_with_expected(
1454 &self,
1455 branch: Option<&str>,
1456 updates: &[crate::db::SubTableUpdate],
1457 expected_table_versions: &std::collections::HashMap<String, u64>,
1458 actor_id: Option<&str>,
1459 ) -> Result<u64> {
1460 table_ops::commit_updates_on_branch_with_expected(
1461 self,
1462 branch,
1463 updates,
1464 expected_table_versions,
1465 actor_id,
1466 )
1467 .await
1468 }
1469
1470 pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1471 table_ops::ensure_commit_graph_initialized(self).await
1472 }
1473
1474 pub(crate) async fn invalidate_graph_index(&self) {
1476 table_ops::invalidate_graph_index(self).await
1477 }
1478}
1479
1480pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1481 let branch = branch.trim();
1482 if branch.is_empty() {
1483 return Err(OmniError::manifest(
1484 "branch name cannot be empty".to_string(),
1485 ));
1486 }
1487 if branch == "main" {
1488 return Ok(None);
1489 }
1490 Ok(Some(branch.to_string()))
1491}
1492
1493pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1494 if super::is_internal_run_branch(branch) {
1495 return Err(OmniError::manifest(format!(
1496 "{} does not allow internal run ref '{}'",
1497 operation, branch
1498 )));
1499 }
1500 if is_internal_system_branch(branch) {
1501 return Err(OmniError::manifest(format!(
1502 "{} does not allow internal system ref '{}'",
1503 operation, branch
1504 )));
1505 }
1506 Ok(())
1507}
1508
1509fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1510 if batches.is_empty() {
1511 return Ok(RecordBatch::new_empty(schema));
1512 }
1513 if batches.len() == 1 {
1514 return Ok(batches.into_iter().next().unwrap());
1515 }
1516 let batch_schema = batches[0].schema();
1517 arrow_select::concat::concat_batches(&batch_schema, &batches)
1518 .map_err(|e| OmniError::Lance(e.to_string()))
1519}
1520
1521fn blob_properties_for_table_key<'a>(
1522 catalog: &'a Catalog,
1523 table_key: &str,
1524) -> Result<&'a std::collections::HashSet<String>> {
1525 if let Some(type_name) = table_key.strip_prefix("node:") {
1526 return catalog
1527 .node_types
1528 .get(type_name)
1529 .map(|node_type| &node_type.blob_properties)
1530 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1531 }
1532 if let Some(type_name) = table_key.strip_prefix("edge:") {
1533 return catalog
1534 .edge_types
1535 .get(type_name)
1536 .map(|edge_type| &edge_type.blob_properties)
1537 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1538 }
1539 Err(OmniError::manifest(format!(
1540 "invalid table key '{}'",
1541 table_key
1542 )))
1543}
1544
1545fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1546 if descriptions.is_null(row) {
1547 return Ok(true);
1548 }
1549
1550 let kind = descriptions
1551 .column_by_name("kind")
1552 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1553 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1554 .or_else(|| {
1555 descriptions
1556 .column_by_name("kind")
1557 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1558 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1559 });
1560 let position = descriptions
1561 .column_by_name("position")
1562 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1563 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1564 let size = descriptions
1565 .column_by_name("size")
1566 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1567 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1568 let blob_uri = descriptions
1569 .column_by_name("blob_uri")
1570 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1571 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1572
1573 let Some(kind) = kind else {
1574 return Ok(true);
1575 };
1576 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1577 if kind != BlobKind::Inline {
1578 return Ok(false);
1579 }
1580
1581 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1582}
1583
1584fn fixup_blob_schemas(catalog: &mut Catalog) {
1590 for node_type in catalog.node_types.values_mut() {
1591 if node_type.blob_properties.is_empty() {
1592 continue;
1593 }
1594 let fields: Vec<Field> = node_type
1595 .arrow_schema
1596 .fields()
1597 .iter()
1598 .map(|f| {
1599 if node_type.blob_properties.contains(f.name()) {
1600 blob_field(f.name(), f.is_nullable())
1601 } else {
1602 f.as_ref().clone()
1603 }
1604 })
1605 .collect();
1606 node_type.arrow_schema = Arc::new(Schema::new(fields));
1607 }
1608 for edge_type in catalog.edge_types.values_mut() {
1609 if edge_type.blob_properties.is_empty() {
1610 continue;
1611 }
1612 let fields: Vec<Field> = edge_type
1613 .arrow_schema
1614 .fields()
1615 .iter()
1616 .map(|f| {
1617 if edge_type.blob_properties.contains(f.name()) {
1618 blob_field(f.name(), f.is_nullable())
1619 } else {
1620 f.as_ref().clone()
1621 }
1622 })
1623 .collect();
1624 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1625 }
1626}
1627
1628fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1629 let schema_ast = parse_schema(schema_source)?;
1630 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1631}
1632
1633async fn init_storage_phase(
1648 root: &str,
1649 schema_source: &str,
1650 schema_ir: &SchemaIR,
1651 catalog: &Catalog,
1652 storage: &Arc<dyn StorageAdapter>,
1653 write_schema_pg: bool,
1654) -> Result<GraphCoordinator> {
1655 if write_schema_pg {
1656 let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1657 storage.write_text(&schema_path, schema_source).await?;
1658 crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1659 }
1660
1661 write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1662 crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1663
1664 let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1665 crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1666
1667 Ok(coordinator)
1668}
1669
1670async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1682 for uri in [
1683 schema_source_uri(root),
1684 schema_ir_uri(root),
1685 schema_state_uri(root),
1686 ] {
1687 if let Err(err) = storage.delete(&uri).await {
1688 tracing::warn!(
1689 target: "omnigraph::init::cleanup",
1690 uri = %uri,
1691 error = %err,
1692 "init failed; best-effort cleanup could not delete artifact",
1693 );
1694 }
1695 }
1696}
1697
1698fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1699 match type_kind {
1700 SchemaTypeKind::Node => format!("node:{}", name),
1701 SchemaTypeKind::Edge => format!("edge:{}", name),
1702 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1703 }
1704}
1705
1706fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1707 if let Some(type_name) = table_key.strip_prefix("node:") {
1708 let node_type: &NodeType = catalog
1709 .node_types
1710 .get(type_name)
1711 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1712 return Ok(node_type.arrow_schema.clone());
1713 }
1714 if let Some(type_name) = table_key.strip_prefix("edge:") {
1715 let edge_type: &EdgeType = catalog
1716 .edge_types
1717 .get(type_name)
1718 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1719 return Ok(edge_type.arrow_schema.clone());
1720 }
1721 Err(OmniError::manifest(format!(
1722 "invalid table key '{}'",
1723 table_key
1724 )))
1725}
1726
1727fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1728 let mut obj = serde_json::Map::new();
1729 for (i, field) in batch.schema().fields().iter().enumerate() {
1730 obj.insert(
1731 field.name().clone(),
1732 json_value_from_array(batch.column(i).as_ref(), row)?,
1733 );
1734 }
1735 Ok(serde_json::Value::Object(obj))
1736}
1737
1738fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1739 if array.is_null(row) {
1740 return Ok(serde_json::Value::Null);
1741 }
1742
1743 match array.data_type() {
1744 DataType::Utf8 => Ok(serde_json::Value::String(
1745 array
1746 .as_any()
1747 .downcast_ref::<StringArray>()
1748 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1749 .value(row)
1750 .to_string(),
1751 )),
1752 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1753 array
1754 .as_any()
1755 .downcast_ref::<LargeStringArray>()
1756 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1757 .value(row)
1758 .to_string(),
1759 )),
1760 DataType::Boolean => Ok(serde_json::Value::Bool(
1761 array
1762 .as_any()
1763 .downcast_ref::<BooleanArray>()
1764 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1765 .value(row),
1766 )),
1767 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1768 array
1769 .as_any()
1770 .downcast_ref::<Int32Array>()
1771 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1772 .value(row),
1773 ))),
1774 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1775 array
1776 .as_any()
1777 .downcast_ref::<Int64Array>()
1778 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1779 .value(row),
1780 ))),
1781 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1782 array
1783 .as_any()
1784 .downcast_ref::<UInt32Array>()
1785 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1786 .value(row),
1787 ))),
1788 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1789 array
1790 .as_any()
1791 .downcast_ref::<UInt64Array>()
1792 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1793 .value(row),
1794 ))),
1795 DataType::Float32 => {
1796 let value = array
1797 .as_any()
1798 .downcast_ref::<Float32Array>()
1799 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1800 .value(row) as f64;
1801 Ok(serde_json::Value::Number(
1802 serde_json::Number::from_f64(value).ok_or_else(|| {
1803 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1804 })?,
1805 ))
1806 }
1807 DataType::Float64 => {
1808 let value = array
1809 .as_any()
1810 .downcast_ref::<Float64Array>()
1811 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1812 .value(row);
1813 Ok(serde_json::Value::Number(
1814 serde_json::Number::from_f64(value).ok_or_else(|| {
1815 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1816 })?,
1817 ))
1818 }
1819 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1820 array
1821 .as_any()
1822 .downcast_ref::<Date32Array>()
1823 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1824 .value(row),
1825 ))),
1826 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1827 &base64::engine::general_purpose::STANDARD,
1828 array
1829 .as_any()
1830 .downcast_ref::<BinaryArray>()
1831 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1832 .value(row),
1833 ))),
1834 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1835 &base64::engine::general_purpose::STANDARD,
1836 array
1837 .as_any()
1838 .downcast_ref::<LargeBinaryArray>()
1839 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1840 .value(row),
1841 ))),
1842 DataType::List(_) => {
1843 let list = array
1844 .as_any()
1845 .downcast_ref::<ListArray>()
1846 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1847 let values = list.value(row);
1848 let mut out = Vec::with_capacity(values.len());
1849 for idx in 0..values.len() {
1850 out.push(json_value_from_array(values.as_ref(), idx)?);
1851 }
1852 Ok(serde_json::Value::Array(out))
1853 }
1854 DataType::LargeList(_) => {
1855 let list = array
1856 .as_any()
1857 .downcast_ref::<LargeListArray>()
1858 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1859 let values = list.value(row);
1860 let mut out = Vec::with_capacity(values.len());
1861 for idx in 0..values.len() {
1862 out.push(json_value_from_array(values.as_ref(), idx)?);
1863 }
1864 Ok(serde_json::Value::Array(out))
1865 }
1866 DataType::FixedSizeList(_, _) => {
1867 let list = array
1868 .as_any()
1869 .downcast_ref::<FixedSizeListArray>()
1870 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1871 let values = list.value(row);
1872 let mut out = Vec::with_capacity(values.len());
1873 for idx in 0..values.len() {
1874 out.push(json_value_from_array(values.as_ref(), idx)?);
1875 }
1876 Ok(serde_json::Value::Array(out))
1877 }
1878 DataType::Struct(fields) => {
1879 let struct_array = array
1880 .as_any()
1881 .downcast_ref::<StructArray>()
1882 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1883 let mut obj = serde_json::Map::new();
1884 for (field_idx, field) in fields.iter().enumerate() {
1885 obj.insert(
1886 field.name().clone(),
1887 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1888 );
1889 }
1890 Ok(serde_json::Value::Object(obj))
1891 }
1892 _ => {
1893 let value = arrow_cast::display::array_value_to_string(array, row)
1894 .map_err(|e| OmniError::Lance(e.to_string()))?;
1895 Ok(serde_json::Value::String(value))
1896 }
1897 }
1898}
1899
1900#[cfg(test)]
1901mod tests {
1902 use super::*;
1903 use crate::db::is_internal_run_branch;
1904 use crate::db::manifest::ManifestCoordinator;
1905 use async_trait::async_trait;
1906 use serde_json::Value;
1907 use std::sync::{Arc, Mutex};
1908
1909 use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1910
1911 const TEST_SCHEMA: &str = r#"
1912node Person {
1913 name: String @key
1914 age: I32?
1915}
1916node Company {
1917 name: String @key
1918}
1919edge Knows: Person -> Person {
1920 since: Date?
1921}
1922edge WorksAt: Person -> Company
1923"#;
1924
1925 #[derive(Debug, Default)]
1926 struct RecordingStorageAdapter {
1927 inner: LocalStorageAdapter,
1928 reads: Mutex<Vec<String>>,
1929 writes: Mutex<Vec<String>>,
1930 exists_checks: Mutex<Vec<String>>,
1931 renames: Mutex<Vec<(String, String)>>,
1932 deletes: Mutex<Vec<String>>,
1933 }
1934
1935 impl RecordingStorageAdapter {
1936 fn reads(&self) -> Vec<String> {
1937 self.reads.lock().unwrap().clone()
1938 }
1939
1940 fn writes(&self) -> Vec<String> {
1941 self.writes.lock().unwrap().clone()
1942 }
1943
1944 fn exists_checks(&self) -> Vec<String> {
1945 self.exists_checks.lock().unwrap().clone()
1946 }
1947 }
1948
1949 #[async_trait]
1950 impl StorageAdapter for RecordingStorageAdapter {
1951 async fn read_text(&self, uri: &str) -> Result<String> {
1952 self.reads.lock().unwrap().push(uri.to_string());
1953 self.inner.read_text(uri).await
1954 }
1955
1956 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1957 self.writes.lock().unwrap().push(uri.to_string());
1958 self.inner.write_text(uri, contents).await
1959 }
1960
1961 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
1962 self.writes.lock().unwrap().push(uri.to_string());
1963 self.inner.write_text_if_absent(uri, contents).await
1964 }
1965
1966 async fn exists(&self, uri: &str) -> Result<bool> {
1967 self.exists_checks.lock().unwrap().push(uri.to_string());
1968 self.inner.exists(uri).await
1969 }
1970
1971 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1972 self.renames
1973 .lock()
1974 .unwrap()
1975 .push((from_uri.to_string(), to_uri.to_string()));
1976 self.inner.rename_text(from_uri, to_uri).await
1977 }
1978
1979 async fn delete(&self, uri: &str) -> Result<()> {
1980 self.deletes.lock().unwrap().push(uri.to_string());
1981 self.inner.delete(uri).await
1982 }
1983
1984 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1985 self.inner.list_dir(dir_uri).await
1986 }
1987 }
1988
1989 #[derive(Debug)]
1990 struct InitRaceStorageAdapter {
1991 inner: LocalStorageAdapter,
1992 root: String,
1993 barrier: Arc<tokio::sync::Barrier>,
1994 }
1995
1996 #[async_trait]
1997 impl StorageAdapter for InitRaceStorageAdapter {
1998 async fn read_text(&self, uri: &str) -> Result<String> {
1999 self.inner.read_text(uri).await
2000 }
2001
2002 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
2003 self.inner.write_text(uri, contents).await
2004 }
2005
2006 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
2007 self.inner.write_text_if_absent(uri, contents).await
2008 }
2009
2010 async fn exists(&self, uri: &str) -> Result<bool> {
2011 let exists = self.inner.exists(uri).await?;
2012 if uri == schema_state_uri(&self.root) {
2013 self.barrier.wait().await;
2014 }
2015 Ok(exists)
2016 }
2017
2018 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
2019 self.inner.rename_text(from_uri, to_uri).await
2020 }
2021
2022 async fn delete(&self, uri: &str) -> Result<()> {
2023 self.inner.delete(uri).await
2024 }
2025
2026 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
2027 self.inner.list_dir(dir_uri).await
2028 }
2029 }
2030
2031 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2032 async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
2033 let dir = tempfile::tempdir().unwrap();
2034 let uri = dir.path().to_str().unwrap().to_string();
2035 let root = normalize_root_uri(&uri).unwrap();
2036 let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
2037 inner: LocalStorageAdapter,
2038 root,
2039 barrier: Arc::new(tokio::sync::Barrier::new(2)),
2040 });
2041
2042 let left = Omnigraph::init_with_storage(
2043 &uri,
2044 TEST_SCHEMA,
2045 Arc::clone(&storage),
2046 InitOptions::default(),
2047 );
2048 let right = Omnigraph::init_with_storage(
2049 &uri,
2050 TEST_SCHEMA,
2051 Arc::clone(&storage),
2052 InitOptions::default(),
2053 );
2054 let (left, right) = tokio::join!(left, right);
2055 let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2056 assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2057
2058 assert!(
2059 dir.path().join("_schema.pg").exists(),
2060 "winning init must leave _schema.pg in place"
2061 );
2062 assert!(
2063 dir.path().join("_schema.ir.json").exists(),
2064 "winning init must leave _schema.ir.json in place"
2065 );
2066 assert!(
2067 dir.path().join("__schema_state.json").exists(),
2068 "winning init must leave __schema_state.json in place"
2069 );
2070 }
2071
2072 #[tokio::test]
2073 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2074 let dir = tempfile::tempdir().unwrap();
2075 let uri = dir.path().to_str().unwrap();
2076 let adapter = Arc::new(RecordingStorageAdapter::default());
2077
2078 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2079 .await
2080 .unwrap();
2081 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2082 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2083 assert!(
2084 adapter
2085 .writes()
2086 .contains(&join_uri(uri, "__schema_state.json"))
2087 );
2088
2089 Omnigraph::open_with_storage(uri, adapter.clone())
2090 .await
2091 .unwrap();
2092 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2093 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2094 assert!(
2095 adapter
2096 .reads()
2097 .contains(&join_uri(uri, "__schema_state.json"))
2098 );
2099 assert!(
2100 adapter
2101 .exists_checks()
2102 .contains(&join_uri(uri, "_schema.ir.json"))
2103 );
2104 assert!(
2105 adapter
2106 .exists_checks()
2107 .contains(&join_uri(uri, "__schema_state.json"))
2108 );
2109 assert!(
2110 adapter
2111 .exists_checks()
2112 .contains(&join_uri(uri, "_graph_commits.lance"))
2113 );
2114 }
2115
2116 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2117 let snapshot = db.snapshot().await;
2118 let ds = snapshot.open(table_key).await.unwrap();
2119 let batches = db.table_store().scan_batches(&ds).await.unwrap();
2120 batches
2121 .into_iter()
2122 .flat_map(|batch| {
2123 (0..batch.num_rows())
2124 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2125 .collect::<Vec<_>>()
2126 })
2127 .collect()
2128 }
2129
2130 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2131 let (mut ds, full_path, table_branch) = db
2132 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2133 .await
2134 .unwrap();
2135 let schema: Arc<Schema> = Arc::new(ds.schema().into());
2136 let columns: Vec<Arc<dyn Array>> = schema
2137 .fields()
2138 .iter()
2139 .map(|field| match field.name().as_str() {
2140 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2141 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2142 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2143 _ => new_null_array(field.data_type(), 1),
2144 })
2145 .collect();
2146 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2147 let state = db
2148 .table_store()
2149 .append_batch(&full_path, &mut ds, batch)
2150 .await
2151 .unwrap();
2152 db.commit_updates(&[crate::db::SubTableUpdate {
2153 table_key: "node:Person".to_string(),
2154 table_version: state.version,
2155 table_branch,
2156 row_count: state.row_count,
2157 version_metadata: state.version_metadata,
2158 }])
2159 .await
2160 .unwrap();
2161 }
2162
2163 #[tokio::test]
2164 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2165 let dir = tempfile::tempdir().unwrap();
2166 let uri = dir.path().to_str().unwrap();
2167 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2168 seed_person_row(&mut db, "Alice", Some(30)).await;
2169
2170 let desired = TEST_SCHEMA.replace(
2171 " age: I32?\n}",
2172 " age: I32?\n nickname: String?\n}",
2173 );
2174 let result = db.apply_schema(&desired).await.unwrap();
2175 assert!(result.applied);
2176
2177 let reopened = Omnigraph::open(uri).await.unwrap();
2178 let rows = table_rows_json(&reopened, "node:Person").await;
2179 assert_eq!(rows.len(), 1);
2180 assert_eq!(rows[0]["name"], "Alice");
2181 assert_eq!(rows[0]["age"], 30);
2182 assert!(rows[0]["nickname"].is_null());
2183 assert!(
2184 reopened.catalog().node_types["Person"]
2185 .properties
2186 .contains_key("nickname")
2187 );
2188 assert!(dir.path().join("_schema.pg").exists());
2189 }
2190
2191 #[tokio::test]
2192 async fn test_apply_schema_renames_property_and_preserves_values() {
2193 let dir = tempfile::tempdir().unwrap();
2194 let uri = dir.path().to_str().unwrap();
2195 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2196 seed_person_row(&mut db, "Alice", Some(30)).await;
2197
2198 let desired = TEST_SCHEMA.replace(
2199 " age: I32?\n}",
2200 " years: I32? @rename_from(\"age\")\n}",
2201 );
2202 db.apply_schema(&desired).await.unwrap();
2203
2204 let reopened = Omnigraph::open(uri).await.unwrap();
2205 let rows = table_rows_json(&reopened, "node:Person").await;
2206 assert_eq!(rows[0]["name"], "Alice");
2207 assert_eq!(rows[0]["years"], 30);
2208 assert!(rows[0].get("age").is_none());
2209 }
2210
2211 #[tokio::test]
2212 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2213 let dir = tempfile::tempdir().unwrap();
2214 let uri = dir.path().to_str().unwrap();
2215 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2216 seed_person_row(&mut db, "Alice", Some(30)).await;
2217 let before_version = db.snapshot().await.version();
2218
2219 let desired = TEST_SCHEMA
2220 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2221 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2222 .replace(
2223 "edge WorksAt: Person -> Company",
2224 "edge WorksAt: Human -> Company",
2225 );
2226 db.apply_schema(&desired).await.unwrap();
2227
2228 let head = db.snapshot().await;
2229 assert!(head.entry("node:Person").is_none());
2230 assert!(head.entry("node:Human").is_some());
2231 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2232 .await
2233 .unwrap();
2234 assert!(historical.entry("node:Person").is_some());
2235 assert!(historical.entry("node:Human").is_none());
2236 }
2237
2238 #[tokio::test]
2239 async fn test_apply_schema_succeeds_after_load() {
2240 let dir = tempfile::tempdir().unwrap();
2247 let uri = dir.path().to_str().unwrap();
2248 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2249
2250 crate::loader::load_jsonl(
2251 &mut db,
2252 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2253 crate::loader::LoadMode::Overwrite,
2254 )
2255 .await
2256 .unwrap();
2257
2258 let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2259 assert!(
2260 !all_branches.iter().any(|b| is_internal_run_branch(b)),
2261 "run branch should be deleted after publish, got: {:?}",
2262 all_branches
2263 );
2264
2265 let desired = TEST_SCHEMA.replace(
2266 " age: I32?\n}",
2267 " age: I32?\n nickname: String?\n}",
2268 );
2269 let result = db.apply_schema(&desired).await.unwrap();
2270 assert!(result.applied, "schema apply should have applied");
2271 }
2272
2273 #[tokio::test]
2274 async fn test_apply_schema_adds_index_for_existing_property() {
2275 let dir = tempfile::tempdir().unwrap();
2276 let uri = dir.path().to_str().unwrap();
2277 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2278
2279 let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2280 db.apply_schema(&desired).await.unwrap();
2281
2282 let snapshot = db.snapshot().await;
2283 let ds = snapshot.open("node:Person").await.unwrap();
2284 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2285 }
2286
2287 #[tokio::test]
2288 async fn test_apply_schema_rewrite_preserves_existing_indices() {
2289 let dir = tempfile::tempdir().unwrap();
2290 let uri = dir.path().to_str().unwrap();
2291 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2292 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2293 seed_person_row(&mut db, "Alice", Some(30)).await;
2294
2295 let desired = initial_schema.replace(
2296 " age: I32?\n}",
2297 " age: I32?\n nickname: String?\n}",
2298 );
2299 db.apply_schema(&desired).await.unwrap();
2300
2301 let snapshot = db.snapshot().await;
2302 let ds = snapshot.open("node:Person").await.unwrap();
2303 assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
2304 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2305 }
2306
2307 #[tokio::test]
2308 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2309 let dir = tempfile::tempdir().unwrap();
2310 let uri = dir.path().to_str().unwrap();
2311 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2312 let mut db = db;
2313 db.coordinator
2314 .write()
2315 .await
2316 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2317 .await
2318 .unwrap();
2319
2320 let err = db
2321 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2322 .await
2323 .unwrap_err();
2324 assert!(
2325 err.to_string()
2326 .contains("write is unavailable while schema apply is in progress")
2327 );
2328 }
2329
2330 #[tokio::test]
2331 async fn test_commit_updates_rejects_while_schema_apply_locked() {
2332 let dir = tempfile::tempdir().unwrap();
2333 let uri = dir.path().to_str().unwrap();
2334 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2335 db.coordinator
2336 .write()
2337 .await
2338 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2339 .await
2340 .unwrap();
2341
2342 let err = db.commit_updates(&[]).await.unwrap_err();
2343 assert!(
2344 err.to_string()
2345 .contains("write commit is unavailable while schema apply is in progress")
2346 );
2347 }
2348
2349 #[tokio::test]
2350 async fn test_branch_list_hides_schema_apply_lock_branch() {
2351 let dir = tempfile::tempdir().unwrap();
2352 let uri = dir.path().to_str().unwrap();
2353 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2354 db.coordinator
2355 .write()
2356 .await
2357 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2358 .await
2359 .unwrap();
2360
2361 let branches = db.branch_list().await.unwrap();
2362 assert_eq!(branches, vec!["main".to_string()]);
2363 }
2364}