1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21 DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
22 build_catalog_from_ir, build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
37pub use schema_apply::SchemaApplyOptions;
38
39use super::commit_graph::GraphCommit;
40use super::manifest::{
41 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
42 table_path_for_table_key,
43};
44use super::schema_state::{
45 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
46 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
47 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
48 write_schema_contract, write_schema_contract_staging,
49};
50use super::{
51 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
52 is_schema_apply_lock_branch,
53};
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum MergeOutcome {
57 AlreadyUpToDate,
58 FastForward,
59 Merged,
60}
61
62#[derive(Debug, Clone)]
63pub struct SchemaApplyResult {
64 pub supported: bool,
65 pub applied: bool,
66 pub manifest_version: u64,
67 pub steps: Vec<SchemaMigrationStep>,
68}
69
70pub struct Omnigraph {
76 root_uri: String,
77 storage: Arc<dyn StorageAdapter>,
78 coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
95 table_store: TableStore,
96 runtime_cache: RuntimeCache,
97 catalog: Arc<ArcSwap<Catalog>>,
102 schema_source: Arc<ArcSwap<String>>,
105 write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
110 merge_exclusive: Arc<tokio::sync::Mutex<()>>,
132 policy: Option<Arc<dyn omnigraph_policy::PolicyChecker>>,
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub enum OpenMode {
161 ReadWrite,
163 ReadOnly,
166}
167
168impl Omnigraph {
169 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
173 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
174 }
175
176 pub(crate) async fn init_with_storage(
177 uri: &str,
178 schema_source: &str,
179 storage: Arc<dyn StorageAdapter>,
180 ) -> Result<Self> {
181 let root = normalize_root_uri(uri)?;
182 let schema_ir = read_schema_ir_from_source(schema_source)?;
183 let mut catalog = build_catalog_from_ir(&schema_ir)?;
184 fixup_blob_schemas(&mut catalog);
185
186 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
188 storage.write_text(&schema_path, schema_source).await?;
189 write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
190
191 let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
193
194 Ok(Self {
195 root_uri: root.clone(),
196 storage,
197 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
198 table_store: TableStore::new(&root),
199 runtime_cache: RuntimeCache::default(),
200 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
201 schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
202 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
203 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
204 policy: None,
205 })
206 }
207
208 pub async fn open(uri: &str) -> Result<Self> {
213 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
214 }
215
216 pub async fn open_read_only(uri: &str) -> Result<Self> {
219 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
220 }
221
222 pub(crate) async fn open_with_storage(
225 uri: &str,
226 storage: Arc<dyn StorageAdapter>,
227 ) -> Result<Self> {
228 Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
229 }
230
231 pub(crate) async fn open_with_storage_and_mode(
232 uri: &str,
233 storage: Arc<dyn StorageAdapter>,
234 mode: OpenMode,
235 ) -> Result<Self> {
236 let root = normalize_root_uri(uri)?;
237 let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
240 if matches!(mode, OpenMode::ReadWrite) {
250 let schema_state_recovery =
251 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
252 .await?;
253 crate::db::manifest::recover_manifest_drift(
259 &root,
260 Arc::clone(&storage),
261 &mut coordinator,
262 crate::db::manifest::RecoveryMode::Full,
263 schema_state_recovery,
264 )
265 .await?;
266 }
267 let schema_path = schema_source_uri(&root);
269 let schema_source = storage.read_text(&schema_path).await?;
270 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
271 let branches = coordinator.branch_list().await?;
272 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
273 &root,
274 Arc::clone(&storage),
275 &branches,
276 ¤t_source_ir,
277 )
278 .await?;
279 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
280 fixup_blob_schemas(&mut catalog);
281
282 Ok(Self {
283 root_uri: root.clone(),
284 storage,
285 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
286 table_store: TableStore::new(&root),
287 runtime_cache: RuntimeCache::default(),
288 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
289 schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
290 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
291 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
292 policy: None,
293 })
294 }
295
296 pub fn catalog(&self) -> Arc<Catalog> {
300 self.catalog.load_full()
301 }
302
303 pub fn schema_source(&self) -> Arc<String> {
305 self.schema_source.load_full()
306 }
307
308 pub(crate) fn store_catalog(&self, catalog: Catalog) {
312 self.catalog.store(Arc::new(catalog));
313 }
314
315 pub(crate) fn store_schema_source(&self, schema_source: String) {
318 self.schema_source.store(Arc::new(schema_source));
319 }
320
321 pub fn uri(&self) -> &str {
322 &self.root_uri
323 }
324
325 pub fn with_policy(mut self, checker: Arc<dyn omnigraph_policy::PolicyChecker>) -> Self {
336 self.policy = Some(checker);
337 self
338 }
339
340 pub(crate) fn enforce(
352 &self,
353 action: omnigraph_policy::PolicyAction,
354 scope: &omnigraph_policy::ResourceScope,
355 actor: Option<&str>,
356 ) -> Result<()> {
357 let Some(checker) = self.policy.as_ref() else {
358 return Ok(());
359 };
360 let Some(actor) = actor else {
361 return Err(OmniError::Policy(
362 "no actor for engine-layer policy check (policy is configured but the call site \
363 didn't thread an actor through — this is almost certainly a bug, not an \
364 intended bypass)"
365 .to_string(),
366 ));
367 };
368 checker
369 .check(action, scope, actor)
370 .map_err(|err| OmniError::Policy(err.to_string()))
371 }
372
373 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
374 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
375 }
376
377 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
378 self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
379 .await
380 }
381
382 pub async fn plan_schema_with_options(
383 &self,
384 desired_schema_source: &str,
385 options: SchemaApplyOptions,
386 ) -> Result<SchemaMigrationPlan> {
387 schema_apply::plan_schema(self, desired_schema_source, options).await
388 }
389
390 pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
391 self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
392 .await
393 }
394
395 pub async fn apply_schema_with_options(
396 &self,
397 desired_schema_source: &str,
398 options: SchemaApplyOptions,
399 ) -> Result<SchemaApplyResult> {
400 self.apply_schema_as(desired_schema_source, options, None).await
401 }
402
403 pub async fn apply_schema_as(
414 &self,
415 desired_schema_source: &str,
416 options: SchemaApplyOptions,
417 actor: Option<&str>,
418 ) -> Result<SchemaApplyResult> {
419 schema_apply::apply_schema(self, desired_schema_source, options, actor).await
420 }
421
422 pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
423 schema_apply::ensure_schema_apply_idle(self, operation).await
424 }
425
426 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
427 schema_apply::ensure_schema_apply_not_locked(self, operation).await
428 }
429
430 pub(crate) fn table_store(&self) -> &TableStore {
431 &self.table_store
432 }
433
434 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
443 &self.table_store
444 }
445
446 pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
450 self.storage.as_ref()
451 }
452
453 pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
461 Arc::clone(&self.write_queue)
462 }
463
464 pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
470 Arc::clone(&self.merge_exclusive)
471 }
472
473 pub(crate) fn root_uri(&self) -> &str {
476 &self.root_uri
477 }
478
479 pub(crate) async fn open_coordinator_for_branch(
480 &self,
481 branch: Option<&str>,
482 ) -> Result<GraphCoordinator> {
483 match branch {
484 Some(branch) => {
485 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
486 }
487 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
488 }
489 }
490
491 pub(crate) async fn swap_coordinator_for_branch(
492 &self,
493 branch: Option<&str>,
494 ) -> Result<GraphCoordinator> {
495 let next = self.open_coordinator_for_branch(branch).await?;
496 let mut coord = self.coordinator.write().await;
497 Ok(std::mem::replace(&mut *coord, next))
498 }
499
500 pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
501 *self.coordinator.write().await = coordinator;
502 }
503
504 pub(crate) async fn resolved_branch_target(
505 &self,
506 branch: Option<&str>,
507 ) -> Result<ResolvedTarget> {
508 self.ensure_schema_state_valid().await?;
509 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
510 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
511 let coord = self.coordinator.read().await;
512 if normalized.as_deref() == coord.current_branch() {
513 let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
514 SnapshotId::synthetic(coord.current_branch(), coord.version())
515 });
516 return Ok(ResolvedTarget {
517 requested,
518 branch: coord.current_branch().map(str::to_string),
519 snapshot_id,
520 snapshot: coord.snapshot(),
521 });
522 }
523 coord.resolve_target(&requested).await
524 }
525
526 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
527 self.resolved_branch_target(branch)
528 .await
529 .map(|resolved| resolved.snapshot)
530 }
531
532 pub(crate) async fn version(&self) -> u64 {
533 self.coordinator.read().await.version()
534 }
535
536 pub(crate) async fn snapshot(&self) -> Snapshot {
538 self.coordinator.read().await.snapshot()
539 }
540
541 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
542 self.resolved_target(target)
543 .await
544 .map(|resolved| resolved.snapshot)
545 }
546
547 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
548 self.snapshot_of(target)
549 .await
550 .map(|snapshot| snapshot.version())
551 }
552
553 pub async fn resolved_branch_of(
554 &self,
555 target: impl Into<ReadTarget>,
556 ) -> Result<Option<String>> {
557 self.resolved_target(target)
558 .await
559 .map(|resolved| resolved.branch)
560 }
561
562 pub async fn sync_branch(&self, branch: &str) -> Result<()> {
564 self.ensure_schema_state_valid().await?;
565 let branch = normalize_branch_name(branch)?;
566 let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
567 *self.coordinator.write().await = next;
568 self.runtime_cache.invalidate_all().await;
569 Ok(())
570 }
571
572 pub async fn refresh(&self) -> Result<()> {
605 {
612 let mut coord = self.coordinator.write().await;
613 coord.refresh().await?;
614 let schema_state_recovery = recover_schema_state_files(
615 &self.root_uri,
616 Arc::clone(&self.storage),
617 &coord.snapshot(),
618 )
619 .await?;
620 crate::db::manifest::recover_manifest_drift(
621 &self.root_uri,
622 Arc::clone(&self.storage),
623 &mut *coord,
624 crate::db::manifest::RecoveryMode::RollForwardOnly,
625 schema_state_recovery,
626 )
627 .await?;
628 } self.reload_schema_if_source_changed().await?;
630 self.runtime_cache.invalidate_all().await;
631 Ok(())
632 }
633
634 async fn reload_schema_if_source_changed(&self) -> Result<()> {
635 let schema_path = schema_source_uri(&self.root_uri);
636 let schema_source = self.storage.read_text(&schema_path).await?;
637 if schema_source == *self.schema_source.load_full() {
638 return Ok(());
639 }
640 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
641 let branches = self.coordinator.read().await.branch_list().await?;
642 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
643 &self.root_uri,
644 Arc::clone(&self.storage),
645 &branches,
646 ¤t_source_ir,
647 )
648 .await?;
649 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
650 fixup_blob_schemas(&mut catalog);
651 self.store_schema_source(schema_source);
652 self.store_catalog(catalog);
653 Ok(())
654 }
655
656 pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
664 self.coordinator.write().await.refresh().await?;
665 self.runtime_cache.invalidate_all().await;
666 Ok(())
667 }
668
669 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
670 self.ensure_schema_state_valid().await?;
671 self.coordinator.read().await.resolve_snapshot_id(branch).await
672 }
673
674 pub(crate) async fn resolved_target(
675 &self,
676 target: impl Into<ReadTarget>,
677 ) -> Result<ResolvedTarget> {
678 self.ensure_schema_state_valid().await?;
679 self.coordinator.read().await.resolve_target(&target.into()).await
680 }
681
682 pub async fn diff_between(
685 &self,
686 from: impl Into<ReadTarget>,
687 to: impl Into<ReadTarget>,
688 filter: &crate::changes::ChangeFilter,
689 ) -> Result<crate::changes::ChangeSet> {
690 let from_resolved = self.resolved_target(from).await?;
691 let to_resolved = self.resolved_target(to).await?;
692 crate::changes::diff_snapshots(
693 self.uri(),
694 &from_resolved.snapshot,
695 &to_resolved.snapshot,
696 filter,
697 to_resolved.branch.clone().or(from_resolved.branch.clone()),
698 )
699 .await
700 }
701
702 pub async fn diff_commits(
705 &self,
706 from_commit_id: &str,
707 to_commit_id: &str,
708 filter: &crate::changes::ChangeFilter,
709 ) -> Result<crate::changes::ChangeSet> {
710 let coord = self.coordinator.read().await;
711 let from_commit = coord.resolve_commit(&SnapshotId::new(from_commit_id)).await?;
712 let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
713 let from_snap = coord
714 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
715 from_commit.graph_commit_id.clone(),
716 )))
717 .await?;
718 let to_snap = coord
719 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
720 to_commit.graph_commit_id.clone(),
721 )))
722 .await?;
723 drop(coord);
724 crate::changes::diff_snapshots(
725 self.uri(),
726 &from_snap.snapshot,
727 &to_snap.snapshot,
728 filter,
729 to_snap.branch.clone().or(from_snap.branch.clone()),
730 )
731 .await
732 }
733
734 pub async fn entity_at_target(
735 &self,
736 target: impl Into<ReadTarget>,
737 table_key: &str,
738 id: &str,
739 ) -> Result<Option<serde_json::Value>> {
740 export::entity_at_target(self, target, table_key, id).await
741 }
742
743 pub async fn entity_at(
745 &self,
746 table_key: &str,
747 id: &str,
748 version: u64,
749 ) -> Result<Option<serde_json::Value>> {
750 export::entity_at(self, table_key, id, version).await
751 }
752
753 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
755 self.ensure_schema_state_valid().await?;
756 self.coordinator.read().await.snapshot_at_version(version).await
757 }
758
759 pub async fn export_jsonl(
760 &self,
761 branch: &str,
762 type_names: &[String],
763 table_keys: &[String],
764 ) -> Result<String> {
765 export::export_jsonl(self, branch, type_names, table_keys).await
766 }
767
768 pub async fn export_jsonl_to_writer<W: Write>(
769 &self,
770 branch: &str,
771 type_names: &[String],
772 table_keys: &[String],
773 writer: &mut W,
774 ) -> Result<()> {
775 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
776 }
777
778 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
782 table_ops::graph_index(self).await
783 }
784
785 pub(crate) async fn graph_index_for_resolved(
786 &self,
787 resolved: &ResolvedTarget,
788 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
789 table_ops::graph_index_for_resolved(self, resolved).await
790 }
791
792 pub async fn ensure_indices(&self) -> Result<()> {
805 table_ops::ensure_indices(self).await
806 }
807
808 pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
809 table_ops::ensure_indices_on(self, branch).await
810 }
811
812 #[cfg(feature = "failpoints")]
813 #[doc(hidden)]
814 pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
815 &mut self,
816 branch: &str,
817 table_key: &str,
818 table_branch: Option<&str>,
819 ) -> Result<u64> {
820 table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
821 self,
822 branch,
823 table_key,
824 table_branch,
825 )
826 .await
827 }
828
829 pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
832 optimize::optimize_all_tables(self).await
833 }
834
835 pub async fn cleanup(
839 &mut self,
840 options: optimize::CleanupPolicyOptions,
841 ) -> Result<Vec<optimize::TableCleanupStats>> {
842 optimize::cleanup_all_tables(self, options).await
843 }
844
845 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
855 self.ensure_schema_state_valid().await?;
856 let catalog = self.catalog();
857 let node_type = catalog
858 .node_types
859 .get(type_name)
860 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
861 if !node_type.blob_properties.contains(property) {
862 return Err(OmniError::manifest(format!(
863 "property '{}' on type '{}' is not a Blob",
864 property, type_name
865 )));
866 }
867
868 let snapshot = self.snapshot().await;
869 let table_key = format!("node:{}", type_name);
870 let ds = snapshot.open(&table_key).await?;
871
872 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
873 let row_id = self
874 .table_store
875 .first_row_id_for_filter(&ds, &filter_sql)
876 .await?
877 .ok_or_else(|| {
878 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
879 })?;
880
881 let ds = Arc::new(ds);
883 let mut blobs = ds
884 .take_blobs(&[row_id], property)
885 .await
886 .map_err(|e| OmniError::Lance(e.to_string()))?;
887
888 blobs.pop().ok_or_else(|| {
889 OmniError::manifest(format!(
890 "blob '{}' on {} '{}' returned no data",
891 property, type_name, id
892 ))
893 })
894 }
895
896 pub(crate) async fn active_branch(&self) -> Option<String> {
897 self.coordinator.read().await.current_branch().map(str::to_string)
898 }
899
900 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
901 let descendants = self.coordinator.read().await.branch_descendants(branch).await?;
902 if let Some(descendant) = descendants.first() {
903 return Err(OmniError::manifest_conflict(format!(
904 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
905 branch, descendant
906 )));
907 }
908
909 for other_branch in branches
910 .iter()
911 .filter(|candidate| candidate.as_str() != branch)
912 {
913 let snapshot = self
914 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
915 .await?;
916 if snapshot
917 .entries()
918 .any(|entry| entry.table_branch.as_deref() == Some(branch))
919 {
920 return Err(OmniError::manifest_conflict(format!(
921 "cannot delete branch '{}' because branch '{}' still depends on it",
922 branch, other_branch
923 )));
924 }
925 }
926
927 Ok(())
928 }
929
930 async fn cleanup_deleted_branch_tables(
931 &self,
932 branch: &str,
933 owned_tables: &[(String, String)],
934 ) -> Result<()> {
935 let mut seen_paths = HashSet::new();
936 let mut cleanup_targets = owned_tables
937 .iter()
938 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
939 .cloned()
940 .collect::<Vec<_>>();
941 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
942
943 for (table_key, table_path) in cleanup_targets {
944 let dataset_uri = self.table_store.dataset_uri(&table_path);
945 if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
946 return Err(OmniError::manifest_internal(format!(
947 "branch '{}' was deleted but cleanup failed for {}: {}",
948 branch, table_key, err
949 )));
950 }
951 }
952
953 Ok(())
954 }
955
956 async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
957 let active = self.coordinator.read().await.current_branch().map(str::to_string);
958 if active.as_deref() == Some(branch) {
959 return Err(OmniError::manifest_conflict(format!(
960 "cannot delete currently active branch '{}'",
961 branch
962 )));
963 }
964
965 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
966 let owned_tables = branch_snapshot
967 .entries()
968 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
969 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
970 .collect::<Vec<_>>();
971
972 self.coordinator.write().await.branch_delete(branch).await?;
973 self.cleanup_deleted_branch_tables(branch, &owned_tables)
974 .await
975 }
976
977 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
978 normalize_branch_name(branch)
979 }
980
981 pub(crate) async fn head_commit_id_for_branch(
982 &self,
983 branch: Option<&str>,
984 ) -> Result<Option<String>> {
985 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
986 coordinator.ensure_commit_graph_initialized().await?;
987 coordinator
988 .head_commit_id()
989 .await
990 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
991 }
992
993 pub async fn branch_create(&self, name: &str) -> Result<()> {
994 self.branch_create_as(name, None).await
995 }
996
997 pub async fn branch_create_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1005 self.enforce(
1006 omnigraph_policy::PolicyAction::BranchCreate,
1007 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1008 actor,
1009 )?;
1010 self.ensure_schema_state_valid().await?;
1011 self.ensure_schema_apply_idle("branch_create").await?;
1012 ensure_public_branch_ref(name, "branch_create")?;
1013 self.coordinator.write().await.branch_create(name).await
1014 }
1015
1016 pub async fn branch_create_from(
1017 &self,
1018 from: impl Into<ReadTarget>,
1019 name: &str,
1020 ) -> Result<()> {
1021 self.branch_create_from_as(from, name, None).await
1022 }
1023
1024 pub async fn branch_create_from_as(
1036 &self,
1037 from: impl Into<ReadTarget>,
1038 name: &str,
1039 actor: Option<&str>,
1040 ) -> Result<()> {
1041 let target = from.into();
1042 let source_branch = match &target {
1043 ReadTarget::Branch(b) => b.clone(),
1044 _ => "<snapshot>".to_string(),
1045 };
1046 self.enforce(
1047 omnigraph_policy::PolicyAction::BranchCreate,
1048 &omnigraph_policy::ResourceScope::BranchTransition {
1049 source: source_branch,
1050 target: name.to_string(),
1051 },
1052 actor,
1053 )?;
1054 self.ensure_schema_apply_idle("branch_create_from").await?;
1055 self.branch_create_from_impl(target, name, false).await
1056 }
1057
1058 async fn branch_create_from_impl(
1059 &self,
1060 from: impl Into<ReadTarget>,
1061 name: &str,
1062 allow_internal_refs: bool,
1063 ) -> Result<()> {
1064 let target = from.into();
1065 let ReadTarget::Branch(branch_name) = target else {
1066 return Err(OmniError::manifest(
1067 "branch creation from pinned snapshots is not supported yet".to_string(),
1068 ));
1069 };
1070 if !allow_internal_refs {
1071 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
1072 ensure_public_branch_ref(name, "branch_create_from")?;
1073 }
1074 let branch = normalize_branch_name(&branch_name)?;
1075 let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
1092 source_coord.branch_create(name).await
1093 }
1094
1095 pub async fn branch_list(&self) -> Result<Vec<String>> {
1096 self.ensure_schema_state_valid().await?;
1097 self.coordinator.read().await.branch_list().await
1098 }
1099
1100 pub async fn branch_delete(&self, name: &str) -> Result<()> {
1101 self.branch_delete_as(name, None).await
1102 }
1103
1104 pub async fn branch_delete_as(&self, name: &str, actor: Option<&str>) -> Result<()> {
1112 self.enforce(
1113 omnigraph_policy::PolicyAction::BranchDelete,
1114 &omnigraph_policy::ResourceScope::TargetBranch(name.to_string()),
1115 actor,
1116 )?;
1117 self.ensure_schema_state_valid().await?;
1118 self.ensure_schema_apply_idle("branch_delete").await?;
1119 ensure_public_branch_ref(name, "branch_delete")?;
1120 self.refresh().await?;
1121 let branch = normalize_branch_name(name)?
1122 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
1123 let branches = self.coordinator.read().await.branch_list().await?;
1124 if !branches.iter().any(|candidate| candidate == &branch) {
1125 return Err(OmniError::manifest_not_found(format!(
1126 "branch '{}' not found",
1127 branch
1128 )));
1129 }
1130
1131 self.ensure_branch_delete_safe(&branch, &branches).await?;
1132 self.delete_branch_storage_only(&branch).await
1133 }
1134
1135 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
1136 self.ensure_schema_state_valid().await?;
1137 self.coordinator.read().await
1138 .resolve_commit(&SnapshotId::new(commit_id))
1139 .await
1140 }
1141
1142 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
1143 self.ensure_schema_state_valid().await?;
1144 let branch = match branch {
1145 Some(branch) => normalize_branch_name(branch)?,
1146 None => None,
1147 };
1148 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
1149 coordinator.list_commits().await
1150 }
1151
1152 pub(crate) async fn open_for_mutation(
1158 &self,
1159 table_key: &str,
1160 op_kind: crate::db::MutationOpKind,
1161 ) -> Result<(Dataset, String, Option<String>)> {
1162 table_ops::open_for_mutation(self, table_key, op_kind).await
1163 }
1164
1165 pub(crate) async fn open_for_mutation_on_branch(
1166 &self,
1167 branch: Option<&str>,
1168 table_key: &str,
1169 op_kind: crate::db::MutationOpKind,
1170 ) -> Result<(Dataset, String, Option<String>)> {
1171 table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1172 }
1173
1174 pub(crate) async fn fork_dataset_from_entry_state(
1175 &self,
1176 table_key: &str,
1177 full_path: &str,
1178 source_branch: Option<&str>,
1179 source_version: u64,
1180 active_branch: &str,
1181 ) -> Result<Dataset> {
1182 table_ops::fork_dataset_from_entry_state(
1183 self,
1184 table_key,
1185 full_path,
1186 source_branch,
1187 source_version,
1188 active_branch,
1189 )
1190 .await
1191 }
1192
1193 pub(crate) async fn reopen_for_mutation(
1194 &self,
1195 table_key: &str,
1196 full_path: &str,
1197 table_branch: Option<&str>,
1198 expected_version: u64,
1199 op_kind: crate::db::MutationOpKind,
1200 ) -> Result<Dataset> {
1201 table_ops::reopen_for_mutation(
1202 self,
1203 table_key,
1204 full_path,
1205 table_branch,
1206 expected_version,
1207 op_kind,
1208 )
1209 .await
1210 }
1211
1212 pub(crate) async fn open_dataset_at_state(
1213 &self,
1214 table_path: &str,
1215 table_branch: Option<&str>,
1216 table_version: u64,
1217 ) -> Result<Dataset> {
1218 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1219 }
1220
1221 pub(crate) async fn build_indices_on_dataset(
1222 &self,
1223 table_key: &str,
1224 ds: &mut Dataset,
1225 ) -> Result<()> {
1226 table_ops::build_indices_on_dataset(self, table_key, ds).await
1227 }
1228
1229 pub(crate) async fn build_indices_on_dataset_for_catalog(
1230 &self,
1231 catalog: &Catalog,
1232 table_key: &str,
1233 ds: &mut Dataset,
1234 ) -> Result<()> {
1235 table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1236 }
1237
1238 #[cfg(test)]
1241 pub(crate) async fn commit_updates(
1242 &mut self,
1243 updates: &[crate::db::SubTableUpdate],
1244 ) -> Result<u64> {
1245 table_ops::commit_updates(self, updates).await
1246 }
1247
1248 pub(crate) async fn commit_manifest_updates(
1249 &self,
1250 updates: &[crate::db::SubTableUpdate],
1251 ) -> Result<u64> {
1252 table_ops::commit_manifest_updates(self, updates).await
1253 }
1254
1255 pub(crate) async fn record_merge_commit(
1256 &self,
1257 manifest_version: u64,
1258 parent_commit_id: &str,
1259 merged_parent_commit_id: &str,
1260 actor_id: Option<&str>,
1261 ) -> Result<String> {
1262 table_ops::record_merge_commit(
1263 self,
1264 manifest_version,
1265 parent_commit_id,
1266 merged_parent_commit_id,
1267 actor_id,
1268 )
1269 .await
1270 }
1271
1272 pub(crate) async fn commit_updates_on_branch_with_expected(
1273 &self,
1274 branch: Option<&str>,
1275 updates: &[crate::db::SubTableUpdate],
1276 expected_table_versions: &std::collections::HashMap<String, u64>,
1277 actor_id: Option<&str>,
1278 ) -> Result<u64> {
1279 table_ops::commit_updates_on_branch_with_expected(
1280 self,
1281 branch,
1282 updates,
1283 expected_table_versions,
1284 actor_id,
1285 )
1286 .await
1287 }
1288
1289 pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1290 table_ops::ensure_commit_graph_initialized(self).await
1291 }
1292
1293 pub(crate) async fn invalidate_graph_index(&self) {
1295 table_ops::invalidate_graph_index(self).await
1296 }
1297}
1298
1299pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1300 let branch = branch.trim();
1301 if branch.is_empty() {
1302 return Err(OmniError::manifest(
1303 "branch name cannot be empty".to_string(),
1304 ));
1305 }
1306 if branch == "main" {
1307 return Ok(None);
1308 }
1309 Ok(Some(branch.to_string()))
1310}
1311
1312pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1313 if super::is_internal_run_branch(branch) {
1314 return Err(OmniError::manifest(format!(
1315 "{} does not allow internal run ref '{}'",
1316 operation, branch
1317 )));
1318 }
1319 if is_internal_system_branch(branch) {
1320 return Err(OmniError::manifest(format!(
1321 "{} does not allow internal system ref '{}'",
1322 operation, branch
1323 )));
1324 }
1325 Ok(())
1326}
1327
1328fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1329 if batches.is_empty() {
1330 return Ok(RecordBatch::new_empty(schema));
1331 }
1332 if batches.len() == 1 {
1333 return Ok(batches.into_iter().next().unwrap());
1334 }
1335 let batch_schema = batches[0].schema();
1336 arrow_select::concat::concat_batches(&batch_schema, &batches)
1337 .map_err(|e| OmniError::Lance(e.to_string()))
1338}
1339
1340fn blob_properties_for_table_key<'a>(
1341 catalog: &'a Catalog,
1342 table_key: &str,
1343) -> Result<&'a std::collections::HashSet<String>> {
1344 if let Some(type_name) = table_key.strip_prefix("node:") {
1345 return catalog
1346 .node_types
1347 .get(type_name)
1348 .map(|node_type| &node_type.blob_properties)
1349 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1350 }
1351 if let Some(type_name) = table_key.strip_prefix("edge:") {
1352 return catalog
1353 .edge_types
1354 .get(type_name)
1355 .map(|edge_type| &edge_type.blob_properties)
1356 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1357 }
1358 Err(OmniError::manifest(format!(
1359 "invalid table key '{}'",
1360 table_key
1361 )))
1362}
1363
1364fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1365 if descriptions.is_null(row) {
1366 return Ok(true);
1367 }
1368
1369 let kind = descriptions
1370 .column_by_name("kind")
1371 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1372 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1373 .or_else(|| {
1374 descriptions
1375 .column_by_name("kind")
1376 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1377 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1378 });
1379 let position = descriptions
1380 .column_by_name("position")
1381 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1382 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1383 let size = descriptions
1384 .column_by_name("size")
1385 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1386 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1387 let blob_uri = descriptions
1388 .column_by_name("blob_uri")
1389 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1390 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1391
1392 let Some(kind) = kind else {
1393 return Ok(true);
1394 };
1395 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1396 if kind != BlobKind::Inline {
1397 return Ok(false);
1398 }
1399
1400 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1401}
1402
1403fn fixup_blob_schemas(catalog: &mut Catalog) {
1409 for node_type in catalog.node_types.values_mut() {
1410 if node_type.blob_properties.is_empty() {
1411 continue;
1412 }
1413 let fields: Vec<Field> = node_type
1414 .arrow_schema
1415 .fields()
1416 .iter()
1417 .map(|f| {
1418 if node_type.blob_properties.contains(f.name()) {
1419 blob_field(f.name(), f.is_nullable())
1420 } else {
1421 f.as_ref().clone()
1422 }
1423 })
1424 .collect();
1425 node_type.arrow_schema = Arc::new(Schema::new(fields));
1426 }
1427 for edge_type in catalog.edge_types.values_mut() {
1428 if edge_type.blob_properties.is_empty() {
1429 continue;
1430 }
1431 let fields: Vec<Field> = edge_type
1432 .arrow_schema
1433 .fields()
1434 .iter()
1435 .map(|f| {
1436 if edge_type.blob_properties.contains(f.name()) {
1437 blob_field(f.name(), f.is_nullable())
1438 } else {
1439 f.as_ref().clone()
1440 }
1441 })
1442 .collect();
1443 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1444 }
1445}
1446
1447fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1448 let schema_ast = parse_schema(schema_source)?;
1449 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1450}
1451
1452fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1453 match type_kind {
1454 SchemaTypeKind::Node => format!("node:{}", name),
1455 SchemaTypeKind::Edge => format!("edge:{}", name),
1456 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1457 }
1458}
1459
1460fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1461 if let Some(type_name) = table_key.strip_prefix("node:") {
1462 let node_type: &NodeType = catalog
1463 .node_types
1464 .get(type_name)
1465 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1466 return Ok(node_type.arrow_schema.clone());
1467 }
1468 if let Some(type_name) = table_key.strip_prefix("edge:") {
1469 let edge_type: &EdgeType = catalog
1470 .edge_types
1471 .get(type_name)
1472 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1473 return Ok(edge_type.arrow_schema.clone());
1474 }
1475 Err(OmniError::manifest(format!(
1476 "invalid table key '{}'",
1477 table_key
1478 )))
1479}
1480
1481fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1482 let mut obj = serde_json::Map::new();
1483 for (i, field) in batch.schema().fields().iter().enumerate() {
1484 obj.insert(
1485 field.name().clone(),
1486 json_value_from_array(batch.column(i).as_ref(), row)?,
1487 );
1488 }
1489 Ok(serde_json::Value::Object(obj))
1490}
1491
1492fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1493 if array.is_null(row) {
1494 return Ok(serde_json::Value::Null);
1495 }
1496
1497 match array.data_type() {
1498 DataType::Utf8 => Ok(serde_json::Value::String(
1499 array
1500 .as_any()
1501 .downcast_ref::<StringArray>()
1502 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1503 .value(row)
1504 .to_string(),
1505 )),
1506 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1507 array
1508 .as_any()
1509 .downcast_ref::<LargeStringArray>()
1510 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1511 .value(row)
1512 .to_string(),
1513 )),
1514 DataType::Boolean => Ok(serde_json::Value::Bool(
1515 array
1516 .as_any()
1517 .downcast_ref::<BooleanArray>()
1518 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1519 .value(row),
1520 )),
1521 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1522 array
1523 .as_any()
1524 .downcast_ref::<Int32Array>()
1525 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1526 .value(row),
1527 ))),
1528 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1529 array
1530 .as_any()
1531 .downcast_ref::<Int64Array>()
1532 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1533 .value(row),
1534 ))),
1535 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1536 array
1537 .as_any()
1538 .downcast_ref::<UInt32Array>()
1539 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1540 .value(row),
1541 ))),
1542 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1543 array
1544 .as_any()
1545 .downcast_ref::<UInt64Array>()
1546 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1547 .value(row),
1548 ))),
1549 DataType::Float32 => {
1550 let value = array
1551 .as_any()
1552 .downcast_ref::<Float32Array>()
1553 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1554 .value(row) as f64;
1555 Ok(serde_json::Value::Number(
1556 serde_json::Number::from_f64(value).ok_or_else(|| {
1557 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1558 })?,
1559 ))
1560 }
1561 DataType::Float64 => {
1562 let value = array
1563 .as_any()
1564 .downcast_ref::<Float64Array>()
1565 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1566 .value(row);
1567 Ok(serde_json::Value::Number(
1568 serde_json::Number::from_f64(value).ok_or_else(|| {
1569 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1570 })?,
1571 ))
1572 }
1573 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1574 array
1575 .as_any()
1576 .downcast_ref::<Date32Array>()
1577 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1578 .value(row),
1579 ))),
1580 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1581 &base64::engine::general_purpose::STANDARD,
1582 array
1583 .as_any()
1584 .downcast_ref::<BinaryArray>()
1585 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1586 .value(row),
1587 ))),
1588 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1589 &base64::engine::general_purpose::STANDARD,
1590 array
1591 .as_any()
1592 .downcast_ref::<LargeBinaryArray>()
1593 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1594 .value(row),
1595 ))),
1596 DataType::List(_) => {
1597 let list = array
1598 .as_any()
1599 .downcast_ref::<ListArray>()
1600 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1601 let values = list.value(row);
1602 let mut out = Vec::with_capacity(values.len());
1603 for idx in 0..values.len() {
1604 out.push(json_value_from_array(values.as_ref(), idx)?);
1605 }
1606 Ok(serde_json::Value::Array(out))
1607 }
1608 DataType::LargeList(_) => {
1609 let list = array
1610 .as_any()
1611 .downcast_ref::<LargeListArray>()
1612 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1613 let values = list.value(row);
1614 let mut out = Vec::with_capacity(values.len());
1615 for idx in 0..values.len() {
1616 out.push(json_value_from_array(values.as_ref(), idx)?);
1617 }
1618 Ok(serde_json::Value::Array(out))
1619 }
1620 DataType::FixedSizeList(_, _) => {
1621 let list = array
1622 .as_any()
1623 .downcast_ref::<FixedSizeListArray>()
1624 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1625 let values = list.value(row);
1626 let mut out = Vec::with_capacity(values.len());
1627 for idx in 0..values.len() {
1628 out.push(json_value_from_array(values.as_ref(), idx)?);
1629 }
1630 Ok(serde_json::Value::Array(out))
1631 }
1632 DataType::Struct(fields) => {
1633 let struct_array = array
1634 .as_any()
1635 .downcast_ref::<StructArray>()
1636 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1637 let mut obj = serde_json::Map::new();
1638 for (field_idx, field) in fields.iter().enumerate() {
1639 obj.insert(
1640 field.name().clone(),
1641 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1642 );
1643 }
1644 Ok(serde_json::Value::Object(obj))
1645 }
1646 _ => {
1647 let value = arrow_cast::display::array_value_to_string(array, row)
1648 .map_err(|e| OmniError::Lance(e.to_string()))?;
1649 Ok(serde_json::Value::String(value))
1650 }
1651 }
1652}
1653
1654#[cfg(test)]
1655mod tests {
1656 use super::*;
1657 use crate::db::is_internal_run_branch;
1658 use crate::db::manifest::ManifestCoordinator;
1659 use async_trait::async_trait;
1660 use serde_json::Value;
1661 use std::sync::Mutex;
1662
1663 use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1664
1665 const TEST_SCHEMA: &str = r#"
1666node Person {
1667 name: String @key
1668 age: I32?
1669}
1670node Company {
1671 name: String @key
1672}
1673edge Knows: Person -> Person {
1674 since: Date?
1675}
1676edge WorksAt: Person -> Company
1677"#;
1678
1679 #[derive(Debug, Default)]
1680 struct RecordingStorageAdapter {
1681 inner: LocalStorageAdapter,
1682 reads: Mutex<Vec<String>>,
1683 writes: Mutex<Vec<String>>,
1684 exists_checks: Mutex<Vec<String>>,
1685 renames: Mutex<Vec<(String, String)>>,
1686 deletes: Mutex<Vec<String>>,
1687 }
1688
1689 impl RecordingStorageAdapter {
1690 fn reads(&self) -> Vec<String> {
1691 self.reads.lock().unwrap().clone()
1692 }
1693
1694 fn writes(&self) -> Vec<String> {
1695 self.writes.lock().unwrap().clone()
1696 }
1697
1698 fn exists_checks(&self) -> Vec<String> {
1699 self.exists_checks.lock().unwrap().clone()
1700 }
1701 }
1702
1703 #[async_trait]
1704 impl StorageAdapter for RecordingStorageAdapter {
1705 async fn read_text(&self, uri: &str) -> Result<String> {
1706 self.reads.lock().unwrap().push(uri.to_string());
1707 self.inner.read_text(uri).await
1708 }
1709
1710 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1711 self.writes.lock().unwrap().push(uri.to_string());
1712 self.inner.write_text(uri, contents).await
1713 }
1714
1715 async fn exists(&self, uri: &str) -> Result<bool> {
1716 self.exists_checks.lock().unwrap().push(uri.to_string());
1717 self.inner.exists(uri).await
1718 }
1719
1720 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1721 self.renames
1722 .lock()
1723 .unwrap()
1724 .push((from_uri.to_string(), to_uri.to_string()));
1725 self.inner.rename_text(from_uri, to_uri).await
1726 }
1727
1728 async fn delete(&self, uri: &str) -> Result<()> {
1729 self.deletes.lock().unwrap().push(uri.to_string());
1730 self.inner.delete(uri).await
1731 }
1732
1733 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1734 self.inner.list_dir(dir_uri).await
1735 }
1736 }
1737
1738 #[tokio::test]
1739 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1740 let dir = tempfile::tempdir().unwrap();
1741 let uri = dir.path().to_str().unwrap();
1742 let adapter = Arc::new(RecordingStorageAdapter::default());
1743
1744 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1745 .await
1746 .unwrap();
1747 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1748 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1749 assert!(
1750 adapter
1751 .writes()
1752 .contains(&join_uri(uri, "__schema_state.json"))
1753 );
1754
1755 Omnigraph::open_with_storage(uri, adapter.clone())
1756 .await
1757 .unwrap();
1758 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1759 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1760 assert!(
1761 adapter
1762 .reads()
1763 .contains(&join_uri(uri, "__schema_state.json"))
1764 );
1765 assert!(
1766 adapter
1767 .exists_checks()
1768 .contains(&join_uri(uri, "_schema.ir.json"))
1769 );
1770 assert!(
1771 adapter
1772 .exists_checks()
1773 .contains(&join_uri(uri, "__schema_state.json"))
1774 );
1775 assert!(
1776 adapter
1777 .exists_checks()
1778 .contains(&join_uri(uri, "_graph_commits.lance"))
1779 );
1780 }
1781
1782 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1783 let snapshot = db.snapshot().await;
1784 let ds = snapshot.open(table_key).await.unwrap();
1785 let batches = db.table_store().scan_batches(&ds).await.unwrap();
1786 batches
1787 .into_iter()
1788 .flat_map(|batch| {
1789 (0..batch.num_rows())
1790 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1791 .collect::<Vec<_>>()
1792 })
1793 .collect()
1794 }
1795
1796 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1797 let (mut ds, full_path, table_branch) = db
1798 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
1799 .await
1800 .unwrap();
1801 let schema: Arc<Schema> = Arc::new(ds.schema().into());
1802 let columns: Vec<Arc<dyn Array>> = schema
1803 .fields()
1804 .iter()
1805 .map(|field| match field.name().as_str() {
1806 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1807 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1808 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1809 _ => new_null_array(field.data_type(), 1),
1810 })
1811 .collect();
1812 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1813 let state = db
1814 .table_store()
1815 .append_batch(&full_path, &mut ds, batch)
1816 .await
1817 .unwrap();
1818 db.commit_updates(&[crate::db::SubTableUpdate {
1819 table_key: "node:Person".to_string(),
1820 table_version: state.version,
1821 table_branch,
1822 row_count: state.row_count,
1823 version_metadata: state.version_metadata,
1824 }])
1825 .await
1826 .unwrap();
1827 }
1828
1829 #[tokio::test]
1830 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1831 let dir = tempfile::tempdir().unwrap();
1832 let uri = dir.path().to_str().unwrap();
1833 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1834 seed_person_row(&mut db, "Alice", Some(30)).await;
1835
1836 let desired = TEST_SCHEMA.replace(
1837 " age: I32?\n}",
1838 " age: I32?\n nickname: String?\n}",
1839 );
1840 let result = db.apply_schema(&desired).await.unwrap();
1841 assert!(result.applied);
1842
1843 let reopened = Omnigraph::open(uri).await.unwrap();
1844 let rows = table_rows_json(&reopened, "node:Person").await;
1845 assert_eq!(rows.len(), 1);
1846 assert_eq!(rows[0]["name"], "Alice");
1847 assert_eq!(rows[0]["age"], 30);
1848 assert!(rows[0]["nickname"].is_null());
1849 assert!(
1850 reopened.catalog().node_types["Person"]
1851 .properties
1852 .contains_key("nickname")
1853 );
1854 assert!(dir.path().join("_schema.pg").exists());
1855 }
1856
1857 #[tokio::test]
1858 async fn test_apply_schema_renames_property_and_preserves_values() {
1859 let dir = tempfile::tempdir().unwrap();
1860 let uri = dir.path().to_str().unwrap();
1861 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1862 seed_person_row(&mut db, "Alice", Some(30)).await;
1863
1864 let desired = TEST_SCHEMA.replace(
1865 " age: I32?\n}",
1866 " years: I32? @rename_from(\"age\")\n}",
1867 );
1868 db.apply_schema(&desired).await.unwrap();
1869
1870 let reopened = Omnigraph::open(uri).await.unwrap();
1871 let rows = table_rows_json(&reopened, "node:Person").await;
1872 assert_eq!(rows[0]["name"], "Alice");
1873 assert_eq!(rows[0]["years"], 30);
1874 assert!(rows[0].get("age").is_none());
1875 }
1876
1877 #[tokio::test]
1878 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1879 let dir = tempfile::tempdir().unwrap();
1880 let uri = dir.path().to_str().unwrap();
1881 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1882 seed_person_row(&mut db, "Alice", Some(30)).await;
1883 let before_version = db.snapshot().await.version();
1884
1885 let desired = TEST_SCHEMA
1886 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1887 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1888 .replace(
1889 "edge WorksAt: Person -> Company",
1890 "edge WorksAt: Human -> Company",
1891 );
1892 db.apply_schema(&desired).await.unwrap();
1893
1894 let head = db.snapshot().await;
1895 assert!(head.entry("node:Person").is_none());
1896 assert!(head.entry("node:Human").is_some());
1897 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1898 .await
1899 .unwrap();
1900 assert!(historical.entry("node:Person").is_some());
1901 assert!(historical.entry("node:Human").is_none());
1902 }
1903
1904 #[tokio::test]
1905 async fn test_apply_schema_succeeds_after_load() {
1906 let dir = tempfile::tempdir().unwrap();
1913 let uri = dir.path().to_str().unwrap();
1914 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1915
1916 crate::loader::load_jsonl(
1917 &mut db,
1918 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
1919 crate::loader::LoadMode::Overwrite,
1920 )
1921 .await
1922 .unwrap();
1923
1924 let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
1925 assert!(
1926 !all_branches.iter().any(|b| is_internal_run_branch(b)),
1927 "run branch should be deleted after publish, got: {:?}",
1928 all_branches
1929 );
1930
1931 let desired = TEST_SCHEMA.replace(
1932 " age: I32?\n}",
1933 " age: I32?\n nickname: String?\n}",
1934 );
1935 let result = db.apply_schema(&desired).await.unwrap();
1936 assert!(result.applied, "schema apply should have applied");
1937 }
1938
1939 #[tokio::test]
1940 async fn test_apply_schema_adds_index_for_existing_property() {
1941 let dir = tempfile::tempdir().unwrap();
1942 let uri = dir.path().to_str().unwrap();
1943 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1944
1945 let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1946 db.apply_schema(&desired).await.unwrap();
1947
1948 let snapshot = db.snapshot().await;
1949 let ds = snapshot.open("node:Person").await.unwrap();
1950 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1951 }
1952
1953 #[tokio::test]
1954 async fn test_apply_schema_rewrite_preserves_existing_indices() {
1955 let dir = tempfile::tempdir().unwrap();
1956 let uri = dir.path().to_str().unwrap();
1957 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1958 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1959 seed_person_row(&mut db, "Alice", Some(30)).await;
1960
1961 let desired = initial_schema.replace(
1962 " age: I32?\n}",
1963 " age: I32?\n nickname: String?\n}",
1964 );
1965 db.apply_schema(&desired).await.unwrap();
1966
1967 let snapshot = db.snapshot().await;
1968 let ds = snapshot.open("node:Person").await.unwrap();
1969 assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1970 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1971 }
1972
1973 #[tokio::test]
1974 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1975 let dir = tempfile::tempdir().unwrap();
1976 let uri = dir.path().to_str().unwrap();
1977 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1978 let mut db = db;
1979 db.coordinator
1980 .write()
1981 .await
1982 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1983 .await
1984 .unwrap();
1985
1986 let err = db
1987 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
1988 .await
1989 .unwrap_err();
1990 assert!(
1991 err.to_string()
1992 .contains("write is unavailable while schema apply is in progress")
1993 );
1994 }
1995
1996 #[tokio::test]
1997 async fn test_commit_updates_rejects_while_schema_apply_locked() {
1998 let dir = tempfile::tempdir().unwrap();
1999 let uri = dir.path().to_str().unwrap();
2000 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2001 db.coordinator
2002 .write()
2003 .await
2004 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2005 .await
2006 .unwrap();
2007
2008 let err = db.commit_updates(&[]).await.unwrap_err();
2009 assert!(
2010 err.to_string()
2011 .contains("write commit is unavailable while schema apply is in progress")
2012 );
2013 }
2014
2015 #[tokio::test]
2016 async fn test_branch_list_hides_schema_apply_lock_branch() {
2017 let dir = tempfile::tempdir().unwrap();
2018 let uri = dir.path().to_str().unwrap();
2019 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2020 db.coordinator
2021 .write()
2022 .await
2023 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2024 .await
2025 .unwrap();
2026
2027 let branches = db.branch_list().await.unwrap();
2028 assert_eq!(branches, vec!["main".to_string()]);
2029 }
2030}