1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21 DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22 build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
37pub use schema_apply::SchemaApplyOptions;
38
39use super::commit_graph::GraphCommit;
40use super::manifest::{
41 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
42 table_path_for_table_key,
43};
44use super::schema_state::{
45 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
46 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
47 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
48 write_schema_contract, write_schema_contract_staging,
49};
50use super::{
51 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
52 is_schema_apply_lock_branch,
53};
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum MergeOutcome {
57 AlreadyUpToDate,
58 FastForward,
59 Merged,
60}
61
62#[derive(Debug, Clone)]
63pub struct SchemaApplyResult {
64 pub supported: bool,
65 pub applied: bool,
66 pub manifest_version: u64,
67 pub steps: Vec<SchemaMigrationStep>,
68}
69
70pub struct Omnigraph {
76 root_uri: String,
77 storage: Arc<dyn StorageAdapter>,
78 coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
95 table_store: TableStore,
96 runtime_cache: RuntimeCache,
97 catalog: Arc<ArcSwap<Catalog>>,
102 schema_source: Arc<ArcSwap<String>>,
105 write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
110 merge_exclusive: Arc<tokio::sync::Mutex<()>>,
132 policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub enum OpenMode {
161 ReadWrite,
163 ReadOnly,
166}
167
168#[derive(Debug, Clone, Copy, Default)]
181pub struct InitOptions {
182 pub force: bool,
185}
186
187impl Omnigraph {
188 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
195 Self::init_with_options(uri, schema_source, InitOptions::default()).await
196 }
197
198 pub async fn init_with_options(
203 uri: &str,
204 schema_source: &str,
205 options: InitOptions,
206 ) -> Result<Self> {
207 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
208 }
209
210 pub(crate) async fn init_with_storage(
211 uri: &str,
212 schema_source: &str,
213 storage: Arc<dyn StorageAdapter>,
214 options: InitOptions,
215 ) -> Result<Self> {
216 let root = normalize_root_uri(uri)?;
217
218 if !options.force {
228 for candidate in [
229 schema_source_uri(&root),
230 schema_ir_uri(&root),
231 schema_state_uri(&root),
232 ] {
233 if storage.exists(&candidate).await? {
234 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
235 }
236 }
237 }
238
239 let schema_ir = read_schema_ir_from_source(schema_source)?;
240 let mut catalog = build_catalog_from_ir(&schema_ir)?;
241 fixup_blob_schemas(&mut catalog);
242
243 let schema_pg_claimed = if options.force {
251 false
252 } else {
253 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
254 if !storage
255 .write_text_if_absent(&schema_path, schema_source)
256 .await?
257 {
258 return Err(OmniError::AlreadyInitialized { uri: root.clone() });
259 }
260 if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
261 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
262 return Err(err);
263 }
264 true
265 };
266
267 let coordinator = match init_storage_phase(
282 &root,
283 schema_source,
284 &schema_ir,
285 &catalog,
286 &storage,
287 !schema_pg_claimed,
288 )
289 .await
290 {
291 Ok(coordinator) => coordinator,
292 Err(err) => {
293 if schema_pg_claimed || options.force {
294 best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
295 }
296 return Err(err);
297 }
298 };
299
300 Ok(Self {
301 root_uri: root.clone(),
302 storage,
303 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
304 table_store: TableStore::new(&root),
305 runtime_cache: RuntimeCache::default(),
306 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
307 schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
308 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
309 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
310 policy: None,
311 })
312 }
313
314 pub async fn open(uri: &str) -> Result<Self> {
319 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
320 }
321
322 pub async fn open_read_only(uri: &str) -> Result<Self> {
325 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
326 }
327
328 pub(crate) async fn open_with_storage(
331 uri: &str,
332 storage: Arc<dyn StorageAdapter>,
333 ) -> Result<Self> {
334 Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
335 }
336
337 pub(crate) async fn open_with_storage_and_mode(
338 uri: &str,
339 storage: Arc<dyn StorageAdapter>,
340 mode: OpenMode,
341 ) -> Result<Self> {
342 let root = normalize_root_uri(uri)?;
343 let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
346 if matches!(mode, OpenMode::ReadWrite) {
356 let schema_state_recovery =
357 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
358 .await?;
359 crate::db::manifest::recover_manifest_drift(
365 &root,
366 Arc::clone(&storage),
367 &mut coordinator,
368 crate::db::manifest::RecoveryMode::Full,
369 schema_state_recovery,
370 )
371 .await?;
372 }
373 let schema_path = schema_source_uri(&root);
375 let schema_source = storage.read_text(&schema_path).await?;
376 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
377 let branches = coordinator.branch_list().await?;
378 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
379 &root,
380 Arc::clone(&storage),
381 &branches,
382 ¤t_source_ir,
383 )
384 .await?;
385 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
386 fixup_blob_schemas(&mut catalog);
387
388 Ok(Self {
389 root_uri: root.clone(),
390 storage,
391 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
392 table_store: TableStore::new(&root),
393 runtime_cache: RuntimeCache::default(),
394 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
395 schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
396 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
397 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
398 policy: None,
399 })
400 }
401
402 pub fn catalog(&self) -> Arc<Catalog> {
406 self.catalog.load_full()
407 }
408
409 pub fn schema_source(&self) -> Arc<String> {
411 self.schema_source.load_full()
412 }
413
414 pub(crate) fn store_catalog(&self, catalog: Catalog) {
418 self.catalog.store(Arc::new(catalog));
419 }
420
421 pub(crate) fn store_schema_source(&self, schema_source: String) {
424 self.schema_source.store(Arc::new(schema_source));
425 }
426
427 pub fn uri(&self) -> &str {
428 &self.root_uri
429 }
430
431 pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
442 self.policy = Some(checker);
443 self
444 }
445
446 pub(crate) fn enforce(
458 &self,
459 action: omnigraph_policy::PolicyAction,
460 scope: &omnigraph_policy::ResourceScope,
461 actor: Option<&str>,
462 ) -> Result<()> {
463 let Some(checker) = self.policy.as_ref() else {
464 return Ok(());
465 };
466 let Some(actor) = actor else {
467 return Err(OmniError::Policy(
468 "no actor for engine-layer policy check (policy is configured but the call site \
469 didn't thread an actor through — this is almost certainly a bug, not an \
470 intended bypass)"
471 .to_string(),
472 ));
473 };
474 checker
475 .check(action, scope, actor)
476 .map_err(|err| OmniError::Policy(err.to_string()))
477 }
478
479 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
480 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
481 }
482
483 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
484 self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
485 .await
486 }
487
488 pub async fn plan_schema_with_options(
489 &self,
490 desired_schema_source: &str,
491 options: SchemaApplyOptions,
492 ) -> Result<SchemaMigrationPlan> {
493 schema_apply::plan_schema(self, desired_schema_source, options).await
494 }
495
496 pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
497 self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
498 .await
499 }
500
501 pub async fn apply_schema_with_options(
502 &self,
503 desired_schema_source: &str,
504 options: SchemaApplyOptions,
505 ) -> Result<SchemaApplyResult> {
506 self.apply_schema_as(desired_schema_source, options, None)
507 .await
508 }
509
510 pub async fn apply_schema_as(
521 &self,
522 desired_schema_source: &str,
523 options: SchemaApplyOptions,
524 actor: Option<&str>,
525 ) -> Result<SchemaApplyResult> {
526 schema_apply::apply_schema(self, desired_schema_source, options, actor).await
527 }
528
529 pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
530 schema_apply::ensure_schema_apply_idle(self, operation).await
531 }
532
533 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
534 schema_apply::ensure_schema_apply_not_locked(self, operation).await
535 }
536
537 pub(crate) fn table_store(&self) -> &TableStore {
538 &self.table_store
539 }
540
541 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
550 &self.table_store
551 }
552
553 pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
557 self.storage.as_ref()
558 }
559
560 pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
568 Arc::clone(&self.write_queue)
569 }
570
571 pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
577 Arc::clone(&self.merge_exclusive)
578 }
579
580 pub(crate) fn root_uri(&self) -> &str {
583 &self.root_uri
584 }
585
586 pub(crate) async fn open_coordinator_for_branch(
587 &self,
588 branch: Option<&str>,
589 ) -> Result<GraphCoordinator> {
590 match branch {
591 Some(branch) => {
592 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
593 }
594 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
595 }
596 }
597
598 pub(crate) async fn swap_coordinator_for_branch(
599 &self,
600 branch: Option<&str>,
601 ) -> Result<GraphCoordinator> {
602 let next = self.open_coordinator_for_branch(branch).await?;
603 let mut coord = self.coordinator.write().await;
604 Ok(std::mem::replace(&mut *coord, next))
605 }
606
607 pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
608 *self.coordinator.write().await = coordinator;
609 }
610
611 pub(crate) async fn resolved_branch_target(
612 &self,
613 branch: Option<&str>,
614 ) -> Result<ResolvedTarget> {
615 self.ensure_schema_state_valid().await?;
616 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
617 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
618 let coord = self.coordinator.read().await;
619 if normalized.as_deref() == coord.current_branch() {
620 let snapshot_id = coord
621 .head_commit_id()
622 .await?
623 .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
624 return Ok(ResolvedTarget {
625 requested,
626 branch: coord.current_branch().map(str::to_string),
627 snapshot_id,
628 snapshot: coord.snapshot(),
629 });
630 }
631 coord.resolve_target(&requested).await
632 }
633
634 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
635 self.resolved_branch_target(branch)
636 .await
637 .map(|resolved| resolved.snapshot)
638 }
639
640 pub(crate) async fn version(&self) -> u64 {
641 self.coordinator.read().await.version()
642 }
643
644 pub(crate) async fn snapshot(&self) -> Snapshot {
646 self.coordinator.read().await.snapshot()
647 }
648
649 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
650 self.resolved_target(target)
651 .await
652 .map(|resolved| resolved.snapshot)
653 }
654
655 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
656 self.snapshot_of(target)
657 .await
658 .map(|snapshot| snapshot.version())
659 }
660
661 pub async fn resolved_branch_of(
662 &self,
663 target: impl Into<ReadTarget>,
664 ) -> Result<Option<String>> {
665 self.resolved_target(target)
666 .await
667 .map(|resolved| resolved.branch)
668 }
669
670 pub async fn sync_branch(&self, branch: &str) -> Result<()> {
672 self.ensure_schema_state_valid().await?;
673 let branch = normalize_branch_name(branch)?;
674 let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
675 *self.coordinator.write().await = next;
676 self.runtime_cache.invalidate_all().await;
677 Ok(())
678 }
679
680 pub async fn refresh(&self) -> Result<()> {
713 {
720 let mut coord = self.coordinator.write().await;
721 coord.refresh().await?;
722 let schema_state_recovery = recover_schema_state_files(
723 &self.root_uri,
724 Arc::clone(&self.storage),
725 &coord.snapshot(),
726 )
727 .await?;
728 crate::db::manifest::recover_manifest_drift(
729 &self.root_uri,
730 Arc::clone(&self.storage),
731 &mut *coord,
732 crate::db::manifest::RecoveryMode::RollForwardOnly,
733 schema_state_recovery,
734 )
735 .await?;
736 } self.reload_schema_if_source_changed().await?;
738 self.runtime_cache.invalidate_all().await;
739 Ok(())
740 }
741
742 async fn reload_schema_if_source_changed(&self) -> Result<()> {
743 let schema_path = schema_source_uri(&self.root_uri);
744 let schema_source = self.storage.read_text(&schema_path).await?;
745 if schema_source == *self.schema_source.load_full() {
746 return Ok(());
747 }
748 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
749 let branches = self.coordinator.read().await.branch_list().await?;
750 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
751 &self.root_uri,
752 Arc::clone(&self.storage),
753 &branches,
754 ¤t_source_ir,
755 )
756 .await?;
757 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
758 fixup_blob_schemas(&mut catalog);
759 self.store_schema_source(schema_source);
760 self.store_catalog(catalog);
761 Ok(())
762 }
763
764 pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
772 self.coordinator.write().await.refresh().await?;
773 self.runtime_cache.invalidate_all().await;
774 Ok(())
775 }
776
777 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
778 self.ensure_schema_state_valid().await?;
779 self.coordinator
780 .read()
781 .await
782 .resolve_snapshot_id(branch)
783 .await
784 }
785
786 pub(crate) async fn resolved_target(
787 &self,
788 target: impl Into<ReadTarget>,
789 ) -> Result<ResolvedTarget> {
790 self.ensure_schema_state_valid().await?;
791 self.coordinator
792 .read()
793 .await
794 .resolve_target(&target.into())
795 .await
796 }
797
798 pub async fn diff_between(
801 &self,
802 from: impl Into<ReadTarget>,
803 to: impl Into<ReadTarget>,
804 filter: &crate::changes::ChangeFilter,
805 ) -> Result<crate::changes::ChangeSet> {
806 let from_resolved = self.resolved_target(from).await?;
807 let to_resolved = self.resolved_target(to).await?;
808 crate::changes::diff_snapshots(
809 self.uri(),
810 &from_resolved.snapshot,
811 &to_resolved.snapshot,
812 filter,
813 to_resolved.branch.clone().or(from_resolved.branch.clone()),
814 )
815 .await
816 }
817
818 pub async fn diff_commits(
821 &self,
822 from_commit_id: &str,
823 to_commit_id: &str,
824 filter: &crate::changes::ChangeFilter,
825 ) -> Result<crate::changes::ChangeSet> {
826 let coord = self.coordinator.read().await;
827 let from_commit = coord
828 .resolve_commit(&SnapshotId::new(from_commit_id))
829 .await?;
830 let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
831 let from_snap = coord
832 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
833 from_commit.graph_commit_id.clone(),
834 )))
835 .await?;
836 let to_snap = coord
837 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
838 to_commit.graph_commit_id.clone(),
839 )))
840 .await?;
841 drop(coord);
842 crate::changes::diff_snapshots(
843 self.uri(),
844 &from_snap.snapshot,
845 &to_snap.snapshot,
846 filter,
847 to_snap.branch.clone().or(from_snap.branch.clone()),
848 )
849 .await
850 }
851
852 pub async fn entity_at_target(
853 &self,
854 target: impl Into<ReadTarget>,
855 table_key: &str,
856 id: &str,
857 ) -> Result<Option<serde_json::Value>> {
858 export::entity_at_target(self, target, table_key, id).await
859 }
860
861 pub async fn entity_at(
863 &self,
864 table_key: &str,
865 id: &str,
866 version: u64,
867 ) -> Result<Option<serde_json::Value>> {
868 export::entity_at(self, table_key, id, version).await
869 }
870
871 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
873 self.ensure_schema_state_valid().await?;
874 self.coordinator
875 .read()
876 .await
877 .snapshot_at_version(version)
878 .await
879 }
880
881 pub async fn export_jsonl(
882 &self,
883 branch: &str,
884 type_names: &[String],
885 table_keys: &[String],
886 ) -> Result<String> {
887 export::export_jsonl(self, branch, type_names, table_keys).await
888 }
889
890 pub async fn export_jsonl_to_writer<W: Write>(
891 &self,
892 branch: &str,
893 type_names: &[String],
894 table_keys: &[String],
895 writer: &mut W,
896 ) -> Result<()> {
897 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
898 }
899
900 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
904 table_ops::graph_index(self).await
905 }
906
907 pub(crate) async fn graph_index_for_resolved(
908 &self,
909 resolved: &ResolvedTarget,
910 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
911 table_ops::graph_index_for_resolved(self, resolved).await
912 }
913
914 pub async fn ensure_indices(&self) -> Result<()> {
927 table_ops::ensure_indices(self).await
928 }
929
930 pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
931 table_ops::ensure_indices_on(self, branch).await
932 }
933
934 #[cfg(feature = "failpoints")]
935 #[doc(hidden)]
936 pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
937 &mut self,
938 branch: &str,
939 table_key: &str,
940 table_branch: Option<&str>,
941 ) -> Result<u64> {
942 table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
943 self,
944 branch,
945 table_key,
946 table_branch,
947 )
948 .await
949 }
950
951 pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
954 optimize::optimize_all_tables(self).await
955 }
956
957 pub async fn cleanup(
961 &mut self,
962 options: optimize::CleanupPolicyOptions,
963 ) -> Result<Vec<optimize::TableCleanupStats>> {
964 optimize::cleanup_all_tables(self, options).await
965 }
966
967 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
977 self.ensure_schema_state_valid().await?;
978 let catalog = self.catalog();
979 let node_type = catalog
980 .node_types
981 .get(type_name)
982 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
983 if !node_type.blob_properties.contains(property) {
984 return Err(OmniError::manifest(format!(
985 "property '{}' on type '{}' is not a Blob",
986 property, type_name
987 )));
988 }
989
990 let snapshot = self.snapshot().await;
991 let table_key = format!("node:{}", type_name);
992 let ds = snapshot.open(&table_key).await?;
993
994 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
995 let row_id = self
996 .table_store
997 .first_row_id_for_filter(&ds, &filter_sql)
998 .await?
999 .ok_or_else(|| {
1000 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
1001 })?;
1002
1003 let ds = Arc::new(ds);
1005 let mut blobs = ds
1006 .take_blobs(&[row_id], property)
1007 .await
1008 .map_err(|e| OmniError::Lance(e.to_string()))?;
1009
1010 blobs.pop().ok_or_else(|| {
1011 OmniError::manifest(format!(
1012 "blob '{}' on {} '{}' returned no data",
1013 property, type_name, id
1014 ))
1015 })
1016 }
1017
1018 pub(crate) async fn active_branch(&self) -> Option<String> {
1019 self.coordinator
1020 .read()
1021 .await
1022 .current_branch()
1023 .map(str::to_string)
1024 }
1025
1026 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
1027 let descendants = self
1028 .coordinator
1029 .read()
1030 .await
1031 .branch_descendants(branch)
1032 .await?;
1033 if let Some(descendant) = descendants.first() {
1034 return Err(OmniError::manifest_conflict(format!(
1035 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
1036 branch, descendant
1037 )));
1038 }
1039
1040 for other_branch in branches
1041 .iter()
1042 .filter(|candidate| candidate.as_str() != branch)
1043 {
1044 let snapshot = self
1045 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
1046 .await?;
1047 if snapshot
1048 .entries()
1049 .any(|entry| entry.table_branch.as_deref() == Some(branch))
1050 {
1051 return Err(OmniError::manifest_conflict(format!(
1052 "cannot delete branch '{}' because branch '{}' still depends on it",
1053 branch, other_branch
1054 )));
1055 }
1056 }
1057
1058 Ok(())
1059 }
1060
1061 async fn cleanup_deleted_branch_tables(
1062 &self,
1063 branch: &str,
1064 owned_tables: &[(String, String)],
1065 ) -> Result<()> {
1066 let mut seen_paths = HashSet::new();
1067 let mut cleanup_targets = owned_tables
1068 .iter()
1069 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
1070 .cloned()
1071 .collect::<Vec<_>>();
1072 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
1073
1074 for (table_key, table_path) in cleanup_targets {
1075 let dataset_uri = self.table_store.dataset_uri(&table_path);
1076 if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
1077 return Err(OmniError::manifest_internal(format!(
1078 "branch '{}' was deleted but cleanup failed for {}: {}",
1079 branch, table_key, err
1080 )));
1081 }
1082 }
1083
1084 Ok(())
1085 }
1086
1087 async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
1088 let active = self
1089 .coordinator
1090 .read()
1091 .await
1092 .current_branch()
1093 .map(str::to_string);
1094 if active.as_deref() == Some(branch) {
1095 return Err(OmniError::manifest_conflict(format!(
1096 "cannot delete currently active branch '{}'",
1097 branch
1098 )));
1099 }
1100
1101 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
1102 let owned_tables = branch_snapshot
1103 .entries()
1104 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
1105 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
1106 .collect::<Vec<_>>();
1107
1108 self.coordinator.write().await.branch_delete(branch).await?;
1109 self.cleanup_deleted_branch_tables(branch, &owned_tables)
1110 .await
1111 }
1112
1113 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1114 normalize_branch_name(branch)
1115 }
1116
1117 pub(crate) async fn head_commit_id_for_branch(
1118 &self,
1119 branch: Option<&str>,
1120 ) -> Result<Option<String>> {
1121 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
1122 coordinator.ensure_commit_graph_initialized().await?;
1123 coordinator
1124 .head_commit_id()
1125 .await
1126 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
1127 }
1128
1129 pub async fn branch_create(&self, name: &str) -> Result<()> {
1130 self.branch_create_as(name, None).await
1131 }
1132
1133 pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1141 self.enforce(
1142 omnigraph_policy::PolicyAction::BranchCreate,
1143 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1144 actor,
1145 )?;
1146 self.ensure_schema_state_valid().await?;
1147 self.ensure_schema_apply_idle("branch_create").await?;
1148 ensure_public_branch_ref(name, "branch_create")?;
1149 self.coordinator.write().await.branch_create(name).await
1150 }
1151
1152 pub async fn branch_create_from(&self, from: impl Into<ReadTarget>, name: &str) -> Result<()> {
1153 self.branch_create_from_as(from, name, None).await
1154 }
1155
1156 pub async fn branch_create_from_as(
1168 &self,
1169 from: impl Into<ReadTarget>,
1170 name: &str,
1171 actor: Option<&str>,
1172 ) -> Result<()> {
1173 let target = from.into();
1174 let source_branch = match &target {
1175 ReadTarget::Branch(b) => b.clone(),
1176 _ => "<snapshot>".to_string(),
1177 };
1178 self.enforce(
1179 omnigraph_policy::PolicyAction::BranchCreate,
1180 &omnigraph_policy::ResourceScope::BranchTransition {
1181 source: source_branch,
1182 target: name.to_string(),
1183 },
1184 actor,
1185 )?;
1186 self.ensure_schema_apply_idle("branch_create_from").await?;
1187 self.branch_create_from_impl(target, name, false).await
1188 }
1189
1190 async fn branch_create_from_impl(
1191 &self,
1192 from: impl Into<ReadTarget>,
1193 name: &str,
1194 allow_internal_refs: bool,
1195 ) -> Result<()> {
1196 let target = from.into();
1197 let ReadTarget::Branch(branch_name) = target else {
1198 return Err(OmniError::manifest(
1199 "branch creation from pinned snapshots is not supported yet".to_string(),
1200 ));
1201 };
1202 if !allow_internal_refs {
1203 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1204 ensure_public_branch_ref(name, "branch_create_from")?;
1205 }
1206 let branch = normalize_branch_name(&branch_name)?;
1207 let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1224 source_coord.branch_create(name).await
1225 }
1226
1227 pub async fn branch_list(&self) -> Result<Vec<String>> {
1228 self.ensure_schema_state_valid().await?;
1229 self.coordinator.read().await.branch_list().await
1230 }
1231
1232 pub async fn branch_delete(&self, name: &str) -> Result<()> {
1233 self.branch_delete_as(name, None).await
1234 }
1235
1236 pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1244 self.enforce(
1245 omnigraph_policy::PolicyAction::BranchDelete,
1246 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1247 actor,
1248 )?;
1249 self.ensure_schema_state_valid().await?;
1250 self.ensure_schema_apply_idle("branch_delete").await?;
1251 ensure_public_branch_ref(name, "branch_delete")?;
1252 self.refresh().await?;
1253 let branch = normalize_branch_name(name)?
1254 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1255 let branches = self.coordinator.read().await.branch_list().await?;
1256 if !branches.iter().any(|candidate| candidate == &branch) {
1257 return Err(OmniError::manifest_not_found(format!(
1258 "branch '{}' not found",
1259 branch
1260 )));
1261 }
1262
1263 self.ensure_branch_delete_safe(&branch, &branches).await?;
1264 self.delete_branch_storage_only(&branch).await
1265 }
1266
1267 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1268 self.ensure_schema_state_valid().await?;
1269 self.coordinator
1270 .read()
1271 .await
1272 .resolve_commit(&SnapshotId::new(commit_id))
1273 .await
1274 }
1275
1276 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1277 self.ensure_schema_state_valid().await?;
1278 let branch = match branch {
1279 Some(branch) => normalize_branch_name(branch)?,
1280 None => None,
1281 };
1282 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1283 coordinator.list_commits().await
1284 }
1285
1286 pub(crate) async fn open_for_mutation(
1292 &self,
1293 table_key: &str,
1294 op_kind: crate::db::MutationOpKind,
1295 ) -> Result<(Dataset, String, Option<String>)> {
1296 table_ops::open_for_mutation(self, table_key, op_kind).await
1297 }
1298
1299 pub(crate) async fn open_for_mutation_on_branch(
1300 &self,
1301 branch: Option<&str>,
1302 table_key: &str,
1303 op_kind: crate::db::MutationOpKind,
1304 ) -> Result<(Dataset, String, Option<String>)> {
1305 table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1306 }
1307
1308 pub(crate) async fn fork_dataset_from_entry_state(
1309 &self,
1310 table_key: &str,
1311 full_path: &str,
1312 source_branch: Option<&str>,
1313 source_version: u64,
1314 active_branch: &str,
1315 ) -> Result<Dataset> {
1316 table_ops::fork_dataset_from_entry_state(
1317 self,
1318 table_key,
1319 full_path,
1320 source_branch,
1321 source_version,
1322 active_branch,
1323 )
1324 .await
1325 }
1326
1327 pub(crate) async fn reopen_for_mutation(
1328 &self,
1329 table_key: &str,
1330 full_path: &str,
1331 table_branch: Option<&str>,
1332 expected_version: u64,
1333 op_kind: crate::db::MutationOpKind,
1334 ) -> Result<Dataset> {
1335 table_ops::reopen_for_mutation(
1336 self,
1337 table_key,
1338 full_path,
1339 table_branch,
1340 expected_version,
1341 op_kind,
1342 )
1343 .await
1344 }
1345
1346 pub(crate) async fn open_dataset_at_state(
1347 &self,
1348 table_path: &str,
1349 table_branch: Option<&str>,
1350 table_version: u64,
1351 ) -> Result<Dataset> {
1352 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1353 }
1354
1355 pub(crate) async fn build_indices_on_dataset(
1356 &self,
1357 table_key: &str,
1358 ds: &mut Dataset,
1359 ) -> Result<()> {
1360 table_ops::build_indices_on_dataset(self, table_key, ds).await
1361 }
1362
1363 pub(crate) async fn build_indices_on_dataset_for_catalog(
1364 &self,
1365 catalog: &Catalog,
1366 table_key: &str,
1367 ds: &mut Dataset,
1368 ) -> Result<()> {
1369 table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1370 }
1371
1372 #[cfg(test)]
1375 pub(crate) async fn commit_updates(
1376 &mut self,
1377 updates: &[crate::db::SubTableUpdate],
1378 ) -> Result<u64> {
1379 table_ops::commit_updates(self, updates).await
1380 }
1381
1382 pub(crate) async fn commit_manifest_updates(
1383 &self,
1384 updates: &[crate::db::SubTableUpdate],
1385 ) -> Result<u64> {
1386 table_ops::commit_manifest_updates(self, updates).await
1387 }
1388
1389 pub(crate) async fn record_merge_commit(
1390 &self,
1391 manifest_version: u64,
1392 parent_commit_id: &str,
1393 merged_parent_commit_id: &str,
1394 actor_id: Option<&str>,
1395 ) -> Result<String> {
1396 table_ops::record_merge_commit(
1397 self,
1398 manifest_version,
1399 parent_commit_id,
1400 merged_parent_commit_id,
1401 actor_id,
1402 )
1403 .await
1404 }
1405
1406 pub(crate) async fn commit_updates_on_branch_with_expected(
1407 &self,
1408 branch: Option<&str>,
1409 updates: &[crate::db::SubTableUpdate],
1410 expected_table_versions: &std::collections::HashMap<String, u64>,
1411 actor_id: Option<&str>,
1412 ) -> Result<u64> {
1413 table_ops::commit_updates_on_branch_with_expected(
1414 self,
1415 branch,
1416 updates,
1417 expected_table_versions,
1418 actor_id,
1419 )
1420 .await
1421 }
1422
1423 pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1424 table_ops::ensure_commit_graph_initialized(self).await
1425 }
1426
1427 pub(crate) async fn invalidate_graph_index(&self) {
1429 table_ops::invalidate_graph_index(self).await
1430 }
1431}
1432
1433pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1434 let branch = branch.trim();
1435 if branch.is_empty() {
1436 return Err(OmniError::manifest(
1437 "branch name cannot be empty".to_string(),
1438 ));
1439 }
1440 if branch == "main" {
1441 return Ok(None);
1442 }
1443 Ok(Some(branch.to_string()))
1444}
1445
1446pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1447 if super::is_internal_run_branch(branch) {
1448 return Err(OmniError::manifest(format!(
1449 "{} does not allow internal run ref '{}'",
1450 operation, branch
1451 )));
1452 }
1453 if is_internal_system_branch(branch) {
1454 return Err(OmniError::manifest(format!(
1455 "{} does not allow internal system ref '{}'",
1456 operation, branch
1457 )));
1458 }
1459 Ok(())
1460}
1461
1462fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1463 if batches.is_empty() {
1464 return Ok(RecordBatch::new_empty(schema));
1465 }
1466 if batches.len() == 1 {
1467 return Ok(batches.into_iter().next().unwrap());
1468 }
1469 let batch_schema = batches[0].schema();
1470 arrow_select::concat::concat_batches(&batch_schema, &batches)
1471 .map_err(|e| OmniError::Lance(e.to_string()))
1472}
1473
1474fn blob_properties_for_table_key<'a>(
1475 catalog: &'a Catalog,
1476 table_key: &str,
1477) -> Result<&'a std::collections::HashSet<String>> {
1478 if let Some(type_name) = table_key.strip_prefix("node:") {
1479 return catalog
1480 .node_types
1481 .get(type_name)
1482 .map(|node_type| &node_type.blob_properties)
1483 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1484 }
1485 if let Some(type_name) = table_key.strip_prefix("edge:") {
1486 return catalog
1487 .edge_types
1488 .get(type_name)
1489 .map(|edge_type| &edge_type.blob_properties)
1490 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1491 }
1492 Err(OmniError::manifest(format!(
1493 "invalid table key '{}'",
1494 table_key
1495 )))
1496}
1497
1498fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1499 if descriptions.is_null(row) {
1500 return Ok(true);
1501 }
1502
1503 let kind = descriptions
1504 .column_by_name("kind")
1505 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1506 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1507 .or_else(|| {
1508 descriptions
1509 .column_by_name("kind")
1510 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1511 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1512 });
1513 let position = descriptions
1514 .column_by_name("position")
1515 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1516 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1517 let size = descriptions
1518 .column_by_name("size")
1519 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1520 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1521 let blob_uri = descriptions
1522 .column_by_name("blob_uri")
1523 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1524 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1525
1526 let Some(kind) = kind else {
1527 return Ok(true);
1528 };
1529 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1530 if kind != BlobKind::Inline {
1531 return Ok(false);
1532 }
1533
1534 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1535}
1536
1537fn fixup_blob_schemas(catalog: &mut Catalog) {
1543 for node_type in catalog.node_types.values_mut() {
1544 if node_type.blob_properties.is_empty() {
1545 continue;
1546 }
1547 let fields: Vec<Field> = node_type
1548 .arrow_schema
1549 .fields()
1550 .iter()
1551 .map(|f| {
1552 if node_type.blob_properties.contains(f.name()) {
1553 blob_field(f.name(), f.is_nullable())
1554 } else {
1555 f.as_ref().clone()
1556 }
1557 })
1558 .collect();
1559 node_type.arrow_schema = Arc::new(Schema::new(fields));
1560 }
1561 for edge_type in catalog.edge_types.values_mut() {
1562 if edge_type.blob_properties.is_empty() {
1563 continue;
1564 }
1565 let fields: Vec<Field> = edge_type
1566 .arrow_schema
1567 .fields()
1568 .iter()
1569 .map(|f| {
1570 if edge_type.blob_properties.contains(f.name()) {
1571 blob_field(f.name(), f.is_nullable())
1572 } else {
1573 f.as_ref().clone()
1574 }
1575 })
1576 .collect();
1577 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1578 }
1579}
1580
1581fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1582 let schema_ast = parse_schema(schema_source)?;
1583 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1584}
1585
1586async fn init_storage_phase(
1601 root: &str,
1602 schema_source: &str,
1603 schema_ir: &SchemaIR,
1604 catalog: &Catalog,
1605 storage: &Arc<dyn StorageAdapter>,
1606 write_schema_pg: bool,
1607) -> Result<GraphCoordinator> {
1608 if write_schema_pg {
1609 let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
1610 storage.write_text(&schema_path, schema_source).await?;
1611 crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
1612 }
1613
1614 write_schema_contract(root, storage.as_ref(), schema_ir).await?;
1615 crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
1616
1617 let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
1618 crate::failpoints::maybe_fail("init.after_coordinator_init")?;
1619
1620 Ok(coordinator)
1621}
1622
1623async fn best_effort_cleanup_init_artifacts(root: &str, storage: &dyn StorageAdapter) {
1635 for uri in [
1636 schema_source_uri(root),
1637 schema_ir_uri(root),
1638 schema_state_uri(root),
1639 ] {
1640 if let Err(err) = storage.delete(&uri).await {
1641 tracing::warn!(
1642 target: "omnigraph::init::cleanup",
1643 uri = %uri,
1644 error = %err,
1645 "init failed; best-effort cleanup could not delete artifact",
1646 );
1647 }
1648 }
1649}
1650
1651fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1652 match type_kind {
1653 SchemaTypeKind::Node => format!("node:{}", name),
1654 SchemaTypeKind::Edge => format!("edge:{}", name),
1655 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1656 }
1657}
1658
1659fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1660 if let Some(type_name) = table_key.strip_prefix("node:") {
1661 let node_type: &NodeType = catalog
1662 .node_types
1663 .get(type_name)
1664 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1665 return Ok(node_type.arrow_schema.clone());
1666 }
1667 if let Some(type_name) = table_key.strip_prefix("edge:") {
1668 let edge_type: &EdgeType = catalog
1669 .edge_types
1670 .get(type_name)
1671 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1672 return Ok(edge_type.arrow_schema.clone());
1673 }
1674 Err(OmniError::manifest(format!(
1675 "invalid table key '{}'",
1676 table_key
1677 )))
1678}
1679
1680fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1681 let mut obj = serde_json::Map::new();
1682 for (i, field) in batch.schema().fields().iter().enumerate() {
1683 obj.insert(
1684 field.name().clone(),
1685 json_value_from_array(batch.column(i).as_ref(), row)?,
1686 );
1687 }
1688 Ok(serde_json::Value::Object(obj))
1689}
1690
1691fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1692 if array.is_null(row) {
1693 return Ok(serde_json::Value::Null);
1694 }
1695
1696 match array.data_type() {
1697 DataType::Utf8 => Ok(serde_json::Value::String(
1698 array
1699 .as_any()
1700 .downcast_ref::<StringArray>()
1701 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1702 .value(row)
1703 .to_string(),
1704 )),
1705 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1706 array
1707 .as_any()
1708 .downcast_ref::<LargeStringArray>()
1709 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1710 .value(row)
1711 .to_string(),
1712 )),
1713 DataType::Boolean => Ok(serde_json::Value::Bool(
1714 array
1715 .as_any()
1716 .downcast_ref::<BooleanArray>()
1717 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1718 .value(row),
1719 )),
1720 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1721 array
1722 .as_any()
1723 .downcast_ref::<Int32Array>()
1724 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1725 .value(row),
1726 ))),
1727 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1728 array
1729 .as_any()
1730 .downcast_ref::<Int64Array>()
1731 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1732 .value(row),
1733 ))),
1734 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1735 array
1736 .as_any()
1737 .downcast_ref::<UInt32Array>()
1738 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1739 .value(row),
1740 ))),
1741 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1742 array
1743 .as_any()
1744 .downcast_ref::<UInt64Array>()
1745 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1746 .value(row),
1747 ))),
1748 DataType::Float32 => {
1749 let value = array
1750 .as_any()
1751 .downcast_ref::<Float32Array>()
1752 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1753 .value(row) as f64;
1754 Ok(serde_json::Value::Number(
1755 serde_json::Number::from_f64(value).ok_or_else(|| {
1756 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1757 })?,
1758 ))
1759 }
1760 DataType::Float64 => {
1761 let value = array
1762 .as_any()
1763 .downcast_ref::<Float64Array>()
1764 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1765 .value(row);
1766 Ok(serde_json::Value::Number(
1767 serde_json::Number::from_f64(value).ok_or_else(|| {
1768 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1769 })?,
1770 ))
1771 }
1772 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1773 array
1774 .as_any()
1775 .downcast_ref::<Date32Array>()
1776 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1777 .value(row),
1778 ))),
1779 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1780 &base64::engine::general_purpose::STANDARD,
1781 array
1782 .as_any()
1783 .downcast_ref::<BinaryArray>()
1784 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1785 .value(row),
1786 ))),
1787 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1788 &base64::engine::general_purpose::STANDARD,
1789 array
1790 .as_any()
1791 .downcast_ref::<LargeBinaryArray>()
1792 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1793 .value(row),
1794 ))),
1795 DataType::List(_) => {
1796 let list = array
1797 .as_any()
1798 .downcast_ref::<ListArray>()
1799 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1800 let values = list.value(row);
1801 let mut out = Vec::with_capacity(values.len());
1802 for idx in 0..values.len() {
1803 out.push(json_value_from_array(values.as_ref(), idx)?);
1804 }
1805 Ok(serde_json::Value::Array(out))
1806 }
1807 DataType::LargeList(_) => {
1808 let list = array
1809 .as_any()
1810 .downcast_ref::<LargeListArray>()
1811 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1812 let values = list.value(row);
1813 let mut out = Vec::with_capacity(values.len());
1814 for idx in 0..values.len() {
1815 out.push(json_value_from_array(values.as_ref(), idx)?);
1816 }
1817 Ok(serde_json::Value::Array(out))
1818 }
1819 DataType::FixedSizeList(_, _) => {
1820 let list = array
1821 .as_any()
1822 .downcast_ref::<FixedSizeListArray>()
1823 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1824 let values = list.value(row);
1825 let mut out = Vec::with_capacity(values.len());
1826 for idx in 0..values.len() {
1827 out.push(json_value_from_array(values.as_ref(), idx)?);
1828 }
1829 Ok(serde_json::Value::Array(out))
1830 }
1831 DataType::Struct(fields) => {
1832 let struct_array = array
1833 .as_any()
1834 .downcast_ref::<StructArray>()
1835 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1836 let mut obj = serde_json::Map::new();
1837 for (field_idx, field) in fields.iter().enumerate() {
1838 obj.insert(
1839 field.name().clone(),
1840 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1841 );
1842 }
1843 Ok(serde_json::Value::Object(obj))
1844 }
1845 _ => {
1846 let value = arrow_cast::display::array_value_to_string(array, row)
1847 .map_err(|e| OmniError::Lance(e.to_string()))?;
1848 Ok(serde_json::Value::String(value))
1849 }
1850 }
1851}
1852
1853#[cfg(test)]
1854mod tests {
1855 use super::*;
1856 use crate::db::is_internal_run_branch;
1857 use crate::db::manifest::ManifestCoordinator;
1858 use async_trait::async_trait;
1859 use serde_json::Value;
1860 use std::sync::{Arc, Mutex};
1861
1862 use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1863
1864 const TEST_SCHEMA: &str = r#"
1865node Person {
1866 name: String @key
1867 age: I32?
1868}
1869node Company {
1870 name: String @key
1871}
1872edge Knows: Person -> Person {
1873 since: Date?
1874}
1875edge WorksAt: Person -> Company
1876"#;
1877
1878 #[derive(Debug, Default)]
1879 struct RecordingStorageAdapter {
1880 inner: LocalStorageAdapter,
1881 reads: Mutex<Vec<String>>,
1882 writes: Mutex<Vec<String>>,
1883 exists_checks: Mutex<Vec<String>>,
1884 renames: Mutex<Vec<(String, String)>>,
1885 deletes: Mutex<Vec<String>>,
1886 }
1887
1888 impl RecordingStorageAdapter {
1889 fn reads(&self) -> Vec<String> {
1890 self.reads.lock().unwrap().clone()
1891 }
1892
1893 fn writes(&self) -> Vec<String> {
1894 self.writes.lock().unwrap().clone()
1895 }
1896
1897 fn exists_checks(&self) -> Vec<String> {
1898 self.exists_checks.lock().unwrap().clone()
1899 }
1900 }
1901
1902 #[async_trait]
1903 impl StorageAdapter for RecordingStorageAdapter {
1904 async fn read_text(&self, uri: &str) -> Result<String> {
1905 self.reads.lock().unwrap().push(uri.to_string());
1906 self.inner.read_text(uri).await
1907 }
1908
1909 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1910 self.writes.lock().unwrap().push(uri.to_string());
1911 self.inner.write_text(uri, contents).await
1912 }
1913
1914 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
1915 self.writes.lock().unwrap().push(uri.to_string());
1916 self.inner.write_text_if_absent(uri, contents).await
1917 }
1918
1919 async fn exists(&self, uri: &str) -> Result<bool> {
1920 self.exists_checks.lock().unwrap().push(uri.to_string());
1921 self.inner.exists(uri).await
1922 }
1923
1924 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1925 self.renames
1926 .lock()
1927 .unwrap()
1928 .push((from_uri.to_string(), to_uri.to_string()));
1929 self.inner.rename_text(from_uri, to_uri).await
1930 }
1931
1932 async fn delete(&self, uri: &str) -> Result<()> {
1933 self.deletes.lock().unwrap().push(uri.to_string());
1934 self.inner.delete(uri).await
1935 }
1936
1937 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1938 self.inner.list_dir(dir_uri).await
1939 }
1940 }
1941
1942 #[derive(Debug)]
1943 struct InitRaceStorageAdapter {
1944 inner: LocalStorageAdapter,
1945 root: String,
1946 barrier: Arc<tokio::sync::Barrier>,
1947 }
1948
1949 #[async_trait]
1950 impl StorageAdapter for InitRaceStorageAdapter {
1951 async fn read_text(&self, uri: &str) -> Result<String> {
1952 self.inner.read_text(uri).await
1953 }
1954
1955 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1956 self.inner.write_text(uri, contents).await
1957 }
1958
1959 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
1960 self.inner.write_text_if_absent(uri, contents).await
1961 }
1962
1963 async fn exists(&self, uri: &str) -> Result<bool> {
1964 let exists = self.inner.exists(uri).await?;
1965 if uri == schema_state_uri(&self.root) {
1966 self.barrier.wait().await;
1967 }
1968 Ok(exists)
1969 }
1970
1971 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1972 self.inner.rename_text(from_uri, to_uri).await
1973 }
1974
1975 async fn delete(&self, uri: &str) -> Result<()> {
1976 self.inner.delete(uri).await
1977 }
1978
1979 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1980 self.inner.list_dir(dir_uri).await
1981 }
1982 }
1983
1984 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1985 async fn concurrent_strict_init_does_not_delete_winning_schema_files() {
1986 let dir = tempfile::tempdir().unwrap();
1987 let uri = dir.path().to_str().unwrap().to_string();
1988 let root = normalize_root_uri(&uri).unwrap();
1989 let storage: Arc<dyn StorageAdapter> = Arc::new(InitRaceStorageAdapter {
1990 inner: LocalStorageAdapter,
1991 root,
1992 barrier: Arc::new(tokio::sync::Barrier::new(2)),
1993 });
1994
1995 let left = Omnigraph::init_with_storage(
1996 &uri,
1997 TEST_SCHEMA,
1998 Arc::clone(&storage),
1999 InitOptions::default(),
2000 );
2001 let right = Omnigraph::init_with_storage(
2002 &uri,
2003 TEST_SCHEMA,
2004 Arc::clone(&storage),
2005 InitOptions::default(),
2006 );
2007 let (left, right) = tokio::join!(left, right);
2008 let ok_count = usize::from(left.is_ok()) + usize::from(right.is_ok());
2009 assert_eq!(ok_count, 1, "exactly one concurrent init should win");
2010
2011 assert!(
2012 dir.path().join("_schema.pg").exists(),
2013 "winning init must leave _schema.pg in place"
2014 );
2015 assert!(
2016 dir.path().join("_schema.ir.json").exists(),
2017 "winning init must leave _schema.ir.json in place"
2018 );
2019 assert!(
2020 dir.path().join("__schema_state.json").exists(),
2021 "winning init must leave __schema_state.json in place"
2022 );
2023 }
2024
2025 #[tokio::test]
2026 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
2027 let dir = tempfile::tempdir().unwrap();
2028 let uri = dir.path().to_str().unwrap();
2029 let adapter = Arc::new(RecordingStorageAdapter::default());
2030
2031 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
2032 .await
2033 .unwrap();
2034 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
2035 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
2036 assert!(
2037 adapter
2038 .writes()
2039 .contains(&join_uri(uri, "__schema_state.json"))
2040 );
2041
2042 Omnigraph::open_with_storage(uri, adapter.clone())
2043 .await
2044 .unwrap();
2045 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
2046 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
2047 assert!(
2048 adapter
2049 .reads()
2050 .contains(&join_uri(uri, "__schema_state.json"))
2051 );
2052 assert!(
2053 adapter
2054 .exists_checks()
2055 .contains(&join_uri(uri, "_schema.ir.json"))
2056 );
2057 assert!(
2058 adapter
2059 .exists_checks()
2060 .contains(&join_uri(uri, "__schema_state.json"))
2061 );
2062 assert!(
2063 adapter
2064 .exists_checks()
2065 .contains(&join_uri(uri, "_graph_commits.lance"))
2066 );
2067 }
2068
2069 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
2070 let snapshot = db.snapshot().await;
2071 let ds = snapshot.open(table_key).await.unwrap();
2072 let batches = db.table_store().scan_batches(&ds).await.unwrap();
2073 batches
2074 .into_iter()
2075 .flat_map(|batch| {
2076 (0..batch.num_rows())
2077 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
2078 .collect::<Vec<_>>()
2079 })
2080 .collect()
2081 }
2082
2083 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
2084 let (mut ds, full_path, table_branch) = db
2085 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2086 .await
2087 .unwrap();
2088 let schema: Arc<Schema> = Arc::new(ds.schema().into());
2089 let columns: Vec<Arc<dyn Array>> = schema
2090 .fields()
2091 .iter()
2092 .map(|field| match field.name().as_str() {
2093 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2094 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
2095 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
2096 _ => new_null_array(field.data_type(), 1),
2097 })
2098 .collect();
2099 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
2100 let state = db
2101 .table_store()
2102 .append_batch(&full_path, &mut ds, batch)
2103 .await
2104 .unwrap();
2105 db.commit_updates(&[crate::db::SubTableUpdate {
2106 table_key: "node:Person".to_string(),
2107 table_version: state.version,
2108 table_branch,
2109 row_count: state.row_count,
2110 version_metadata: state.version_metadata,
2111 }])
2112 .await
2113 .unwrap();
2114 }
2115
2116 #[tokio::test]
2117 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
2118 let dir = tempfile::tempdir().unwrap();
2119 let uri = dir.path().to_str().unwrap();
2120 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2121 seed_person_row(&mut db, "Alice", Some(30)).await;
2122
2123 let desired = TEST_SCHEMA.replace(
2124 " age: I32?\n}",
2125 " age: I32?\n nickname: String?\n}",
2126 );
2127 let result = db.apply_schema(&desired).await.unwrap();
2128 assert!(result.applied);
2129
2130 let reopened = Omnigraph::open(uri).await.unwrap();
2131 let rows = table_rows_json(&reopened, "node:Person").await;
2132 assert_eq!(rows.len(), 1);
2133 assert_eq!(rows[0]["name"], "Alice");
2134 assert_eq!(rows[0]["age"], 30);
2135 assert!(rows[0]["nickname"].is_null());
2136 assert!(
2137 reopened.catalog().node_types["Person"]
2138 .properties
2139 .contains_key("nickname")
2140 );
2141 assert!(dir.path().join("_schema.pg").exists());
2142 }
2143
2144 #[tokio::test]
2145 async fn test_apply_schema_renames_property_and_preserves_values() {
2146 let dir = tempfile::tempdir().unwrap();
2147 let uri = dir.path().to_str().unwrap();
2148 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2149 seed_person_row(&mut db, "Alice", Some(30)).await;
2150
2151 let desired = TEST_SCHEMA.replace(
2152 " age: I32?\n}",
2153 " years: I32? @rename_from(\"age\")\n}",
2154 );
2155 db.apply_schema(&desired).await.unwrap();
2156
2157 let reopened = Omnigraph::open(uri).await.unwrap();
2158 let rows = table_rows_json(&reopened, "node:Person").await;
2159 assert_eq!(rows[0]["name"], "Alice");
2160 assert_eq!(rows[0]["years"], 30);
2161 assert!(rows[0].get("age").is_none());
2162 }
2163
2164 #[tokio::test]
2165 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
2166 let dir = tempfile::tempdir().unwrap();
2167 let uri = dir.path().to_str().unwrap();
2168 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2169 seed_person_row(&mut db, "Alice", Some(30)).await;
2170 let before_version = db.snapshot().await.version();
2171
2172 let desired = TEST_SCHEMA
2173 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
2174 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
2175 .replace(
2176 "edge WorksAt: Person -> Company",
2177 "edge WorksAt: Human -> Company",
2178 );
2179 db.apply_schema(&desired).await.unwrap();
2180
2181 let head = db.snapshot().await;
2182 assert!(head.entry("node:Person").is_none());
2183 assert!(head.entry("node:Human").is_some());
2184 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
2185 .await
2186 .unwrap();
2187 assert!(historical.entry("node:Person").is_some());
2188 assert!(historical.entry("node:Human").is_none());
2189 }
2190
2191 #[tokio::test]
2192 async fn test_apply_schema_succeeds_after_load() {
2193 let dir = tempfile::tempdir().unwrap();
2200 let uri = dir.path().to_str().unwrap();
2201 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2202
2203 crate::loader::load_jsonl(
2204 &mut db,
2205 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2206 crate::loader::LoadMode::Overwrite,
2207 )
2208 .await
2209 .unwrap();
2210
2211 let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
2212 assert!(
2213 !all_branches.iter().any(|b| is_internal_run_branch(b)),
2214 "run branch should be deleted after publish, got: {:?}",
2215 all_branches
2216 );
2217
2218 let desired = TEST_SCHEMA.replace(
2219 " age: I32?\n}",
2220 " age: I32?\n nickname: String?\n}",
2221 );
2222 let result = db.apply_schema(&desired).await.unwrap();
2223 assert!(result.applied, "schema apply should have applied");
2224 }
2225
2226 #[tokio::test]
2227 async fn test_apply_schema_adds_index_for_existing_property() {
2228 let dir = tempfile::tempdir().unwrap();
2229 let uri = dir.path().to_str().unwrap();
2230 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2231
2232 let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2233 db.apply_schema(&desired).await.unwrap();
2234
2235 let snapshot = db.snapshot().await;
2236 let ds = snapshot.open("node:Person").await.unwrap();
2237 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2238 }
2239
2240 #[tokio::test]
2241 async fn test_apply_schema_rewrite_preserves_existing_indices() {
2242 let dir = tempfile::tempdir().unwrap();
2243 let uri = dir.path().to_str().unwrap();
2244 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
2245 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
2246 seed_person_row(&mut db, "Alice", Some(30)).await;
2247
2248 let desired = initial_schema.replace(
2249 " age: I32?\n}",
2250 " age: I32?\n nickname: String?\n}",
2251 );
2252 db.apply_schema(&desired).await.unwrap();
2253
2254 let snapshot = db.snapshot().await;
2255 let ds = snapshot.open("node:Person").await.unwrap();
2256 assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
2257 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
2258 }
2259
2260 #[tokio::test]
2261 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
2262 let dir = tempfile::tempdir().unwrap();
2263 let uri = dir.path().to_str().unwrap();
2264 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2265 let mut db = db;
2266 db.coordinator
2267 .write()
2268 .await
2269 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2270 .await
2271 .unwrap();
2272
2273 let err = db
2274 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
2275 .await
2276 .unwrap_err();
2277 assert!(
2278 err.to_string()
2279 .contains("write is unavailable while schema apply is in progress")
2280 );
2281 }
2282
2283 #[tokio::test]
2284 async fn test_commit_updates_rejects_while_schema_apply_locked() {
2285 let dir = tempfile::tempdir().unwrap();
2286 let uri = dir.path().to_str().unwrap();
2287 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2288 db.coordinator
2289 .write()
2290 .await
2291 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2292 .await
2293 .unwrap();
2294
2295 let err = db.commit_updates(&[]).await.unwrap_err();
2296 assert!(
2297 err.to_string()
2298 .contains("write commit is unavailable while schema apply is in progress")
2299 );
2300 }
2301
2302 #[tokio::test]
2303 async fn test_branch_list_hides_schema_apply_lock_branch() {
2304 let dir = tempfile::tempdir().unwrap();
2305 let uri = dir.path().to_str().unwrap();
2306 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2307 db.coordinator
2308 .write()
2309 .await
2310 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2311 .await
2312 .unwrap();
2313
2314 let branches = db.branch_list().await.unwrap();
2315 assert_eq!(branches, vec!["main".to_string()]);
2316 }
2317}