1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arc_swap::ArcSwap;
6use arrow_array::{
7 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
8 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
9 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
10};
11use arrow_schema::{DataType, Field, Schema};
12use lance::Dataset;
13use lance::blob::{BlobArrayBuilder, blob_field};
14use lance::dataset::BlobFile;
15use lance::dataset::scanner::ColumnOrdering;
16use lance::datatypes::BlobKind;
17use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
18use omnigraph_compiler::schema::parser::parse_schema;
19use omnigraph_compiler::types::ScalarType;
20use omnigraph_compiler::{
21 SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
22 build_schema_ir, plan_schema_migration,
23};
24
25use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
26use crate::error::{OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
37
38use super::commit_graph::GraphCommit;
39use super::manifest::{
40 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
41 table_path_for_table_key,
42};
43use super::schema_state::{
44 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
45 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
46 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
47 write_schema_contract, write_schema_contract_staging,
48};
49use super::{
50 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
51 is_schema_apply_lock_branch,
52};
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum MergeOutcome {
56 AlreadyUpToDate,
57 FastForward,
58 Merged,
59}
60
61#[derive(Debug, Clone)]
62pub struct SchemaApplyResult {
63 pub supported: bool,
64 pub applied: bool,
65 pub manifest_version: u64,
66 pub steps: Vec<SchemaMigrationStep>,
67}
68
69pub struct Omnigraph {
75 root_uri: String,
76 storage: Arc<dyn StorageAdapter>,
77 coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
94 table_store: TableStore,
95 runtime_cache: RuntimeCache,
96 catalog: Arc<ArcSwap<Catalog>>,
101 schema_source: Arc<ArcSwap<String>>,
104 write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
109 merge_exclusive: Arc<tokio::sync::Mutex<()>>,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum OpenMode {
144 ReadWrite,
146 ReadOnly,
149}
150
151impl Omnigraph {
152 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
156 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
157 }
158
159 pub(crate) async fn init_with_storage(
160 uri: &str,
161 schema_source: &str,
162 storage: Arc<dyn StorageAdapter>,
163 ) -> Result<Self> {
164 let root = normalize_root_uri(uri)?;
165 let schema_ir = read_schema_ir_from_source(schema_source)?;
166 let mut catalog = build_catalog_from_ir(&schema_ir)?;
167 fixup_blob_schemas(&mut catalog);
168
169 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
171 storage.write_text(&schema_path, schema_source).await?;
172 write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
173
174 let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
176
177 Ok(Self {
178 root_uri: root.clone(),
179 storage,
180 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
181 table_store: TableStore::new(&root),
182 runtime_cache: RuntimeCache::default(),
183 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
184 schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
185 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
186 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
187 })
188 }
189
190 pub async fn open(uri: &str) -> Result<Self> {
195 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
196 }
197
198 pub async fn open_read_only(uri: &str) -> Result<Self> {
201 Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
202 }
203
204 pub(crate) async fn open_with_storage(
207 uri: &str,
208 storage: Arc<dyn StorageAdapter>,
209 ) -> Result<Self> {
210 Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
211 }
212
213 pub(crate) async fn open_with_storage_and_mode(
214 uri: &str,
215 storage: Arc<dyn StorageAdapter>,
216 mode: OpenMode,
217 ) -> Result<Self> {
218 let root = normalize_root_uri(uri)?;
219 let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
222 if matches!(mode, OpenMode::ReadWrite) {
232 let schema_state_recovery =
233 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
234 .await?;
235 crate::db::manifest::recover_manifest_drift(
241 &root,
242 Arc::clone(&storage),
243 &mut coordinator,
244 crate::db::manifest::RecoveryMode::Full,
245 schema_state_recovery,
246 )
247 .await?;
248 }
249 let schema_path = schema_source_uri(&root);
251 let schema_source = storage.read_text(&schema_path).await?;
252 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
253 let branches = coordinator.branch_list().await?;
254 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
255 &root,
256 Arc::clone(&storage),
257 &branches,
258 ¤t_source_ir,
259 )
260 .await?;
261 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
262 fixup_blob_schemas(&mut catalog);
263
264 Ok(Self {
265 root_uri: root.clone(),
266 storage,
267 coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
268 table_store: TableStore::new(&root),
269 runtime_cache: RuntimeCache::default(),
270 catalog: Arc::new(ArcSwap::from_pointee(catalog)),
271 schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
272 write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
273 merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
274 })
275 }
276
277 pub fn catalog(&self) -> Arc<Catalog> {
281 self.catalog.load_full()
282 }
283
284 pub fn schema_source(&self) -> Arc<String> {
286 self.schema_source.load_full()
287 }
288
289 pub(crate) fn store_catalog(&self, catalog: Catalog) {
293 self.catalog.store(Arc::new(catalog));
294 }
295
296 pub(crate) fn store_schema_source(&self, schema_source: String) {
299 self.schema_source.store(Arc::new(schema_source));
300 }
301
302 pub fn uri(&self) -> &str {
303 &self.root_uri
304 }
305
306 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
307 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
308 }
309
310 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
311 schema_apply::plan_schema(self, desired_schema_source).await
312 }
313
314 pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
315 schema_apply::apply_schema(self, desired_schema_source).await
316 }
317
318 pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
319 schema_apply::ensure_schema_apply_idle(self, operation).await
320 }
321
322 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
323 schema_apply::ensure_schema_apply_not_locked(self, operation).await
324 }
325
326 pub(crate) fn table_store(&self) -> &TableStore {
327 &self.table_store
328 }
329
330 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
339 &self.table_store
340 }
341
342 pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
346 self.storage.as_ref()
347 }
348
349 pub(crate) fn write_queue(&self) -> Arc<crate::db::write_queue::WriteQueueManager> {
357 Arc::clone(&self.write_queue)
358 }
359
360 pub(crate) fn merge_exclusive(&self) -> Arc<tokio::sync::Mutex<()>> {
366 Arc::clone(&self.merge_exclusive)
367 }
368
369 pub(crate) fn root_uri(&self) -> &str {
372 &self.root_uri
373 }
374
375 pub(crate) async fn open_coordinator_for_branch(
376 &self,
377 branch: Option<&str>,
378 ) -> Result<GraphCoordinator> {
379 match branch {
380 Some(branch) => {
381 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
382 }
383 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
384 }
385 }
386
387 pub(crate) async fn swap_coordinator_for_branch(
388 &self,
389 branch: Option<&str>,
390 ) -> Result<GraphCoordinator> {
391 let next = self.open_coordinator_for_branch(branch).await?;
392 let mut coord = self.coordinator.write().await;
393 Ok(std::mem::replace(&mut *coord, next))
394 }
395
396 pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
397 *self.coordinator.write().await = coordinator;
398 }
399
400 pub(crate) async fn resolved_branch_target(
401 &self,
402 branch: Option<&str>,
403 ) -> Result<ResolvedTarget> {
404 self.ensure_schema_state_valid().await?;
405 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
406 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
407 let coord = self.coordinator.read().await;
408 if normalized.as_deref() == coord.current_branch() {
409 let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
410 SnapshotId::synthetic(coord.current_branch(), coord.version())
411 });
412 return Ok(ResolvedTarget {
413 requested,
414 branch: coord.current_branch().map(str::to_string),
415 snapshot_id,
416 snapshot: coord.snapshot(),
417 });
418 }
419 coord.resolve_target(&requested).await
420 }
421
422 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
423 self.resolved_branch_target(branch)
424 .await
425 .map(|resolved| resolved.snapshot)
426 }
427
428 pub(crate) async fn version(&self) -> u64 {
429 self.coordinator.read().await.version()
430 }
431
432 pub(crate) async fn snapshot(&self) -> Snapshot {
434 self.coordinator.read().await.snapshot()
435 }
436
437 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
438 self.resolved_target(target)
439 .await
440 .map(|resolved| resolved.snapshot)
441 }
442
443 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
444 self.snapshot_of(target)
445 .await
446 .map(|snapshot| snapshot.version())
447 }
448
449 pub async fn resolved_branch_of(
450 &self,
451 target: impl Into<ReadTarget>,
452 ) -> Result<Option<String>> {
453 self.resolved_target(target)
454 .await
455 .map(|resolved| resolved.branch)
456 }
457
458 pub async fn sync_branch(&self, branch: &str) -> Result<()> {
460 self.ensure_schema_state_valid().await?;
461 let branch = normalize_branch_name(branch)?;
462 let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
463 *self.coordinator.write().await = next;
464 self.runtime_cache.invalidate_all().await;
465 Ok(())
466 }
467
468 pub async fn refresh(&self) -> Result<()> {
501 {
508 let mut coord = self.coordinator.write().await;
509 coord.refresh().await?;
510 let schema_state_recovery = recover_schema_state_files(
511 &self.root_uri,
512 Arc::clone(&self.storage),
513 &coord.snapshot(),
514 )
515 .await?;
516 crate::db::manifest::recover_manifest_drift(
517 &self.root_uri,
518 Arc::clone(&self.storage),
519 &mut *coord,
520 crate::db::manifest::RecoveryMode::RollForwardOnly,
521 schema_state_recovery,
522 )
523 .await?;
524 } self.reload_schema_if_source_changed().await?;
526 self.runtime_cache.invalidate_all().await;
527 Ok(())
528 }
529
530 async fn reload_schema_if_source_changed(&self) -> Result<()> {
531 let schema_path = schema_source_uri(&self.root_uri);
532 let schema_source = self.storage.read_text(&schema_path).await?;
533 if schema_source == *self.schema_source.load_full() {
534 return Ok(());
535 }
536 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
537 let branches = self.coordinator.read().await.branch_list().await?;
538 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
539 &self.root_uri,
540 Arc::clone(&self.storage),
541 &branches,
542 ¤t_source_ir,
543 )
544 .await?;
545 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
546 fixup_blob_schemas(&mut catalog);
547 self.store_schema_source(schema_source);
548 self.store_catalog(catalog);
549 Ok(())
550 }
551
552 pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
560 self.coordinator.write().await.refresh().await?;
561 self.runtime_cache.invalidate_all().await;
562 Ok(())
563 }
564
565 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
566 self.ensure_schema_state_valid().await?;
567 self.coordinator.read().await.resolve_snapshot_id(branch).await
568 }
569
570 pub(crate) async fn resolved_target(
571 &self,
572 target: impl Into<ReadTarget>,
573 ) -> Result<ResolvedTarget> {
574 self.ensure_schema_state_valid().await?;
575 self.coordinator.read().await.resolve_target(&target.into()).await
576 }
577
578 pub async fn diff_between(
581 &self,
582 from: impl Into<ReadTarget>,
583 to: impl Into<ReadTarget>,
584 filter: &crate::changes::ChangeFilter,
585 ) -> Result<crate::changes::ChangeSet> {
586 let from_resolved = self.resolved_target(from).await?;
587 let to_resolved = self.resolved_target(to).await?;
588 crate::changes::diff_snapshots(
589 self.uri(),
590 &from_resolved.snapshot,
591 &to_resolved.snapshot,
592 filter,
593 to_resolved.branch.clone().or(from_resolved.branch.clone()),
594 )
595 .await
596 }
597
598 pub async fn diff_commits(
601 &self,
602 from_commit_id: &str,
603 to_commit_id: &str,
604 filter: &crate::changes::ChangeFilter,
605 ) -> Result<crate::changes::ChangeSet> {
606 let coord = self.coordinator.read().await;
607 let from_commit = coord.resolve_commit(&SnapshotId::new(from_commit_id)).await?;
608 let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
609 let from_snap = coord
610 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
611 from_commit.graph_commit_id.clone(),
612 )))
613 .await?;
614 let to_snap = coord
615 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
616 to_commit.graph_commit_id.clone(),
617 )))
618 .await?;
619 drop(coord);
620 crate::changes::diff_snapshots(
621 self.uri(),
622 &from_snap.snapshot,
623 &to_snap.snapshot,
624 filter,
625 to_snap.branch.clone().or(from_snap.branch.clone()),
626 )
627 .await
628 }
629
630 pub async fn entity_at_target(
631 &self,
632 target: impl Into<ReadTarget>,
633 table_key: &str,
634 id: &str,
635 ) -> Result<Option<serde_json::Value>> {
636 export::entity_at_target(self, target, table_key, id).await
637 }
638
639 pub async fn entity_at(
641 &self,
642 table_key: &str,
643 id: &str,
644 version: u64,
645 ) -> Result<Option<serde_json::Value>> {
646 export::entity_at(self, table_key, id, version).await
647 }
648
649 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
651 self.ensure_schema_state_valid().await?;
652 self.coordinator.read().await.snapshot_at_version(version).await
653 }
654
655 pub async fn export_jsonl(
656 &self,
657 branch: &str,
658 type_names: &[String],
659 table_keys: &[String],
660 ) -> Result<String> {
661 export::export_jsonl(self, branch, type_names, table_keys).await
662 }
663
664 pub async fn export_jsonl_to_writer<W: Write>(
665 &self,
666 branch: &str,
667 type_names: &[String],
668 table_keys: &[String],
669 writer: &mut W,
670 ) -> Result<()> {
671 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
672 }
673
674 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
678 table_ops::graph_index(self).await
679 }
680
681 pub(crate) async fn graph_index_for_resolved(
682 &self,
683 resolved: &ResolvedTarget,
684 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
685 table_ops::graph_index_for_resolved(self, resolved).await
686 }
687
688 pub async fn ensure_indices(&self) -> Result<()> {
701 table_ops::ensure_indices(self).await
702 }
703
704 pub async fn ensure_indices_on(&self, branch: &str) -> Result<()> {
705 table_ops::ensure_indices_on(self, branch).await
706 }
707
708 #[cfg(feature = "failpoints")]
709 #[doc(hidden)]
710 pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
711 &mut self,
712 branch: &str,
713 table_key: &str,
714 table_branch: Option<&str>,
715 ) -> Result<u64> {
716 table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
717 self,
718 branch,
719 table_key,
720 table_branch,
721 )
722 .await
723 }
724
725 pub async fn optimize(&self) -> Result<Vec<optimize::TableOptimizeStats>> {
728 optimize::optimize_all_tables(self).await
729 }
730
731 pub async fn cleanup(
735 &mut self,
736 options: optimize::CleanupPolicyOptions,
737 ) -> Result<Vec<optimize::TableCleanupStats>> {
738 optimize::cleanup_all_tables(self, options).await
739 }
740
741 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
751 self.ensure_schema_state_valid().await?;
752 let catalog = self.catalog();
753 let node_type = catalog
754 .node_types
755 .get(type_name)
756 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
757 if !node_type.blob_properties.contains(property) {
758 return Err(OmniError::manifest(format!(
759 "property '{}' on type '{}' is not a Blob",
760 property, type_name
761 )));
762 }
763
764 let snapshot = self.snapshot().await;
765 let table_key = format!("node:{}", type_name);
766 let ds = snapshot.open(&table_key).await?;
767
768 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
769 let row_id = self
770 .table_store
771 .first_row_id_for_filter(&ds, &filter_sql)
772 .await?
773 .ok_or_else(|| {
774 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
775 })?;
776
777 let ds = Arc::new(ds);
779 let mut blobs = ds
780 .take_blobs(&[row_id], property)
781 .await
782 .map_err(|e| OmniError::Lance(e.to_string()))?;
783
784 blobs.pop().ok_or_else(|| {
785 OmniError::manifest(format!(
786 "blob '{}' on {} '{}' returned no data",
787 property, type_name, id
788 ))
789 })
790 }
791
792 pub(crate) async fn active_branch(&self) -> Option<String> {
793 self.coordinator.read().await.current_branch().map(str::to_string)
794 }
795
796 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
797 let descendants = self.coordinator.read().await.branch_descendants(branch).await?;
798 if let Some(descendant) = descendants.first() {
799 return Err(OmniError::manifest_conflict(format!(
800 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
801 branch, descendant
802 )));
803 }
804
805 for other_branch in branches
806 .iter()
807 .filter(|candidate| candidate.as_str() != branch)
808 {
809 let snapshot = self
810 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
811 .await?;
812 if snapshot
813 .entries()
814 .any(|entry| entry.table_branch.as_deref() == Some(branch))
815 {
816 return Err(OmniError::manifest_conflict(format!(
817 "cannot delete branch '{}' because branch '{}' still depends on it",
818 branch, other_branch
819 )));
820 }
821 }
822
823 Ok(())
824 }
825
826 async fn cleanup_deleted_branch_tables(
827 &self,
828 branch: &str,
829 owned_tables: &[(String, String)],
830 ) -> Result<()> {
831 let mut seen_paths = HashSet::new();
832 let mut cleanup_targets = owned_tables
833 .iter()
834 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
835 .cloned()
836 .collect::<Vec<_>>();
837 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
838
839 for (table_key, table_path) in cleanup_targets {
840 let dataset_uri = self.table_store.dataset_uri(&table_path);
841 if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
842 return Err(OmniError::manifest_internal(format!(
843 "branch '{}' was deleted but cleanup failed for {}: {}",
844 branch, table_key, err
845 )));
846 }
847 }
848
849 Ok(())
850 }
851
852 async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
853 let active = self.coordinator.read().await.current_branch().map(str::to_string);
854 if active.as_deref() == Some(branch) {
855 return Err(OmniError::manifest_conflict(format!(
856 "cannot delete currently active branch '{}'",
857 branch
858 )));
859 }
860
861 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
862 let owned_tables = branch_snapshot
863 .entries()
864 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
865 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
866 .collect::<Vec<_>>();
867
868 self.coordinator.write().await.branch_delete(branch).await?;
869 self.cleanup_deleted_branch_tables(branch, &owned_tables)
870 .await
871 }
872
873 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
874 normalize_branch_name(branch)
875 }
876
877 pub(crate) async fn head_commit_id_for_branch(
878 &self,
879 branch: Option<&str>,
880 ) -> Result<Option<String>> {
881 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
882 coordinator.ensure_commit_graph_initialized().await?;
883 coordinator
884 .head_commit_id()
885 .await
886 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
887 }
888
889 pub async fn branch_create(&self, name: &str) -> Result<()> {
890 self.ensure_schema_state_valid().await?;
891 self.ensure_schema_apply_idle("branch_create").await?;
892 ensure_public_branch_ref(name, "branch_create")?;
893 self.coordinator.write().await.branch_create(name).await
894 }
895
896 pub async fn branch_create_from(
897 &self,
898 from: impl Into<ReadTarget>,
899 name: &str,
900 ) -> Result<()> {
901 self.ensure_schema_apply_idle("branch_create_from").await?;
902 self.branch_create_from_impl(from, name, false).await
903 }
904
905 async fn branch_create_from_impl(
906 &self,
907 from: impl Into<ReadTarget>,
908 name: &str,
909 allow_internal_refs: bool,
910 ) -> Result<()> {
911 let target = from.into();
912 let ReadTarget::Branch(branch_name) = target else {
913 return Err(OmniError::manifest(
914 "branch creation from pinned snapshots is not supported yet".to_string(),
915 ));
916 };
917 if !allow_internal_refs {
918 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
919 ensure_public_branch_ref(name, "branch_create_from")?;
920 }
921 let branch = normalize_branch_name(&branch_name)?;
922 let mut source_coord = self.open_coordinator_for_branch(branch.as_deref()).await?;
939 source_coord.branch_create(name).await
940 }
941
942 pub async fn branch_list(&self) -> Result<Vec<String>> {
943 self.ensure_schema_state_valid().await?;
944 self.coordinator.read().await.branch_list().await
945 }
946
947 pub async fn branch_delete(&self, name: &str) -> Result<()> {
948 self.ensure_schema_state_valid().await?;
949 self.ensure_schema_apply_idle("branch_delete").await?;
950 ensure_public_branch_ref(name, "branch_delete")?;
951 self.refresh().await?;
952 let branch = normalize_branch_name(name)?
953 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
954 let branches = self.coordinator.read().await.branch_list().await?;
955 if !branches.iter().any(|candidate| candidate == &branch) {
956 return Err(OmniError::manifest_not_found(format!(
957 "branch '{}' not found",
958 branch
959 )));
960 }
961
962 self.ensure_branch_delete_safe(&branch, &branches).await?;
963 self.delete_branch_storage_only(&branch).await
964 }
965
966 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
967 self.ensure_schema_state_valid().await?;
968 self.coordinator.read().await
969 .resolve_commit(&SnapshotId::new(commit_id))
970 .await
971 }
972
973 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
974 self.ensure_schema_state_valid().await?;
975 let branch = match branch {
976 Some(branch) => normalize_branch_name(branch)?,
977 None => None,
978 };
979 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
980 coordinator.list_commits().await
981 }
982
983 pub(crate) async fn open_for_mutation(
989 &self,
990 table_key: &str,
991 op_kind: crate::db::MutationOpKind,
992 ) -> Result<(Dataset, String, Option<String>)> {
993 table_ops::open_for_mutation(self, table_key, op_kind).await
994 }
995
996 pub(crate) async fn open_for_mutation_on_branch(
997 &self,
998 branch: Option<&str>,
999 table_key: &str,
1000 op_kind: crate::db::MutationOpKind,
1001 ) -> Result<(Dataset, String, Option<String>)> {
1002 table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
1003 }
1004
1005 pub(crate) async fn fork_dataset_from_entry_state(
1006 &self,
1007 table_key: &str,
1008 full_path: &str,
1009 source_branch: Option<&str>,
1010 source_version: u64,
1011 active_branch: &str,
1012 ) -> Result<Dataset> {
1013 table_ops::fork_dataset_from_entry_state(
1014 self,
1015 table_key,
1016 full_path,
1017 source_branch,
1018 source_version,
1019 active_branch,
1020 )
1021 .await
1022 }
1023
1024 pub(crate) async fn reopen_for_mutation(
1025 &self,
1026 table_key: &str,
1027 full_path: &str,
1028 table_branch: Option<&str>,
1029 expected_version: u64,
1030 op_kind: crate::db::MutationOpKind,
1031 ) -> Result<Dataset> {
1032 table_ops::reopen_for_mutation(
1033 self,
1034 table_key,
1035 full_path,
1036 table_branch,
1037 expected_version,
1038 op_kind,
1039 )
1040 .await
1041 }
1042
1043 pub(crate) async fn open_dataset_at_state(
1044 &self,
1045 table_path: &str,
1046 table_branch: Option<&str>,
1047 table_version: u64,
1048 ) -> Result<Dataset> {
1049 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1050 }
1051
1052 pub(crate) async fn build_indices_on_dataset(
1053 &self,
1054 table_key: &str,
1055 ds: &mut Dataset,
1056 ) -> Result<()> {
1057 table_ops::build_indices_on_dataset(self, table_key, ds).await
1058 }
1059
1060 pub(crate) async fn build_indices_on_dataset_for_catalog(
1061 &self,
1062 catalog: &Catalog,
1063 table_key: &str,
1064 ds: &mut Dataset,
1065 ) -> Result<()> {
1066 table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1067 }
1068
1069 #[cfg(test)]
1072 pub(crate) async fn commit_updates(
1073 &mut self,
1074 updates: &[crate::db::SubTableUpdate],
1075 ) -> Result<u64> {
1076 table_ops::commit_updates(self, updates).await
1077 }
1078
1079 pub(crate) async fn commit_manifest_updates(
1080 &self,
1081 updates: &[crate::db::SubTableUpdate],
1082 ) -> Result<u64> {
1083 table_ops::commit_manifest_updates(self, updates).await
1084 }
1085
1086 pub(crate) async fn record_merge_commit(
1087 &self,
1088 manifest_version: u64,
1089 parent_commit_id: &str,
1090 merged_parent_commit_id: &str,
1091 actor_id: Option<&str>,
1092 ) -> Result<String> {
1093 table_ops::record_merge_commit(
1094 self,
1095 manifest_version,
1096 parent_commit_id,
1097 merged_parent_commit_id,
1098 actor_id,
1099 )
1100 .await
1101 }
1102
1103 pub(crate) async fn commit_updates_on_branch_with_expected(
1104 &self,
1105 branch: Option<&str>,
1106 updates: &[crate::db::SubTableUpdate],
1107 expected_table_versions: &std::collections::HashMap<String, u64>,
1108 actor_id: Option<&str>,
1109 ) -> Result<u64> {
1110 table_ops::commit_updates_on_branch_with_expected(
1111 self,
1112 branch,
1113 updates,
1114 expected_table_versions,
1115 actor_id,
1116 )
1117 .await
1118 }
1119
1120 pub(crate) async fn ensure_commit_graph_initialized(&self) -> Result<()> {
1121 table_ops::ensure_commit_graph_initialized(self).await
1122 }
1123
1124 pub(crate) async fn invalidate_graph_index(&self) {
1126 table_ops::invalidate_graph_index(self).await
1127 }
1128}
1129
1130pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1131 let branch = branch.trim();
1132 if branch.is_empty() {
1133 return Err(OmniError::manifest(
1134 "branch name cannot be empty".to_string(),
1135 ));
1136 }
1137 if branch == "main" {
1138 return Ok(None);
1139 }
1140 Ok(Some(branch.to_string()))
1141}
1142
1143pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1144 if super::is_internal_run_branch(branch) {
1145 return Err(OmniError::manifest(format!(
1146 "{} does not allow internal run ref '{}'",
1147 operation, branch
1148 )));
1149 }
1150 if is_internal_system_branch(branch) {
1151 return Err(OmniError::manifest(format!(
1152 "{} does not allow internal system ref '{}'",
1153 operation, branch
1154 )));
1155 }
1156 Ok(())
1157}
1158
1159fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1160 if batches.is_empty() {
1161 return Ok(RecordBatch::new_empty(schema));
1162 }
1163 if batches.len() == 1 {
1164 return Ok(batches.into_iter().next().unwrap());
1165 }
1166 let batch_schema = batches[0].schema();
1167 arrow_select::concat::concat_batches(&batch_schema, &batches)
1168 .map_err(|e| OmniError::Lance(e.to_string()))
1169}
1170
1171fn blob_properties_for_table_key<'a>(
1172 catalog: &'a Catalog,
1173 table_key: &str,
1174) -> Result<&'a std::collections::HashSet<String>> {
1175 if let Some(type_name) = table_key.strip_prefix("node:") {
1176 return catalog
1177 .node_types
1178 .get(type_name)
1179 .map(|node_type| &node_type.blob_properties)
1180 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1181 }
1182 if let Some(type_name) = table_key.strip_prefix("edge:") {
1183 return catalog
1184 .edge_types
1185 .get(type_name)
1186 .map(|edge_type| &edge_type.blob_properties)
1187 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1188 }
1189 Err(OmniError::manifest(format!(
1190 "invalid table key '{}'",
1191 table_key
1192 )))
1193}
1194
1195fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1196 if descriptions.is_null(row) {
1197 return Ok(true);
1198 }
1199
1200 let kind = descriptions
1201 .column_by_name("kind")
1202 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1203 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1204 .or_else(|| {
1205 descriptions
1206 .column_by_name("kind")
1207 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1208 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1209 });
1210 let position = descriptions
1211 .column_by_name("position")
1212 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1213 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1214 let size = descriptions
1215 .column_by_name("size")
1216 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1217 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1218 let blob_uri = descriptions
1219 .column_by_name("blob_uri")
1220 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1221 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1222
1223 let Some(kind) = kind else {
1224 return Ok(true);
1225 };
1226 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1227 if kind != BlobKind::Inline {
1228 return Ok(false);
1229 }
1230
1231 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1232}
1233
1234fn fixup_blob_schemas(catalog: &mut Catalog) {
1240 for node_type in catalog.node_types.values_mut() {
1241 if node_type.blob_properties.is_empty() {
1242 continue;
1243 }
1244 let fields: Vec<Field> = node_type
1245 .arrow_schema
1246 .fields()
1247 .iter()
1248 .map(|f| {
1249 if node_type.blob_properties.contains(f.name()) {
1250 blob_field(f.name(), f.is_nullable())
1251 } else {
1252 f.as_ref().clone()
1253 }
1254 })
1255 .collect();
1256 node_type.arrow_schema = Arc::new(Schema::new(fields));
1257 }
1258 for edge_type in catalog.edge_types.values_mut() {
1259 if edge_type.blob_properties.is_empty() {
1260 continue;
1261 }
1262 let fields: Vec<Field> = edge_type
1263 .arrow_schema
1264 .fields()
1265 .iter()
1266 .map(|f| {
1267 if edge_type.blob_properties.contains(f.name()) {
1268 blob_field(f.name(), f.is_nullable())
1269 } else {
1270 f.as_ref().clone()
1271 }
1272 })
1273 .collect();
1274 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1275 }
1276}
1277
1278fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1279 let schema_ast = parse_schema(schema_source)?;
1280 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1281}
1282
1283fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1284 match type_kind {
1285 SchemaTypeKind::Node => format!("node:{}", name),
1286 SchemaTypeKind::Edge => format!("edge:{}", name),
1287 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1288 }
1289}
1290
1291fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1292 if let Some(type_name) = table_key.strip_prefix("node:") {
1293 let node_type: &NodeType = catalog
1294 .node_types
1295 .get(type_name)
1296 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1297 return Ok(node_type.arrow_schema.clone());
1298 }
1299 if let Some(type_name) = table_key.strip_prefix("edge:") {
1300 let edge_type: &EdgeType = catalog
1301 .edge_types
1302 .get(type_name)
1303 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1304 return Ok(edge_type.arrow_schema.clone());
1305 }
1306 Err(OmniError::manifest(format!(
1307 "invalid table key '{}'",
1308 table_key
1309 )))
1310}
1311
1312fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1313 let mut obj = serde_json::Map::new();
1314 for (i, field) in batch.schema().fields().iter().enumerate() {
1315 obj.insert(
1316 field.name().clone(),
1317 json_value_from_array(batch.column(i).as_ref(), row)?,
1318 );
1319 }
1320 Ok(serde_json::Value::Object(obj))
1321}
1322
1323fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1324 if array.is_null(row) {
1325 return Ok(serde_json::Value::Null);
1326 }
1327
1328 match array.data_type() {
1329 DataType::Utf8 => Ok(serde_json::Value::String(
1330 array
1331 .as_any()
1332 .downcast_ref::<StringArray>()
1333 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1334 .value(row)
1335 .to_string(),
1336 )),
1337 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1338 array
1339 .as_any()
1340 .downcast_ref::<LargeStringArray>()
1341 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1342 .value(row)
1343 .to_string(),
1344 )),
1345 DataType::Boolean => Ok(serde_json::Value::Bool(
1346 array
1347 .as_any()
1348 .downcast_ref::<BooleanArray>()
1349 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1350 .value(row),
1351 )),
1352 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1353 array
1354 .as_any()
1355 .downcast_ref::<Int32Array>()
1356 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1357 .value(row),
1358 ))),
1359 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1360 array
1361 .as_any()
1362 .downcast_ref::<Int64Array>()
1363 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1364 .value(row),
1365 ))),
1366 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1367 array
1368 .as_any()
1369 .downcast_ref::<UInt32Array>()
1370 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1371 .value(row),
1372 ))),
1373 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1374 array
1375 .as_any()
1376 .downcast_ref::<UInt64Array>()
1377 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1378 .value(row),
1379 ))),
1380 DataType::Float32 => {
1381 let value = array
1382 .as_any()
1383 .downcast_ref::<Float32Array>()
1384 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1385 .value(row) as f64;
1386 Ok(serde_json::Value::Number(
1387 serde_json::Number::from_f64(value).ok_or_else(|| {
1388 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1389 })?,
1390 ))
1391 }
1392 DataType::Float64 => {
1393 let value = array
1394 .as_any()
1395 .downcast_ref::<Float64Array>()
1396 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1397 .value(row);
1398 Ok(serde_json::Value::Number(
1399 serde_json::Number::from_f64(value).ok_or_else(|| {
1400 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1401 })?,
1402 ))
1403 }
1404 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1405 array
1406 .as_any()
1407 .downcast_ref::<Date32Array>()
1408 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1409 .value(row),
1410 ))),
1411 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1412 &base64::engine::general_purpose::STANDARD,
1413 array
1414 .as_any()
1415 .downcast_ref::<BinaryArray>()
1416 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1417 .value(row),
1418 ))),
1419 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1420 &base64::engine::general_purpose::STANDARD,
1421 array
1422 .as_any()
1423 .downcast_ref::<LargeBinaryArray>()
1424 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1425 .value(row),
1426 ))),
1427 DataType::List(_) => {
1428 let list = array
1429 .as_any()
1430 .downcast_ref::<ListArray>()
1431 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1432 let values = list.value(row);
1433 let mut out = Vec::with_capacity(values.len());
1434 for idx in 0..values.len() {
1435 out.push(json_value_from_array(values.as_ref(), idx)?);
1436 }
1437 Ok(serde_json::Value::Array(out))
1438 }
1439 DataType::LargeList(_) => {
1440 let list = array
1441 .as_any()
1442 .downcast_ref::<LargeListArray>()
1443 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1444 let values = list.value(row);
1445 let mut out = Vec::with_capacity(values.len());
1446 for idx in 0..values.len() {
1447 out.push(json_value_from_array(values.as_ref(), idx)?);
1448 }
1449 Ok(serde_json::Value::Array(out))
1450 }
1451 DataType::FixedSizeList(_, _) => {
1452 let list = array
1453 .as_any()
1454 .downcast_ref::<FixedSizeListArray>()
1455 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1456 let values = list.value(row);
1457 let mut out = Vec::with_capacity(values.len());
1458 for idx in 0..values.len() {
1459 out.push(json_value_from_array(values.as_ref(), idx)?);
1460 }
1461 Ok(serde_json::Value::Array(out))
1462 }
1463 DataType::Struct(fields) => {
1464 let struct_array = array
1465 .as_any()
1466 .downcast_ref::<StructArray>()
1467 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1468 let mut obj = serde_json::Map::new();
1469 for (field_idx, field) in fields.iter().enumerate() {
1470 obj.insert(
1471 field.name().clone(),
1472 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1473 );
1474 }
1475 Ok(serde_json::Value::Object(obj))
1476 }
1477 _ => {
1478 let value = arrow_cast::display::array_value_to_string(array, row)
1479 .map_err(|e| OmniError::Lance(e.to_string()))?;
1480 Ok(serde_json::Value::String(value))
1481 }
1482 }
1483}
1484
1485#[cfg(test)]
1486mod tests {
1487 use super::*;
1488 use crate::db::is_internal_run_branch;
1489 use crate::db::manifest::ManifestCoordinator;
1490 use async_trait::async_trait;
1491 use serde_json::Value;
1492 use std::sync::Mutex;
1493
1494 use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1495
1496 const TEST_SCHEMA: &str = r#"
1497node Person {
1498 name: String @key
1499 age: I32?
1500}
1501node Company {
1502 name: String @key
1503}
1504edge Knows: Person -> Person {
1505 since: Date?
1506}
1507edge WorksAt: Person -> Company
1508"#;
1509
1510 #[derive(Debug, Default)]
1511 struct RecordingStorageAdapter {
1512 inner: LocalStorageAdapter,
1513 reads: Mutex<Vec<String>>,
1514 writes: Mutex<Vec<String>>,
1515 exists_checks: Mutex<Vec<String>>,
1516 renames: Mutex<Vec<(String, String)>>,
1517 deletes: Mutex<Vec<String>>,
1518 }
1519
1520 impl RecordingStorageAdapter {
1521 fn reads(&self) -> Vec<String> {
1522 self.reads.lock().unwrap().clone()
1523 }
1524
1525 fn writes(&self) -> Vec<String> {
1526 self.writes.lock().unwrap().clone()
1527 }
1528
1529 fn exists_checks(&self) -> Vec<String> {
1530 self.exists_checks.lock().unwrap().clone()
1531 }
1532 }
1533
1534 #[async_trait]
1535 impl StorageAdapter for RecordingStorageAdapter {
1536 async fn read_text(&self, uri: &str) -> Result<String> {
1537 self.reads.lock().unwrap().push(uri.to_string());
1538 self.inner.read_text(uri).await
1539 }
1540
1541 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1542 self.writes.lock().unwrap().push(uri.to_string());
1543 self.inner.write_text(uri, contents).await
1544 }
1545
1546 async fn exists(&self, uri: &str) -> Result<bool> {
1547 self.exists_checks.lock().unwrap().push(uri.to_string());
1548 self.inner.exists(uri).await
1549 }
1550
1551 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1552 self.renames
1553 .lock()
1554 .unwrap()
1555 .push((from_uri.to_string(), to_uri.to_string()));
1556 self.inner.rename_text(from_uri, to_uri).await
1557 }
1558
1559 async fn delete(&self, uri: &str) -> Result<()> {
1560 self.deletes.lock().unwrap().push(uri.to_string());
1561 self.inner.delete(uri).await
1562 }
1563
1564 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
1565 self.inner.list_dir(dir_uri).await
1566 }
1567 }
1568
1569 #[tokio::test]
1570 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1571 let dir = tempfile::tempdir().unwrap();
1572 let uri = dir.path().to_str().unwrap();
1573 let adapter = Arc::new(RecordingStorageAdapter::default());
1574
1575 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1576 .await
1577 .unwrap();
1578 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1579 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1580 assert!(
1581 adapter
1582 .writes()
1583 .contains(&join_uri(uri, "__schema_state.json"))
1584 );
1585
1586 Omnigraph::open_with_storage(uri, adapter.clone())
1587 .await
1588 .unwrap();
1589 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1590 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1591 assert!(
1592 adapter
1593 .reads()
1594 .contains(&join_uri(uri, "__schema_state.json"))
1595 );
1596 assert!(
1597 adapter
1598 .exists_checks()
1599 .contains(&join_uri(uri, "_schema.ir.json"))
1600 );
1601 assert!(
1602 adapter
1603 .exists_checks()
1604 .contains(&join_uri(uri, "__schema_state.json"))
1605 );
1606 assert!(
1607 adapter
1608 .exists_checks()
1609 .contains(&join_uri(uri, "_graph_commits.lance"))
1610 );
1611 }
1612
1613 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1614 let snapshot = db.snapshot().await;
1615 let ds = snapshot.open(table_key).await.unwrap();
1616 let batches = db.table_store().scan_batches(&ds).await.unwrap();
1617 batches
1618 .into_iter()
1619 .flat_map(|batch| {
1620 (0..batch.num_rows())
1621 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1622 .collect::<Vec<_>>()
1623 })
1624 .collect()
1625 }
1626
1627 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1628 let (mut ds, full_path, table_branch) = db
1629 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
1630 .await
1631 .unwrap();
1632 let schema: Arc<Schema> = Arc::new(ds.schema().into());
1633 let columns: Vec<Arc<dyn Array>> = schema
1634 .fields()
1635 .iter()
1636 .map(|field| match field.name().as_str() {
1637 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1638 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1639 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1640 _ => new_null_array(field.data_type(), 1),
1641 })
1642 .collect();
1643 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1644 let state = db
1645 .table_store()
1646 .append_batch(&full_path, &mut ds, batch)
1647 .await
1648 .unwrap();
1649 db.commit_updates(&[crate::db::SubTableUpdate {
1650 table_key: "node:Person".to_string(),
1651 table_version: state.version,
1652 table_branch,
1653 row_count: state.row_count,
1654 version_metadata: state.version_metadata,
1655 }])
1656 .await
1657 .unwrap();
1658 }
1659
1660 #[tokio::test]
1661 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1662 let dir = tempfile::tempdir().unwrap();
1663 let uri = dir.path().to_str().unwrap();
1664 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1665 seed_person_row(&mut db, "Alice", Some(30)).await;
1666
1667 let desired = TEST_SCHEMA.replace(
1668 " age: I32?\n}",
1669 " age: I32?\n nickname: String?\n}",
1670 );
1671 let result = db.apply_schema(&desired).await.unwrap();
1672 assert!(result.applied);
1673
1674 let reopened = Omnigraph::open(uri).await.unwrap();
1675 let rows = table_rows_json(&reopened, "node:Person").await;
1676 assert_eq!(rows.len(), 1);
1677 assert_eq!(rows[0]["name"], "Alice");
1678 assert_eq!(rows[0]["age"], 30);
1679 assert!(rows[0]["nickname"].is_null());
1680 assert!(
1681 reopened.catalog().node_types["Person"]
1682 .properties
1683 .contains_key("nickname")
1684 );
1685 assert!(dir.path().join("_schema.pg").exists());
1686 }
1687
1688 #[tokio::test]
1689 async fn test_apply_schema_renames_property_and_preserves_values() {
1690 let dir = tempfile::tempdir().unwrap();
1691 let uri = dir.path().to_str().unwrap();
1692 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1693 seed_person_row(&mut db, "Alice", Some(30)).await;
1694
1695 let desired = TEST_SCHEMA.replace(
1696 " age: I32?\n}",
1697 " years: I32? @rename_from(\"age\")\n}",
1698 );
1699 db.apply_schema(&desired).await.unwrap();
1700
1701 let reopened = Omnigraph::open(uri).await.unwrap();
1702 let rows = table_rows_json(&reopened, "node:Person").await;
1703 assert_eq!(rows[0]["name"], "Alice");
1704 assert_eq!(rows[0]["years"], 30);
1705 assert!(rows[0].get("age").is_none());
1706 }
1707
1708 #[tokio::test]
1709 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1710 let dir = tempfile::tempdir().unwrap();
1711 let uri = dir.path().to_str().unwrap();
1712 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1713 seed_person_row(&mut db, "Alice", Some(30)).await;
1714 let before_version = db.snapshot().await.version();
1715
1716 let desired = TEST_SCHEMA
1717 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1718 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1719 .replace(
1720 "edge WorksAt: Person -> Company",
1721 "edge WorksAt: Human -> Company",
1722 );
1723 db.apply_schema(&desired).await.unwrap();
1724
1725 let head = db.snapshot().await;
1726 assert!(head.entry("node:Person").is_none());
1727 assert!(head.entry("node:Human").is_some());
1728 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1729 .await
1730 .unwrap();
1731 assert!(historical.entry("node:Person").is_some());
1732 assert!(historical.entry("node:Human").is_none());
1733 }
1734
1735 #[tokio::test]
1736 async fn test_apply_schema_succeeds_after_load() {
1737 let dir = tempfile::tempdir().unwrap();
1744 let uri = dir.path().to_str().unwrap();
1745 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1746
1747 crate::loader::load_jsonl(
1748 &mut db,
1749 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
1750 crate::loader::LoadMode::Overwrite,
1751 )
1752 .await
1753 .unwrap();
1754
1755 let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
1756 assert!(
1757 !all_branches.iter().any(|b| is_internal_run_branch(b)),
1758 "run branch should be deleted after publish, got: {:?}",
1759 all_branches
1760 );
1761
1762 let desired = TEST_SCHEMA.replace(
1763 " age: I32?\n}",
1764 " age: I32?\n nickname: String?\n}",
1765 );
1766 let result = db.apply_schema(&desired).await.unwrap();
1767 assert!(result.applied, "schema apply should have applied");
1768 }
1769
1770 #[tokio::test]
1771 async fn test_apply_schema_adds_index_for_existing_property() {
1772 let dir = tempfile::tempdir().unwrap();
1773 let uri = dir.path().to_str().unwrap();
1774 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1775
1776 let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1777 db.apply_schema(&desired).await.unwrap();
1778
1779 let snapshot = db.snapshot().await;
1780 let ds = snapshot.open("node:Person").await.unwrap();
1781 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1782 }
1783
1784 #[tokio::test]
1785 async fn test_apply_schema_rewrite_preserves_existing_indices() {
1786 let dir = tempfile::tempdir().unwrap();
1787 let uri = dir.path().to_str().unwrap();
1788 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1789 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1790 seed_person_row(&mut db, "Alice", Some(30)).await;
1791
1792 let desired = initial_schema.replace(
1793 " age: I32?\n}",
1794 " age: I32?\n nickname: String?\n}",
1795 );
1796 db.apply_schema(&desired).await.unwrap();
1797
1798 let snapshot = db.snapshot().await;
1799 let ds = snapshot.open("node:Person").await.unwrap();
1800 assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1801 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1802 }
1803
1804 #[tokio::test]
1805 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1806 let dir = tempfile::tempdir().unwrap();
1807 let uri = dir.path().to_str().unwrap();
1808 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1809 let mut db = db;
1810 db.coordinator
1811 .write()
1812 .await
1813 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1814 .await
1815 .unwrap();
1816
1817 let err = db
1818 .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
1819 .await
1820 .unwrap_err();
1821 assert!(
1822 err.to_string()
1823 .contains("write is unavailable while schema apply is in progress")
1824 );
1825 }
1826
1827 #[tokio::test]
1828 async fn test_commit_updates_rejects_while_schema_apply_locked() {
1829 let dir = tempfile::tempdir().unwrap();
1830 let uri = dir.path().to_str().unwrap();
1831 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1832 db.coordinator
1833 .write()
1834 .await
1835 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1836 .await
1837 .unwrap();
1838
1839 let err = db.commit_updates(&[]).await.unwrap_err();
1840 assert!(
1841 err.to_string()
1842 .contains("write commit is unavailable while schema apply is in progress")
1843 );
1844 }
1845
1846 #[tokio::test]
1847 async fn test_branch_list_hides_schema_apply_lock_branch() {
1848 let dir = tempfile::tempdir().unwrap();
1849 let uri = dir.path().to_str().unwrap();
1850 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1851 db.coordinator
1852 .write()
1853 .await
1854 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1855 .await
1856 .unwrap();
1857
1858 let branches = db.branch_list().await.unwrap();
1859 assert_eq!(branches, vec!["main".to_string()]);
1860 }
1861}