1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::io::Write;
3use std::sync::Arc;
4
5use arrow_array::{
6 Array, BinaryArray, BooleanArray, Date32Array, FixedSizeListArray, Float32Array, Float64Array,
7 Int32Array, Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray,
8 RecordBatch, StringArray, StructArray, UInt32Array, UInt64Array, new_null_array,
9};
10use arrow_schema::{DataType, Field, Schema};
11use lance::Dataset;
12use lance::blob::{BlobArrayBuilder, blob_field};
13use lance::dataset::BlobFile;
14use lance::dataset::scanner::ColumnOrdering;
15use lance::datatypes::BlobKind;
16use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
17use omnigraph_compiler::schema::parser::parse_schema;
18use omnigraph_compiler::types::ScalarType;
19use omnigraph_compiler::{
20 SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir,
21 build_schema_ir, plan_schema_migration,
22};
23
24use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
25use crate::db::run_registry::{RunRecord, RunStatus};
26use crate::error::{ManifestErrorKind, OmniError, Result};
27use crate::runtime_cache::RuntimeCache;
28use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
29use crate::table_store::TableStore;
30
31mod export;
32mod optimize;
33mod schema_apply;
34mod table_ops;
35
36pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
37
38use super::commit_graph::GraphCommit;
39use super::manifest::{
40 ManifestChange, Snapshot, SubTableEntry, TableRegistration, TableTombstone,
41 table_path_for_table_key,
42};
43use super::schema_state::{
44 SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
45 validate_schema_contract, write_schema_contract,
46};
47use super::{
48 ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId,
49 is_internal_system_branch, is_schema_apply_lock_branch,
50};
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum MergeOutcome {
54 AlreadyUpToDate,
55 FastForward,
56 Merged,
57}
58
59#[derive(Debug, Clone)]
60pub struct SchemaApplyResult {
61 pub supported: bool,
62 pub applied: bool,
63 pub manifest_version: u64,
64 pub steps: Vec<SchemaMigrationStep>,
65}
66
67pub struct Omnigraph {
73 root_uri: String,
74 storage: Arc<dyn StorageAdapter>,
75 coordinator: GraphCoordinator,
76 table_store: TableStore,
77 runtime_cache: RuntimeCache,
78 catalog: Catalog,
79 schema_source: String,
80 pub(crate) audit_actor_id: Option<String>,
81}
82
83impl Omnigraph {
84 pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
88 Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
89 }
90
91 pub(crate) async fn init_with_storage(
92 uri: &str,
93 schema_source: &str,
94 storage: Arc<dyn StorageAdapter>,
95 ) -> Result<Self> {
96 let root = normalize_root_uri(uri)?;
97 let schema_ir = read_schema_ir_from_source(schema_source)?;
98 let mut catalog = build_catalog_from_ir(&schema_ir)?;
99 fixup_blob_schemas(&mut catalog);
100
101 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
103 storage.write_text(&schema_path, schema_source).await?;
104 write_schema_contract(&root, storage.as_ref(), &schema_ir).await?;
105
106 let coordinator = GraphCoordinator::init(&root, &catalog, Arc::clone(&storage)).await?;
108
109 Ok(Self {
110 root_uri: root.clone(),
111 storage,
112 coordinator,
113 table_store: TableStore::new(&root),
114 runtime_cache: RuntimeCache::default(),
115 catalog,
116 schema_source: schema_source.to_string(),
117 audit_actor_id: None,
118 })
119 }
120
121 pub async fn open(uri: &str) -> Result<Self> {
125 Self::open_with_storage(uri, storage_for_uri(uri)?).await
126 }
127
128 pub(crate) async fn open_with_storage(
129 uri: &str,
130 storage: Arc<dyn StorageAdapter>,
131 ) -> Result<Self> {
132 let root = normalize_root_uri(uri)?;
133 let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
135 let schema_source = storage.read_text(&schema_path).await?;
136 let current_source_ir = read_schema_ir_from_source(&schema_source)?;
137 let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
138 let branches = coordinator.branch_list().await?;
139 let (accepted_ir, _) = load_or_bootstrap_schema_contract(
140 &root,
141 Arc::clone(&storage),
142 &branches,
143 ¤t_source_ir,
144 )
145 .await?;
146 let mut catalog = build_catalog_from_ir(&accepted_ir)?;
147 fixup_blob_schemas(&mut catalog);
148
149 Ok(Self {
150 root_uri: root.clone(),
151 storage,
152 coordinator,
153 table_store: TableStore::new(&root),
154 runtime_cache: RuntimeCache::default(),
155 catalog,
156 schema_source,
157 audit_actor_id: None,
158 })
159 }
160
161 pub fn catalog(&self) -> &Catalog {
162 &self.catalog
163 }
164
165 pub fn schema_source(&self) -> &str {
166 &self.schema_source
167 }
168
169 pub fn uri(&self) -> &str {
170 &self.root_uri
171 }
172
173 pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
174 validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
175 }
176
177 pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
178 schema_apply::plan_schema(self, desired_schema_source).await
179 }
180
181 pub async fn apply_schema(&mut self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
182 schema_apply::apply_schema(self, desired_schema_source).await
183 }
184
185 pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
186 schema_apply::ensure_schema_apply_idle(self, operation).await
187 }
188
189 async fn ensure_schema_apply_not_locked(&self, operation: &str) -> Result<()> {
190 schema_apply::ensure_schema_apply_not_locked(self, operation).await
191 }
192
193 pub(crate) fn table_store(&self) -> &TableStore {
194 &self.table_store
195 }
196
197 pub(crate) async fn open_coordinator_for_branch(
198 &self,
199 branch: Option<&str>,
200 ) -> Result<GraphCoordinator> {
201 match branch {
202 Some(branch) => {
203 GraphCoordinator::open_branch(self.uri(), branch, Arc::clone(&self.storage)).await
204 }
205 None => GraphCoordinator::open(self.uri(), Arc::clone(&self.storage)).await,
206 }
207 }
208
209 pub(crate) async fn swap_coordinator_for_branch(
210 &mut self,
211 branch: Option<&str>,
212 ) -> Result<GraphCoordinator> {
213 let next = self.open_coordinator_for_branch(branch).await?;
214 Ok(std::mem::replace(&mut self.coordinator, next))
215 }
216
217 pub(crate) fn restore_coordinator(&mut self, coordinator: GraphCoordinator) {
218 self.coordinator = coordinator;
219 }
220
221 pub(crate) async fn resolved_branch_target(
222 &self,
223 branch: Option<&str>,
224 ) -> Result<ResolvedTarget> {
225 self.ensure_schema_state_valid().await?;
226 let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
227 let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
228 if normalized.as_deref() == self.coordinator.current_branch() {
229 let snapshot_id = self.coordinator.head_commit_id().await?.unwrap_or_else(|| {
230 SnapshotId::synthetic(
231 self.coordinator.current_branch(),
232 self.coordinator.version(),
233 )
234 });
235 return Ok(ResolvedTarget {
236 requested,
237 branch: self.coordinator.current_branch().map(str::to_string),
238 snapshot_id,
239 snapshot: self.coordinator.snapshot(),
240 });
241 }
242 self.coordinator.resolve_target(&requested).await
243 }
244
245 pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
246 self.resolved_branch_target(branch)
247 .await
248 .map(|resolved| resolved.snapshot)
249 }
250
251 pub(crate) fn version(&self) -> u64 {
252 self.coordinator.version()
253 }
254
255 pub(crate) fn snapshot(&self) -> Snapshot {
257 self.coordinator.snapshot()
258 }
259
260 pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
261 self.resolved_target(target)
262 .await
263 .map(|resolved| resolved.snapshot)
264 }
265
266 pub async fn version_of(&self, target: impl Into<ReadTarget>) -> Result<u64> {
267 self.snapshot_of(target)
268 .await
269 .map(|snapshot| snapshot.version())
270 }
271
272 pub async fn resolved_branch_of(
273 &self,
274 target: impl Into<ReadTarget>,
275 ) -> Result<Option<String>> {
276 self.resolved_target(target)
277 .await
278 .map(|resolved| resolved.branch)
279 }
280
281 pub async fn sync_branch(&mut self, branch: &str) -> Result<()> {
283 self.ensure_schema_state_valid().await?;
284 let branch = normalize_branch_name(branch)?;
285 self.coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
286 self.runtime_cache.invalidate_all().await;
287 Ok(())
288 }
289
290 pub(crate) async fn refresh(&mut self) -> Result<()> {
292 self.coordinator.refresh().await?;
293 self.runtime_cache.invalidate_all().await;
294 Ok(())
295 }
296
297 pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
298 self.ensure_schema_state_valid().await?;
299 self.coordinator.resolve_snapshot_id(branch).await
300 }
301
302 pub(crate) async fn resolved_target(
303 &self,
304 target: impl Into<ReadTarget>,
305 ) -> Result<ResolvedTarget> {
306 self.ensure_schema_state_valid().await?;
307 self.coordinator.resolve_target(&target.into()).await
308 }
309
310 pub async fn diff_between(
313 &self,
314 from: impl Into<ReadTarget>,
315 to: impl Into<ReadTarget>,
316 filter: &crate::changes::ChangeFilter,
317 ) -> Result<crate::changes::ChangeSet> {
318 let from_resolved = self.resolved_target(from).await?;
319 let to_resolved = self.resolved_target(to).await?;
320 crate::changes::diff_snapshots(
321 self.uri(),
322 &from_resolved.snapshot,
323 &to_resolved.snapshot,
324 filter,
325 to_resolved.branch.clone().or(from_resolved.branch.clone()),
326 )
327 .await
328 }
329
330 pub async fn diff_commits(
333 &self,
334 from_commit_id: &str,
335 to_commit_id: &str,
336 filter: &crate::changes::ChangeFilter,
337 ) -> Result<crate::changes::ChangeSet> {
338 let from_commit = self
339 .coordinator
340 .resolve_commit(&SnapshotId::new(from_commit_id))
341 .await?;
342 let to_commit = self
343 .coordinator
344 .resolve_commit(&SnapshotId::new(to_commit_id))
345 .await?;
346 let from_snap = self
347 .coordinator
348 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
349 from_commit.graph_commit_id.clone(),
350 )))
351 .await?;
352 let to_snap = self
353 .coordinator
354 .resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
355 to_commit.graph_commit_id.clone(),
356 )))
357 .await?;
358 crate::changes::diff_snapshots(
359 self.uri(),
360 &from_snap.snapshot,
361 &to_snap.snapshot,
362 filter,
363 to_snap.branch.clone().or(from_snap.branch.clone()),
364 )
365 .await
366 }
367
368 pub async fn entity_at_target(
369 &self,
370 target: impl Into<ReadTarget>,
371 table_key: &str,
372 id: &str,
373 ) -> Result<Option<serde_json::Value>> {
374 export::entity_at_target(self, target, table_key, id).await
375 }
376
377 pub async fn entity_at(
379 &self,
380 table_key: &str,
381 id: &str,
382 version: u64,
383 ) -> Result<Option<serde_json::Value>> {
384 export::entity_at(self, table_key, id, version).await
385 }
386
387 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
389 self.ensure_schema_state_valid().await?;
390 self.coordinator.snapshot_at_version(version).await
391 }
392
393 pub async fn export_jsonl(
394 &self,
395 branch: &str,
396 type_names: &[String],
397 table_keys: &[String],
398 ) -> Result<String> {
399 export::export_jsonl(self, branch, type_names, table_keys).await
400 }
401
402 pub async fn export_jsonl_to_writer<W: Write>(
403 &self,
404 branch: &str,
405 type_names: &[String],
406 table_keys: &[String],
407 writer: &mut W,
408 ) -> Result<()> {
409 export::export_jsonl_to_writer(self, branch, type_names, table_keys, writer).await
410 }
411
412 pub async fn graph_index(&self) -> Result<Arc<crate::graph_index::GraphIndex>> {
416 table_ops::graph_index(self).await
417 }
418
419 pub(crate) async fn graph_index_for_resolved(
420 &self,
421 resolved: &ResolvedTarget,
422 ) -> Result<Arc<crate::graph_index::GraphIndex>> {
423 table_ops::graph_index_for_resolved(self, resolved).await
424 }
425
426 pub async fn ensure_indices(&mut self) -> Result<()> {
439 table_ops::ensure_indices(self).await
440 }
441
442 pub async fn ensure_indices_on(&mut self, branch: &str) -> Result<()> {
443 table_ops::ensure_indices_on(self, branch).await
444 }
445
446 pub async fn optimize(&mut self) -> Result<Vec<optimize::TableOptimizeStats>> {
449 optimize::optimize_all_tables(self).await
450 }
451
452 pub async fn cleanup(
456 &mut self,
457 options: optimize::CleanupPolicyOptions,
458 ) -> Result<Vec<optimize::TableCleanupStats>> {
459 optimize::cleanup_all_tables(self, options).await
460 }
461
462 pub async fn read_blob(&self, type_name: &str, id: &str, property: &str) -> Result<BlobFile> {
472 self.ensure_schema_state_valid().await?;
473 let node_type = self
474 .catalog
475 .node_types
476 .get(type_name)
477 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
478 if !node_type.blob_properties.contains(property) {
479 return Err(OmniError::manifest(format!(
480 "property '{}' on type '{}' is not a Blob",
481 property, type_name
482 )));
483 }
484
485 let snapshot = self.snapshot();
486 let table_key = format!("node:{}", type_name);
487 let ds = snapshot.open(&table_key).await?;
488
489 let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
490 let row_id = self
491 .table_store
492 .first_row_id_for_filter(&ds, &filter_sql)
493 .await?
494 .ok_or_else(|| {
495 OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
496 })?;
497
498 let ds = Arc::new(ds);
500 let mut blobs = ds
501 .take_blobs(&[row_id], property)
502 .await
503 .map_err(|e| OmniError::Lance(e.to_string()))?;
504
505 blobs.pop().ok_or_else(|| {
506 OmniError::manifest(format!(
507 "blob '{}' on {} '{}' returned no data",
508 property, type_name, id
509 ))
510 })
511 }
512
513 pub(crate) fn active_branch(&self) -> Option<&str> {
514 self.coordinator.current_branch()
515 }
516
517 async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
518 let descendants = self.coordinator.branch_descendants(branch).await?;
519 if let Some(descendant) = descendants.first() {
520 return Err(OmniError::manifest_conflict(format!(
521 "cannot delete branch '{}' because descendant branch '{}' still depends on it",
522 branch, descendant
523 )));
524 }
525
526 for run in self.list_runs().await? {
527 if run.target_branch == branch && matches!(run.status, RunStatus::Running) {
528 return Err(OmniError::manifest_conflict(format!(
529 "cannot delete branch '{}' while run '{}' targeting it is {}",
530 branch,
531 run.run_id,
532 run.status.as_str()
533 )));
534 }
535 }
536
537 for other_branch in branches
538 .iter()
539 .filter(|candidate| candidate.as_str() != branch)
540 {
541 let snapshot = self
542 .snapshot_of(ReadTarget::branch(other_branch.as_str()))
543 .await?;
544 if snapshot
545 .entries()
546 .any(|entry| entry.table_branch.as_deref() == Some(branch))
547 {
548 return Err(OmniError::manifest_conflict(format!(
549 "cannot delete branch '{}' because branch '{}' still depends on it",
550 branch, other_branch
551 )));
552 }
553 }
554
555 Ok(())
556 }
557
558 async fn cleanup_deleted_branch_tables(
559 &self,
560 branch: &str,
561 owned_tables: &[(String, String)],
562 ) -> Result<()> {
563 let mut seen_paths = HashSet::new();
564 let mut cleanup_targets = owned_tables
565 .iter()
566 .filter(|(_, table_path)| seen_paths.insert(table_path.clone()))
567 .cloned()
568 .collect::<Vec<_>>();
569 cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
570
571 for (table_key, table_path) in cleanup_targets {
572 let dataset_uri = self.table_store.dataset_uri(&table_path);
573 if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await {
574 return Err(OmniError::manifest_internal(format!(
575 "branch '{}' was deleted but cleanup failed for {}: {}",
576 branch, table_key, err
577 )));
578 }
579 }
580
581 Ok(())
582 }
583
584 async fn delete_branch_storage_only(&mut self, branch: &str) -> Result<()> {
585 if self.coordinator.current_branch() == Some(branch) {
586 return Err(OmniError::manifest_conflict(format!(
587 "cannot delete currently active branch '{}'",
588 branch
589 )));
590 }
591
592 let branch_snapshot = self.snapshot_of(ReadTarget::branch(branch)).await?;
593 let owned_tables = branch_snapshot
594 .entries()
595 .filter(|entry| entry.table_branch.as_deref() == Some(branch))
596 .map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
597 .collect::<Vec<_>>();
598
599 self.coordinator.branch_delete(branch).await?;
600 self.cleanup_deleted_branch_tables(branch, &owned_tables)
601 .await
602 }
603
604 async fn cleanup_terminal_run_branches_for_target(&mut self, branch: &str) -> Result<()> {
605 let terminal_run_branches = self
606 .list_runs()
607 .await?
608 .into_iter()
609 .filter(|run| {
610 run.target_branch == branch
611 && matches!(
612 run.status,
613 RunStatus::Published | RunStatus::Aborted | RunStatus::Failed
614 )
615 })
616 .map(|run| run.run_branch)
617 .collect::<Vec<_>>();
618
619 let live_branches: HashSet<String> =
620 self.coordinator.all_branches().await?.into_iter().collect();
621
622 for run_branch in terminal_run_branches {
623 if !live_branches.contains(&run_branch) {
624 continue;
625 }
626 match self.delete_branch_storage_only(&run_branch).await {
627 Ok(()) => {}
628 Err(OmniError::Manifest(err)) if err.kind == ManifestErrorKind::NotFound => {}
629 Err(err) => return Err(err),
630 }
631 }
632
633 Ok(())
634 }
635
636 pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
637 normalize_branch_name(branch)
638 }
639
640 pub(crate) async fn head_commit_id_for_branch(
641 &self,
642 branch: Option<&str>,
643 ) -> Result<Option<String>> {
644 let mut coordinator = self.open_coordinator_for_branch(branch).await?;
645 coordinator.ensure_commit_graph_initialized().await?;
646 coordinator
647 .head_commit_id()
648 .await
649 .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
650 }
651
652 pub async fn branch_create(&mut self, name: &str) -> Result<()> {
653 self.ensure_schema_state_valid().await?;
654 self.ensure_schema_apply_idle("branch_create").await?;
655 ensure_public_branch_ref(name, "branch_create")?;
656 self.coordinator.branch_create(name).await
657 }
658
659 pub(crate) fn current_audit_actor(&self) -> Option<&str> {
660 self.audit_actor_id.as_deref()
661 }
662
663 pub async fn branch_create_from(
664 &mut self,
665 from: impl Into<ReadTarget>,
666 name: &str,
667 ) -> Result<()> {
668 self.ensure_schema_apply_idle("branch_create_from").await?;
669 self.branch_create_from_impl(from, name, false).await
670 }
671
672 async fn branch_create_from_impl(
673 &mut self,
674 from: impl Into<ReadTarget>,
675 name: &str,
676 allow_internal_refs: bool,
677 ) -> Result<()> {
678 let target = from.into();
679 let ReadTarget::Branch(branch_name) = target else {
680 return Err(OmniError::manifest(
681 "branch creation from pinned snapshots is not supported yet".to_string(),
682 ));
683 };
684 if !allow_internal_refs {
685 ensure_public_branch_ref(&branch_name, "branch_create_from")?;
686 ensure_public_branch_ref(name, "branch_create_from")?;
687 }
688 let branch = normalize_branch_name(&branch_name)?;
689 let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?;
690 let result = self.coordinator.branch_create(name).await;
691 self.restore_coordinator(previous);
692 result
693 }
694
695 pub async fn branch_list(&self) -> Result<Vec<String>> {
696 self.ensure_schema_state_valid().await?;
697 self.coordinator.branch_list().await
698 }
699
700 pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
701 self.ensure_schema_state_valid().await?;
702 self.ensure_schema_apply_idle("branch_delete").await?;
703 ensure_public_branch_ref(name, "branch_delete")?;
704 self.refresh().await?;
705 let branch = normalize_branch_name(name)?
706 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
707 let branches = self.coordinator.branch_list().await?;
708 if !branches.iter().any(|candidate| candidate == &branch) {
709 return Err(OmniError::manifest_not_found(format!(
710 "branch '{}' not found",
711 branch
712 )));
713 }
714
715 self.ensure_branch_delete_safe(&branch, &branches).await?;
716 self.cleanup_terminal_run_branches_for_target(&branch)
717 .await?;
718 self.delete_branch_storage_only(&branch).await
719 }
720
721 pub(crate) async fn latest_branch_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
722 let normalized = normalize_branch_name(branch)?;
723 let fresh = self
724 .open_coordinator_for_branch(normalized.as_deref())
725 .await?;
726 fresh.resolve_snapshot_id(branch).await
727 }
728
729 pub async fn begin_run(
730 &mut self,
731 target_branch: &str,
732 operation_hash: Option<&str>,
733 ) -> Result<RunRecord> {
734 self.begin_run_as(target_branch, operation_hash, None).await
735 }
736
737 pub async fn begin_run_as(
738 &mut self,
739 target_branch: &str,
740 operation_hash: Option<&str>,
741 actor_id: Option<&str>,
742 ) -> Result<RunRecord> {
743 self.ensure_schema_state_valid().await?;
744 self.ensure_schema_apply_idle("begin_run").await?;
745 ensure_public_branch_ref(target_branch, "begin_run")?;
746 let target_branch =
747 normalize_branch_name(target_branch)?.unwrap_or_else(|| "main".to_string());
748 let fresh = self
749 .open_coordinator_for_branch(Self::normalize_branch_name(&target_branch)?.as_deref())
750 .await?;
751 let base_snapshot_id = fresh.resolve_snapshot_id(&target_branch).await?;
752 let base_manifest_version = fresh.version();
753 let record = RunRecord::new(
754 target_branch.clone(),
755 base_snapshot_id.as_str(),
756 base_manifest_version,
757 operation_hash.map(str::to_string),
758 actor_id
759 .map(str::to_string)
760 .or_else(|| self.current_audit_actor().map(str::to_string)),
761 )?;
762
763 self.branch_create_from_impl(
764 ReadTarget::branch(target_branch.clone()),
765 &record.run_branch,
766 true,
767 )
768 .await?;
769 self.coordinator.append_run_record(&record).await?;
770 Ok(record)
771 }
772
773 pub async fn get_run(&self, run_id: &RunId) -> Result<RunRecord> {
774 self.ensure_schema_state_valid().await?;
775 self.coordinator.get_run(run_id).await
776 }
777
778 pub async fn list_runs(&self) -> Result<Vec<RunRecord>> {
779 self.ensure_schema_state_valid().await?;
780 self.coordinator.list_runs().await
781 }
782
783 pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
784 self.ensure_schema_state_valid().await?;
785 self.coordinator
786 .resolve_commit(&SnapshotId::new(commit_id))
787 .await
788 }
789
790 pub async fn list_commits(&self, branch: Option<&str>) -> Result<Vec<GraphCommit>> {
791 self.ensure_schema_state_valid().await?;
792 let branch = match branch {
793 Some(branch) => normalize_branch_name(branch)?,
794 None => None,
795 };
796 let coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
797 coordinator.list_commits().await
798 }
799
800 pub async fn abort_run(&mut self, run_id: &RunId) -> Result<RunRecord> {
801 self.ensure_schema_state_valid().await?;
802 let run = self.get_run(run_id).await?;
803 match run.status {
804 RunStatus::Running | RunStatus::Failed => {
805 self.terminate_run(&run, RunStatus::Aborted, None).await
806 }
807 RunStatus::Published => Err(OmniError::manifest_conflict(format!(
808 "run '{}' is already published",
809 run_id
810 ))),
811 RunStatus::Aborted => Err(OmniError::manifest_conflict(format!(
812 "run '{}' is already aborted",
813 run_id
814 ))),
815 }
816 }
817
818 pub async fn fail_run(&mut self, run_id: &RunId) -> Result<RunRecord> {
819 self.ensure_schema_state_valid().await?;
820 let run = self.get_run(run_id).await?;
821 match run.status {
822 RunStatus::Running => self.terminate_run(&run, RunStatus::Failed, None).await,
823 RunStatus::Failed => Ok(run),
824 RunStatus::Published => Err(OmniError::manifest_conflict(format!(
825 "run '{}' is already published",
826 run_id
827 ))),
828 RunStatus::Aborted => Err(OmniError::manifest_conflict(format!(
829 "run '{}' is already aborted",
830 run_id
831 ))),
832 }
833 }
834
835 async fn terminate_run(
840 &mut self,
841 run: &RunRecord,
842 status: RunStatus,
843 published_snapshot_id: Option<String>,
844 ) -> Result<RunRecord> {
845 let updated = run.with_status(status, published_snapshot_id)?;
846 self.coordinator.append_run_record(&updated).await?;
847 let _ = self.delete_branch_storage_only(&updated.run_branch).await;
848 Ok(updated)
849 }
850
851 pub async fn publish_run(&mut self, run_id: &RunId) -> Result<SnapshotId> {
852 self.publish_run_as(run_id, None).await
853 }
854
855 pub async fn publish_run_as(
856 &mut self,
857 run_id: &RunId,
858 actor_id: Option<&str>,
859 ) -> Result<SnapshotId> {
860 self.ensure_schema_state_valid().await?;
861 self.ensure_schema_apply_idle("publish_run").await?;
862 let run = self.get_run(run_id).await?;
863 match run.status {
864 RunStatus::Running => {}
865 RunStatus::Published => {
866 return run
867 .published_snapshot_id
868 .clone()
869 .map(SnapshotId::new)
870 .ok_or_else(|| {
871 OmniError::manifest(format!(
872 "run '{}' is published but missing published snapshot id",
873 run_id
874 ))
875 });
876 }
877 RunStatus::Failed | RunStatus::Aborted => {
878 return Err(OmniError::manifest_conflict(format!(
879 "run '{}' is not publishable from status '{}'",
880 run_id,
881 run.status.as_str()
882 )));
883 }
884 }
885
886 let publish_actor = actor_id
887 .map(str::to_string)
888 .or_else(|| run.actor_id.clone());
889 let current_target_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
890 let previous_actor = self.audit_actor_id.clone();
891 self.audit_actor_id = publish_actor.clone();
892 let publish_result = if current_target_snapshot_id.as_str() == run.base_snapshot_id {
893 let run_for_promotion = run.clone();
894 self.sync_branch(&run_for_promotion.target_branch).await?;
895 self.promote_run_snapshot_to_target(&run_for_promotion)
896 .await
897 } else {
898 let run_branch = run.run_branch.clone();
899 let target_branch = run.target_branch.clone();
900 self.branch_merge_internal(&run_branch, &target_branch)
901 .await?;
902 self.reify_internal_run_refs(&target_branch, &run_branch)
903 .await
904 };
905 self.audit_actor_id = previous_actor;
906 publish_result?;
907 let published_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
908 self.terminate_run(
909 &run,
910 RunStatus::Published,
911 Some(published_snapshot_id.as_str().to_string()),
912 )
913 .await?;
914 Ok(published_snapshot_id)
915 }
916
917 async fn promote_run_snapshot_to_target(&mut self, run: &RunRecord) -> Result<()> {
918 let target_snapshot = self
919 .snapshot_of(ReadTarget::branch(run.target_branch.as_str()))
920 .await?;
921 let run_snapshot = self
922 .snapshot_of(ReadTarget::branch(run.run_branch.as_str()))
923 .await?;
924 let mut table_keys = std::collections::BTreeSet::new();
925 for entry in target_snapshot.entries() {
926 table_keys.insert(entry.table_key.clone());
927 }
928 for entry in run_snapshot.entries() {
929 table_keys.insert(entry.table_key.clone());
930 }
931
932 let mut updates = Vec::new();
933 let mut changed_edge_tables = false;
934 let target_branch = normalize_branch_name(&run.target_branch)?;
935
936 for table_key in table_keys {
937 let target_entry = target_snapshot.entry(&table_key);
938 let run_entry = run_snapshot.entry(&table_key);
939 if same_manifest_state(target_entry, run_entry) {
940 continue;
941 }
942 let Some(_run_entry) = run_entry else {
943 return Err(OmniError::manifest(format!(
944 "run '{}' removed table '{}' which publish_run does not support",
945 run.run_id, table_key
946 )));
947 };
948
949 let source_ds = run_snapshot.open(&table_key).await?;
950 let batch = self.batch_for_table_rewrite(&source_ds, &table_key).await?;
951
952 let (mut target_ds, full_path, table_branch) = self
953 .open_for_mutation_on_branch(target_branch.as_deref(), &table_key)
954 .await?;
955 let state = self
956 .table_store()
957 .overwrite_batch(&full_path, &mut target_ds, batch)
958 .await?;
959 updates.push(crate::db::SubTableUpdate {
960 table_key: table_key.clone(),
961 table_version: state.version,
962 table_branch,
963 row_count: state.row_count,
964 version_metadata: state.version_metadata,
965 });
966 if table_key.starts_with("edge:") {
967 changed_edge_tables = true;
968 }
969 }
970
971 if !updates.is_empty() {
972 self.commit_updates_on_branch(target_branch.as_deref(), &updates)
973 .await?;
974 if changed_edge_tables {
975 self.invalidate_graph_index().await;
976 }
977 }
978
979 Ok(())
980 }
981
982 async fn reify_internal_run_refs(
983 &mut self,
984 target_branch: &str,
985 run_branch: &str,
986 ) -> Result<()> {
987 let target_snapshot = self.snapshot_of(ReadTarget::branch(target_branch)).await?;
988 let mut updates = Vec::new();
989 let mut changed_edge_tables = false;
990 let target_branch = normalize_branch_name(target_branch)?;
991
992 for entry in target_snapshot.entries() {
993 if entry.table_branch.as_deref() != Some(run_branch) {
994 continue;
995 }
996
997 let source_ds = target_snapshot.open(&entry.table_key).await?;
998 let batch = self
999 .batch_for_table_rewrite(&source_ds, &entry.table_key)
1000 .await?;
1001
1002 let (mut target_ds, full_path, table_branch) = self
1003 .open_for_mutation_on_branch(target_branch.as_deref(), &entry.table_key)
1004 .await?;
1005 let state = self
1006 .table_store()
1007 .overwrite_batch(&full_path, &mut target_ds, batch)
1008 .await?;
1009 updates.push(crate::db::SubTableUpdate {
1010 table_key: entry.table_key.clone(),
1011 table_version: state.version,
1012 table_branch,
1013 row_count: state.row_count,
1014 version_metadata: state.version_metadata,
1015 });
1016 if entry.table_key.starts_with("edge:") {
1017 changed_edge_tables = true;
1018 }
1019 }
1020
1021 if !updates.is_empty() {
1022 self.commit_updates_on_branch(target_branch.as_deref(), &updates)
1023 .await?;
1024 if changed_edge_tables {
1025 self.invalidate_graph_index().await;
1026 }
1027 }
1028
1029 Ok(())
1030 }
1031
1032 pub(crate) async fn open_for_mutation(
1038 &self,
1039 table_key: &str,
1040 ) -> Result<(Dataset, String, Option<String>)> {
1041 table_ops::open_for_mutation(self, table_key).await
1042 }
1043
1044 pub(crate) async fn open_for_mutation_on_branch(
1045 &self,
1046 branch: Option<&str>,
1047 table_key: &str,
1048 ) -> Result<(Dataset, String, Option<String>)> {
1049 table_ops::open_for_mutation_on_branch(self, branch, table_key).await
1050 }
1051
1052 pub(crate) async fn fork_dataset_from_entry_state(
1053 &self,
1054 table_key: &str,
1055 full_path: &str,
1056 source_branch: Option<&str>,
1057 source_version: u64,
1058 active_branch: &str,
1059 ) -> Result<Dataset> {
1060 table_ops::fork_dataset_from_entry_state(
1061 self,
1062 table_key,
1063 full_path,
1064 source_branch,
1065 source_version,
1066 active_branch,
1067 )
1068 .await
1069 }
1070
1071 pub(crate) async fn reopen_for_mutation(
1072 &self,
1073 table_key: &str,
1074 full_path: &str,
1075 table_branch: Option<&str>,
1076 expected_version: u64,
1077 ) -> Result<Dataset> {
1078 table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version)
1079 .await
1080 }
1081
1082 pub(crate) async fn open_dataset_at_state(
1083 &self,
1084 table_path: &str,
1085 table_branch: Option<&str>,
1086 table_version: u64,
1087 ) -> Result<Dataset> {
1088 table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
1089 }
1090
1091 pub(crate) async fn build_indices_on_dataset(
1092 &self,
1093 table_key: &str,
1094 ds: &mut Dataset,
1095 ) -> Result<()> {
1096 table_ops::build_indices_on_dataset(self, table_key, ds).await
1097 }
1098
1099 pub(crate) async fn build_indices_on_dataset_for_catalog(
1100 &self,
1101 catalog: &Catalog,
1102 table_key: &str,
1103 ds: &mut Dataset,
1104 ) -> Result<()> {
1105 table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
1106 }
1107
1108 pub(crate) async fn commit_updates(
1109 &mut self,
1110 updates: &[crate::db::SubTableUpdate],
1111 ) -> Result<u64> {
1112 table_ops::commit_updates(self, updates).await
1113 }
1114
1115 pub(crate) async fn commit_manifest_updates(
1116 &mut self,
1117 updates: &[crate::db::SubTableUpdate],
1118 ) -> Result<u64> {
1119 table_ops::commit_manifest_updates(self, updates).await
1120 }
1121
1122 pub(crate) async fn record_merge_commit(
1123 &mut self,
1124 manifest_version: u64,
1125 parent_commit_id: &str,
1126 merged_parent_commit_id: &str,
1127 ) -> Result<String> {
1128 table_ops::record_merge_commit(
1129 self,
1130 manifest_version,
1131 parent_commit_id,
1132 merged_parent_commit_id,
1133 )
1134 .await
1135 }
1136
1137 pub(crate) async fn commit_updates_on_branch(
1138 &mut self,
1139 branch: Option<&str>,
1140 updates: &[crate::db::SubTableUpdate],
1141 ) -> Result<u64> {
1142 table_ops::commit_updates_on_branch(self, branch, updates).await
1143 }
1144
1145 pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
1146 table_ops::ensure_commit_graph_initialized(self).await
1147 }
1148
1149 pub(crate) async fn invalidate_graph_index(&self) {
1151 table_ops::invalidate_graph_index(self).await
1152 }
1153
1154 async fn batch_for_table_rewrite(
1155 &self,
1156 source_ds: &Dataset,
1157 table_key: &str,
1158 ) -> Result<RecordBatch> {
1159 schema_apply::batch_for_table_rewrite(self, source_ds, table_key).await
1160 }
1161}
1162
1163pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
1164 let branch = branch.trim();
1165 if branch.is_empty() {
1166 return Err(OmniError::manifest(
1167 "branch name cannot be empty".to_string(),
1168 ));
1169 }
1170 if branch == "main" {
1171 return Ok(None);
1172 }
1173 Ok(Some(branch.to_string()))
1174}
1175
1176fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
1177 if super::is_internal_run_branch(branch) {
1178 return Err(OmniError::manifest(format!(
1179 "{} does not allow internal run ref '{}'",
1180 operation, branch
1181 )));
1182 }
1183 if is_internal_system_branch(branch) {
1184 return Err(OmniError::manifest(format!(
1185 "{} does not allow internal system ref '{}'",
1186 operation, branch
1187 )));
1188 }
1189 Ok(())
1190}
1191
1192fn same_manifest_state(
1193 left: Option<&crate::db::SubTableEntry>,
1194 right: Option<&crate::db::SubTableEntry>,
1195) -> bool {
1196 match (left, right) {
1197 (None, None) => true,
1198 (Some(left), Some(right)) => {
1199 left.table_path == right.table_path
1200 && left.table_version == right.table_version
1201 && left.table_branch == right.table_branch
1202 && left.row_count == right.row_count
1203 }
1204 _ => false,
1205 }
1206}
1207
1208fn concat_or_empty_batches(schema: Arc<Schema>, batches: Vec<RecordBatch>) -> Result<RecordBatch> {
1209 if batches.is_empty() {
1210 return Ok(RecordBatch::new_empty(schema));
1211 }
1212 if batches.len() == 1 {
1213 return Ok(batches.into_iter().next().unwrap());
1214 }
1215 let batch_schema = batches[0].schema();
1216 arrow_select::concat::concat_batches(&batch_schema, &batches)
1217 .map_err(|e| OmniError::Lance(e.to_string()))
1218}
1219
1220fn blob_properties_for_table_key<'a>(
1221 catalog: &'a Catalog,
1222 table_key: &str,
1223) -> Result<&'a std::collections::HashSet<String>> {
1224 if let Some(type_name) = table_key.strip_prefix("node:") {
1225 return catalog
1226 .node_types
1227 .get(type_name)
1228 .map(|node_type| &node_type.blob_properties)
1229 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)));
1230 }
1231 if let Some(type_name) = table_key.strip_prefix("edge:") {
1232 return catalog
1233 .edge_types
1234 .get(type_name)
1235 .map(|edge_type| &edge_type.blob_properties)
1236 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)));
1237 }
1238 Err(OmniError::manifest(format!(
1239 "invalid table key '{}'",
1240 table_key
1241 )))
1242}
1243
1244fn blob_description_is_null(descriptions: &StructArray, row: usize) -> Result<bool> {
1245 if descriptions.is_null(row) {
1246 return Ok(true);
1247 }
1248
1249 let kind = descriptions
1250 .column_by_name("kind")
1251 .and_then(|col| col.as_any().downcast_ref::<UInt32Array>())
1252 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row) as u8))
1253 .or_else(|| {
1254 descriptions
1255 .column_by_name("kind")
1256 .and_then(|col| col.as_any().downcast_ref::<arrow_array::UInt8Array>())
1257 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)))
1258 });
1259 let position = descriptions
1260 .column_by_name("position")
1261 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1262 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1263 let size = descriptions
1264 .column_by_name("size")
1265 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
1266 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1267 let blob_uri = descriptions
1268 .column_by_name("blob_uri")
1269 .and_then(|col| col.as_any().downcast_ref::<StringArray>())
1270 .and_then(|arr| (!arr.is_null(row)).then(|| arr.value(row)));
1271
1272 let Some(kind) = kind else {
1273 return Ok(true);
1274 };
1275 let kind = BlobKind::try_from(kind).map_err(|e| OmniError::Lance(e.to_string()))?;
1276 if kind != BlobKind::Inline {
1277 return Ok(false);
1278 }
1279
1280 Ok(position.unwrap_or(0) == 0 && size.unwrap_or(0) == 0 && blob_uri.unwrap_or("").is_empty())
1281}
1282
1283fn fixup_blob_schemas(catalog: &mut Catalog) {
1289 for node_type in catalog.node_types.values_mut() {
1290 if node_type.blob_properties.is_empty() {
1291 continue;
1292 }
1293 let fields: Vec<Field> = node_type
1294 .arrow_schema
1295 .fields()
1296 .iter()
1297 .map(|f| {
1298 if node_type.blob_properties.contains(f.name()) {
1299 blob_field(f.name(), f.is_nullable())
1300 } else {
1301 f.as_ref().clone()
1302 }
1303 })
1304 .collect();
1305 node_type.arrow_schema = Arc::new(Schema::new(fields));
1306 }
1307 for edge_type in catalog.edge_types.values_mut() {
1308 if edge_type.blob_properties.is_empty() {
1309 continue;
1310 }
1311 let fields: Vec<Field> = edge_type
1312 .arrow_schema
1313 .fields()
1314 .iter()
1315 .map(|f| {
1316 if edge_type.blob_properties.contains(f.name()) {
1317 blob_field(f.name(), f.is_nullable())
1318 } else {
1319 f.as_ref().clone()
1320 }
1321 })
1322 .collect();
1323 edge_type.arrow_schema = Arc::new(Schema::new(fields));
1324 }
1325}
1326
1327fn read_schema_ir_from_source(schema_source: &str) -> Result<SchemaIR> {
1328 let schema_ast = parse_schema(schema_source)?;
1329 build_schema_ir(&schema_ast).map_err(|err| OmniError::manifest(err.to_string()))
1330}
1331
1332fn schema_table_key(type_kind: SchemaTypeKind, name: &str) -> String {
1333 match type_kind {
1334 SchemaTypeKind::Node => format!("node:{}", name),
1335 SchemaTypeKind::Edge => format!("edge:{}", name),
1336 SchemaTypeKind::Interface => unreachable!("interfaces do not map to tables"),
1337 }
1338}
1339
1340fn schema_for_table_key(catalog: &Catalog, table_key: &str) -> Result<Arc<Schema>> {
1341 if let Some(type_name) = table_key.strip_prefix("node:") {
1342 let node_type: &NodeType = catalog
1343 .node_types
1344 .get(type_name)
1345 .ok_or_else(|| OmniError::manifest(format!("unknown node type '{}'", type_name)))?;
1346 return Ok(node_type.arrow_schema.clone());
1347 }
1348 if let Some(type_name) = table_key.strip_prefix("edge:") {
1349 let edge_type: &EdgeType = catalog
1350 .edge_types
1351 .get(type_name)
1352 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", type_name)))?;
1353 return Ok(edge_type.arrow_schema.clone());
1354 }
1355 Err(OmniError::manifest(format!(
1356 "invalid table key '{}'",
1357 table_key
1358 )))
1359}
1360
1361fn record_batch_row_to_json(batch: &RecordBatch, row: usize) -> Result<serde_json::Value> {
1362 let mut obj = serde_json::Map::new();
1363 for (i, field) in batch.schema().fields().iter().enumerate() {
1364 obj.insert(
1365 field.name().clone(),
1366 json_value_from_array(batch.column(i).as_ref(), row)?,
1367 );
1368 }
1369 Ok(serde_json::Value::Object(obj))
1370}
1371
1372fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Value> {
1373 if array.is_null(row) {
1374 return Ok(serde_json::Value::Null);
1375 }
1376
1377 match array.data_type() {
1378 DataType::Utf8 => Ok(serde_json::Value::String(
1379 array
1380 .as_any()
1381 .downcast_ref::<StringArray>()
1382 .ok_or_else(|| OmniError::Lance("expected StringArray".to_string()))?
1383 .value(row)
1384 .to_string(),
1385 )),
1386 DataType::LargeUtf8 => Ok(serde_json::Value::String(
1387 array
1388 .as_any()
1389 .downcast_ref::<LargeStringArray>()
1390 .ok_or_else(|| OmniError::Lance("expected LargeStringArray".to_string()))?
1391 .value(row)
1392 .to_string(),
1393 )),
1394 DataType::Boolean => Ok(serde_json::Value::Bool(
1395 array
1396 .as_any()
1397 .downcast_ref::<BooleanArray>()
1398 .ok_or_else(|| OmniError::Lance("expected BooleanArray".to_string()))?
1399 .value(row),
1400 )),
1401 DataType::Int32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1402 array
1403 .as_any()
1404 .downcast_ref::<Int32Array>()
1405 .ok_or_else(|| OmniError::Lance("expected Int32Array".to_string()))?
1406 .value(row),
1407 ))),
1408 DataType::Int64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1409 array
1410 .as_any()
1411 .downcast_ref::<Int64Array>()
1412 .ok_or_else(|| OmniError::Lance("expected Int64Array".to_string()))?
1413 .value(row),
1414 ))),
1415 DataType::UInt32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1416 array
1417 .as_any()
1418 .downcast_ref::<UInt32Array>()
1419 .ok_or_else(|| OmniError::Lance("expected UInt32Array".to_string()))?
1420 .value(row),
1421 ))),
1422 DataType::UInt64 => Ok(serde_json::Value::Number(serde_json::Number::from(
1423 array
1424 .as_any()
1425 .downcast_ref::<UInt64Array>()
1426 .ok_or_else(|| OmniError::Lance("expected UInt64Array".to_string()))?
1427 .value(row),
1428 ))),
1429 DataType::Float32 => {
1430 let value = array
1431 .as_any()
1432 .downcast_ref::<Float32Array>()
1433 .ok_or_else(|| OmniError::Lance("expected Float32Array".to_string()))?
1434 .value(row) as f64;
1435 Ok(serde_json::Value::Number(
1436 serde_json::Number::from_f64(value).ok_or_else(|| {
1437 OmniError::Lance(format!("cannot encode f32 value '{}' as JSON", value))
1438 })?,
1439 ))
1440 }
1441 DataType::Float64 => {
1442 let value = array
1443 .as_any()
1444 .downcast_ref::<Float64Array>()
1445 .ok_or_else(|| OmniError::Lance("expected Float64Array".to_string()))?
1446 .value(row);
1447 Ok(serde_json::Value::Number(
1448 serde_json::Number::from_f64(value).ok_or_else(|| {
1449 OmniError::Lance(format!("cannot encode f64 value '{}' as JSON", value))
1450 })?,
1451 ))
1452 }
1453 DataType::Date32 => Ok(serde_json::Value::Number(serde_json::Number::from(
1454 array
1455 .as_any()
1456 .downcast_ref::<Date32Array>()
1457 .ok_or_else(|| OmniError::Lance("expected Date32Array".to_string()))?
1458 .value(row),
1459 ))),
1460 DataType::Binary => Ok(serde_json::Value::String(base64::Engine::encode(
1461 &base64::engine::general_purpose::STANDARD,
1462 array
1463 .as_any()
1464 .downcast_ref::<BinaryArray>()
1465 .ok_or_else(|| OmniError::Lance("expected BinaryArray".to_string()))?
1466 .value(row),
1467 ))),
1468 DataType::LargeBinary => Ok(serde_json::Value::String(base64::Engine::encode(
1469 &base64::engine::general_purpose::STANDARD,
1470 array
1471 .as_any()
1472 .downcast_ref::<LargeBinaryArray>()
1473 .ok_or_else(|| OmniError::Lance("expected LargeBinaryArray".to_string()))?
1474 .value(row),
1475 ))),
1476 DataType::List(_) => {
1477 let list = array
1478 .as_any()
1479 .downcast_ref::<ListArray>()
1480 .ok_or_else(|| OmniError::Lance("expected ListArray".to_string()))?;
1481 let values = list.value(row);
1482 let mut out = Vec::with_capacity(values.len());
1483 for idx in 0..values.len() {
1484 out.push(json_value_from_array(values.as_ref(), idx)?);
1485 }
1486 Ok(serde_json::Value::Array(out))
1487 }
1488 DataType::LargeList(_) => {
1489 let list = array
1490 .as_any()
1491 .downcast_ref::<LargeListArray>()
1492 .ok_or_else(|| OmniError::Lance("expected LargeListArray".to_string()))?;
1493 let values = list.value(row);
1494 let mut out = Vec::with_capacity(values.len());
1495 for idx in 0..values.len() {
1496 out.push(json_value_from_array(values.as_ref(), idx)?);
1497 }
1498 Ok(serde_json::Value::Array(out))
1499 }
1500 DataType::FixedSizeList(_, _) => {
1501 let list = array
1502 .as_any()
1503 .downcast_ref::<FixedSizeListArray>()
1504 .ok_or_else(|| OmniError::Lance("expected FixedSizeListArray".to_string()))?;
1505 let values = list.value(row);
1506 let mut out = Vec::with_capacity(values.len());
1507 for idx in 0..values.len() {
1508 out.push(json_value_from_array(values.as_ref(), idx)?);
1509 }
1510 Ok(serde_json::Value::Array(out))
1511 }
1512 DataType::Struct(fields) => {
1513 let struct_array = array
1514 .as_any()
1515 .downcast_ref::<StructArray>()
1516 .ok_or_else(|| OmniError::Lance("expected StructArray".to_string()))?;
1517 let mut obj = serde_json::Map::new();
1518 for (field_idx, field) in fields.iter().enumerate() {
1519 obj.insert(
1520 field.name().clone(),
1521 json_value_from_array(struct_array.column(field_idx).as_ref(), row)?,
1522 );
1523 }
1524 Ok(serde_json::Value::Object(obj))
1525 }
1526 _ => {
1527 let value = arrow_cast::display::array_value_to_string(array, row)
1528 .map_err(|e| OmniError::Lance(e.to_string()))?;
1529 Ok(serde_json::Value::String(value))
1530 }
1531 }
1532}
1533
1534#[cfg(test)]
1535mod tests {
1536 use super::*;
1537 use crate::db::is_internal_run_branch;
1538 use crate::db::manifest::ManifestCoordinator;
1539 use async_trait::async_trait;
1540 use serde_json::Value;
1541 use std::sync::Mutex;
1542
1543 use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri};
1544
1545 const TEST_SCHEMA: &str = r#"
1546node Person {
1547 name: String @key
1548 age: I32?
1549}
1550node Company {
1551 name: String @key
1552}
1553edge Knows: Person -> Person {
1554 since: Date?
1555}
1556edge WorksAt: Person -> Company
1557"#;
1558
1559 #[derive(Debug, Default)]
1560 struct RecordingStorageAdapter {
1561 inner: LocalStorageAdapter,
1562 reads: Mutex<Vec<String>>,
1563 writes: Mutex<Vec<String>>,
1564 exists_checks: Mutex<Vec<String>>,
1565 }
1566
1567 impl RecordingStorageAdapter {
1568 fn reads(&self) -> Vec<String> {
1569 self.reads.lock().unwrap().clone()
1570 }
1571
1572 fn writes(&self) -> Vec<String> {
1573 self.writes.lock().unwrap().clone()
1574 }
1575
1576 fn exists_checks(&self) -> Vec<String> {
1577 self.exists_checks.lock().unwrap().clone()
1578 }
1579 }
1580
1581 #[async_trait]
1582 impl StorageAdapter for RecordingStorageAdapter {
1583 async fn read_text(&self, uri: &str) -> Result<String> {
1584 self.reads.lock().unwrap().push(uri.to_string());
1585 self.inner.read_text(uri).await
1586 }
1587
1588 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
1589 self.writes.lock().unwrap().push(uri.to_string());
1590 self.inner.write_text(uri, contents).await
1591 }
1592
1593 async fn exists(&self, uri: &str) -> Result<bool> {
1594 self.exists_checks.lock().unwrap().push(uri.to_string());
1595 self.inner.exists(uri).await
1596 }
1597 }
1598
1599 #[tokio::test]
1600 async fn test_init_and_open_route_graph_metadata_through_storage_adapter() {
1601 let dir = tempfile::tempdir().unwrap();
1602 let uri = dir.path().to_str().unwrap();
1603 let adapter = Arc::new(RecordingStorageAdapter::default());
1604
1605 Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
1606 .await
1607 .unwrap();
1608 assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));
1609 assert!(adapter.writes().contains(&join_uri(uri, "_schema.ir.json")));
1610 assert!(
1611 adapter
1612 .writes()
1613 .contains(&join_uri(uri, "__schema_state.json"))
1614 );
1615
1616 Omnigraph::open_with_storage(uri, adapter.clone())
1617 .await
1618 .unwrap();
1619 assert!(adapter.reads().contains(&join_uri(uri, "_schema.pg")));
1620 assert!(adapter.reads().contains(&join_uri(uri, "_schema.ir.json")));
1621 assert!(
1622 adapter
1623 .reads()
1624 .contains(&join_uri(uri, "__schema_state.json"))
1625 );
1626 assert!(
1627 adapter
1628 .exists_checks()
1629 .contains(&join_uri(uri, "_schema.ir.json"))
1630 );
1631 assert!(
1632 adapter
1633 .exists_checks()
1634 .contains(&join_uri(uri, "__schema_state.json"))
1635 );
1636 assert!(
1637 adapter
1638 .exists_checks()
1639 .contains(&join_uri(uri, "_graph_commits.lance"))
1640 );
1641 }
1642
1643 async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
1644 let snapshot = db.snapshot();
1645 let ds = snapshot.open(table_key).await.unwrap();
1646 let batches = db.table_store().scan_batches(&ds).await.unwrap();
1647 batches
1648 .into_iter()
1649 .flat_map(|batch| {
1650 (0..batch.num_rows())
1651 .map(|row| record_batch_row_to_json(&batch, row).unwrap())
1652 .collect::<Vec<_>>()
1653 })
1654 .collect()
1655 }
1656
1657 async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
1658 let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
1659 let schema: Arc<Schema> = Arc::new(ds.schema().into());
1660 let columns: Vec<Arc<dyn Array>> = schema
1661 .fields()
1662 .iter()
1663 .map(|field| match field.name().as_str() {
1664 "id" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1665 "name" => Arc::new(StringArray::from(vec![name])) as Arc<dyn Array>,
1666 "age" => Arc::new(Int32Array::from(vec![age])) as Arc<dyn Array>,
1667 _ => new_null_array(field.data_type(), 1),
1668 })
1669 .collect();
1670 let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
1671 let state = db
1672 .table_store()
1673 .append_batch(&full_path, &mut ds, batch)
1674 .await
1675 .unwrap();
1676 db.commit_updates(&[crate::db::SubTableUpdate {
1677 table_key: "node:Person".to_string(),
1678 table_version: state.version,
1679 table_branch,
1680 row_count: state.row_count,
1681 version_metadata: state.version_metadata,
1682 }])
1683 .await
1684 .unwrap();
1685 }
1686
1687 #[tokio::test]
1688 async fn test_apply_schema_adds_nullable_property_and_preserves_rows() {
1689 let dir = tempfile::tempdir().unwrap();
1690 let uri = dir.path().to_str().unwrap();
1691 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1692 seed_person_row(&mut db, "Alice", Some(30)).await;
1693
1694 let desired = TEST_SCHEMA.replace(
1695 " age: I32?\n}",
1696 " age: I32?\n nickname: String?\n}",
1697 );
1698 let result = db.apply_schema(&desired).await.unwrap();
1699 assert!(result.applied);
1700
1701 let reopened = Omnigraph::open(uri).await.unwrap();
1702 let rows = table_rows_json(&reopened, "node:Person").await;
1703 assert_eq!(rows.len(), 1);
1704 assert_eq!(rows[0]["name"], "Alice");
1705 assert_eq!(rows[0]["age"], 30);
1706 assert!(rows[0]["nickname"].is_null());
1707 assert!(
1708 reopened.catalog().node_types["Person"]
1709 .properties
1710 .contains_key("nickname")
1711 );
1712 assert!(dir.path().join("_schema.pg").exists());
1713 }
1714
1715 #[tokio::test]
1716 async fn test_apply_schema_renames_property_and_preserves_values() {
1717 let dir = tempfile::tempdir().unwrap();
1718 let uri = dir.path().to_str().unwrap();
1719 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1720 seed_person_row(&mut db, "Alice", Some(30)).await;
1721
1722 let desired = TEST_SCHEMA.replace(
1723 " age: I32?\n}",
1724 " years: I32? @rename_from(\"age\")\n}",
1725 );
1726 db.apply_schema(&desired).await.unwrap();
1727
1728 let reopened = Omnigraph::open(uri).await.unwrap();
1729 let rows = table_rows_json(&reopened, "node:Person").await;
1730 assert_eq!(rows[0]["name"], "Alice");
1731 assert_eq!(rows[0]["years"], 30);
1732 assert!(rows[0].get("age").is_none());
1733 }
1734
1735 #[tokio::test]
1736 async fn test_apply_schema_renames_type_and_preserves_historical_snapshot() {
1737 let dir = tempfile::tempdir().unwrap();
1738 let uri = dir.path().to_str().unwrap();
1739 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1740 seed_person_row(&mut db, "Alice", Some(30)).await;
1741 let before_version = db.snapshot().version();
1742
1743 let desired = TEST_SCHEMA
1744 .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
1745 .replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
1746 .replace(
1747 "edge WorksAt: Person -> Company",
1748 "edge WorksAt: Human -> Company",
1749 );
1750 db.apply_schema(&desired).await.unwrap();
1751
1752 let head = db.snapshot();
1753 assert!(head.entry("node:Person").is_none());
1754 assert!(head.entry("node:Human").is_some());
1755 let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
1756 .await
1757 .unwrap();
1758 assert!(historical.entry("node:Person").is_some());
1759 assert!(historical.entry("node:Human").is_none());
1760 }
1761
1762 #[tokio::test]
1763 async fn test_apply_schema_succeeds_after_load() {
1764 let dir = tempfile::tempdir().unwrap();
1771 let uri = dir.path().to_str().unwrap();
1772 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1773
1774 crate::loader::load_jsonl(
1775 &mut db,
1776 r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
1777 crate::loader::LoadMode::Overwrite,
1778 )
1779 .await
1780 .unwrap();
1781
1782 let all_branches = db.coordinator.all_branches().await.unwrap();
1783 assert!(
1784 !all_branches.iter().any(|b| is_internal_run_branch(b)),
1785 "MR-674: run branch should be deleted after publish, got: {:?}",
1786 all_branches
1787 );
1788
1789 let desired = TEST_SCHEMA.replace(
1790 " age: I32?\n}",
1791 " age: I32?\n nickname: String?\n}",
1792 );
1793 let result = db.apply_schema(&desired).await.unwrap();
1794 assert!(result.applied, "schema apply should have applied");
1795 }
1796
1797 #[tokio::test]
1798 async fn test_apply_schema_adds_index_for_existing_property() {
1799 let dir = tempfile::tempdir().unwrap();
1800 let uri = dir.path().to_str().unwrap();
1801 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1802
1803 let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1804 db.apply_schema(&desired).await.unwrap();
1805
1806 let snapshot = db.snapshot();
1807 let ds = snapshot.open("node:Person").await.unwrap();
1808 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1809 }
1810
1811 #[tokio::test]
1812 async fn test_apply_schema_rewrite_preserves_existing_indices() {
1813 let dir = tempfile::tempdir().unwrap();
1814 let uri = dir.path().to_str().unwrap();
1815 let initial_schema = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
1816 let mut db = Omnigraph::init(uri, &initial_schema).await.unwrap();
1817 seed_person_row(&mut db, "Alice", Some(30)).await;
1818
1819 let desired = initial_schema.replace(
1820 " age: I32?\n}",
1821 " age: I32?\n nickname: String?\n}",
1822 );
1823 db.apply_schema(&desired).await.unwrap();
1824
1825 let snapshot = db.snapshot();
1826 let ds = snapshot.open("node:Person").await.unwrap();
1827 assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
1828 assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
1829 }
1830
1831 #[tokio::test]
1832 async fn test_open_for_mutation_rejects_while_schema_apply_locked() {
1833 let dir = tempfile::tempdir().unwrap();
1834 let uri = dir.path().to_str().unwrap();
1835 let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1836 let mut db = db;
1837 db.coordinator
1838 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1839 .await
1840 .unwrap();
1841
1842 let err = db.open_for_mutation("node:Person").await.unwrap_err();
1843 assert!(
1844 err.to_string()
1845 .contains("write is unavailable while schema apply is in progress")
1846 );
1847 }
1848
1849 #[tokio::test]
1850 async fn test_commit_updates_rejects_while_schema_apply_locked() {
1851 let dir = tempfile::tempdir().unwrap();
1852 let uri = dir.path().to_str().unwrap();
1853 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1854 db.coordinator
1855 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1856 .await
1857 .unwrap();
1858
1859 let err = db.commit_updates(&[]).await.unwrap_err();
1860 assert!(
1861 err.to_string()
1862 .contains("write commit is unavailable while schema apply is in progress")
1863 );
1864 }
1865
1866 #[tokio::test]
1867 async fn test_branch_list_hides_schema_apply_lock_branch() {
1868 let dir = tempfile::tempdir().unwrap();
1869 let uri = dir.path().to_str().unwrap();
1870 let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
1871 db.coordinator
1872 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
1873 .await
1874 .unwrap();
1875
1876 let branches = db.branch_list().await.unwrap();
1877 assert_eq!(branches, vec!["main".to_string()]);
1878 }
1879}