1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arrow_array::{
6 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
7 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
8 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
9};
10use arrow_schema::{DataType, Field, Schema};
11use lance::Dataset;
12use lance::blob::{BlobArrayBuilder, blob_field};
13use lance::dataset::BlobFile;
14use lance::dataset::scanner::ColumnOrdering;
15use lance::datatypes::BlobKind;
16use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
17use omnigraph_compiler::schema::parser::parse_schema;
18use omnigraph_compiler::types::ScalarType;
19use omnigraph_compiler::{
20 SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
21 build_schema_ir, plan_schema_migration,
22};
23
24use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
25use crate::error::{OmniError, Result};
26use crate::runtime_cache::RuntimeCache;
27use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
28use crate::table_store::TableStore;
29
30mod export;
31mod optimize;
32mod schema_apply;
33mod table_ops;
34
35pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
36
37use super::commit_graph::GraphCommit;
38use super::manifest::{
39 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
40 table_path_for_table_key,
41};
42use super::schema_state::{
43 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
44 recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
45 schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
46 write_schema_contract, write_schema_contract_staging,
47};
48use super::{
49 ReadTarget, ResolvedTarget, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, is_internal_system_branch,
50 is_schema_apply_lock_branch,
51};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum MergeOutcome {
55 AlreadyUpToDate,
56 FastForward,
57 Merged,
58}
59
60#[derive(Debug, Clone)]
61pub struct SchemaApplyResult {
62 pub supported: bool,
63 pub applied: bool,
64 pub manifest_version: u64,
65 pub steps: Vec<SchemaMigrationStep>,
66}
67
68pub struct Omnigraph {
74 root_uri: String,
75 storage: Arc<dyn StorageAdapter>,
76 coordinator: GraphCoordinator,
77 table_store: TableStore,
78 runtime_cache: RuntimeCache,
79 catalog: Catalog,
80 schema_source: String,
81 pub(crate) audit_actor_id: Option<String>,
82}
83
84impl Omnigraph {
85 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
89 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
90 }
91
92 pub(crate) async fn init_with_storage(
93 uri: &str,
94 schema_source: &str,
95 storage: Arc<dyn StorageAdapter>,
96 ) -> Result<Self> {
97 let root = normalize_root_uri(uri)?;
98 let schema_ir = read_schema_ir_from_source(schema_source)?;
99 let mut catalog = build_catalog_from_ir(&schema_ir)?;
100 fixup_blob_schemas(&mut catalog);
101
102 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
104 storage.write_text(&schema_path, schema_source).await?;
105 write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
106
107 let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
109
110 Ok(Self {
111 root_uri: root.clone(),
112 storage,
113 coordinator,
114 table_store: TableStore::new(&root),
115 runtime_cache: RuntimeCache::default(),
116 catalog,
117 schema_source: schema_source.to_string(),
118 audit_actor_id: None,
119 })
120 }
121
122 pub async fn open(uri: &str) -> Result<Self> {
126 Self::open_with_storage(uri, storage_for_uri(uri)?).await
127 }
128
129 pub(crate) async fn open_with_storage(
130 uri: &str,
131 storage: Arc<dyn StorageAdapter>,
132 ) -> Result<Self> {
133 let root = normalize_root_uri(uri)?;
134 let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
139 recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?;
140 let schema_path = schema_source_uri(&root);
142 let schema_source = storage.read_text(&schema_path).await?;
143 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
144 let branches = coordinator.branch_list().await?;
145 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
146 &root,
147 Arc::clone(&storage),
148 &branches,
149 ¤t_source_ir,
150 )
151 .await?;
152 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
153 fixup_blob_schemas(&mut catalog);
154
155 Ok(Self {
156 root_uri: root.clone(),
157 storage,
158 coordinator,
159 table_store: TableStore::new(&root),
160 runtime_cache: RuntimeCache::default(),
161 catalog,
162 schema_source,
163 audit_actor_id: None,
164 })
165 }
166
167 pub fn catalog(&self) -> &Catalog {
168 &self.catalog
169 }
170
171 pub fn schema_source(&self) -> &str {
172 &self.schema_source
173 }
174
175 pub fn uri(&self) -> &str {
176 &self.root_uri
177 }
178
179 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
180 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
181 }
182
183 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
184 schema_apply::plan_schema(self, desired_schema_source).await
185 }
186
187 pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
188 schema_apply::apply_schema(self, desired_schema_source).await
189 }
190
191 pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
192 schema_apply::ensure_schema_apply_idle(self, operation).await
193 }
194
195 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
196 schema_apply::ensure_schema_apply_not_locked(self, operation).await
197 }
198
199 pub(crate) fn table_store(&self) -> &TableStore {
200 &self.table_store
201 }
202
203 pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
213 &self.table_store
214 }
215
216 pub(crate) async fn open_coordinator_for_branch(
217 &self,
218 branch: Option<&str>,
219 ) -> Result<GraphCoordinator> {
220 match branch {
221 Some(branch) => {
222 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
223 }
224 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
225 }
226 }
227
228 pub(crate) async fn swap_coordinator_for_branch(
229 &mut self,
230 branch: Option<&str>,
231 ) -> Result<GraphCoordinator> {
232 let next = self.open_coordinator_for_branch(branch).await?;
233 Ok(std::mem::replace(&mut self.coordinator, next))
234 }
235
236 pub(crate) fn restore_coordinator(&mut self, coordinator: GraphCoordinator) {
237 self.coordinator = coordinator;
238 }
239
240 pub(crate) async fn resolved_branch_target(
241 &self,
242 branch: Option<&str>,
243 ) -> Result<ResolvedTarget> {
244 self.ensure_schema_state_valid().await?;
245 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
246 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
247 if normalized.as_deref() == self.coordinator.current_branch() {
248 let snapshot_id = self.coordinator.head_commit_id().await?.unwrap_or_else(|| {
249 SnapshotId::synthetic(
250 self.coordinator.current_branch(),
251 self.coordinator.version(),
252 )
253 });
254 return Ok(ResolvedTarget {
255 requested,
256 branch: self.coordinator.current_branch().map(str::to_string),
257 snapshot_id,
258 snapshot: self.coordinator.snapshot(),
259 });
260 }
261 self.coordinator.resolve_target(&requested).await
262 }
263
264 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
265 self.resolved_branch_target(branch)
266 .await
267 .map(|resolved| resolved.snapshot)
268 }
269
270 pub(crate) fn version(&self) -> u64 {
271 self.coordinator.version()
272 }
273
274 pub(crate) fn snapshot(&self) -> Snapshot {
276 self.coordinator.snapshot()
277 }
278
279 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
280 self.resolved_target(target)
281 .await
282 .map(|resolved| resolved.snapshot)
283 }
284
285 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
286 self.snapshot_of(target)
287 .await
288 .map(|snapshot| snapshot.version())
289 }
290
291 pub async fn resolved_branch_of(
292 &self,
293 target: impl Into<ReadTarget>,
294 ) -> Result<Option<String>> {
295 self.resolved_target(target)
296 .await
297 .map(|resolved| resolved.branch)
298 }
299
300 pub async fn sync_branch(&mut self, branch: &str) -> Result<()> {
302 self.ensure_schema_state_valid().await?;
303 let branch = normalize_branch_name(branch)?;
304 self.coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
305 self.runtime_cache.invalidate_all().await;
306 Ok(())
307 }
308
309 pub(crate) async fn refresh(&mut self) -> Result<()> {
311 self.coordinator.refresh().await?;
312 self.runtime_cache.invalidate_all().await;
313 Ok(())
314 }
315
316 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
317 self.ensure_schema_state_valid().await?;
318 self.coordinator.resolve_snapshot_id(branch).await
319 }
320
321 pub(crate) async fn resolved_target(
322 &self,
323 target: impl Into<ReadTarget>,
324 ) -> Result<ResolvedTarget> {
325 self.ensure_schema_state_valid().await?;
326 self.coordinator.resolve_target(&target.into()).await
327 }
328
329 pub async fn diff_between(
332 &self,
333 from: impl Into<ReadTarget>,
334 to: impl Into<ReadTarget>,
335 filter: &crate::changes::ChangeFilter,
336 ) -> Result<crate::changes::ChangeSet> {
337 let from_resolved = self.resolved_target(from).await?;
338 let to_resolved = self.resolved_target(to).await?;
339 crate::changes::diff_snapshots(
340 self.uri(),
341 &from_resolved.snapshot,
342 &to_resolved.snapshot,
343 filter,
344 to_resolved.branch.clone().or(from_resolved.branch.clone()),
345 )
346 .await
347 }
348
349 pub async fn diff_commits(
352 &self,
353 from_commit_id: &str,
354 to_commit_id: &str,
355 filter: &crate::changes::ChangeFilter,
356 ) -> Result<crate::changes::ChangeSet> {
357 let from_commit = self
358 .coordinator
359 .resolve_commit(&SnapshotId::new(from_commit_id))
360 .await?;
361 let to_commit = self
362 .coordinator
363 .resolve_commit(&SnapshotId::new(to_commit_id))
364 .await?;
365 let from_snap = self
366 .coordinator
367 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
368 from_commit.graph_commit_id.clone(),
369 )))
370 .await?;
371 let to_snap = self
372 .coordinator
373 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
374 to_commit.graph_commit_id.clone(),
375 )))
376 .await?;
377 crate::changes::diff_snapshots(
378 self.uri(),
379 &from_snap.snapshot,
380 &to_snap.snapshot,
381 filter,
382 to_snap.branch.clone().or(from_snap.branch.clone()),
383 )
384 .await
385 }
386
387 pub async fn entity_at_target(
388 &self,
389 target: impl Into<ReadTarget>,
390 table_key: &str,
391 id: &str,
392 ) -> Result<Option<serde_json::Value>> {
393 export::entity_at_target(self, target, table_key, id).await
394 }
395
396 pub async fn entity_at(
398 &self,
399 table_key: &str,
400 id: &str,
401 version: u64,
402 ) -> Result<Option<serde_json::Value>> {
403 export::entity_at(self, table_key, id, version).await
404 }
405
406 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
408 self.ensure_schema_state_valid().await?;
409 self.coordinator.snapshot_at_version(version).await
410 }
411
412 pub async fn export_jsonl(
413 &self,
414 branch: &str,
415 type_names: &[String],
416 table_keys: &[String],
417 ) -> Result<String> {
418 export::export_jsonl(self, branch, type_names, table_keys).await
419 }
420
421 pub async fn export_jsonl_to_writer<W: Write>(
422 &self,
423 branch: &str,
424 type_names: &[String],
425 table_keys: &[String],
426 writer: &mut W,
427 ) -> Result<()> {
428 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
429 }
430
431 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
435 table_ops::graph_index(self).await
436 }
437
438 pub(crate) async fn graph_index_for_resolved(
439 &self,
440 resolved: &ResolvedTarget,
441 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
442 table_ops::graph_index_for_resolved(self, resolved).await
443 }
444
445 pub async fn ensure_indices(&mut self) -> Result<()> {
458 table_ops::ensure_indices(self).await
459 }
460
461 pub async fn ensure_indices_on(&mut self, branch: &str) -> Result<()> {
462 table_ops::ensure_indices_on(self, branch).await
463 }
464
465 pub async fn optimize(&mut self) -> Result<Vec<optimize::TableOptimizeStats>> {
468 optimize::optimize_all_tables(self).await
469 }
470
471 pub async fn cleanup(
475 &mut self,
476 options: optimize::CleanupPolicyOptions,
477 ) -> Result<Vec<optimize::TableCleanupStats>> {
478 optimize::cleanup_all_tables(self, options).await
479 }
480
481 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
491 self.ensure_schema_state_valid().await?;
492 let node_type = self
493 .catalog
494 .node_types
495 .get(type_name)
496 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
497 if !node_type.blob_properties.contains(property) {
498 return Err(OmniError::manifest(format!(
499 "property '{}' on type '{}' is not a Blob",
500 property, type_name
501 )));
502 }
503
504 let snapshot = self.snapshot();
505 let table_key = format!("node:{}", type_name);
506 let ds = snapshot.open(&table_key).await?;
507
508 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
509 let row_id = self
510 .table_store
511 .first_row_id_for_filter(&ds, &filter_sql)
512 .await?
513 .ok_or_else(|| {
514 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
515 })?;
516
517 let ds = Arc::new(ds);
519 let mut blobs = ds
520 .take_blobs(&[row_id], property)
521 .await
522 .map_err(|e| OmniError::Lance(e.to_string()))?;
523
524 blobs.pop().ok_or_else(|| {
525 OmniError::manifest(format!(
526 "blob '{}' on {} '{}' returned no data",
527 property, type_name, id
528 ))
529 })
530 }
531
532 pub(crate) fn active_branch(&self) -> Option<&str> {
533 self.coordinator.current_branch()
534 }
535
536 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
537 let descendants = self.coordinator.branch_descendants(branch).await?;
538 if let Some(descendant) = descendants.first() {
539 return Err(OmniError::manifest_conflict(format!(
540 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
541 branch, descendant
542 )));
543 }
544
545 for other_branch in branches
546 .iter()
547 .filter(|candidate| candidate.as_str() != branch)
548 {
549 let snapshot = self
550 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
551 .await?;
552 if snapshot
553 .entries()
554 .any(|entry| entry.table_branch.as_deref() == Some(branch))
555 {
556 return Err(OmniError::manifest_conflict(format!(
557 "cannot delete branch '{}' because branch '{}' still depends on it",
558 branch, other_branch
559 )));
560 }
561 }
562
563 Ok(())
564 }
565
566 async fn cleanup_deleted_branch_tables(
567 &self,
568 branch: &str,
569 owned_tables: &[(String, String)],
570 ) -> Result<()> {
571 let mut seen_paths = HashSet::new();
572 let mut cleanup_targets = owned_tables
573 .iter()
574 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
575 .cloned()
576 .collect::<Vec<_>>();
577 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
578
579 for (table_key, table_path) in cleanup_targets {
580 let dataset_uri = self.table_store.dataset_uri(&table_path);
581 if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
582 return Err(OmniError::manifest_internal(format!(
583 "branch '{}' was deleted but cleanup failed for {}: {}",
584 branch, table_key, err
585 )));
586 }
587 }
588
589 Ok(())
590 }
591
592 async fn delete_branch_storage_only(&mut self, branch: &str) -> Result<()> {
593 if self.coordinator.current_branch() == Some(branch) {
594 return Err(OmniError::manifest_conflict(format!(
595 "cannot delete currently active branch '{}'",
596 branch
597 )));
598 }
599
600 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
601 let owned_tables = branch_snapshot
602 .entries()
603 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
604 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
605 .collect::<Vec<_>>();
606
607 self.coordinator.branch_delete(branch).await?;
608 self.cleanup_deleted_branch_tables(branch, &owned_tables)
609 .await
610 }
611
612 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
613 normalize_branch_name(branch)
614 }
615
616 pub(crate) async fn head_commit_id_for_branch(
617 &self,
618 branch: Option<&str>,
619 ) -> Result<Option<String>> {
620 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
621 coordinator.ensure_commit_graph_initialized().await?;
622 coordinator
623 .head_commit_id()
624 .await
625 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
626 }
627
628 pub async fn branch_create(&mut self, name: &str) -> Result<()> {
629 self.ensure_schema_state_valid().await?;
630 self.ensure_schema_apply_idle("branch_create").await?;
631 ensure_public_branch_ref(name, "branch_create")?;
632 self.coordinator.branch_create(name).await
633 }
634
635 pub(crate) fn current_audit_actor(&self) -> Option<&str> {
636 self.audit_actor_id.as_deref()
637 }
638
639 pub async fn branch_create_from(
640 &mut self,
641 from: impl Into<ReadTarget>,
642 name: &str,
643 ) -> Result<()> {
644 self.ensure_schema_apply_idle("branch_create_from").await?;
645 self.branch_create_from_impl(from, name, false).await
646 }
647
648 async fn branch_create_from_impl(
649 &mut self,
650 from: impl Into<ReadTarget>,
651 name: &str,
652 allow_internal_refs: bool,
653 ) -> Result<()> {
654 let target = from.into();
655 let ReadTarget::Branch(branch_name) = target else {
656 return Err(OmniError::manifest(
657 "branch creation from pinned snapshots is not supported yet".to_string(),
658 ));
659 };
660 if !allow_internal_refs {
661 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
662 ensure_public_branch_ref(name, "branch_create_from")?;
663 }
664 let branch = normalize_branch_name(&branch_name)?;
665 let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?;
666 let result = self.coordinator.branch_create(name).await;
667 self.restore_coordinator(previous);
668 result
669 }
670
671 pub async fn branch_list(&self) -> Result<Vec<String>> {
672 self.ensure_schema_state_valid().await?;
673 self.coordinator.branch_list().await
674 }
675
676 pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
677 self.ensure_schema_state_valid().await?;
678 self.ensure_schema_apply_idle("branch_delete").await?;
679 ensure_public_branch_ref(name, "branch_delete")?;
680 self.refresh().await?;
681 let branch = normalize_branch_name(name)?
682 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
683 let branches = self.coordinator.branch_list().await?;
684 if !branches.iter().any(|candidate| candidate == &branch) {
685 return Err(OmniError::manifest_not_found(format!(
686 "branch '{}' not found",
687 branch
688 )));
689 }
690
691 self.ensure_branch_delete_safe(&branch, &branches).await?;
692 self.delete_branch_storage_only(&branch).await
693 }
694
695 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
696 self.ensure_schema_state_valid().await?;
697 self.coordinator
698 .resolve_commit(&SnapshotId::new(commit_id))
699 .await
700 }
701
702 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
703 self.ensure_schema_state_valid().await?;
704 let branch = match branch {
705 Some(branch) => normalize_branch_name(branch)?,
706 None => None,
707 };
708 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
709 coordinator.list_commits().await
710 }
711
712 pub(crate) async fn open_for_mutation(
718 &self,
719 table_key: &str,
720 ) -> Result<(Dataset, String, Option<String>)> {
721 table_ops::open_for_mutation(self, table_key).await
722 }
723
724 pub(crate) async fn open_for_mutation_on_branch(
725 &self,
726 branch: Option<&str>,
727 table_key: &str,
728 ) -> Result<(Dataset, String, Option<String>)> {
729 table_ops::open_for_mutation_on_branch(self, branch, table_key).await
730 }
731
732 pub(crate) async fn fork_dataset_from_entry_state(
733 &self,
734 table_key: &str,
735 full_path: &str,
736 source_branch: Option<&str>,
737 source_version: u64,
738 active_branch: &str,
739 ) -> Result<Dataset> {
740 table_ops::fork_dataset_from_entry_state(
741 self,
742 table_key,
743 full_path,
744 source_branch,
745 source_version,
746 active_branch,
747 )
748 .await
749 }
750
751 pub(crate) async fn reopen_for_mutation(
752 &self,
753 table_key: &str,
754 full_path: &str,
755 table_branch: Option<&str>,
756 expected_version: u64,
757 ) -> Result<Dataset> {
758 table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version)
759 .await
760 }
761
762 pub(crate) async fn open_dataset_at_state(
763 &self,
764 table_path: &str,
765 table_branch: Option<&str>,
766 table_version: u64,
767 ) -> Result<Dataset> {
768 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
769 }
770
771 pub(crate) async fn build_indices_on_dataset(
772 &self,
773 table_key: &str,
774 ds: &mut Dataset,
775 ) -> Result<()> {
776 table_ops::build_indices_on_dataset(self, table_key, ds).await
777 }
778
779 pub(crate) async fn build_indices_on_dataset_for_catalog(
780 &self,
781 catalog: &Catalog,
782 table_key: &str,
783 ds: &mut Dataset,
784 ) -> Result<()> {
785 table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
786 }
787
788 #[cfg(test)]
791 pub(crate) async fn commit_updates(
792 &mut self,
793 updates: &[crate::db::SubTableUpdate],
794 ) -> Result<u64> {
795 table_ops::commit_updates(self, updates).await
796 }
797
798 pub(crate) async fn commit_manifest_updates(
799 &mut self,
800 updates: &[crate::db::SubTableUpdate],
801 ) -> Result<u64> {
802 table_ops::commit_manifest_updates(self, updates).await
803 }
804
805 pub(crate) async fn record_merge_commit(
806 &mut self,
807 manifest_version: u64,
808 parent_commit_id: &str,
809 merged_parent_commit_id: &str,
810 ) -> Result<String> {
811 table_ops::record_merge_commit(
812 self,
813 manifest_version,
814 parent_commit_id,
815 merged_parent_commit_id,
816 )
817 .await
818 }
819
820 pub(crate) async fn commit_updates_on_branch_with_expected(
821 &mut self,
822 branch: Option<&str>,
823 updates: &[crate::db::SubTableUpdate],
824 expected_table_versions: &std::collections::HashMap<String, u64>,
825 ) -> Result<u64> {
826 table_ops::commit_updates_on_branch_with_expected(
827 self,
828 branch,
829 updates,
830 expected_table_versions,
831 )
832 .await
833 }
834
835 pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
836 table_ops::ensure_commit_graph_initialized(self).await
837 }
838
839 pub(crate) async fn invalidate_graph_index(&self) {
841 table_ops::invalidate_graph_index(self).await
842 }
843
844}
845
846pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
847 let branch = branch.trim();
848 if branch.is_empty() {
849 return Err(OmniError::manifest(
850 "branch name cannot be empty".to_string(),
851 ));
852 }
853 if branch == "main" {
854 return Ok(None);
855 }
856 Ok(Some(branch.to_string()))
857}
858
859pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
860 if super::is_internal_run_branch(branch) {
861 return Err(OmniError::manifest(format!(
862 "{} does not allow internal run ref '{}'",
863 operation, branch
864 )));
865 }
866 if is_internal_system_branch(branch) {
867 return Err(OmniError::manifest(format!(
868 "{} does not allow internal system ref '{}'",
869 operation, branch
870 )));
871 }
872 Ok(())
873}
874
875fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
876 if batches.is_empty() {
877 return Ok(RecordBatch::new_empty(schema));
878 }
879 if batches.len() == 1 {
880 return Ok(batches.into_iter().next().unwrap());
881 }
882 let batch_schema = batches[0].schema();
883 arrow_select::concat::concat_batches(&batch_schema, &batches)
884 .map_err(|e| OmniError::Lance(e.to_string()))
885}
886
887fn blob_properties_for_table_key<'a>(
888 catalog: &'a Catalog,
889 table_key: &str,
890) -> Result<&'a std::collections::HashSet<String>> {
891 if let Some(type_name) = table_key.strip_prefix("node:") {
892 return catalog
893 .node_types
894 .get(type_name)
895 .map(|node_type| &node_type.blob_properties)
896 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
897 }
898 if let Some(type_name) = table_key.strip_prefix("edge:") {
899 return catalog
900 .edge_types
901 .get(type_name)
902 .map(|edge_type| &edge_type.blob_properties)
903 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
904 }
905 Err(OmniError::manifest(format!(
906 "invalid table key '{}'",
907 table_key
908 )))
909}
910
911fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
912 if descriptions.is_null(row) {
913 return Ok(true);
914 }
915
916 let kind = descriptions
917 .column_by_name("kind")
918 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
919 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
920 .or_else(|| {
921 descriptions
922 .column_by_name("kind")
923 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
924 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
925 });
926 let position = descriptions
927 .column_by_name("position")
928 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
929 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
930 let size = descriptions
931 .column_by_name("size")
932 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
933 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
934 let blob_uri = descriptions
935 .column_by_name("blob_uri")
936 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
937 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
938
939 let Some(kind) = kind else {
940 return Ok(true);
941 };
942 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
943 if kind != BlobKind::Inline {
944 return Ok(false);
945 }
946
947 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
948}
949
950fn fixup_blob_schemas(catalog: &mut Catalog) {
956 for node_type in catalog.node_types.values_mut() {
957 if node_type.blob_properties.is_empty() {
958 continue;
959 }
960 let fields: Vec<Field> = node_type
961 .arrow_schema
962 .fields()
963 .iter()
964 .map(|f| {
965 if node_type.blob_properties.contains(f.name()) {
966 blob_field(f.name(), f.is_nullable())
967 } else {
968 f.as_ref().clone()
969 }
970 })
971 .collect();
972 node_type.arrow_schema = Arc::new(Schema::new(fields));
973 }
974 for edge_type in catalog.edge_types.values_mut() {
975 if edge_type.blob_properties.is_empty() {
976 continue;
977 }
978 let fields: Vec<Field> = edge_type
979 .arrow_schema
980 .fields()
981 .iter()
982 .map(|f| {
983 if edge_type.blob_properties.contains(f.name()) {
984 blob_field(f.name(), f.is_nullable())
985 } else {
986 f.as_ref().clone()
987 }
988 })
989 .collect();
990 edge_type.arrow_schema = Arc::new(Schema::new(fields));
991 }
992}
993
994fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
995 let schema_ast = parse_schema(schema_source)?;
996 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
997}
998
999fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1000 match type_kind {
1001 SchemaTypeKind::Node => format!("node:{}", name),
1002 SchemaTypeKind::Edge => format!("edge:{}", name),
1003 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1004 }
1005}
1006
1007fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1008 if let Some(type_name) = table_key.strip_prefix("node:") {
1009 let node_type: &NodeType = catalog
1010 .node_types
1011 .get(type_name)
1012 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1013 return Ok(node_type.arrow_schema.clone());
1014 }
1015 if let Some(type_name) = table_key.strip_prefix("edge:") {
1016 let edge_type: &EdgeType = catalog
1017 .edge_types
1018 .get(type_name)
1019 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1020 return Ok(edge_type.arrow_schema.clone());
1021 }
1022 Err(OmniError::manifest(format!(
1023 "invalid table key '{}'",
1024 table_key
1025 )))
1026}
1027
1028fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1029 let mut obj = serde_json::Map::new();
1030 for (i, field) in batch.schema().fields().iter().enumerate() {
1031 obj.insert(
1032 field.name().clone(),
1033 json_value_from_array(batch.column(i).as_ref(), row)?,
1034 );
1035 }
1036 Ok(serde_json::Value::Object(obj))
1037}
1038
1039fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1040 if array.is_null(row) {
1041 return Ok(serde_json::Value::Null);
1042 }
1043
1044 match array.data_type() {
1045 DataType::Utf8 => Ok(serde_json::Value::String(
1046 array
1047 .as_any()
1048 .downcast_ref::<StringArray>()
1049 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1050 .value(row)
1051 .to_string(),
1052 )),
1053 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1054 array
1055 .as_any()
1056 .downcast_ref::<LargeStringArray>()
1057 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1058 .value(row)
1059 .to_string(),
1060 )),
1061 DataType::Boolean => Ok(serde_json::Value::Bool(
1062 array
1063 .as_any()
1064 .downcast_ref::<BooleanArray>()
1065 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1066 .value(row),
1067 )),
1068 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1069 array
1070 .as_any()
1071 .downcast_ref::<Int32Array>()
1072 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1073 .value(row),
1074 ))),
1075 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1076 array
1077 .as_any()
1078 .downcast_ref::<Int64Array>()
1079 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1080 .value(row),
1081 ))),
1082 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1083 array
1084 .as_any()
1085 .downcast_ref::<UInt32Array>()
1086 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1087 .value(row),
1088 ))),
1089 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1090 array
1091 .as_any()
1092 .downcast_ref::<UInt64Array>()
1093 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1094 .value(row),
1095 ))),
1096 DataType::Float32 => {
1097 let value = array
1098 .as_any()
1099 .downcast_ref::<Float32Array>()
1100 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1101 .value(row) as f64;
1102 Ok(serde_json::Value::Number(
1103 serde_json::Number::from_f64(value).ok_or_else(|| {
1104 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1105 })?,
1106 ))
1107 }
1108 DataType::Float64 => {
1109 let value = array
1110 .as_any()
1111 .downcast_ref::<Float64Array>()
1112 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1113 .value(row);
1114 Ok(serde_json::Value::Number(
1115 serde_json::Number::from_f64(value).ok_or_else(|| {
1116 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1117 })?,
1118 ))
1119 }
1120 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1121 array
1122 .as_any()
1123 .downcast_ref::<Date32Array>()
1124 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1125 .value(row),
1126 ))),
1127 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1128 &base64::engine::general_purpose::STANDARD,
1129 array
1130 .as_any()
1131 .downcast_ref::<BinaryArray>()
1132 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1133 .value(row),
1134 ))),
1135 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1136 &base64::engine::general_purpose::STANDARD,
1137 array
1138 .as_any()
1139 .downcast_ref::<LargeBinaryArray>()
1140 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1141 .value(row),
1142 ))),
1143 DataType::List(_) => {
1144 let list = array
1145 .as_any()
1146 .downcast_ref::<ListArray>()
1147 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1148 let values = list.value(row);
1149 let mut out = Vec::with_capacity(values.len());
1150 for idx in 0..values.len() {
1151 out.push(json_value_from_array(values.as_ref(), idx)?);
1152 }
1153 Ok(serde_json::Value::Array(out))
1154 }
1155 DataType::LargeList(_) => {
1156 let list = array
1157 .as_any()
1158 .downcast_ref::<LargeListArray>()
1159 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1160 let values = list.value(row);
1161 let mut out = Vec::with_capacity(values.len());
1162 for idx in 0..values.len() {
1163 out.push(json_value_from_array(values.as_ref(), idx)?);
1164 }
1165 Ok(serde_json::Value::Array(out))
1166 }
1167 DataType::FixedSizeList(_, _) => {
1168 let list = array
1169 .as_any()
1170 .downcast_ref::<FixedSizeListArray>()
1171 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1172 let values = list.value(row);
1173 let mut out = Vec::with_capacity(values.len());
1174 for idx in 0..values.len() {
1175 out.push(json_value_from_array(values.as_ref(), idx)?);
1176 }
1177 Ok(serde_json::Value::Array(out))
1178 }
1179 DataType::Struct(fields) => {
1180 let struct_array = array
1181 .as_any()
1182 .downcast_ref::<StructArray>()
1183 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1184 let mut obj = serde_json::Map::new();
1185 for (field_idx, field) in fields.iter().enumerate() {
1186 obj.insert(
1187 field.name().clone(),
1188 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1189 );
1190 }
1191 Ok(serde_json::Value::Object(obj))
1192 }
1193 _ => {
1194 let value = arrow_cast::display::array_value_to_string(array, row)
1195 .map_err(|e| OmniError::Lance(e.to_string()))?;
1196 Ok(serde_json::Value::String(value))
1197 }
1198 }
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203 use super::*;
1204 use crate::db::is_internal_run_branch;
1205 use crate::db::manifest::ManifestCoordinator;
1206 use async_trait::async_trait;
1207 use serde_json::Value;
1208 use std::sync::Mutex;
1209
1210 use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1211
1212 const TEST_SCHEMA: &str = r#"
1213node Person {
1214 name: String @key
1215 age: I32?
1216}
1217node Company {
1218 name: String @key
1219}
1220edge Knows: Person -> Person {
1221 since: Date?
1222}
1223edge WorksAt: Person -> Company
1224"#;
1225
1226 #[derive(Debug, Default)]
1227 struct RecordingStorageAdapter {
1228 inner: LocalStorageAdapter,
1229 reads: Mutex<Vec<String>>,
1230 writes: Mutex<Vec<String>>,
1231 exists_checks: Mutex<Vec<String>>,
1232 renames: Mutex<Vec<(String, String)>>,
1233 deletes: Mutex<Vec<String>>,
1234 }
1235
1236 impl RecordingStorageAdapter {
1237 fn reads(&self) -> Vec<String> {
1238 self.reads.lock().unwrap().clone()
1239 }
1240
1241 fn writes(&self) -> Vec<String> {
1242 self.writes.lock().unwrap().clone()
1243 }
1244
1245 fn exists_checks(&self) -> Vec<String> {
1246 self.exists_checks.lock().unwrap().clone()
1247 }
1248 }
1249
1250 #[async_trait]
1251 impl StorageAdapter for RecordingStorageAdapter {
1252 async fn read_text(&self, uri: &str) -> Result<String> {
1253 self.reads.lock().unwrap().push(uri.to_string());
1254 self.inner.read_text(uri).await
1255 }
1256
1257 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1258 self.writes.lock().unwrap().push(uri.to_string());
1259 self.inner.write_text(uri, contents).await
1260 }
1261
1262 async fn exists(&self, uri: &str) -> Result<bool> {
1263 self.exists_checks.lock().unwrap().push(uri.to_string());
1264 self.inner.exists(uri).await
1265 }
1266
1267 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
1268 self.renames
1269 .lock()
1270 .unwrap()
1271 .push((from_uri.to_string(), to_uri.to_string()));
1272 self.inner.rename_text(from_uri, to_uri).await
1273 }
1274
1275 async fn delete(&self, uri: &str) -> Result<()> {
1276 self.deletes.lock().unwrap().push(uri.to_string());
1277 self.inner.delete(uri).await
1278 }
1279 }
1280
1281 #[tokio::test]
1282 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1283 let dir = tempfile::tempdir().unwrap();
1284 let uri = dir.path().to_str().unwrap();
1285 let adapter = Arc::new(RecordingStorageAdapter::default());
1286
1287 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1288 .await
1289 .unwrap();
1290 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1291 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1292 assert!(
1293 adapter
1294 .writes()
1295 .contains(&join_uri(uri, "__schema_state.json"))
1296 );
1297
1298 Omnigraph::open_with_storage(uri, adapter.clone())
1299 .await
1300 .unwrap();
1301 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1302 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1303 assert!(
1304 adapter
1305 .reads()
1306 .contains(&join_uri(uri, "__schema_state.json"))
1307 );
1308 assert!(
1309 adapter
1310 .exists_checks()
1311 .contains(&join_uri(uri, "_schema.ir.json"))
1312 );
1313 assert!(
1314 adapter
1315 .exists_checks()
1316 .contains(&join_uri(uri, "__schema_state.json"))
1317 );
1318 assert!(
1319 adapter
1320 .exists_checks()
1321 .contains(&join_uri(uri, "_graph_commits.lance"))
1322 );
1323 }
1324
1325 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1326 let snapshot = db.snapshot();
1327 let ds = snapshot.open(table_key).await.unwrap();
1328 let batches = db.table_store().scan_batches(&ds).await.unwrap();
1329 batches
1330 .into_iter()
1331 .flat_map(|batch| {
1332 (0..batch.num_rows())
1333 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1334 .collect::<Vec<_>>()
1335 })
1336 .collect()
1337 }
1338
1339 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1340 let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
1341 let schema: Arc<Schema> = Arc::new(ds.schema().into());
1342 let columns: Vec<Arc<dyn Array>> = schema
1343 .fields()
1344 .iter()
1345 .map(|field| match field.name().as_str() {
1346 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1347 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1348 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1349 _ => new_null_array(field.data_type(), 1),
1350 })
1351 .collect();
1352 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1353 let state = db
1354 .table_store()
1355 .append_batch(&full_path, &mut ds, batch)
1356 .await
1357 .unwrap();
1358 db.commit_updates(&[crate::db::SubTableUpdate {
1359 table_key: "node:Person".to_string(),
1360 table_version: state.version,
1361 table_branch,
1362 row_count: state.row_count,
1363 version_metadata: state.version_metadata,
1364 }])
1365 .await
1366 .unwrap();
1367 }
1368
1369 #[tokio::test]
1370 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1371 let dir = tempfile::tempdir().unwrap();
1372 let uri = dir.path().to_str().unwrap();
1373 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1374 seed_person_row(&mut db, "Alice", Some(30)).await;
1375
1376 let desired = TEST_SCHEMA.replace(
1377 " age: I32?\n}",
1378 " age: I32?\n nickname: String?\n}",
1379 );
1380 let result = db.apply_schema(&desired).await.unwrap();
1381 assert!(result.applied);
1382
1383 let reopened = Omnigraph::open(uri).await.unwrap();
1384 let rows = table_rows_json(&reopened, "node:Person").await;
1385 assert_eq!(rows.len(), 1);
1386 assert_eq!(rows[0]["name"], "Alice");
1387 assert_eq!(rows[0]["age"], 30);
1388 assert!(rows[0]["nickname"].is_null());
1389 assert!(
1390 reopened.catalog().node_types["Person"]
1391 .properties
1392 .contains_key("nickname")
1393 );
1394 assert!(dir.path().join("_schema.pg").exists());
1395 }
1396
1397 #[tokio::test]
1398 async fn test_apply_schema_renames_property_and_preserves_values() {
1399 let dir = tempfile::tempdir().unwrap();
1400 let uri = dir.path().to_str().unwrap();
1401 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1402 seed_person_row(&mut db, "Alice", Some(30)).await;
1403
1404 let desired = TEST_SCHEMA.replace(
1405 " age: I32?\n}",
1406 " years: I32? @rename_from(\"age\")\n}",
1407 );
1408 db.apply_schema(&desired).await.unwrap();
1409
1410 let reopened = Omnigraph::open(uri).await.unwrap();
1411 let rows = table_rows_json(&reopened, "node:Person").await;
1412 assert_eq!(rows[0]["name"], "Alice");
1413 assert_eq!(rows[0]["years"], 30);
1414 assert!(rows[0].get("age").is_none());
1415 }
1416
1417 #[tokio::test]
1418 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1419 let dir = tempfile::tempdir().unwrap();
1420 let uri = dir.path().to_str().unwrap();
1421 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1422 seed_person_row(&mut db, "Alice", Some(30)).await;
1423 let before_version = db.snapshot().version();
1424
1425 let desired = TEST_SCHEMA
1426 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1427 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1428 .replace(
1429 "edge WorksAt: Person -> Company",
1430 "edge WorksAt: Human -> Company",
1431 );
1432 db.apply_schema(&desired).await.unwrap();
1433
1434 let head = db.snapshot();
1435 assert!(head.entry("node:Person").is_none());
1436 assert!(head.entry("node:Human").is_some());
1437 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1438 .await
1439 .unwrap();
1440 assert!(historical.entry("node:Person").is_some());
1441 assert!(historical.entry("node:Human").is_none());
1442 }
1443
1444 #[tokio::test]
1445 async fn test_apply_schema_succeeds_after_load() {
1446 let dir = tempfile::tempdir().unwrap();
1453 let uri = dir.path().to_str().unwrap();
1454 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1455
1456 crate::loader::load_jsonl(
1457 &mut db,
1458 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
1459 crate::loader::LoadMode::Overwrite,
1460 )
1461 .await
1462 .unwrap();
1463
1464 let all_branches = db.coordinator.all_branches().await.unwrap();
1465 assert!(
1466 !all_branches.iter().any(|b| is_internal_run_branch(b)),
1467 "run branch should be deleted after publish, got: {:?}",
1468 all_branches
1469 );
1470
1471 let desired = TEST_SCHEMA.replace(
1472 " age: I32?\n}",
1473 " age: I32?\n nickname: String?\n}",
1474 );
1475 let result = db.apply_schema(&desired).await.unwrap();
1476 assert!(result.applied, "schema apply should have applied");
1477 }
1478
1479 #[tokio::test]
1480 async fn test_apply_schema_adds_index_for_existing_property() {
1481 let dir = tempfile::tempdir().unwrap();
1482 let uri = dir.path().to_str().unwrap();
1483 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1484
1485 let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1486 db.apply_schema(&desired).await.unwrap();
1487
1488 let snapshot = db.snapshot();
1489 let ds = snapshot.open("node:Person").await.unwrap();
1490 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1491 }
1492
1493 #[tokio::test]
1494 async fn test_apply_schema_rewrite_preserves_existing_indices() {
1495 let dir = tempfile::tempdir().unwrap();
1496 let uri = dir.path().to_str().unwrap();
1497 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1498 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1499 seed_person_row(&mut db, "Alice", Some(30)).await;
1500
1501 let desired = initial_schema.replace(
1502 " age: I32?\n}",
1503 " age: I32?\n nickname: String?\n}",
1504 );
1505 db.apply_schema(&desired).await.unwrap();
1506
1507 let snapshot = db.snapshot();
1508 let ds = snapshot.open("node:Person").await.unwrap();
1509 assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1510 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1511 }
1512
1513 #[tokio::test]
1514 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1515 let dir = tempfile::tempdir().unwrap();
1516 let uri = dir.path().to_str().unwrap();
1517 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1518 let mut db = db;
1519 db.coordinator
1520 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1521 .await
1522 .unwrap();
1523
1524 let err = db.open_for_mutation("node:Person").await.unwrap_err();
1525 assert!(
1526 err.to_string()
1527 .contains("write is unavailable while schema apply is in progress")
1528 );
1529 }
1530
1531 #[tokio::test]
1532 async fn test_commit_updates_rejects_while_schema_apply_locked() {
1533 let dir = tempfile::tempdir().unwrap();
1534 let uri = dir.path().to_str().unwrap();
1535 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1536 db.coordinator
1537 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1538 .await
1539 .unwrap();
1540
1541 let err = db.commit_updates(&[]).await.unwrap_err();
1542 assert!(
1543 err.to_string()
1544 .contains("write commit is unavailable while schema apply is in progress")
1545 );
1546 }
1547
1548 #[tokio::test]
1549 async fn test_branch_list_hides_schema_apply_lock_branch() {
1550 let dir = tempfile::tempdir().unwrap();
1551 let uri = dir.path().to_str().unwrap();
1552 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1553 db.coordinator
1554 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1555 .await
1556 .unwrap();
1557
1558 let branches = db.branch_list().await.unwrap();
1559 assert_eq!(branches, vec!["main".to_string()]);
1560 }
1561}