1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arrow_array::{
6 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
7 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
8 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
9};
10use arrow_schema::{DataType, Field, Schema};
11use lance::Dataset;
12use lance::blob::{BlobArrayBuilder, blob_field};
13use lance::dataset::BlobFile;
14use lance::dataset::scanner::ColumnOrdering;
15use lance::datatypes::BlobKind;
16use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
17use omnigraph_compiler::schema::parser::parse_schema;
18use omnigraph_compiler::types::ScalarType;
19use omnigraph_compiler::{
20 SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
21 build_schema_ir, plan_schema_migration,
22};
23
24use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
25use crate::db::run_registry::{RunRecord, RunStatus};
26use crate::error::{ManifestErrorKind, OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod schema_apply;
33mod table_ops;
34
35use super::commit_graph::GraphCommit;
36use super::manifest::{
37 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
38 table_path_for_table_key,
39};
40use super::schema_state::{
41 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
42 validate_schema_contract, write_schema_contract,
43};
44use super::{
45 ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId,
46 is_internal_system_branch, is_schema_apply_lock_branch,
47};
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum MergeOutcome {
51 AlreadyUpToDate,
52 FastForward,
53 Merged,
54}
55
56#[derive(Debug, Clone)]
57pub struct SchemaApplyResult {
58 pub supported: bool,
59 pub applied: bool,
60 pub manifest_version: u64,
61 pub steps: Vec<SchemaMigrationStep>,
62}
63
64pub struct Omnigraph {
70 root_uri: String,
71 storage: Arc<dyn StorageAdapter>,
72 coordinator: GraphCoordinator,
73 table_store: TableStore,
74 runtime_cache: RuntimeCache,
75 catalog: Catalog,
76 schema_source: String,
77 pub(crate) audit_actor_id: Option<String>,
78}
79
80impl Omnigraph {
81 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
85 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
86 }
87
88 pub(crate) async fn init_with_storage(
89 uri: &str,
90 schema_source: &str,
91 storage: Arc<dyn StorageAdapter>,
92 ) -> Result<Self> {
93 let root = normalize_root_uri(uri)?;
94 let schema_ir = read_schema_ir_from_source(schema_source)?;
95 let mut catalog = build_catalog_from_ir(&schema_ir)?;
96 fixup_blob_schemas(&mut catalog);
97
98 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
100 storage.write_text(&schema_path, schema_source).await?;
101 write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
102
103 let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
105
106 Ok(Self {
107 root_uri: root.clone(),
108 storage,
109 coordinator,
110 table_store: TableStore::new(&root),
111 runtime_cache: RuntimeCache::default(),
112 catalog,
113 schema_source: schema_source.to_string(),
114 audit_actor_id: None,
115 })
116 }
117
118 pub async fn open(uri: &str) -> Result<Self> {
122 Self::open_with_storage(uri, storage_for_uri(uri)?).await
123 }
124
125 pub(crate) async fn open_with_storage(
126 uri: &str,
127 storage: Arc<dyn StorageAdapter>,
128 ) -> Result<Self> {
129 let root = normalize_root_uri(uri)?;
130 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
132 let schema_source = storage.read_text(&schema_path).await?;
133 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
134 let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
135 let branches = coordinator.branch_list().await?;
136 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
137 &root,
138 Arc::clone(&storage),
139 &branches,
140 ¤t_source_ir,
141 )
142 .await?;
143 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
144 fixup_blob_schemas(&mut catalog);
145
146 Ok(Self {
147 root_uri: root.clone(),
148 storage,
149 coordinator,
150 table_store: TableStore::new(&root),
151 runtime_cache: RuntimeCache::default(),
152 catalog,
153 schema_source,
154 audit_actor_id: None,
155 })
156 }
157
158 pub fn catalog(&self) -> &Catalog {
159 &self.catalog
160 }
161
162 pub fn schema_source(&self) -> &str {
163 &self.schema_source
164 }
165
166 pub fn uri(&self) -> &str {
167 &self.root_uri
168 }
169
170 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
171 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
172 }
173
174 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
175 schema_apply::plan_schema(self, desired_schema_source).await
176 }
177
178 pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
179 schema_apply::apply_schema(self, desired_schema_source).await
180 }
181
182 pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
183 schema_apply::ensure_schema_apply_idle(self, operation).await
184 }
185
186 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
187 schema_apply::ensure_schema_apply_not_locked(self, operation).await
188 }
189
190 pub(crate) fn table_store(&self) -> &TableStore {
191 &self.table_store
192 }
193
194 pub(crate) async fn open_coordinator_for_branch(
195 &self,
196 branch: Option<&str>,
197 ) -> Result<GraphCoordinator> {
198 match branch {
199 Some(branch) => {
200 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
201 }
202 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
203 }
204 }
205
206 pub(crate) async fn swap_coordinator_for_branch(
207 &mut self,
208 branch: Option<&str>,
209 ) -> Result<GraphCoordinator> {
210 let next = self.open_coordinator_for_branch(branch).await?;
211 Ok(std::mem::replace(&mut self.coordinator, next))
212 }
213
214 pub(crate) fn restore_coordinator(&mut self, coordinator: GraphCoordinator) {
215 self.coordinator = coordinator;
216 }
217
218 pub(crate) async fn resolved_branch_target(
219 &self,
220 branch: Option<&str>,
221 ) -> Result<ResolvedTarget> {
222 self.ensure_schema_state_valid().await?;
223 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
224 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
225 if normalized.as_deref() == self.coordinator.current_branch() {
226 let snapshot_id = self.coordinator.head_commit_id().await?.unwrap_or_else(|| {
227 SnapshotId::synthetic(
228 self.coordinator.current_branch(),
229 self.coordinator.version(),
230 )
231 });
232 return Ok(ResolvedTarget {
233 requested,
234 branch: self.coordinator.current_branch().map(str::to_string),
235 snapshot_id,
236 snapshot: self.coordinator.snapshot(),
237 });
238 }
239 self.coordinator.resolve_target(&requested).await
240 }
241
242 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
243 self.resolved_branch_target(branch)
244 .await
245 .map(|resolved| resolved.snapshot)
246 }
247
248 pub(crate) fn version(&self) -> u64 {
249 self.coordinator.version()
250 }
251
252 pub(crate) fn snapshot(&self) -> Snapshot {
254 self.coordinator.snapshot()
255 }
256
257 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
258 self.resolved_target(target)
259 .await
260 .map(|resolved| resolved.snapshot)
261 }
262
263 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
264 self.snapshot_of(target)
265 .await
266 .map(|snapshot| snapshot.version())
267 }
268
269 pub async fn resolved_branch_of(
270 &self,
271 target: impl Into<ReadTarget>,
272 ) -> Result<Option<String>> {
273 self.resolved_target(target)
274 .await
275 .map(|resolved| resolved.branch)
276 }
277
278 pub async fn sync_branch(&mut self, branch: &str) -> Result<()> {
280 self.ensure_schema_state_valid().await?;
281 let branch = normalize_branch_name(branch)?;
282 self.coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
283 self.runtime_cache.invalidate_all().await;
284 Ok(())
285 }
286
287 pub(crate) async fn refresh(&mut self) -> Result<()> {
289 self.coordinator.refresh().await?;
290 self.runtime_cache.invalidate_all().await;
291 Ok(())
292 }
293
294 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
295 self.ensure_schema_state_valid().await?;
296 self.coordinator.resolve_snapshot_id(branch).await
297 }
298
299 pub(crate) async fn resolved_target(
300 &self,
301 target: impl Into<ReadTarget>,
302 ) -> Result<ResolvedTarget> {
303 self.ensure_schema_state_valid().await?;
304 self.coordinator.resolve_target(&target.into()).await
305 }
306
307 pub async fn diff_between(
310 &self,
311 from: impl Into<ReadTarget>,
312 to: impl Into<ReadTarget>,
313 filter: &crate::changes::ChangeFilter,
314 ) -> Result<crate::changes::ChangeSet> {
315 let from_resolved = self.resolved_target(from).await?;
316 let to_resolved = self.resolved_target(to).await?;
317 crate::changes::diff_snapshots(
318 self.uri(),
319 &from_resolved.snapshot,
320 &to_resolved.snapshot,
321 filter,
322 to_resolved.branch.clone().or(from_resolved.branch.clone()),
323 )
324 .await
325 }
326
327 pub async fn diff_commits(
330 &self,
331 from_commit_id: &str,
332 to_commit_id: &str,
333 filter: &crate::changes::ChangeFilter,
334 ) -> Result<crate::changes::ChangeSet> {
335 let from_commit = self
336 .coordinator
337 .resolve_commit(&SnapshotId::new(from_commit_id))
338 .await?;
339 let to_commit = self
340 .coordinator
341 .resolve_commit(&SnapshotId::new(to_commit_id))
342 .await?;
343 let from_snap = self
344 .coordinator
345 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
346 from_commit.graph_commit_id.clone(),
347 )))
348 .await?;
349 let to_snap = self
350 .coordinator
351 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
352 to_commit.graph_commit_id.clone(),
353 )))
354 .await?;
355 crate::changes::diff_snapshots(
356 self.uri(),
357 &from_snap.snapshot,
358 &to_snap.snapshot,
359 filter,
360 to_snap.branch.clone().or(from_snap.branch.clone()),
361 )
362 .await
363 }
364
365 pub async fn entity_at_target(
366 &self,
367 target: impl Into<ReadTarget>,
368 table_key: &str,
369 id: &str,
370 ) -> Result<Option<serde_json::Value>> {
371 export::entity_at_target(self, target, table_key, id).await
372 }
373
374 pub async fn entity_at(
376 &self,
377 table_key: &str,
378 id: &str,
379 version: u64,
380 ) -> Result<Option<serde_json::Value>> {
381 export::entity_at(self, table_key, id, version).await
382 }
383
384 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
386 self.ensure_schema_state_valid().await?;
387 self.coordinator.snapshot_at_version(version).await
388 }
389
390 pub async fn export_jsonl(
391 &self,
392 branch: &str,
393 type_names: &[String],
394 table_keys: &[String],
395 ) -> Result<String> {
396 export::export_jsonl(self, branch, type_names, table_keys).await
397 }
398
399 pub async fn export_jsonl_to_writer<W: Write>(
400 &self,
401 branch: &str,
402 type_names: &[String],
403 table_keys: &[String],
404 writer: &mut W,
405 ) -> Result<()> {
406 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
407 }
408
409 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
413 table_ops::graph_index(self).await
414 }
415
416 pub(crate) async fn graph_index_for_resolved(
417 &self,
418 resolved: &ResolvedTarget,
419 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
420 table_ops::graph_index_for_resolved(self, resolved).await
421 }
422
423 pub async fn ensure_indices(&mut self) -> Result<()> {
436 table_ops::ensure_indices(self).await
437 }
438
439 pub async fn ensure_indices_on(&mut self, branch: &str) -> Result<()> {
440 table_ops::ensure_indices_on(self, branch).await
441 }
442
443 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
453 self.ensure_schema_state_valid().await?;
454 let node_type = self
455 .catalog
456 .node_types
457 .get(type_name)
458 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
459 if !node_type.blob_properties.contains(property) {
460 return Err(OmniError::manifest(format!(
461 "property '{}' on type '{}' is not a Blob",
462 property, type_name
463 )));
464 }
465
466 let snapshot = self.snapshot();
467 let table_key = format!("node:{}", type_name);
468 let ds = snapshot.open(&table_key).await?;
469
470 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
471 let row_id = self
472 .table_store
473 .first_row_id_for_filter(&ds, &filter_sql)
474 .await?
475 .ok_or_else(|| {
476 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
477 })?;
478
479 let ds = Arc::new(ds);
481 let mut blobs = ds
482 .take_blobs(&[row_id], property)
483 .await
484 .map_err(|e| OmniError::Lance(e.to_string()))?;
485
486 blobs.pop().ok_or_else(|| {
487 OmniError::manifest(format!(
488 "blob '{}' on {} '{}' returned no data",
489 property, type_name, id
490 ))
491 })
492 }
493
494 pub(crate) fn active_branch(&self) -> Option<&str> {
495 self.coordinator.current_branch()
496 }
497
498 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
499 let descendants = self.coordinator.branch_descendants(branch).await?;
500 if let Some(descendant) = descendants.first() {
501 return Err(OmniError::manifest_conflict(format!(
502 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
503 branch, descendant
504 )));
505 }
506
507 for run in self.list_runs().await? {
508 if run.target_branch == branch
509 && matches!(run.status, RunStatus::Running | RunStatus::Failed)
510 {
511 return Err(OmniError::manifest_conflict(format!(
512 "cannot delete branch '{}' while run '{}' targeting it is {}",
513 branch,
514 run.run_id,
515 run.status.as_str()
516 )));
517 }
518 }
519
520 for other_branch in branches
521 .iter()
522 .filter(|candidate| candidate.as_str() != branch)
523 {
524 let snapshot = self
525 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
526 .await?;
527 if snapshot
528 .entries()
529 .any(|entry| entry.table_branch.as_deref() == Some(branch))
530 {
531 return Err(OmniError::manifest_conflict(format!(
532 "cannot delete branch '{}' because branch '{}' still depends on it",
533 branch, other_branch
534 )));
535 }
536 }
537
538 Ok(())
539 }
540
541 async fn cleanup_deleted_branch_tables(
542 &self,
543 branch: &str,
544 owned_tables: &[(String, String)],
545 ) -> Result<()> {
546 let mut seen_paths = HashSet::new();
547 let mut cleanup_targets = owned_tables
548 .iter()
549 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
550 .cloned()
551 .collect::<Vec<_>>();
552 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
553
554 for (table_key, table_path) in cleanup_targets {
555 let dataset_uri = self.table_store.dataset_uri(&table_path);
556 if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
557 return Err(OmniError::manifest_internal(format!(
558 "branch '{}' was deleted but cleanup failed for {}: {}",
559 branch, table_key, err
560 )));
561 }
562 }
563
564 Ok(())
565 }
566
567 async fn delete_branch_storage_only(&mut self, branch: &str) -> Result<()> {
568 if self.coordinator.current_branch() == Some(branch) {
569 return Err(OmniError::manifest_conflict(format!(
570 "cannot delete currently active branch '{}'",
571 branch
572 )));
573 }
574
575 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
576 let owned_tables = branch_snapshot
577 .entries()
578 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
579 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
580 .collect::<Vec<_>>();
581
582 self.coordinator.branch_delete(branch).await?;
583 self.cleanup_deleted_branch_tables(branch, &owned_tables)
584 .await
585 }
586
587 async fn cleanup_terminal_run_branches_for_target(&mut self, branch: &str) -> Result<()> {
588 let terminal_run_branches = self
589 .list_runs()
590 .await?
591 .into_iter()
592 .filter(|run| {
593 run.target_branch == branch
594 && matches!(run.status, RunStatus::Published | RunStatus::Aborted)
595 })
596 .map(|run| run.run_branch)
597 .collect::<Vec<_>>();
598
599 for run_branch in terminal_run_branches {
600 match self.delete_branch_storage_only(&run_branch).await {
601 Ok(()) => {}
602 Err(OmniError::Manifest(err)) if err.kind == ManifestErrorKind::NotFound => {}
603 Err(err) => return Err(err),
604 }
605 }
606
607 Ok(())
608 }
609
610 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
611 normalize_branch_name(branch)
612 }
613
614 pub(crate) async fn head_commit_id_for_branch(
615 &self,
616 branch: Option<&str>,
617 ) -> Result<Option<String>> {
618 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
619 coordinator.ensure_commit_graph_initialized().await?;
620 coordinator
621 .head_commit_id()
622 .await
623 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
624 }
625
626 pub async fn branch_create(&mut self, name: &str) -> Result<()> {
627 self.ensure_schema_state_valid().await?;
628 self.ensure_schema_apply_idle("branch_create").await?;
629 ensure_public_branch_ref(name, "branch_create")?;
630 self.coordinator.branch_create(name).await
631 }
632
633 pub(crate) fn current_audit_actor(&self) -> Option<&str> {
634 self.audit_actor_id.as_deref()
635 }
636
637 pub async fn branch_create_from(
638 &mut self,
639 from: impl Into<ReadTarget>,
640 name: &str,
641 ) -> Result<()> {
642 self.ensure_schema_apply_idle("branch_create_from").await?;
643 self.branch_create_from_impl(from, name, false).await
644 }
645
646 async fn branch_create_from_impl(
647 &mut self,
648 from: impl Into<ReadTarget>,
649 name: &str,
650 allow_internal_refs: bool,
651 ) -> Result<()> {
652 let target = from.into();
653 let ReadTarget::Branch(branch_name) = target else {
654 return Err(OmniError::manifest(
655 "branch creation from pinned snapshots is not supported yet".to_string(),
656 ));
657 };
658 if !allow_internal_refs {
659 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
660 ensure_public_branch_ref(name, "branch_create_from")?;
661 }
662 let branch = normalize_branch_name(&branch_name)?;
663 let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?;
664 let result = self.coordinator.branch_create(name).await;
665 self.restore_coordinator(previous);
666 result
667 }
668
669 pub async fn branch_list(&self) -> Result<Vec<String>> {
670 self.ensure_schema_state_valid().await?;
671 self.coordinator.branch_list().await
672 }
673
674 pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
675 self.ensure_schema_state_valid().await?;
676 self.ensure_schema_apply_idle("branch_delete").await?;
677 ensure_public_branch_ref(name, "branch_delete")?;
678 self.refresh().await?;
679 let branch = normalize_branch_name(name)?
680 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
681 let branches = self.coordinator.branch_list().await?;
682 if !branches.iter().any(|candidate| candidate == &branch) {
683 return Err(OmniError::manifest_not_found(format!(
684 "branch '{}' not found",
685 branch
686 )));
687 }
688
689 self.ensure_branch_delete_safe(&branch, &branches).await?;
690 self.cleanup_terminal_run_branches_for_target(&branch)
691 .await?;
692 self.delete_branch_storage_only(&branch).await
693 }
694
695 pub(crate) async fn latest_branch_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
696 let normalized = normalize_branch_name(branch)?;
697 let fresh = self
698 .open_coordinator_for_branch(normalized.as_deref())
699 .await?;
700 fresh.resolve_snapshot_id(branch).await
701 }
702
703 pub async fn begin_run(
704 &mut self,
705 target_branch: &str,
706 operation_hash: Option<&str>,
707 ) -> Result<RunRecord> {
708 self.begin_run_as(target_branch, operation_hash, None).await
709 }
710
711 pub async fn begin_run_as(
712 &mut self,
713 target_branch: &str,
714 operation_hash: Option<&str>,
715 actor_id: Option<&str>,
716 ) -> Result<RunRecord> {
717 self.ensure_schema_state_valid().await?;
718 self.ensure_schema_apply_idle("begin_run").await?;
719 ensure_public_branch_ref(target_branch, "begin_run")?;
720 let target_branch =
721 normalize_branch_name(target_branch)?.unwrap_or_else(|| "main".to_string());
722 let fresh = self
723 .open_coordinator_for_branch(Self::normalize_branch_name(&target_branch)?.as_deref())
724 .await?;
725 let base_snapshot_id = fresh.resolve_snapshot_id(&target_branch).await?;
726 let base_manifest_version = fresh.version();
727 let record = RunRecord::new(
728 target_branch.clone(),
729 base_snapshot_id.as_str(),
730 base_manifest_version,
731 operation_hash.map(str::to_string),
732 actor_id
733 .map(str::to_string)
734 .or_else(|| self.current_audit_actor().map(str::to_string)),
735 )?;
736
737 self.branch_create_from_impl(
738 ReadTarget::branch(target_branch.clone()),
739 &record.run_branch,
740 true,
741 )
742 .await?;
743 self.coordinator.append_run_record(&record).await?;
744 Ok(record)
745 }
746
747 pub async fn get_run(&self, run_id: &RunId) -> Result<RunRecord> {
748 self.ensure_schema_state_valid().await?;
749 self.coordinator.get_run(run_id).await
750 }
751
752 pub async fn list_runs(&self) -> Result<Vec<RunRecord>> {
753 self.ensure_schema_state_valid().await?;
754 self.coordinator.list_runs().await
755 }
756
757 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
758 self.ensure_schema_state_valid().await?;
759 self.coordinator
760 .resolve_commit(&SnapshotId::new(commit_id))
761 .await
762 }
763
764 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
765 self.ensure_schema_state_valid().await?;
766 let branch = match branch {
767 Some(branch) => normalize_branch_name(branch)?,
768 None => None,
769 };
770 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
771 coordinator.list_commits().await
772 }
773
774 pub async fn abort_run(&mut self, run_id: &RunId) -> Result<RunRecord> {
775 self.ensure_schema_state_valid().await?;
776 let run = self.get_run(run_id).await?;
777 match run.status {
778 RunStatus::Running | RunStatus::Failed => {
779 let updated = run.with_status(RunStatus::Aborted, None)?;
780 self.coordinator.append_run_record(&updated).await?;
781 Ok(updated)
782 }
783 RunStatus::Published => Err(OmniError::manifest_conflict(format!(
784 "run '{}' is already published",
785 run_id
786 ))),
787 RunStatus::Aborted => Err(OmniError::manifest_conflict(format!(
788 "run '{}' is already aborted",
789 run_id
790 ))),
791 }
792 }
793
794 pub async fn fail_run(&mut self, run_id: &RunId) -> Result<RunRecord> {
795 self.ensure_schema_state_valid().await?;
796 let run = self.get_run(run_id).await?;
797 match run.status {
798 RunStatus::Running => {
799 let updated = run.with_status(RunStatus::Failed, None)?;
800 self.coordinator.append_run_record(&updated).await?;
801 Ok(updated)
802 }
803 RunStatus::Failed => Ok(run),
804 RunStatus::Published => Err(OmniError::manifest_conflict(format!(
805 "run '{}' is already published",
806 run_id
807 ))),
808 RunStatus::Aborted => Err(OmniError::manifest_conflict(format!(
809 "run '{}' is already aborted",
810 run_id
811 ))),
812 }
813 }
814
815 pub async fn publish_run(&mut self, run_id: &RunId) -> Result<SnapshotId> {
816 self.publish_run_as(run_id, None).await
817 }
818
819 pub async fn publish_run_as(
820 &mut self,
821 run_id: &RunId,
822 actor_id: Option<&str>,
823 ) -> Result<SnapshotId> {
824 self.ensure_schema_state_valid().await?;
825 self.ensure_schema_apply_idle("publish_run").await?;
826 let run = self.get_run(run_id).await?;
827 match run.status {
828 RunStatus::Running => {}
829 RunStatus::Published => {
830 return run
831 .published_snapshot_id
832 .clone()
833 .map(SnapshotId::new)
834 .ok_or_else(|| {
835 OmniError::manifest(format!(
836 "run '{}' is published but missing published snapshot id",
837 run_id
838 ))
839 });
840 }
841 RunStatus::Failed | RunStatus::Aborted => {
842 return Err(OmniError::manifest_conflict(format!(
843 "run '{}' is not publishable from status '{}'",
844 run_id,
845 run.status.as_str()
846 )));
847 }
848 }
849
850 let publish_actor = actor_id
851 .map(str::to_string)
852 .or_else(|| run.actor_id.clone());
853 let current_target_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
854 let previous_actor = self.audit_actor_id.clone();
855 self.audit_actor_id = publish_actor.clone();
856 let publish_result = if current_target_snapshot_id.as_str() == run.base_snapshot_id {
857 let run_for_promotion = run.clone();
858 self.sync_branch(&run_for_promotion.target_branch).await?;
859 self.promote_run_snapshot_to_target(&run_for_promotion)
860 .await
861 } else {
862 let run_branch = run.run_branch.clone();
863 let target_branch = run.target_branch.clone();
864 self.branch_merge_internal(&run_branch, &target_branch)
865 .await?;
866 self.reify_internal_run_refs(&target_branch, &run_branch)
867 .await
868 };
869 self.audit_actor_id = previous_actor;
870 publish_result?;
871 let published_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
872 let updated = run.with_status(
873 RunStatus::Published,
874 Some(published_snapshot_id.as_str().to_string()),
875 )?;
876 self.coordinator.append_run_record(&updated).await?;
877 Ok(published_snapshot_id)
878 }
879
880 async fn promote_run_snapshot_to_target(&mut self, run: &RunRecord) -> Result<()> {
881 let target_snapshot = self
882 .snapshot_of(ReadTarget::branch(run.target_branch.as_str()))
883 .await?;
884 let run_snapshot = self
885 .snapshot_of(ReadTarget::branch(run.run_branch.as_str()))
886 .await?;
887 let mut table_keys = std::collections::BTreeSet::new();
888 for entry in target_snapshot.entries() {
889 table_keys.insert(entry.table_key.clone());
890 }
891 for entry in run_snapshot.entries() {
892 table_keys.insert(entry.table_key.clone());
893 }
894
895 let mut updates = Vec::new();
896 let mut changed_edge_tables = false;
897 let target_branch = normalize_branch_name(&run.target_branch)?;
898
899 for table_key in table_keys {
900 let target_entry = target_snapshot.entry(&table_key);
901 let run_entry = run_snapshot.entry(&table_key);
902 if same_manifest_state(target_entry, run_entry) {
903 continue;
904 }
905 let Some(_run_entry) = run_entry else {
906 return Err(OmniError::manifest(format!(
907 "run '{}' removed table '{}' which publish_run does not support",
908 run.run_id, table_key
909 )));
910 };
911
912 let source_ds = run_snapshot.open(&table_key).await?;
913 let batch = self.batch_for_table_rewrite(&source_ds, &table_key).await?;
914
915 let (mut target_ds, full_path, table_branch) = self
916 .open_for_mutation_on_branch(target_branch.as_deref(), &table_key)
917 .await?;
918 let state = self
919 .table_store()
920 .overwrite_batch(&full_path, &mut target_ds, batch)
921 .await?;
922 updates.push(crate::db::SubTableUpdate {
923 table_key: table_key.clone(),
924 table_version: state.version,
925 table_branch,
926 row_count: state.row_count,
927 version_metadata: state.version_metadata,
928 });
929 if table_key.starts_with("edge:") {
930 changed_edge_tables = true;
931 }
932 }
933
934 if !updates.is_empty() {
935 self.commit_updates_on_branch(target_branch.as_deref(), &updates)
936 .await?;
937 if changed_edge_tables {
938 self.invalidate_graph_index().await;
939 }
940 }
941
942 Ok(())
943 }
944
945 async fn reify_internal_run_refs(
946 &mut self,
947 target_branch: &str,
948 run_branch: &str,
949 ) -> Result<()> {
950 let target_snapshot = self.snapshot_of(ReadTarget::branch(target_branch)).await?;
951 let mut updates = Vec::new();
952 let mut changed_edge_tables = false;
953 let target_branch = normalize_branch_name(target_branch)?;
954
955 for entry in target_snapshot.entries() {
956 if entry.table_branch.as_deref() != Some(run_branch) {
957 continue;
958 }
959
960 let source_ds = target_snapshot.open(&entry.table_key).await?;
961 let batch = self
962 .batch_for_table_rewrite(&source_ds, &entry.table_key)
963 .await?;
964
965 let (mut target_ds, full_path, table_branch) = self
966 .open_for_mutation_on_branch(target_branch.as_deref(), &entry.table_key)
967 .await?;
968 let state = self
969 .table_store()
970 .overwrite_batch(&full_path, &mut target_ds, batch)
971 .await?;
972 updates.push(crate::db::SubTableUpdate {
973 table_key: entry.table_key.clone(),
974 table_version: state.version,
975 table_branch,
976 row_count: state.row_count,
977 version_metadata: state.version_metadata,
978 });
979 if entry.table_key.starts_with("edge:") {
980 changed_edge_tables = true;
981 }
982 }
983
984 if !updates.is_empty() {
985 self.commit_updates_on_branch(target_branch.as_deref(), &updates)
986 .await?;
987 if changed_edge_tables {
988 self.invalidate_graph_index().await;
989 }
990 }
991
992 Ok(())
993 }
994
995 pub(crate) async fn open_for_mutation(
1001 &self,
1002 table_key: &str,
1003 ) -> Result<(Dataset, String, Option<String>)> {
1004 table_ops::open_for_mutation(self, table_key).await
1005 }
1006
1007 pub(crate) async fn open_for_mutation_on_branch(
1008 &self,
1009 branch: Option<&str>,
1010 table_key: &str,
1011 ) -> Result<(Dataset, String, Option<String>)> {
1012 table_ops::open_for_mutation_on_branch(self, branch, table_key).await
1013 }
1014
1015 pub(crate) async fn fork_dataset_from_entry_state(
1016 &self,
1017 table_key: &str,
1018 full_path: &str,
1019 source_branch: Option<&str>,
1020 source_version: u64,
1021 active_branch: &str,
1022 ) -> Result<Dataset> {
1023 table_ops::fork_dataset_from_entry_state(
1024 self,
1025 table_key,
1026 full_path,
1027 source_branch,
1028 source_version,
1029 active_branch,
1030 )
1031 .await
1032 }
1033
1034 pub(crate) async fn reopen_for_mutation(
1035 &self,
1036 table_key: &str,
1037 full_path: &str,
1038 table_branch: Option<&str>,
1039 expected_version: u64,
1040 ) -> Result<Dataset> {
1041 table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version)
1042 .await
1043 }
1044
1045 pub(crate) async fn open_dataset_at_state(
1046 &self,
1047 table_path: &str,
1048 table_branch: Option<&str>,
1049 table_version: u64,
1050 ) -> Result<Dataset> {
1051 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1052 }
1053
1054 pub(crate) async fn build_indices_on_dataset(
1055 &self,
1056 table_key: &str,
1057 ds: &mut Dataset,
1058 ) -> Result<()> {
1059 table_ops::build_indices_on_dataset(self, table_key, ds).await
1060 }
1061
1062 pub(crate) async fn build_indices_on_dataset_for_catalog(
1063 &self,
1064 catalog: &Catalog,
1065 table_key: &str,
1066 ds: &mut Dataset,
1067 ) -> Result<()> {
1068 table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1069 }
1070
1071 pub(crate) async fn commit_updates(
1072 &mut self,
1073 updates: &[crate::db::SubTableUpdate],
1074 ) -> Result<u64> {
1075 table_ops::commit_updates(self, updates).await
1076 }
1077
1078 pub(crate) async fn commit_manifest_updates(
1079 &mut self,
1080 updates: &[crate::db::SubTableUpdate],
1081 ) -> Result<u64> {
1082 table_ops::commit_manifest_updates(self, updates).await
1083 }
1084
1085 pub(crate) async fn record_merge_commit(
1086 &mut self,
1087 manifest_version: u64,
1088 parent_commit_id: &str,
1089 merged_parent_commit_id: &str,
1090 ) -> Result<String> {
1091 table_ops::record_merge_commit(
1092 self,
1093 manifest_version,
1094 parent_commit_id,
1095 merged_parent_commit_id,
1096 )
1097 .await
1098 }
1099
1100 pub(crate) async fn commit_updates_on_branch(
1101 &mut self,
1102 branch: Option<&str>,
1103 updates: &[crate::db::SubTableUpdate],
1104 ) -> Result<u64> {
1105 table_ops::commit_updates_on_branch(self, branch, updates).await
1106 }
1107
1108 pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
1109 table_ops::ensure_commit_graph_initialized(self).await
1110 }
1111
1112 pub(crate) async fn invalidate_graph_index(&self) {
1114 table_ops::invalidate_graph_index(self).await
1115 }
1116
1117 async fn batch_for_table_rewrite(
1118 &self,
1119 source_ds: &Dataset,
1120 table_key: &str,
1121 ) -> Result<RecordBatch> {
1122 schema_apply::batch_for_table_rewrite(self, source_ds, table_key).await
1123 }
1124}
1125
1126pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1127 let branch = branch.trim();
1128 if branch.is_empty() {
1129 return Err(OmniError::manifest(
1130 "branch name cannot be empty".to_string(),
1131 ));
1132 }
1133 if branch == "main" {
1134 return Ok(None);
1135 }
1136 Ok(Some(branch.to_string()))
1137}
1138
1139fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1140 if super::is_internal_run_branch(branch) {
1141 return Err(OmniError::manifest(format!(
1142 "{} does not allow internal run ref '{}'",
1143 operation, branch
1144 )));
1145 }
1146 if is_internal_system_branch(branch) {
1147 return Err(OmniError::manifest(format!(
1148 "{} does not allow internal system ref '{}'",
1149 operation, branch
1150 )));
1151 }
1152 Ok(())
1153}
1154
1155fn same_manifest_state(
1156 left: Option<&crate::db::SubTableEntry>,
1157 right: Option<&crate::db::SubTableEntry>,
1158) -> bool {
1159 match (left, right) {
1160 (None, None) => true,
1161 (Some(left), Some(right)) => {
1162 left.table_path == right.table_path
1163 && left.table_version == right.table_version
1164 && left.table_branch == right.table_branch
1165 && left.row_count == right.row_count
1166 }
1167 _ => false,
1168 }
1169}
1170
1171fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1172 if batches.is_empty() {
1173 return Ok(RecordBatch::new_empty(schema));
1174 }
1175 if batches.len() == 1 {
1176 return Ok(batches.into_iter().next().unwrap());
1177 }
1178 let batch_schema = batches[0].schema();
1179 arrow_select::concat::concat_batches(&batch_schema, &batches)
1180 .map_err(|e| OmniError::Lance(e.to_string()))
1181}
1182
1183fn blob_properties_for_table_key<'a>(
1184 catalog: &'a Catalog,
1185 table_key: &str,
1186) -> Result<&'a std::collections::HashSet<String>> {
1187 if let Some(type_name) = table_key.strip_prefix("node:") {
1188 return catalog
1189 .node_types
1190 .get(type_name)
1191 .map(|node_type| &node_type.blob_properties)
1192 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1193 }
1194 if let Some(type_name) = table_key.strip_prefix("edge:") {
1195 return catalog
1196 .edge_types
1197 .get(type_name)
1198 .map(|edge_type| &edge_type.blob_properties)
1199 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1200 }
1201 Err(OmniError::manifest(format!(
1202 "invalid table key '{}'",
1203 table_key
1204 )))
1205}
1206
1207fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1208 if descriptions.is_null(row) {
1209 return Ok(true);
1210 }
1211
1212 let kind = descriptions
1213 .column_by_name("kind")
1214 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1215 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1216 .or_else(|| {
1217 descriptions
1218 .column_by_name("kind")
1219 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1220 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1221 });
1222 let position = descriptions
1223 .column_by_name("position")
1224 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1225 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1226 let size = descriptions
1227 .column_by_name("size")
1228 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1229 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1230 let blob_uri = descriptions
1231 .column_by_name("blob_uri")
1232 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1233 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1234
1235 let Some(kind) = kind else {
1236 return Ok(true);
1237 };
1238 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1239 if kind != BlobKind::Inline {
1240 return Ok(false);
1241 }
1242
1243 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1244}
1245
1246fn fixup_blob_schemas(catalog: &mut Catalog) {
1252 for node_type in catalog.node_types.values_mut() {
1253 if node_type.blob_properties.is_empty() {
1254 continue;
1255 }
1256 let fields: Vec<Field> = node_type
1257 .arrow_schema
1258 .fields()
1259 .iter()
1260 .map(|f| {
1261 if node_type.blob_properties.contains(f.name()) {
1262 blob_field(f.name(), f.is_nullable())
1263 } else {
1264 f.as_ref().clone()
1265 }
1266 })
1267 .collect();
1268 node_type.arrow_schema = Arc::new(Schema::new(fields));
1269 }
1270 for edge_type in catalog.edge_types.values_mut() {
1271 if edge_type.blob_properties.is_empty() {
1272 continue;
1273 }
1274 let fields: Vec<Field> = edge_type
1275 .arrow_schema
1276 .fields()
1277 .iter()
1278 .map(|f| {
1279 if edge_type.blob_properties.contains(f.name()) {
1280 blob_field(f.name(), f.is_nullable())
1281 } else {
1282 f.as_ref().clone()
1283 }
1284 })
1285 .collect();
1286 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1287 }
1288}
1289
1290fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1291 let schema_ast = parse_schema(schema_source)?;
1292 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1293}
1294
1295fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1296 match type_kind {
1297 SchemaTypeKind::Node => format!("node:{}", name),
1298 SchemaTypeKind::Edge => format!("edge:{}", name),
1299 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1300 }
1301}
1302
1303fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1304 if let Some(type_name) = table_key.strip_prefix("node:") {
1305 let node_type: &NodeType = catalog
1306 .node_types
1307 .get(type_name)
1308 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1309 return Ok(node_type.arrow_schema.clone());
1310 }
1311 if let Some(type_name) = table_key.strip_prefix("edge:") {
1312 let edge_type: &EdgeType = catalog
1313 .edge_types
1314 .get(type_name)
1315 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1316 return Ok(edge_type.arrow_schema.clone());
1317 }
1318 Err(OmniError::manifest(format!(
1319 "invalid table key '{}'",
1320 table_key
1321 )))
1322}
1323
1324fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1325 let mut obj = serde_json::Map::new();
1326 for (i, field) in batch.schema().fields().iter().enumerate() {
1327 obj.insert(
1328 field.name().clone(),
1329 json_value_from_array(batch.column(i).as_ref(), row)?,
1330 );
1331 }
1332 Ok(serde_json::Value::Object(obj))
1333}
1334
1335fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1336 if array.is_null(row) {
1337 return Ok(serde_json::Value::Null);
1338 }
1339
1340 match array.data_type() {
1341 DataType::Utf8 => Ok(serde_json::Value::String(
1342 array
1343 .as_any()
1344 .downcast_ref::<StringArray>()
1345 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1346 .value(row)
1347 .to_string(),
1348 )),
1349 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1350 array
1351 .as_any()
1352 .downcast_ref::<LargeStringArray>()
1353 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1354 .value(row)
1355 .to_string(),
1356 )),
1357 DataType::Boolean => Ok(serde_json::Value::Bool(
1358 array
1359 .as_any()
1360 .downcast_ref::<BooleanArray>()
1361 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1362 .value(row),
1363 )),
1364 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1365 array
1366 .as_any()
1367 .downcast_ref::<Int32Array>()
1368 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1369 .value(row),
1370 ))),
1371 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1372 array
1373 .as_any()
1374 .downcast_ref::<Int64Array>()
1375 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1376 .value(row),
1377 ))),
1378 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1379 array
1380 .as_any()
1381 .downcast_ref::<UInt32Array>()
1382 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1383 .value(row),
1384 ))),
1385 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1386 array
1387 .as_any()
1388 .downcast_ref::<UInt64Array>()
1389 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1390 .value(row),
1391 ))),
1392 DataType::Float32 => {
1393 let value = array
1394 .as_any()
1395 .downcast_ref::<Float32Array>()
1396 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1397 .value(row) as f64;
1398 Ok(serde_json::Value::Number(
1399 serde_json::Number::from_f64(value).ok_or_else(|| {
1400 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1401 })?,
1402 ))
1403 }
1404 DataType::Float64 => {
1405 let value = array
1406 .as_any()
1407 .downcast_ref::<Float64Array>()
1408 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1409 .value(row);
1410 Ok(serde_json::Value::Number(
1411 serde_json::Number::from_f64(value).ok_or_else(|| {
1412 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1413 })?,
1414 ))
1415 }
1416 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1417 array
1418 .as_any()
1419 .downcast_ref::<Date32Array>()
1420 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1421 .value(row),
1422 ))),
1423 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1424 &base64::engine::general_purpose::STANDARD,
1425 array
1426 .as_any()
1427 .downcast_ref::<BinaryArray>()
1428 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1429 .value(row),
1430 ))),
1431 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1432 &base64::engine::general_purpose::STANDARD,
1433 array
1434 .as_any()
1435 .downcast_ref::<LargeBinaryArray>()
1436 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1437 .value(row),
1438 ))),
1439 DataType::List(_) => {
1440 let list = array
1441 .as_any()
1442 .downcast_ref::<ListArray>()
1443 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1444 let values = list.value(row);
1445 let mut out = Vec::with_capacity(values.len());
1446 for idx in 0..values.len() {
1447 out.push(json_value_from_array(values.as_ref(), idx)?);
1448 }
1449 Ok(serde_json::Value::Array(out))
1450 }
1451 DataType::LargeList(_) => {
1452 let list = array
1453 .as_any()
1454 .downcast_ref::<LargeListArray>()
1455 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1456 let values = list.value(row);
1457 let mut out = Vec::with_capacity(values.len());
1458 for idx in 0..values.len() {
1459 out.push(json_value_from_array(values.as_ref(), idx)?);
1460 }
1461 Ok(serde_json::Value::Array(out))
1462 }
1463 DataType::FixedSizeList(_, _) => {
1464 let list = array
1465 .as_any()
1466 .downcast_ref::<FixedSizeListArray>()
1467 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1468 let values = list.value(row);
1469 let mut out = Vec::with_capacity(values.len());
1470 for idx in 0..values.len() {
1471 out.push(json_value_from_array(values.as_ref(), idx)?);
1472 }
1473 Ok(serde_json::Value::Array(out))
1474 }
1475 DataType::Struct(fields) => {
1476 let struct_array = array
1477 .as_any()
1478 .downcast_ref::<StructArray>()
1479 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1480 let mut obj = serde_json::Map::new();
1481 for (field_idx, field) in fields.iter().enumerate() {
1482 obj.insert(
1483 field.name().clone(),
1484 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1485 );
1486 }
1487 Ok(serde_json::Value::Object(obj))
1488 }
1489 _ => {
1490 let value = arrow_cast::display::array_value_to_string(array, row)
1491 .map_err(|e| OmniError::Lance(e.to_string()))?;
1492 Ok(serde_json::Value::String(value))
1493 }
1494 }
1495}
1496
1497#[cfg(test)]
1498mod tests {
1499 use super::*;
1500 use crate::db::manifest::ManifestCoordinator;
1501 use async_trait::async_trait;
1502 use omnigraph_compiler::{SchemaMigrationStep, SchemaTypeKind};
1503 use serde_json::Value;
1504 use std::fs;
1505 use std::sync::Mutex;
1506
1507 use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1508
1509 const TEST_SCHEMA: &str = r#"
1510node Person {
1511 name: String @key
1512 age: I32?
1513}
1514node Company {
1515 name: String @key
1516}
1517edge Knows: Person -> Person {
1518 since: Date?
1519}
1520edge WorksAt: Person -> Company
1521"#;
1522
1523 #[derive(Debug, Default)]
1524 struct RecordingStorageAdapter {
1525 inner: LocalStorageAdapter,
1526 reads: Mutex<Vec<String>>,
1527 writes: Mutex<Vec<String>>,
1528 exists_checks: Mutex<Vec<String>>,
1529 }
1530
1531 impl RecordingStorageAdapter {
1532 fn reads(&self) -> Vec<String> {
1533 self.reads.lock().unwrap().clone()
1534 }
1535
1536 fn writes(&self) -> Vec<String> {
1537 self.writes.lock().unwrap().clone()
1538 }
1539
1540 fn exists_checks(&self) -> Vec<String> {
1541 self.exists_checks.lock().unwrap().clone()
1542 }
1543 }
1544
1545 #[async_trait]
1546 impl StorageAdapter for RecordingStorageAdapter {
1547 async fn read_text(&self, uri: &str) -> Result<String> {
1548 self.reads.lock().unwrap().push(uri.to_string());
1549 self.inner.read_text(uri).await
1550 }
1551
1552 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1553 self.writes.lock().unwrap().push(uri.to_string());
1554 self.inner.write_text(uri, contents).await
1555 }
1556
1557 async fn exists(&self, uri: &str) -> Result<bool> {
1558 self.exists_checks.lock().unwrap().push(uri.to_string());
1559 self.inner.exists(uri).await
1560 }
1561 }
1562
1563 #[tokio::test]
1564 async fn test_init_creates_repo() {
1565 let dir = tempfile::tempdir().unwrap();
1566 let uri = dir.path().to_str().unwrap();
1567
1568 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1569
1570 assert!(dir.path().join("_schema.pg").exists());
1572 assert!(dir.path().join("_schema.ir.json").exists());
1573 assert!(dir.path().join("__schema_state.json").exists());
1574
1575 let snap = db.snapshot();
1577 assert!(snap.entry("node:Person").is_some());
1578 assert!(snap.entry("node:Company").is_some());
1579 assert!(snap.entry("edge:Knows").is_some());
1580 assert!(snap.entry("edge:WorksAt").is_some());
1581
1582 assert_eq!(db.catalog().node_types.len(), 2);
1584 assert_eq!(db.catalog().edge_types.len(), 2);
1585 assert_eq!(
1586 db.catalog().node_types["Person"].key_property(),
1587 Some("name")
1588 );
1589 }
1590
1591 #[tokio::test]
1592 async fn test_open_reads_existing_repo() {
1593 let dir = tempfile::tempdir().unwrap();
1594 let uri = dir.path().to_str().unwrap();
1595
1596 Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1597
1598 let db = Omnigraph::open(uri).await.unwrap();
1600 assert_eq!(db.catalog().node_types.len(), 2);
1601 assert_eq!(db.catalog().edge_types.len(), 2);
1602 let snap = db.snapshot();
1603 assert!(snap.entry("node:Person").is_some());
1604 assert!(snap.entry("edge:Knows").is_some());
1605 }
1606
1607 #[tokio::test]
1608 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1609 let dir = tempfile::tempdir().unwrap();
1610 let uri = dir.path().to_str().unwrap();
1611 let adapter = Arc::new(RecordingStorageAdapter::default());
1612
1613 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1614 .await
1615 .unwrap();
1616 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1617 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1618 assert!(
1619 adapter
1620 .writes()
1621 .contains(&join_uri(uri, "__schema_state.json"))
1622 );
1623
1624 Omnigraph::open_with_storage(uri, adapter.clone())
1625 .await
1626 .unwrap();
1627 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1628 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1629 assert!(
1630 adapter
1631 .reads()
1632 .contains(&join_uri(uri, "__schema_state.json"))
1633 );
1634 assert!(
1635 adapter
1636 .exists_checks()
1637 .contains(&join_uri(uri, "_schema.ir.json"))
1638 );
1639 assert!(
1640 adapter
1641 .exists_checks()
1642 .contains(&join_uri(uri, "__schema_state.json"))
1643 );
1644 assert!(
1645 adapter
1646 .exists_checks()
1647 .contains(&join_uri(uri, "_graph_commits.lance"))
1648 );
1649 }
1650
1651 #[tokio::test]
1652 async fn test_open_bootstraps_legacy_schema_state_for_main_only_repo() {
1653 let dir = tempfile::tempdir().unwrap();
1654 let uri = dir.path().to_str().unwrap();
1655 Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1656
1657 fs::remove_file(dir.path().join("_schema.ir.json")).unwrap();
1658 fs::remove_file(dir.path().join("__schema_state.json")).unwrap();
1659
1660 let db = Omnigraph::open(uri).await.unwrap();
1661 assert_eq!(db.catalog().node_types.len(), 2);
1662 assert!(dir.path().join("_schema.ir.json").exists());
1663 assert!(dir.path().join("__schema_state.json").exists());
1664 }
1665
1666 #[tokio::test]
1667 async fn test_open_rejects_legacy_repo_with_public_branch() {
1668 let dir = tempfile::tempdir().unwrap();
1669 let uri = dir.path().to_str().unwrap();
1670 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1671 db.branch_create("feature").await.unwrap();
1672
1673 fs::remove_file(dir.path().join("_schema.ir.json")).unwrap();
1674 fs::remove_file(dir.path().join("__schema_state.json")).unwrap();
1675
1676 let err = match Omnigraph::open(uri).await {
1677 Ok(_) => panic!("expected legacy repo with public branch to fail schema bootstrap"),
1678 Err(err) => err,
1679 };
1680 let message = err.to_string();
1681 assert!(message.contains("public branches block schema evolution entirely"));
1682 }
1683
1684 #[tokio::test]
1685 async fn test_long_lived_handle_rejects_schema_source_drift() {
1686 let dir = tempfile::tempdir().unwrap();
1687 let uri = dir.path().to_str().unwrap();
1688 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1689
1690 let drifted = TEST_SCHEMA.replace("age: I32?", "age: I64?");
1691 fs::write(dir.path().join("_schema.pg"), drifted).unwrap();
1692
1693 let err = match db.snapshot_of(ReadTarget::branch("main")).await {
1694 Ok(_) => panic!("expected schema source drift to be rejected"),
1695 Err(err) => err,
1696 };
1697 assert!(
1698 err.to_string()
1699 .contains("current _schema.pg no longer matches the accepted compiled schema")
1700 );
1701 }
1702
1703 #[tokio::test]
1704 async fn test_long_lived_handle_rejects_schema_ir_drift() {
1705 let dir = tempfile::tempdir().unwrap();
1706 let uri = dir.path().to_str().unwrap();
1707 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1708
1709 fs::write(dir.path().join("_schema.ir.json"), "{not valid json").unwrap();
1710
1711 let err = match db.snapshot_of(ReadTarget::branch("main")).await {
1712 Ok(_) => panic!("expected schema IR drift to be rejected"),
1713 Err(err) => err,
1714 };
1715 assert!(
1716 err.to_string()
1717 .contains("accepted compiled schema contract in _schema.ir.json is invalid")
1718 );
1719 }
1720
1721 #[tokio::test]
1722 async fn test_long_lived_handle_rejects_ir_and_source_updates_without_state_update() {
1723 let dir = tempfile::tempdir().unwrap();
1724 let uri = dir.path().to_str().unwrap();
1725 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1726
1727 let drifted = TEST_SCHEMA.replace("age: I32?", "age: I64?");
1728 let drifted_ir = read_schema_ir_from_source(&drifted).unwrap();
1729 let drifted_ir_json = omnigraph_compiler::schema_ir_pretty_json(&drifted_ir).unwrap();
1730 fs::write(dir.path().join("_schema.pg"), drifted).unwrap();
1731 fs::write(dir.path().join("_schema.ir.json"), drifted_ir_json).unwrap();
1732
1733 let err = match db.snapshot_of(ReadTarget::branch("main")).await {
1734 Ok(_) => panic!("expected schema state mismatch to be rejected"),
1735 Err(err) => err,
1736 };
1737 assert!(
1738 err.to_string()
1739 .contains("accepted compiled schema does not match the recorded schema state")
1740 );
1741 }
1742
1743 #[tokio::test]
1744 async fn test_comment_only_schema_edit_keeps_schema_state_valid() {
1745 let dir = tempfile::tempdir().unwrap();
1746 let uri = dir.path().to_str().unwrap();
1747 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1748
1749 let commented = format!("// comment-only drift\n{}", TEST_SCHEMA);
1750 fs::write(dir.path().join("_schema.pg"), commented).unwrap();
1751
1752 let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
1753 assert!(snapshot.entry("node:Person").is_some());
1754 }
1755
1756 #[tokio::test]
1757 async fn test_plan_schema_reports_supported_additive_change() {
1758 let dir = tempfile::tempdir().unwrap();
1759 let uri = dir.path().to_str().unwrap();
1760 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1761
1762 let desired = TEST_SCHEMA.replace(
1763 " age: I32?\n}",
1764 " age: I32?\n nickname: String?\n}",
1765 );
1766
1767 let plan = db.plan_schema(&desired).await.unwrap();
1768 assert!(plan.supported);
1769 assert!(plan.steps.iter().any(|step| matches!(
1770 step,
1771 SchemaMigrationStep::AddProperty {
1772 type_kind: SchemaTypeKind::Node,
1773 type_name,
1774 property_name,
1775 ..
1776 } if type_name == "Person" && property_name == "nickname"
1777 )));
1778 }
1779
1780 #[tokio::test]
1781 async fn test_plan_schema_rejects_when_schema_contract_has_drifted() {
1782 let dir = tempfile::tempdir().unwrap();
1783 let uri = dir.path().to_str().unwrap();
1784 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1785
1786 let drifted = TEST_SCHEMA.replace("age: I32?", "age: I64?");
1787 fs::write(dir.path().join("_schema.pg"), drifted).unwrap();
1788
1789 let err = db.plan_schema(TEST_SCHEMA).await.unwrap_err();
1790 assert!(
1791 err.to_string()
1792 .contains("current _schema.pg no longer matches the accepted compiled schema")
1793 );
1794 }
1795
1796 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1797 let snapshot = db.snapshot();
1798 let ds = snapshot.open(table_key).await.unwrap();
1799 let batches = db.table_store().scan_batches(&ds).await.unwrap();
1800 batches
1801 .into_iter()
1802 .flat_map(|batch| {
1803 (0..batch.num_rows())
1804 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1805 .collect::<Vec<_>>()
1806 })
1807 .collect()
1808 }
1809
1810 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1811 let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
1812 let schema: Arc<Schema> = Arc::new(ds.schema().into());
1813 let columns: Vec<Arc<dyn Array>> = schema
1814 .fields()
1815 .iter()
1816 .map(|field| match field.name().as_str() {
1817 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1818 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1819 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1820 _ => new_null_array(field.data_type(), 1),
1821 })
1822 .collect();
1823 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1824 let state = db
1825 .table_store()
1826 .append_batch(&full_path, &mut ds, batch)
1827 .await
1828 .unwrap();
1829 db.commit_updates(&[crate::db::SubTableUpdate {
1830 table_key: "node:Person".to_string(),
1831 table_version: state.version,
1832 table_branch,
1833 row_count: state.row_count,
1834 version_metadata: state.version_metadata,
1835 }])
1836 .await
1837 .unwrap();
1838 }
1839
1840 #[tokio::test]
1841 async fn test_apply_schema_noop_returns_not_applied() {
1842 let dir = tempfile::tempdir().unwrap();
1843 let uri = dir.path().to_str().unwrap();
1844 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1845
1846 let result = db.apply_schema(TEST_SCHEMA).await.unwrap();
1847 assert!(result.supported);
1848 assert!(!result.applied);
1849 assert!(result.steps.is_empty());
1850 }
1851
1852 #[tokio::test]
1853 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1854 let dir = tempfile::tempdir().unwrap();
1855 let uri = dir.path().to_str().unwrap();
1856 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1857 seed_person_row(&mut db, "Alice", Some(30)).await;
1858
1859 let desired = TEST_SCHEMA.replace(
1860 " age: I32?\n}",
1861 " age: I32?\n nickname: String?\n}",
1862 );
1863 let result = db.apply_schema(&desired).await.unwrap();
1864 assert!(result.applied);
1865
1866 let reopened = Omnigraph::open(uri).await.unwrap();
1867 let rows = table_rows_json(&reopened, "node:Person").await;
1868 assert_eq!(rows.len(), 1);
1869 assert_eq!(rows[0]["name"], "Alice");
1870 assert_eq!(rows[0]["age"], 30);
1871 assert!(rows[0]["nickname"].is_null());
1872 assert!(
1873 reopened.catalog().node_types["Person"]
1874 .properties
1875 .contains_key("nickname")
1876 );
1877 assert!(dir.path().join("_schema.pg").exists());
1878 }
1879
1880 #[tokio::test]
1881 async fn test_apply_schema_renames_property_and_preserves_values() {
1882 let dir = tempfile::tempdir().unwrap();
1883 let uri = dir.path().to_str().unwrap();
1884 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1885 seed_person_row(&mut db, "Alice", Some(30)).await;
1886
1887 let desired = TEST_SCHEMA.replace(
1888 " age: I32?\n}",
1889 " years: I32? @rename_from(\"age\")\n}",
1890 );
1891 db.apply_schema(&desired).await.unwrap();
1892
1893 let reopened = Omnigraph::open(uri).await.unwrap();
1894 let rows = table_rows_json(&reopened, "node:Person").await;
1895 assert_eq!(rows[0]["name"], "Alice");
1896 assert_eq!(rows[0]["years"], 30);
1897 assert!(rows[0].get("age").is_none());
1898 }
1899
1900 #[tokio::test]
1901 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1902 let dir = tempfile::tempdir().unwrap();
1903 let uri = dir.path().to_str().unwrap();
1904 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1905 seed_person_row(&mut db, "Alice", Some(30)).await;
1906 let before_version = db.snapshot().version();
1907
1908 let desired = TEST_SCHEMA
1909 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1910 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1911 .replace(
1912 "edge WorksAt: Person -> Company",
1913 "edge WorksAt: Human -> Company",
1914 );
1915 db.apply_schema(&desired).await.unwrap();
1916
1917 let head = db.snapshot();
1918 assert!(head.entry("node:Person").is_none());
1919 assert!(head.entry("node:Human").is_some());
1920 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1921 .await
1922 .unwrap();
1923 assert!(historical.entry("node:Person").is_some());
1924 assert!(historical.entry("node:Human").is_none());
1925 }
1926
1927 #[tokio::test]
1928 async fn test_apply_schema_rejects_when_non_main_branch_exists() {
1929 let dir = tempfile::tempdir().unwrap();
1930 let uri = dir.path().to_str().unwrap();
1931 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1932 db.branch_create("feature").await.unwrap();
1933
1934 let desired = TEST_SCHEMA.replace(
1935 " age: I32?\n}",
1936 " age: I32?\n nickname: String?\n}",
1937 );
1938 let err = db.apply_schema(&desired).await.unwrap_err();
1939 assert!(
1940 err.to_string()
1941 .contains("schema apply requires a repo with only main")
1942 );
1943 }
1944
1945 #[tokio::test]
1946 async fn test_apply_schema_adds_index_for_existing_property() {
1947 let dir = tempfile::tempdir().unwrap();
1948 let uri = dir.path().to_str().unwrap();
1949 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1950
1951 let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1952 db.apply_schema(&desired).await.unwrap();
1953
1954 let snapshot = db.snapshot();
1955 let ds = snapshot.open("node:Person").await.unwrap();
1956 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1957 }
1958
1959 #[tokio::test]
1960 async fn test_apply_schema_rewrite_preserves_existing_indices() {
1961 let dir = tempfile::tempdir().unwrap();
1962 let uri = dir.path().to_str().unwrap();
1963 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1964 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1965 seed_person_row(&mut db, "Alice", Some(30)).await;
1966
1967 let desired = initial_schema.replace(
1968 " age: I32?\n}",
1969 " age: I32?\n nickname: String?\n}",
1970 );
1971 db.apply_schema(&desired).await.unwrap();
1972
1973 let snapshot = db.snapshot();
1974 let ds = snapshot.open("node:Person").await.unwrap();
1975 assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1976 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1977 }
1978
1979 #[tokio::test]
1980 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1981 let dir = tempfile::tempdir().unwrap();
1982 let uri = dir.path().to_str().unwrap();
1983 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1984 let mut db = db;
1985 db.coordinator
1986 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1987 .await
1988 .unwrap();
1989
1990 let err = db.open_for_mutation("node:Person").await.unwrap_err();
1991 assert!(
1992 err.to_string()
1993 .contains("write is unavailable while schema apply is in progress")
1994 );
1995 }
1996
1997 #[tokio::test]
1998 async fn test_commit_updates_rejects_while_schema_apply_locked() {
1999 let dir = tempfile::tempdir().unwrap();
2000 let uri = dir.path().to_str().unwrap();
2001 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2002 db.coordinator
2003 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2004 .await
2005 .unwrap();
2006
2007 let err = db.commit_updates(&[]).await.unwrap_err();
2008 assert!(
2009 err.to_string()
2010 .contains("write commit is unavailable while schema apply is in progress")
2011 );
2012 }
2013
2014 #[tokio::test]
2015 async fn test_branch_list_hides_schema_apply_lock_branch() {
2016 let dir = tempfile::tempdir().unwrap();
2017 let uri = dir.path().to_str().unwrap();
2018 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2019 db.coordinator
2020 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
2021 .await
2022 .unwrap();
2023
2024 let branches = db.branch_list().await.unwrap();
2025 assert_eq!(branches, vec!["main".to_string()]);
2026 }
2027
2028 #[tokio::test]
2029 async fn test_apply_schema_unsupported_plan_does_not_advance_manifest() {
2030 let dir = tempfile::tempdir().unwrap();
2031 let uri = dir.path().to_str().unwrap();
2032 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2033 let before_version = db.snapshot().version();
2034
2035 let desired = TEST_SCHEMA.replace("age: I32?", "age: I64?");
2036 let err = db.apply_schema(&desired).await.unwrap_err();
2037 assert!(err.to_string().contains("changing property type"));
2038 assert_eq!(db.snapshot().version(), before_version);
2039 }
2040
2041 #[tokio::test]
2042 async fn test_open_nonexistent_fails() {
2043 let result = Omnigraph::open("/tmp/nonexistent_omnigraph_test_xyz").await;
2044 assert!(result.is_err());
2045 }
2046
2047 #[tokio::test]
2048 async fn test_snapshot_version_is_pinned() {
2049 let dir = tempfile::tempdir().unwrap();
2050 let uri = dir.path().to_str().unwrap();
2051
2052 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
2053
2054 let snap1 = db.snapshot();
2056 let v1 = snap1.version();
2057
2058 crate::loader::load_jsonl(
2060 &mut db,
2061 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
2062 crate::loader::LoadMode::Overwrite,
2063 )
2064 .await
2065 .unwrap();
2066
2067 let snap2 = db.snapshot();
2069 assert!(snap2.version() > v1);
2070
2071 assert_eq!(snap1.version(), v1);
2073 }
2074}