1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4
5use omnigraph_compiler::catalog::Catalog;
6
7use crate::error::{OmniError, Result};
8use crate::failpoints;
9use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
10
11use super::commit_graph::{CommitGraph, GraphCommit};
12use super::is_internal_system_branch;
13use super::manifest::{
14 ManifestChange, ManifestCoordinator, ManifestIncarnation, Snapshot, SubTableUpdate,
15};
16
17const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct SnapshotId(String);
21
22impl SnapshotId {
23 pub fn new(id: impl Into<String>) -> Self {
24 Self(id.into())
25 }
26
27 pub fn as_str(&self) -> &str {
28 &self.0
29 }
30
31 pub(crate) fn synthetic(branch: Option<&str>, version: u64, e_tag: Option<&str>) -> Self {
32 let branch = branch.unwrap_or("main");
33 match e_tag {
34 Some(e_tag) => Self(format!("manifest:{}:v{}:etag:{}", branch, version, e_tag)),
35 None => Self(format!("manifest:{}:v{}", branch, version)),
36 }
37 }
38}
39
40impl fmt::Display for SnapshotId {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 self.0.fmt(f)
43 }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum ReadTarget {
48 Branch(String),
49 Snapshot(SnapshotId),
50}
51
52impl ReadTarget {
53 pub fn branch(name: impl Into<String>) -> Self {
54 Self::Branch(name.into())
55 }
56
57 pub fn snapshot(id: impl Into<SnapshotId>) -> Self {
58 Self::Snapshot(id.into())
59 }
60}
61
62impl From<&str> for ReadTarget {
63 fn from(value: &str) -> Self {
64 Self::branch(value)
65 }
66}
67
68impl From<String> for ReadTarget {
69 fn from(value: String) -> Self {
70 Self::Branch(value)
71 }
72}
73
74impl From<SnapshotId> for ReadTarget {
75 fn from(value: SnapshotId) -> Self {
76 Self::Snapshot(value)
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct ResolvedTarget {
82 pub requested: ReadTarget,
83 pub branch: Option<String>,
84 pub snapshot_id: SnapshotId,
85 pub snapshot: Snapshot,
86}
87
88#[derive(Debug, Clone)]
89pub(crate) struct PublishedSnapshot {
90 pub manifest_version: u64,
91 pub _snapshot_id: SnapshotId,
92}
93
94pub struct GraphCoordinator {
95 root_uri: String,
96 storage: Arc<dyn StorageAdapter>,
97 manifest: ManifestCoordinator,
98 commit_graph: Option<CommitGraph>,
99 bound_branch: Option<String>,
100}
101
102impl GraphCoordinator {
103 pub async fn init(
104 root_uri: &str,
105 catalog: &Catalog,
106 storage: Arc<dyn StorageAdapter>,
107 ) -> Result<Self> {
108 let root = normalize_root_uri(root_uri)?;
109 let manifest = ManifestCoordinator::init(&root, catalog).await?;
110 let commit_graph = Some(CommitGraph::init(&root, manifest.version()).await?);
111 Ok(Self {
112 root_uri: root,
113 storage,
114 manifest,
115 commit_graph,
116 bound_branch: None,
117 })
118 }
119
120 pub async fn open(root_uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
121 let root = normalize_root_uri(root_uri)?;
122 let manifest = ManifestCoordinator::open(&root).await?;
123 let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
124 Some(CommitGraph::open(&root).await?)
125 } else {
126 None
127 };
128 Ok(Self {
129 root_uri: root,
130 storage,
131 manifest,
132 commit_graph,
133 bound_branch: None,
134 })
135 }
136
137 pub async fn open_branch(
138 root_uri: &str,
139 branch: &str,
140 storage: Arc<dyn StorageAdapter>,
141 ) -> Result<Self> {
142 let branch = normalize_branch_name(branch)?;
143 let Some(branch_name) = branch else {
144 return Self::open(root_uri, storage).await;
145 };
146
147 let root = normalize_root_uri(root_uri)?;
148 let manifest = ManifestCoordinator::open_at_branch(&root, &branch_name).await?;
149 let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
150 Some(CommitGraph::open_at_branch(&root, &branch_name).await?)
151 } else {
152 None
153 };
154
155 Ok(Self {
156 root_uri: root,
157 storage,
158 manifest,
159 commit_graph,
160 bound_branch: Some(branch_name),
161 })
162 }
163
164 pub fn root_uri(&self) -> &str {
165 &self.root_uri
166 }
167
168 pub fn version(&self) -> u64 {
169 self.manifest.version()
170 }
171
172 pub(crate) fn manifest_incarnation(&self) -> ManifestIncarnation {
173 self.manifest.incarnation()
174 }
175
176 pub fn snapshot(&self) -> Snapshot {
177 self.manifest.snapshot()
178 }
179
180 pub fn current_branch(&self) -> Option<&str> {
181 self.bound_branch.as_deref()
182 }
183
184 pub async fn refresh(&mut self) -> Result<()> {
185 self.manifest.refresh().await?;
186 if let Some(commit_graph) = &mut self.commit_graph {
187 commit_graph.refresh().await?;
188 }
189 Ok(())
190 }
191
192 pub(crate) async fn probe_latest_incarnation(&self) -> Result<ManifestIncarnation> {
193 crate::instrumentation::record_probe();
194 self.manifest.probe_latest_incarnation().await
195 }
196
197 pub async fn refresh_manifest_only(&mut self) -> Result<()> {
202 self.manifest.refresh().await
203 }
204
205 pub async fn branch_list(&self) -> Result<Vec<String>> {
206 self.manifest.list_branches().await.map(|branches| {
207 branches
208 .into_iter()
209 .filter(|branch| !is_internal_system_branch(branch))
210 .collect()
211 })
212 }
213
214 pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
215 self.manifest.list_branches().await
216 }
217
218 pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
219 self.manifest
220 .descendant_branches(name)
221 .await
222 .map(|branches| {
223 branches
224 .into_iter()
225 .filter(|branch| !is_internal_system_branch(branch))
226 .collect()
227 })
228 }
229
230 pub async fn branch_create(&mut self, name: &str) -> Result<()> {
231 let branch = normalize_branch_name(name)?
232 .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?;
233 self.ensure_commit_graph_initialized().await?;
234
235 self.manifest.create_branch(&branch).await?;
237
238 if let Err(err) = self.create_commit_graph_branch(&branch).await {
242 if let Err(rollback_err) = self.manifest.delete_branch(&branch).await {
243 tracing::warn!(
244 target: "omnigraph::branch_create",
245 branch = %branch,
246 error = %rollback_err,
247 "rollback of manifest branch failed after commit-graph create failure",
248 );
249 }
250 return Err(err);
251 }
252 Ok(())
253 }
254
255 async fn create_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
260 failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
261 let Some(commit_graph) = &mut self.commit_graph else {
262 return Ok(());
263 };
264 if commit_graph
265 .list_branches()
266 .await?
267 .iter()
268 .any(|existing| existing == branch)
269 {
270 commit_graph.force_delete_branch(branch).await?;
271 }
272 commit_graph.create_branch(branch).await
273 }
274
275 pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
276 let branch = normalize_branch_name(name)?
277 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
278 if self.current_branch() == Some(branch.as_str()) {
279 return Err(OmniError::manifest_conflict(format!(
280 "cannot delete currently active branch '{}'",
281 branch
282 )));
283 }
284
285 self.manifest.delete_branch(&branch).await?;
289
290 if let Err(err) = self.reclaim_commit_graph_branch(&branch).await {
295 tracing::warn!(
296 target: "omnigraph::branch_delete::cleanup",
297 branch = %branch,
298 error = %err,
299 "best-effort commit-graph branch reclaim failed; cleanup will reconcile",
300 );
301 }
302
303 Ok(())
304 }
305
306 async fn reclaim_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
309 failpoints::maybe_fail("branch_delete.before_commit_graph_reclaim")?;
310 if let Some(commit_graph) = &mut self.commit_graph {
311 commit_graph.force_delete_branch(branch).await
312 } else if self
313 .storage
314 .exists(&graph_commits_uri(self.root_uri()))
315 .await?
316 {
317 let mut commit_graph = CommitGraph::open(self.root_uri()).await?;
318 commit_graph.force_delete_branch(branch).await
319 } else {
320 Ok(())
321 }
322 }
323
324 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
325 ManifestCoordinator::snapshot_at(self.root_uri(), self.current_branch(), version).await
326 }
327
328 pub async fn resolve_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
329 let normalized = normalize_branch_name(branch)?;
330 let other = match normalized.as_deref() {
331 Some(branch) => {
332 GraphCoordinator::open_branch(self.root_uri(), branch, Arc::clone(&self.storage))
333 .await?
334 }
335 None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
336 };
337
338 Ok(other.head_commit_id().await?.unwrap_or_else(|| {
339 SnapshotId::synthetic(
340 other.current_branch(),
341 other.version(),
342 other.manifest_incarnation().e_tag.as_deref(),
343 )
344 }))
345 }
346
347 pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
348 match target {
349 ReadTarget::Branch(branch) => {
350 let normalized = normalize_branch_name(branch)?;
351 let other = match normalized.as_deref() {
352 Some(branch) => {
353 GraphCoordinator::open_branch(
354 self.root_uri(),
355 branch,
356 Arc::clone(&self.storage),
357 )
358 .await?
359 }
360 None => {
361 GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?
362 }
363 };
364 let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
365 SnapshotId::synthetic(
366 other.current_branch(),
367 other.version(),
368 other.manifest_incarnation().e_tag.as_deref(),
369 )
370 });
371 Ok(ResolvedTarget {
372 requested: target.clone(),
373 branch: other.bound_branch.clone(),
374 snapshot_id,
375 snapshot: other.snapshot(),
376 })
377 }
378 ReadTarget::Snapshot(snapshot_id) => {
379 let commit = self.resolve_commit(snapshot_id).await?;
380 let snapshot = ManifestCoordinator::snapshot_at(
381 self.root_uri(),
382 commit.manifest_branch.as_deref(),
383 commit.manifest_version,
384 )
385 .await?;
386 Ok(ResolvedTarget {
387 requested: target.clone(),
388 branch: commit.manifest_branch.clone(),
389 snapshot_id: snapshot_id.clone(),
390 snapshot,
391 })
392 }
393 }
394 }
395
396 pub async fn resolve_commit(&self, snapshot_id: &SnapshotId) -> Result<GraphCommit> {
397 if let Some(commit_graph) = &self.commit_graph {
398 if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
399 return Ok(commit);
400 }
401 }
402
403 for branch in self.manifest.list_branches().await? {
404 let normalized = normalize_branch_name(&branch)?;
405 let Some(commit_graph) = self
406 .open_commit_graph_for_branch(normalized.as_deref())
407 .await?
408 else {
409 break;
410 };
411 if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
412 return Ok(commit);
413 }
414 }
415
416 Err(OmniError::manifest_not_found(format!(
417 "commit '{}' not found",
418 snapshot_id
419 )))
420 }
421
422 pub(crate) async fn head_commit_id(&self) -> Result<Option<SnapshotId>> {
423 match &self.commit_graph {
424 Some(commit_graph) => commit_graph
425 .head_commit_id()
426 .await
427 .map(|id| id.map(SnapshotId::new)),
428 None => Ok(None),
429 }
430 }
431
432 pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
433 if self.commit_graph.is_some() {
434 return Ok(());
435 }
436 if !self
437 .storage
438 .exists(&graph_commits_uri(self.root_uri()))
439 .await?
440 {
441 let _ = CommitGraph::init(self.root_uri(), self.manifest.version()).await?;
442 }
443 self.commit_graph = match self.current_branch() {
444 Some(branch) => Some(CommitGraph::open_at_branch(self.root_uri(), branch).await?),
445 None => Some(CommitGraph::open(self.root_uri()).await?),
446 };
447 Ok(())
448 }
449
450 pub(crate) async fn commit_updates_with_actor(
451 &mut self,
452 updates: &[SubTableUpdate],
453 actor_id: Option<&str>,
454 ) -> Result<PublishedSnapshot> {
455 let manifest_version = self.commit_manifest_updates(updates).await?;
456 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
457 Ok(PublishedSnapshot {
458 manifest_version,
459 _snapshot_id: snapshot_id,
460 })
461 }
462
463 pub(crate) async fn commit_updates_with_actor_with_expected(
469 &mut self,
470 updates: &[SubTableUpdate],
471 expected_table_versions: &HashMap<String, u64>,
472 actor_id: Option<&str>,
473 ) -> Result<PublishedSnapshot> {
474 let manifest_version = self
475 .commit_manifest_updates_with_expected(updates, expected_table_versions)
476 .await?;
477 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
478 Ok(PublishedSnapshot {
479 manifest_version,
480 _snapshot_id: snapshot_id,
481 })
482 }
483
484 pub(crate) async fn commit_manifest_updates(
485 &mut self,
486 updates: &[SubTableUpdate],
487 ) -> Result<u64> {
488 let manifest_version = self.manifest.commit(updates).await?;
489 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
490 Ok(manifest_version)
491 }
492
493 pub(crate) async fn commit_manifest_updates_with_expected(
494 &mut self,
495 updates: &[SubTableUpdate],
496 expected_table_versions: &HashMap<String, u64>,
497 ) -> Result<u64> {
498 let manifest_version = self
499 .manifest
500 .commit_with_expected(updates, expected_table_versions)
501 .await?;
502 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
503 Ok(manifest_version)
504 }
505
506 pub(crate) async fn commit_manifest_changes(
507 &mut self,
508 changes: &[ManifestChange],
509 ) -> Result<u64> {
510 let manifest_version = self.manifest.commit_changes(changes).await?;
511 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
512 Ok(manifest_version)
513 }
514
515 pub(crate) async fn commit_changes_with_actor(
516 &mut self,
517 changes: &[ManifestChange],
518 actor_id: Option<&str>,
519 ) -> Result<PublishedSnapshot> {
520 let manifest_version = self.commit_manifest_changes(changes).await?;
521 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
522 Ok(PublishedSnapshot {
523 manifest_version,
524 _snapshot_id: snapshot_id,
525 })
526 }
527
528 pub(crate) async fn record_graph_commit(
529 &mut self,
530 manifest_version: u64,
531 actor_id: Option<&str>,
532 ) -> Result<SnapshotId> {
533 self.ensure_commit_graph_initialized().await?;
534 let current_branch = self.current_branch().map(str::to_string);
535 let Some(commit_graph) = &mut self.commit_graph else {
536 return Ok(SnapshotId::synthetic(
537 current_branch.as_deref(),
538 manifest_version,
539 self.manifest_incarnation().e_tag.as_deref(),
540 ));
541 };
542 failpoints::maybe_fail("graph_publish.before_commit_append")?;
543 commit_graph.refresh().await?;
556 let graph_commit_id = commit_graph
557 .append_commit(current_branch.as_deref(), manifest_version, actor_id)
558 .await?;
559 Ok(SnapshotId::new(graph_commit_id))
560 }
561
562 pub(crate) async fn record_merge_commit(
563 &mut self,
564 manifest_version: u64,
565 parent_commit_id: &str,
566 merged_parent_commit_id: &str,
567 actor_id: Option<&str>,
568 ) -> Result<SnapshotId> {
569 self.ensure_commit_graph_initialized().await?;
570 let current_branch = self.current_branch().map(str::to_string);
571 let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
572 OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
573 })?;
574 failpoints::maybe_fail("graph_publish.before_commit_append")?;
575 let graph_commit_id = commit_graph
576 .append_merge_commit(
577 current_branch.as_deref(),
578 manifest_version,
579 parent_commit_id,
580 merged_parent_commit_id,
581 actor_id,
582 )
583 .await?;
584 Ok(SnapshotId::new(graph_commit_id))
585 }
586
587 async fn open_commit_graph_for_branch(
588 &self,
589 branch: Option<&str>,
590 ) -> Result<Option<CommitGraph>> {
591 if !self
592 .storage
593 .exists(&graph_commits_uri(self.root_uri()))
594 .await?
595 {
596 return Ok(None);
597 }
598 let graph = match branch {
599 Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
600 None => CommitGraph::open(self.root_uri()).await?,
601 };
602 Ok(Some(graph))
603 }
604
605 pub(crate) async fn list_commits(&self) -> Result<Vec<GraphCommit>> {
606 if let Some(commit_graph) = &self.commit_graph {
607 return commit_graph.load_commits().await;
608 }
609 if !self
610 .storage
611 .exists(&graph_commits_uri(self.root_uri()))
612 .await?
613 {
614 return Ok(Vec::new());
615 }
616 let commit_graph = match self.current_branch() {
617 Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
618 None => CommitGraph::open(self.root_uri()).await?,
619 };
620 commit_graph.load_commits().await
621 }
622}
623
624fn graph_commits_uri(root_uri: &str) -> String {
625 join_uri(root_uri, GRAPH_COMMITS_DIR)
626}
627
628fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
629 let branch = branch.trim();
630 if branch.is_empty() {
631 return Err(OmniError::manifest(
632 "branch name cannot be empty".to_string(),
633 ));
634 }
635 if branch == "main" {
636 return Ok(None);
637 }
638 Ok(Some(branch.to_string()))
639}