Skip to main content

omnigraph/db/
graph_coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4
5use omnigraph_compiler::catalog::Catalog;
6
7use crate::error::{OmniError, Result};
8use crate::failpoints;
9use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
10
11use super::commit_graph::{CommitGraph, GraphCommit};
12use super::is_internal_system_branch;
13use super::manifest::{
14    ManifestChange, ManifestCoordinator, ManifestIncarnation, Snapshot, SubTableUpdate,
15};
16
17const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct SnapshotId(String);
21
22impl SnapshotId {
23    pub fn new(id: impl Into<String>) -> Self {
24        Self(id.into())
25    }
26
27    pub fn as_str(&self) -> &str {
28        &self.0
29    }
30
31    pub(crate) fn synthetic(branch: Option<&str>, version: u64, e_tag: Option<&str>) -> Self {
32        let branch = branch.unwrap_or("main");
33        match e_tag {
34            Some(e_tag) => Self(format!("manifest:{}:v{}:etag:{}", branch, version, e_tag)),
35            None => Self(format!("manifest:{}:v{}", branch, version)),
36        }
37    }
38}
39
40impl fmt::Display for SnapshotId {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        self.0.fmt(f)
43    }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum ReadTarget {
48    Branch(String),
49    Snapshot(SnapshotId),
50}
51
52impl ReadTarget {
53    pub fn branch(name: impl Into<String>) -> Self {
54        Self::Branch(name.into())
55    }
56
57    pub fn snapshot(id: impl Into<SnapshotId>) -> Self {
58        Self::Snapshot(id.into())
59    }
60}
61
62impl From<&str> for ReadTarget {
63    fn from(value: &str) -> Self {
64        Self::branch(value)
65    }
66}
67
68impl From<String> for ReadTarget {
69    fn from(value: String) -> Self {
70        Self::Branch(value)
71    }
72}
73
74impl From<SnapshotId> for ReadTarget {
75    fn from(value: SnapshotId) -> Self {
76        Self::Snapshot(value)
77    }
78}
79
80#[derive(Debug, Clone)]
81pub struct ResolvedTarget {
82    pub requested: ReadTarget,
83    pub branch: Option<String>,
84    pub snapshot_id: SnapshotId,
85    pub snapshot: Snapshot,
86}
87
88#[derive(Debug, Clone)]
89pub(crate) struct PublishedSnapshot {
90    pub manifest_version: u64,
91    pub _snapshot_id: SnapshotId,
92}
93
94pub struct GraphCoordinator {
95    root_uri: String,
96    storage: Arc<dyn StorageAdapter>,
97    manifest: ManifestCoordinator,
98    commit_graph: Option<CommitGraph>,
99    bound_branch: Option<String>,
100}
101
102impl GraphCoordinator {
103    pub async fn init(
104        root_uri: &str,
105        catalog: &Catalog,
106        storage: Arc<dyn StorageAdapter>,
107    ) -> Result<Self> {
108        let root = normalize_root_uri(root_uri)?;
109        let manifest = ManifestCoordinator::init(&root, catalog).await?;
110        let commit_graph = Some(CommitGraph::init(&root, manifest.version()).await?);
111        Ok(Self {
112            root_uri: root,
113            storage,
114            manifest,
115            commit_graph,
116            bound_branch: None,
117        })
118    }
119
120    pub async fn open(root_uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
121        let root = normalize_root_uri(root_uri)?;
122        let manifest = ManifestCoordinator::open(&root).await?;
123        let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
124            Some(CommitGraph::open(&root).await?)
125        } else {
126            None
127        };
128        Ok(Self {
129            root_uri: root,
130            storage,
131            manifest,
132            commit_graph,
133            bound_branch: None,
134        })
135    }
136
137    pub async fn open_branch(
138        root_uri: &str,
139        branch: &str,
140        storage: Arc<dyn StorageAdapter>,
141    ) -> Result<Self> {
142        let branch = normalize_branch_name(branch)?;
143        let Some(branch_name) = branch else {
144            return Self::open(root_uri, storage).await;
145        };
146
147        let root = normalize_root_uri(root_uri)?;
148        let manifest = ManifestCoordinator::open_at_branch(&root, &branch_name).await?;
149        let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
150            Some(CommitGraph::open_at_branch(&root, &branch_name).await?)
151        } else {
152            None
153        };
154
155        Ok(Self {
156            root_uri: root,
157            storage,
158            manifest,
159            commit_graph,
160            bound_branch: Some(branch_name),
161        })
162    }
163
164    pub fn root_uri(&self) -> &str {
165        &self.root_uri
166    }
167
168    pub fn version(&self) -> u64 {
169        self.manifest.version()
170    }
171
172    pub(crate) fn manifest_incarnation(&self) -> ManifestIncarnation {
173        self.manifest.incarnation()
174    }
175
176    pub fn snapshot(&self) -> Snapshot {
177        self.manifest.snapshot()
178    }
179
180    pub fn current_branch(&self) -> Option<&str> {
181        self.bound_branch.as_deref()
182    }
183
184    pub async fn refresh(&mut self) -> Result<()> {
185        self.manifest.refresh().await?;
186        if let Some(commit_graph) = &mut self.commit_graph {
187            commit_graph.refresh().await?;
188        }
189        Ok(())
190    }
191
192    pub(crate) async fn probe_latest_incarnation(&self) -> Result<ManifestIncarnation> {
193        crate::instrumentation::record_probe();
194        self.manifest.probe_latest_incarnation().await
195    }
196
197    /// Refresh only the manifest (not the commit graph). The read path uses this
198    /// on a stale same-branch probe: a read pins its snapshot by manifest version
199    /// and never needs the commit graph, so a full `refresh` (which also scans
200    /// the commit graph) would be wasted IO.
201    pub async fn refresh_manifest_only(&mut self) -> Result<()> {
202        self.manifest.refresh().await
203    }
204
205    pub async fn branch_list(&self) -> Result<Vec<String>> {
206        self.manifest.list_branches().await.map(|branches| {
207            branches
208                .into_iter()
209                .filter(|branch| !is_internal_system_branch(branch))
210                .collect()
211        })
212    }
213
214    pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
215        self.manifest.list_branches().await
216    }
217
218    pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
219        self.manifest
220            .descendant_branches(name)
221            .await
222            .map(|branches| {
223                branches
224                    .into_iter()
225                    .filter(|branch| !is_internal_system_branch(branch))
226                    .collect()
227            })
228    }
229
230    pub async fn branch_create(&mut self, name: &str) -> Result<()> {
231        let branch = normalize_branch_name(name)?
232            .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?;
233        self.ensure_commit_graph_initialized().await?;
234
235        // Manifest authority flip first.
236        self.manifest.create_branch(&branch).await?;
237
238        // Derived commit-graph branch. If anything after the authority flip
239        // fails, roll back the manifest branch so the branch never half-exists
240        // (a manifest branch with no commit-graph branch breaks the next write).
241        if let Err(err) = self.create_commit_graph_branch(&branch).await {
242            if let Err(rollback_err) = self.manifest.delete_branch(&branch).await {
243                tracing::warn!(
244                    target: "omnigraph::branch_create",
245                    branch = %branch,
246                    error = %rollback_err,
247                    "rollback of manifest branch failed after commit-graph create failure",
248                );
249            }
250            return Err(err);
251        }
252        Ok(())
253    }
254
255    /// Create the derived commit-graph branch for `branch`, healing a zombie ref
256    /// left by an incomplete prior delete. The manifest branch was just created
257    /// fresh, so any existing commit-graph branch with this name is provably
258    /// orphaned and is force-dropped before recreating.
259    async fn create_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
260        failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
261        let Some(commit_graph) = &mut self.commit_graph else {
262            return Ok(());
263        };
264        if commit_graph
265            .list_branches()
266            .await?
267            .iter()
268            .any(|existing| existing == branch)
269        {
270            commit_graph.force_delete_branch(branch).await?;
271        }
272        commit_graph.create_branch(branch).await
273    }
274
275    pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
276        let branch = normalize_branch_name(name)?
277            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
278        if self.current_branch() == Some(branch.as_str()) {
279            return Err(OmniError::manifest_conflict(format!(
280                "cannot delete currently active branch '{}'",
281                branch
282            )));
283        }
284
285        // Manifest authority flip — the single atomic op that makes the branch
286        // cease to exist. Must succeed; everything after is derived state
287        // reclaimed best-effort.
288        self.manifest.delete_branch(&branch).await?;
289
290        // Commit-graph branch is derived state. Reclaim best-effort with the
291        // idempotent force variant: a failure here (or a missing dataset) is
292        // reconciled by `cleanup` and must not fail the delete after the
293        // authority already flipped.
294        if let Err(err) = self.reclaim_commit_graph_branch(&branch).await {
295            tracing::warn!(
296                target: "omnigraph::branch_delete::cleanup",
297                branch = %branch,
298                error = %err,
299                "best-effort commit-graph branch reclaim failed; cleanup will reconcile",
300            );
301        }
302
303        Ok(())
304    }
305
306    /// Best-effort, idempotent reclaim of the commit-graph branch `branch`.
307    /// Tolerates an absent commit-graph dataset (a graph that never committed).
308    async fn reclaim_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
309        failpoints::maybe_fail("branch_delete.before_commit_graph_reclaim")?;
310        if let Some(commit_graph) = &mut self.commit_graph {
311            commit_graph.force_delete_branch(branch).await
312        } else if self
313            .storage
314            .exists(&graph_commits_uri(self.root_uri()))
315            .await?
316        {
317            let mut commit_graph = CommitGraph::open(self.root_uri()).await?;
318            commit_graph.force_delete_branch(branch).await
319        } else {
320            Ok(())
321        }
322    }
323
324    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
325        ManifestCoordinator::snapshot_at(self.root_uri(), self.current_branch(), version).await
326    }
327
328    pub async fn resolve_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
329        let normalized = normalize_branch_name(branch)?;
330        let other = match normalized.as_deref() {
331            Some(branch) => {
332                GraphCoordinator::open_branch(self.root_uri(), branch, Arc::clone(&self.storage))
333                    .await?
334            }
335            None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
336        };
337
338        Ok(other.head_commit_id().await?.unwrap_or_else(|| {
339            SnapshotId::synthetic(
340                other.current_branch(),
341                other.version(),
342                other.manifest_incarnation().e_tag.as_deref(),
343            )
344        }))
345    }
346
347    pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
348        match target {
349            ReadTarget::Branch(branch) => {
350                let normalized = normalize_branch_name(branch)?;
351                let other = match normalized.as_deref() {
352                    Some(branch) => {
353                        GraphCoordinator::open_branch(
354                            self.root_uri(),
355                            branch,
356                            Arc::clone(&self.storage),
357                        )
358                        .await?
359                    }
360                    None => {
361                        GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?
362                    }
363                };
364                let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
365                    SnapshotId::synthetic(
366                        other.current_branch(),
367                        other.version(),
368                        other.manifest_incarnation().e_tag.as_deref(),
369                    )
370                });
371                Ok(ResolvedTarget {
372                    requested: target.clone(),
373                    branch: other.bound_branch.clone(),
374                    snapshot_id,
375                    snapshot: other.snapshot(),
376                })
377            }
378            ReadTarget::Snapshot(snapshot_id) => {
379                let commit = self.resolve_commit(snapshot_id).await?;
380                let snapshot = ManifestCoordinator::snapshot_at(
381                    self.root_uri(),
382                    commit.manifest_branch.as_deref(),
383                    commit.manifest_version,
384                )
385                .await?;
386                Ok(ResolvedTarget {
387                    requested: target.clone(),
388                    branch: commit.manifest_branch.clone(),
389                    snapshot_id: snapshot_id.clone(),
390                    snapshot,
391                })
392            }
393        }
394    }
395
396    pub async fn resolve_commit(&self, snapshot_id: &SnapshotId) -> Result<GraphCommit> {
397        if let Some(commit_graph) = &self.commit_graph {
398            if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
399                return Ok(commit);
400            }
401        }
402
403        for branch in self.manifest.list_branches().await? {
404            let normalized = normalize_branch_name(&branch)?;
405            let Some(commit_graph) = self
406                .open_commit_graph_for_branch(normalized.as_deref())
407                .await?
408            else {
409                break;
410            };
411            if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
412                return Ok(commit);
413            }
414        }
415
416        Err(OmniError::manifest_not_found(format!(
417            "commit '{}' not found",
418            snapshot_id
419        )))
420    }
421
422    pub(crate) async fn head_commit_id(&self) -> Result<Option<SnapshotId>> {
423        match &self.commit_graph {
424            Some(commit_graph) => commit_graph
425                .head_commit_id()
426                .await
427                .map(|id| id.map(SnapshotId::new)),
428            None => Ok(None),
429        }
430    }
431
432    pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
433        if self.commit_graph.is_some() {
434            return Ok(());
435        }
436        if !self
437            .storage
438            .exists(&graph_commits_uri(self.root_uri()))
439            .await?
440        {
441            let _ = CommitGraph::init(self.root_uri(), self.manifest.version()).await?;
442        }
443        self.commit_graph = match self.current_branch() {
444            Some(branch) => Some(CommitGraph::open_at_branch(self.root_uri(), branch).await?),
445            None => Some(CommitGraph::open(self.root_uri()).await?),
446        };
447        Ok(())
448    }
449
450    pub(crate) async fn commit_updates_with_actor(
451        &mut self,
452        updates: &[SubTableUpdate],
453        actor_id: Option<&str>,
454    ) -> Result<PublishedSnapshot> {
455        let manifest_version = self.commit_manifest_updates(updates).await?;
456        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
457        Ok(PublishedSnapshot {
458            manifest_version,
459            _snapshot_id: snapshot_id,
460        })
461    }
462
463    /// Commit with publisher-level OCC fence. The `expected_table_versions` map
464    /// asserts the manifest's current latest non-tombstoned `table_version` for
465    /// each `table_key` matches what the caller observed before writing.
466    /// Mismatches surface as `OmniError::Manifest` with
467    /// `ManifestConflictDetails::ExpectedVersionMismatch`.
468    pub(crate) async fn commit_updates_with_actor_with_expected(
469        &mut self,
470        updates: &[SubTableUpdate],
471        expected_table_versions: &HashMap<String, u64>,
472        actor_id: Option<&str>,
473    ) -> Result<PublishedSnapshot> {
474        let manifest_version = self
475            .commit_manifest_updates_with_expected(updates, expected_table_versions)
476            .await?;
477        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
478        Ok(PublishedSnapshot {
479            manifest_version,
480            _snapshot_id: snapshot_id,
481        })
482    }
483
484    pub(crate) async fn commit_manifest_updates(
485        &mut self,
486        updates: &[SubTableUpdate],
487    ) -> Result<u64> {
488        let manifest_version = self.manifest.commit(updates).await?;
489        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
490        Ok(manifest_version)
491    }
492
493    pub(crate) async fn commit_manifest_updates_with_expected(
494        &mut self,
495        updates: &[SubTableUpdate],
496        expected_table_versions: &HashMap<String, u64>,
497    ) -> Result<u64> {
498        let manifest_version = self
499            .manifest
500            .commit_with_expected(updates, expected_table_versions)
501            .await?;
502        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
503        Ok(manifest_version)
504    }
505
506    pub(crate) async fn commit_manifest_changes(
507        &mut self,
508        changes: &[ManifestChange],
509    ) -> Result<u64> {
510        let manifest_version = self.manifest.commit_changes(changes).await?;
511        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
512        Ok(manifest_version)
513    }
514
515    pub(crate) async fn commit_changes_with_actor(
516        &mut self,
517        changes: &[ManifestChange],
518        actor_id: Option<&str>,
519    ) -> Result<PublishedSnapshot> {
520        let manifest_version = self.commit_manifest_changes(changes).await?;
521        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
522        Ok(PublishedSnapshot {
523            manifest_version,
524            _snapshot_id: snapshot_id,
525        })
526    }
527
528    pub(crate) async fn record_graph_commit(
529        &mut self,
530        manifest_version: u64,
531        actor_id: Option<&str>,
532    ) -> Result<SnapshotId> {
533        self.ensure_commit_graph_initialized().await?;
534        let current_branch = self.current_branch().map(str::to_string);
535        let Some(commit_graph) = &mut self.commit_graph else {
536            return Ok(SnapshotId::synthetic(
537                current_branch.as_deref(),
538                manifest_version,
539                self.manifest_incarnation().e_tag.as_deref(),
540            ));
541        };
542        failpoints::maybe_fail("graph_publish.before_commit_append")?;
543        // Refresh the commit-graph head from storage before selecting the
544        // parent. `append_commit` parents the new commit on the IN-MEMORY head
545        // (`head_commit_id`, zero storage read), but the manifest was just
546        // committed against a freshly rebased pin (`commit_all` opens a fresh
547        // coordinator) while THIS coordinator's cached head may be stale because
548        // an external writer advanced the branch. Without this refresh a
549        // same-branch write after an external commit appends off the stale head
550        // and FORKS the commit DAG (the new commit and the external commit
551        // sharing a parent). Refreshing makes the parent the true current head;
552        // the just-committed manifest version has no commit-graph row yet, so the
553        // fresh head is exactly the prior commit. (record_merge_commit is
554        // unaffected — it passes explicit parents, never the cached head.)
555        commit_graph.refresh().await?;
556        let graph_commit_id = commit_graph
557            .append_commit(current_branch.as_deref(), manifest_version, actor_id)
558            .await?;
559        Ok(SnapshotId::new(graph_commit_id))
560    }
561
562    pub(crate) async fn record_merge_commit(
563        &mut self,
564        manifest_version: u64,
565        parent_commit_id: &str,
566        merged_parent_commit_id: &str,
567        actor_id: Option<&str>,
568    ) -> Result<SnapshotId> {
569        self.ensure_commit_graph_initialized().await?;
570        let current_branch = self.current_branch().map(str::to_string);
571        let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
572            OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
573        })?;
574        failpoints::maybe_fail("graph_publish.before_commit_append")?;
575        let graph_commit_id = commit_graph
576            .append_merge_commit(
577                current_branch.as_deref(),
578                manifest_version,
579                parent_commit_id,
580                merged_parent_commit_id,
581                actor_id,
582            )
583            .await?;
584        Ok(SnapshotId::new(graph_commit_id))
585    }
586
587    async fn open_commit_graph_for_branch(
588        &self,
589        branch: Option<&str>,
590    ) -> Result<Option<CommitGraph>> {
591        if !self
592            .storage
593            .exists(&graph_commits_uri(self.root_uri()))
594            .await?
595        {
596            return Ok(None);
597        }
598        let graph = match branch {
599            Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
600            None => CommitGraph::open(self.root_uri()).await?,
601        };
602        Ok(Some(graph))
603    }
604
605    pub(crate) async fn list_commits(&self) -> Result<Vec<GraphCommit>> {
606        if let Some(commit_graph) = &self.commit_graph {
607            return commit_graph.load_commits().await;
608        }
609        if !self
610            .storage
611            .exists(&graph_commits_uri(self.root_uri()))
612            .await?
613        {
614            return Ok(Vec::new());
615        }
616        let commit_graph = match self.current_branch() {
617            Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
618            None => CommitGraph::open(self.root_uri()).await?,
619        };
620        commit_graph.load_commits().await
621    }
622}
623
624fn graph_commits_uri(root_uri: &str) -> String {
625    join_uri(root_uri, GRAPH_COMMITS_DIR)
626}
627
628fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
629    let branch = branch.trim();
630    if branch.is_empty() {
631        return Err(OmniError::manifest(
632            "branch name cannot be empty".to_string(),
633        ));
634    }
635    if branch == "main" {
636        return Ok(None);
637    }
638    Ok(Some(branch.to_string()))
639}