Skip to main content

omnigraph/db/
graph_coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4
5use omnigraph_compiler::catalog::Catalog;
6
7use crate::error::{OmniError, Result};
8use crate::failpoints;
9use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
10
11use super::commit_graph::{CommitGraph, GraphCommit};
12use super::is_internal_system_branch;
13use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
14
15const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18pub struct SnapshotId(String);
19
20impl SnapshotId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn as_str(&self) -> &str {
26        &self.0
27    }
28
29    pub(crate) fn synthetic(branch: Option<&str>, version: u64) -> Self {
30        match branch {
31            Some(branch) => Self(format!("manifest:{}:v{}", branch, version)),
32            None => Self(format!("manifest:main:v{}", version)),
33        }
34    }
35}
36
37impl fmt::Display for SnapshotId {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        self.0.fmt(f)
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum ReadTarget {
45    Branch(String),
46    Snapshot(SnapshotId),
47}
48
49impl ReadTarget {
50    pub fn branch(name: impl Into<String>) -> Self {
51        Self::Branch(name.into())
52    }
53
54    pub fn snapshot(id: impl Into<SnapshotId>) -> Self {
55        Self::Snapshot(id.into())
56    }
57}
58
59impl From<&str> for ReadTarget {
60    fn from(value: &str) -> Self {
61        Self::branch(value)
62    }
63}
64
65impl From<String> for ReadTarget {
66    fn from(value: String) -> Self {
67        Self::Branch(value)
68    }
69}
70
71impl From<SnapshotId> for ReadTarget {
72    fn from(value: SnapshotId) -> Self {
73        Self::Snapshot(value)
74    }
75}
76
77#[derive(Debug, Clone)]
78pub struct ResolvedTarget {
79    pub requested: ReadTarget,
80    pub branch: Option<String>,
81    pub snapshot_id: SnapshotId,
82    pub snapshot: Snapshot,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct PublishedSnapshot {
87    pub manifest_version: u64,
88    pub _snapshot_id: SnapshotId,
89}
90
91pub struct GraphCoordinator {
92    root_uri: String,
93    storage: Arc<dyn StorageAdapter>,
94    manifest: ManifestCoordinator,
95    commit_graph: Option<CommitGraph>,
96    bound_branch: Option<String>,
97}
98
99impl GraphCoordinator {
100    pub async fn init(
101        root_uri: &str,
102        catalog: &Catalog,
103        storage: Arc<dyn StorageAdapter>,
104    ) -> Result<Self> {
105        let root = normalize_root_uri(root_uri)?;
106        let manifest = ManifestCoordinator::init(&root, catalog).await?;
107        let commit_graph = Some(CommitGraph::init(&root, manifest.version()).await?);
108        Ok(Self {
109            root_uri: root,
110            storage,
111            manifest,
112            commit_graph,
113            bound_branch: None,
114        })
115    }
116
117    pub async fn open(root_uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
118        let root = normalize_root_uri(root_uri)?;
119        let manifest = ManifestCoordinator::open(&root).await?;
120        let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
121            Some(CommitGraph::open(&root).await?)
122        } else {
123            None
124        };
125        Ok(Self {
126            root_uri: root,
127            storage,
128            manifest,
129            commit_graph,
130            bound_branch: None,
131        })
132    }
133
134    pub async fn open_branch(
135        root_uri: &str,
136        branch: &str,
137        storage: Arc<dyn StorageAdapter>,
138    ) -> Result<Self> {
139        let branch = normalize_branch_name(branch)?;
140        let Some(branch_name) = branch else {
141            return Self::open(root_uri, storage).await;
142        };
143
144        let root = normalize_root_uri(root_uri)?;
145        let manifest = ManifestCoordinator::open_at_branch(&root, &branch_name).await?;
146        let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
147            Some(CommitGraph::open_at_branch(&root, &branch_name).await?)
148        } else {
149            None
150        };
151
152        Ok(Self {
153            root_uri: root,
154            storage,
155            manifest,
156            commit_graph,
157            bound_branch: Some(branch_name),
158        })
159    }
160
161    pub fn root_uri(&self) -> &str {
162        &self.root_uri
163    }
164
165    pub fn version(&self) -> u64 {
166        self.manifest.version()
167    }
168
169    pub fn snapshot(&self) -> Snapshot {
170        self.manifest.snapshot()
171    }
172
173    pub fn current_branch(&self) -> Option<&str> {
174        self.bound_branch.as_deref()
175    }
176
177    pub async fn refresh(&mut self) -> Result<()> {
178        self.manifest.refresh().await?;
179        if let Some(commit_graph) = &mut self.commit_graph {
180            commit_graph.refresh().await?;
181        }
182        Ok(())
183    }
184
185    pub async fn branch_list(&self) -> Result<Vec<String>> {
186        self.manifest.list_branches().await.map(|branches| {
187            branches
188                .into_iter()
189                .filter(|branch| !is_internal_system_branch(branch))
190                .collect()
191        })
192    }
193
194    pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
195        self.manifest.list_branches().await
196    }
197
198    pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
199        self.manifest
200            .descendant_branches(name)
201            .await
202            .map(|branches| {
203                branches
204                    .into_iter()
205                    .filter(|branch| !is_internal_system_branch(branch))
206                    .collect()
207            })
208    }
209
210    pub async fn branch_create(&mut self, name: &str) -> Result<()> {
211        let branch = normalize_branch_name(name)?
212            .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?;
213        self.ensure_commit_graph_initialized().await?;
214
215        // Manifest authority flip first.
216        self.manifest.create_branch(&branch).await?;
217
218        // Derived commit-graph branch. If anything after the authority flip
219        // fails, roll back the manifest branch so the branch never half-exists
220        // (a manifest branch with no commit-graph branch breaks the next write).
221        if let Err(err) = self.create_commit_graph_branch(&branch).await {
222            if let Err(rollback_err) = self.manifest.delete_branch(&branch).await {
223                tracing::warn!(
224                    target: "omnigraph::branch_create",
225                    branch = %branch,
226                    error = %rollback_err,
227                    "rollback of manifest branch failed after commit-graph create failure",
228                );
229            }
230            return Err(err);
231        }
232        Ok(())
233    }
234
235    /// Create the derived commit-graph branch for `branch`, healing a zombie ref
236    /// left by an incomplete prior delete. The manifest branch was just created
237    /// fresh, so any existing commit-graph branch with this name is provably
238    /// orphaned and is force-dropped before recreating.
239    async fn create_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
240        failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
241        let Some(commit_graph) = &mut self.commit_graph else {
242            return Ok(());
243        };
244        if commit_graph
245            .list_branches()
246            .await?
247            .iter()
248            .any(|existing| existing == branch)
249        {
250            commit_graph.force_delete_branch(branch).await?;
251        }
252        commit_graph.create_branch(branch).await
253    }
254
255    pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
256        let branch = normalize_branch_name(name)?
257            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
258        if self.current_branch() == Some(branch.as_str()) {
259            return Err(OmniError::manifest_conflict(format!(
260                "cannot delete currently active branch '{}'",
261                branch
262            )));
263        }
264
265        // Manifest authority flip — the single atomic op that makes the branch
266        // cease to exist. Must succeed; everything after is derived state
267        // reclaimed best-effort.
268        self.manifest.delete_branch(&branch).await?;
269
270        // Commit-graph branch is derived state. Reclaim best-effort with the
271        // idempotent force variant: a failure here (or a missing dataset) is
272        // reconciled by `cleanup` and must not fail the delete after the
273        // authority already flipped.
274        if let Err(err) = self.reclaim_commit_graph_branch(&branch).await {
275            tracing::warn!(
276                target: "omnigraph::branch_delete::cleanup",
277                branch = %branch,
278                error = %err,
279                "best-effort commit-graph branch reclaim failed; cleanup will reconcile",
280            );
281        }
282
283        Ok(())
284    }
285
286    /// Best-effort, idempotent reclaim of the commit-graph branch `branch`.
287    /// Tolerates an absent commit-graph dataset (a graph that never committed).
288    async fn reclaim_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
289        failpoints::maybe_fail("branch_delete.before_commit_graph_reclaim")?;
290        if let Some(commit_graph) = &mut self.commit_graph {
291            commit_graph.force_delete_branch(branch).await
292        } else if self
293            .storage
294            .exists(&graph_commits_uri(self.root_uri()))
295            .await?
296        {
297            let mut commit_graph = CommitGraph::open(self.root_uri()).await?;
298            commit_graph.force_delete_branch(branch).await
299        } else {
300            Ok(())
301        }
302    }
303
304    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
305        ManifestCoordinator::snapshot_at(self.root_uri(), self.current_branch(), version).await
306    }
307
308    pub async fn resolve_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
309        let normalized = normalize_branch_name(branch)?;
310        let other = match normalized.as_deref() {
311            Some(branch) => {
312                GraphCoordinator::open_branch(self.root_uri(), branch, Arc::clone(&self.storage))
313                    .await?
314            }
315            None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
316        };
317
318        Ok(other
319            .head_commit_id()
320            .await?
321            .unwrap_or_else(|| SnapshotId::synthetic(other.current_branch(), other.version())))
322    }
323
324    pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
325        match target {
326            ReadTarget::Branch(branch) => {
327                let normalized = normalize_branch_name(branch)?;
328                let other = match normalized.as_deref() {
329                    Some(branch) => {
330                        GraphCoordinator::open_branch(
331                            self.root_uri(),
332                            branch,
333                            Arc::clone(&self.storage),
334                        )
335                        .await?
336                    }
337                    None => {
338                        GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?
339                    }
340                };
341                let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
342                    SnapshotId::synthetic(other.current_branch(), other.version())
343                });
344                Ok(ResolvedTarget {
345                    requested: target.clone(),
346                    branch: other.bound_branch.clone(),
347                    snapshot_id,
348                    snapshot: other.snapshot(),
349                })
350            }
351            ReadTarget::Snapshot(snapshot_id) => {
352                let commit = self.resolve_commit(snapshot_id).await?;
353                let snapshot = ManifestCoordinator::snapshot_at(
354                    self.root_uri(),
355                    commit.manifest_branch.as_deref(),
356                    commit.manifest_version,
357                )
358                .await?;
359                Ok(ResolvedTarget {
360                    requested: target.clone(),
361                    branch: commit.manifest_branch.clone(),
362                    snapshot_id: snapshot_id.clone(),
363                    snapshot,
364                })
365            }
366        }
367    }
368
369    pub async fn resolve_commit(&self, snapshot_id: &SnapshotId) -> Result<GraphCommit> {
370        if let Some(commit_graph) = &self.commit_graph {
371            if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
372                return Ok(commit);
373            }
374        }
375
376        for branch in self.manifest.list_branches().await? {
377            let normalized = normalize_branch_name(&branch)?;
378            let Some(commit_graph) = self
379                .open_commit_graph_for_branch(normalized.as_deref())
380                .await?
381            else {
382                break;
383            };
384            if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
385                return Ok(commit);
386            }
387        }
388
389        Err(OmniError::manifest_not_found(format!(
390            "commit '{}' not found",
391            snapshot_id
392        )))
393    }
394
395    pub(crate) async fn head_commit_id(&self) -> Result<Option<SnapshotId>> {
396        match &self.commit_graph {
397            Some(commit_graph) => commit_graph
398                .head_commit_id()
399                .await
400                .map(|id| id.map(SnapshotId::new)),
401            None => Ok(None),
402        }
403    }
404
405    pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
406        if self.commit_graph.is_some() {
407            return Ok(());
408        }
409        if !self
410            .storage
411            .exists(&graph_commits_uri(self.root_uri()))
412            .await?
413        {
414            let _ = CommitGraph::init(self.root_uri(), self.manifest.version()).await?;
415        }
416        self.commit_graph = match self.current_branch() {
417            Some(branch) => Some(CommitGraph::open_at_branch(self.root_uri(), branch).await?),
418            None => Some(CommitGraph::open(self.root_uri()).await?),
419        };
420        Ok(())
421    }
422
423    pub(crate) async fn commit_updates_with_actor(
424        &mut self,
425        updates: &[SubTableUpdate],
426        actor_id: Option<&str>,
427    ) -> Result<PublishedSnapshot> {
428        let manifest_version = self.commit_manifest_updates(updates).await?;
429        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
430        Ok(PublishedSnapshot {
431            manifest_version,
432            _snapshot_id: snapshot_id,
433        })
434    }
435
436    /// Commit with publisher-level OCC fence. The `expected_table_versions` map
437    /// asserts the manifest's current latest non-tombstoned `table_version` for
438    /// each `table_key` matches what the caller observed before writing.
439    /// Mismatches surface as `OmniError::Manifest` with
440    /// `ManifestConflictDetails::ExpectedVersionMismatch`.
441    pub(crate) async fn commit_updates_with_actor_with_expected(
442        &mut self,
443        updates: &[SubTableUpdate],
444        expected_table_versions: &HashMap<String, u64>,
445        actor_id: Option<&str>,
446    ) -> Result<PublishedSnapshot> {
447        let manifest_version = self
448            .commit_manifest_updates_with_expected(updates, expected_table_versions)
449            .await?;
450        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
451        Ok(PublishedSnapshot {
452            manifest_version,
453            _snapshot_id: snapshot_id,
454        })
455    }
456
457    pub(crate) async fn commit_manifest_updates(
458        &mut self,
459        updates: &[SubTableUpdate],
460    ) -> Result<u64> {
461        let manifest_version = self.manifest.commit(updates).await?;
462        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
463        Ok(manifest_version)
464    }
465
466    pub(crate) async fn commit_manifest_updates_with_expected(
467        &mut self,
468        updates: &[SubTableUpdate],
469        expected_table_versions: &HashMap<String, u64>,
470    ) -> Result<u64> {
471        let manifest_version = self
472            .manifest
473            .commit_with_expected(updates, expected_table_versions)
474            .await?;
475        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
476        Ok(manifest_version)
477    }
478
479    pub(crate) async fn commit_manifest_changes(
480        &mut self,
481        changes: &[ManifestChange],
482    ) -> Result<u64> {
483        let manifest_version = self.manifest.commit_changes(changes).await?;
484        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
485        Ok(manifest_version)
486    }
487
488    pub(crate) async fn commit_changes_with_actor(
489        &mut self,
490        changes: &[ManifestChange],
491        actor_id: Option<&str>,
492    ) -> Result<PublishedSnapshot> {
493        let manifest_version = self.commit_manifest_changes(changes).await?;
494        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
495        Ok(PublishedSnapshot {
496            manifest_version,
497            _snapshot_id: snapshot_id,
498        })
499    }
500
501    pub(crate) async fn record_graph_commit(
502        &mut self,
503        manifest_version: u64,
504        actor_id: Option<&str>,
505    ) -> Result<SnapshotId> {
506        self.ensure_commit_graph_initialized().await?;
507        let current_branch = self.current_branch().map(str::to_string);
508        let Some(commit_graph) = &mut self.commit_graph else {
509            return Ok(SnapshotId::synthetic(
510                current_branch.as_deref(),
511                manifest_version,
512            ));
513        };
514        failpoints::maybe_fail("graph_publish.before_commit_append")?;
515        let graph_commit_id = commit_graph
516            .append_commit(current_branch.as_deref(), manifest_version, actor_id)
517            .await?;
518        Ok(SnapshotId::new(graph_commit_id))
519    }
520
521    pub(crate) async fn record_merge_commit(
522        &mut self,
523        manifest_version: u64,
524        parent_commit_id: &str,
525        merged_parent_commit_id: &str,
526        actor_id: Option<&str>,
527    ) -> Result<SnapshotId> {
528        self.ensure_commit_graph_initialized().await?;
529        let current_branch = self.current_branch().map(str::to_string);
530        let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
531            OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
532        })?;
533        failpoints::maybe_fail("graph_publish.before_commit_append")?;
534        let graph_commit_id = commit_graph
535            .append_merge_commit(
536                current_branch.as_deref(),
537                manifest_version,
538                parent_commit_id,
539                merged_parent_commit_id,
540                actor_id,
541            )
542            .await?;
543        Ok(SnapshotId::new(graph_commit_id))
544    }
545
546    async fn open_commit_graph_for_branch(
547        &self,
548        branch: Option<&str>,
549    ) -> Result<Option<CommitGraph>> {
550        if !self
551            .storage
552            .exists(&graph_commits_uri(self.root_uri()))
553            .await?
554        {
555            return Ok(None);
556        }
557        let graph = match branch {
558            Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
559            None => CommitGraph::open(self.root_uri()).await?,
560        };
561        Ok(Some(graph))
562    }
563
564    pub(crate) async fn list_commits(&self) -> Result<Vec<GraphCommit>> {
565        if let Some(commit_graph) = &self.commit_graph {
566            return commit_graph.load_commits().await;
567        }
568        if !self
569            .storage
570            .exists(&graph_commits_uri(self.root_uri()))
571            .await?
572        {
573            return Ok(Vec::new());
574        }
575        let commit_graph = match self.current_branch() {
576            Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
577            None => CommitGraph::open(self.root_uri()).await?,
578        };
579        commit_graph.load_commits().await
580    }
581}
582
583fn graph_commits_uri(root_uri: &str) -> String {
584    join_uri(root_uri, GRAPH_COMMITS_DIR)
585}
586
587fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
588    let branch = branch.trim();
589    if branch.is_empty() {
590        return Err(OmniError::manifest(
591            "branch name cannot be empty".to_string(),
592        ));
593    }
594    if branch == "main" {
595        return Ok(None);
596    }
597    Ok(Some(branch.to_string()))
598}