Skip to main content

omnigraph/db/
graph_coordinator.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4
5use omnigraph_compiler::catalog::Catalog;
6
7use crate::error::{OmniError, Result};
8use crate::failpoints;
9use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
10
11use super::commit_graph::{CommitGraph, GraphCommit};
12use super::is_internal_system_branch;
13use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
14
15const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18pub struct SnapshotId(String);
19
20impl SnapshotId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn as_str(&self) -> &str {
26        &self.0
27    }
28
29    pub(crate) fn synthetic(branch: Option<&str>, version: u64) -> Self {
30        match branch {
31            Some(branch) => Self(format!("manifest:{}:v{}", branch, version)),
32            None => Self(format!("manifest:main:v{}", version)),
33        }
34    }
35}
36
37impl fmt::Display for SnapshotId {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        self.0.fmt(f)
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum ReadTarget {
45    Branch(String),
46    Snapshot(SnapshotId),
47}
48
49impl ReadTarget {
50    pub fn branch(name: impl Into<String>) -> Self {
51        Self::Branch(name.into())
52    }
53
54    pub fn snapshot(id: impl Into<SnapshotId>) -> Self {
55        Self::Snapshot(id.into())
56    }
57}
58
59impl From<&str> for ReadTarget {
60    fn from(value: &str) -> Self {
61        Self::branch(value)
62    }
63}
64
65impl From<String> for ReadTarget {
66    fn from(value: String) -> Self {
67        Self::Branch(value)
68    }
69}
70
71impl From<SnapshotId> for ReadTarget {
72    fn from(value: SnapshotId) -> Self {
73        Self::Snapshot(value)
74    }
75}
76
77#[derive(Debug, Clone)]
78pub struct ResolvedTarget {
79    pub requested: ReadTarget,
80    pub branch: Option<String>,
81    pub snapshot_id: SnapshotId,
82    pub snapshot: Snapshot,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct PublishedSnapshot {
87    pub manifest_version: u64,
88    pub _snapshot_id: SnapshotId,
89}
90
91pub struct GraphCoordinator {
92    root_uri: String,
93    storage: Arc<dyn StorageAdapter>,
94    manifest: ManifestCoordinator,
95    commit_graph: Option<CommitGraph>,
96    bound_branch: Option<String>,
97}
98
99impl GraphCoordinator {
100    pub async fn init(
101        root_uri: &str,
102        catalog: &Catalog,
103        storage: Arc<dyn StorageAdapter>,
104    ) -> Result<Self> {
105        let root = normalize_root_uri(root_uri)?;
106        let manifest = ManifestCoordinator::init(&root, catalog).await?;
107        let commit_graph = Some(CommitGraph::init(&root, manifest.version()).await?);
108        Ok(Self {
109            root_uri: root,
110            storage,
111            manifest,
112            commit_graph,
113            bound_branch: None,
114        })
115    }
116
117    pub async fn open(root_uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
118        let root = normalize_root_uri(root_uri)?;
119        let manifest = ManifestCoordinator::open(&root).await?;
120        let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
121            Some(CommitGraph::open(&root).await?)
122        } else {
123            None
124        };
125        Ok(Self {
126            root_uri: root,
127            storage,
128            manifest,
129            commit_graph,
130            bound_branch: None,
131        })
132    }
133
134    pub async fn open_branch(
135        root_uri: &str,
136        branch: &str,
137        storage: Arc<dyn StorageAdapter>,
138    ) -> Result<Self> {
139        let branch = normalize_branch_name(branch)?;
140        let Some(branch_name) = branch else {
141            return Self::open(root_uri, storage).await;
142        };
143
144        let root = normalize_root_uri(root_uri)?;
145        let manifest = ManifestCoordinator::open_at_branch(&root, &branch_name).await?;
146        let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
147            Some(CommitGraph::open_at_branch(&root, &branch_name).await?)
148        } else {
149            None
150        };
151
152        Ok(Self {
153            root_uri: root,
154            storage,
155            manifest,
156            commit_graph,
157            bound_branch: Some(branch_name),
158        })
159    }
160
161    pub fn root_uri(&self) -> &str {
162        &self.root_uri
163    }
164
165    pub fn version(&self) -> u64 {
166        self.manifest.version()
167    }
168
169    pub fn snapshot(&self) -> Snapshot {
170        self.manifest.snapshot()
171    }
172
173    pub fn current_branch(&self) -> Option<&str> {
174        self.bound_branch.as_deref()
175    }
176
177    pub async fn refresh(&mut self) -> Result<()> {
178        self.manifest.refresh().await?;
179        if let Some(commit_graph) = &mut self.commit_graph {
180            commit_graph.refresh().await?;
181        }
182        Ok(())
183    }
184
185    pub async fn branch_list(&self) -> Result<Vec<String>> {
186        self.manifest.list_branches().await.map(|branches| {
187            branches
188                .into_iter()
189                .filter(|branch| !is_internal_system_branch(branch))
190                .collect()
191        })
192    }
193
194    pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
195        self.manifest.list_branches().await
196    }
197
198    pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
199        self.manifest
200            .descendant_branches(name)
201            .await
202            .map(|branches| {
203                branches
204                    .into_iter()
205                    .filter(|branch| !is_internal_system_branch(branch))
206                    .collect()
207            })
208    }
209
210    pub async fn branch_create(&mut self, name: &str) -> Result<()> {
211        let branch = normalize_branch_name(name)?
212            .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?;
213        self.ensure_commit_graph_initialized().await?;
214        self.manifest.create_branch(&branch).await?;
215        failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
216        if let Some(commit_graph) = &mut self.commit_graph {
217            commit_graph.create_branch(&branch).await?;
218        }
219        Ok(())
220    }
221
222    pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
223        let branch = normalize_branch_name(name)?
224            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
225        if self.current_branch() == Some(branch.as_str()) {
226            return Err(OmniError::manifest_conflict(format!(
227                "cannot delete currently active branch '{}'",
228                branch
229            )));
230        }
231
232        self.manifest.delete_branch(&branch).await?;
233
234        if let Some(commit_graph) = &mut self.commit_graph {
235            commit_graph.delete_branch(&branch).await?;
236        } else if self
237            .storage
238            .exists(&graph_commits_uri(self.root_uri()))
239            .await?
240        {
241            let mut commit_graph = CommitGraph::open(self.root_uri()).await?;
242            commit_graph.delete_branch(&branch).await?;
243        }
244
245        Ok(())
246    }
247
248    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
249        ManifestCoordinator::snapshot_at(self.root_uri(), self.current_branch(), version).await
250    }
251
252    pub async fn resolve_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
253        let normalized = normalize_branch_name(branch)?;
254        let other = match normalized.as_deref() {
255            Some(branch) => {
256                GraphCoordinator::open_branch(self.root_uri(), branch, Arc::clone(&self.storage))
257                    .await?
258            }
259            None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
260        };
261
262        Ok(other
263            .head_commit_id()
264            .await?
265            .unwrap_or_else(|| SnapshotId::synthetic(other.current_branch(), other.version())))
266    }
267
268    pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
269        match target {
270            ReadTarget::Branch(branch) => {
271                let normalized = normalize_branch_name(branch)?;
272                let other = match normalized.as_deref() {
273                    Some(branch) => {
274                        GraphCoordinator::open_branch(
275                            self.root_uri(),
276                            branch,
277                            Arc::clone(&self.storage),
278                        )
279                        .await?
280                    }
281                    None => {
282                        GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?
283                    }
284                };
285                let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
286                    SnapshotId::synthetic(other.current_branch(), other.version())
287                });
288                Ok(ResolvedTarget {
289                    requested: target.clone(),
290                    branch: other.bound_branch.clone(),
291                    snapshot_id,
292                    snapshot: other.snapshot(),
293                })
294            }
295            ReadTarget::Snapshot(snapshot_id) => {
296                let commit = self.resolve_commit(snapshot_id).await?;
297                let snapshot = ManifestCoordinator::snapshot_at(
298                    self.root_uri(),
299                    commit.manifest_branch.as_deref(),
300                    commit.manifest_version,
301                )
302                .await?;
303                Ok(ResolvedTarget {
304                    requested: target.clone(),
305                    branch: commit.manifest_branch.clone(),
306                    snapshot_id: snapshot_id.clone(),
307                    snapshot,
308                })
309            }
310        }
311    }
312
313    pub async fn resolve_commit(&self, snapshot_id: &SnapshotId) -> Result<GraphCommit> {
314        if let Some(commit_graph) = &self.commit_graph {
315            if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
316                return Ok(commit);
317            }
318        }
319
320        for branch in self.manifest.list_branches().await? {
321            let normalized = normalize_branch_name(&branch)?;
322            let Some(commit_graph) = self
323                .open_commit_graph_for_branch(normalized.as_deref())
324                .await?
325            else {
326                break;
327            };
328            if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
329                return Ok(commit);
330            }
331        }
332
333        Err(OmniError::manifest_not_found(format!(
334            "commit '{}' not found",
335            snapshot_id
336        )))
337    }
338
339    pub(crate) async fn head_commit_id(&self) -> Result<Option<SnapshotId>> {
340        match &self.commit_graph {
341            Some(commit_graph) => commit_graph
342                .head_commit_id()
343                .await
344                .map(|id| id.map(SnapshotId::new)),
345            None => Ok(None),
346        }
347    }
348
349    pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
350        if self.commit_graph.is_some() {
351            return Ok(());
352        }
353        if !self
354            .storage
355            .exists(&graph_commits_uri(self.root_uri()))
356            .await?
357        {
358            let _ = CommitGraph::init(self.root_uri(), self.manifest.version()).await?;
359        }
360        self.commit_graph = match self.current_branch() {
361            Some(branch) => Some(CommitGraph::open_at_branch(self.root_uri(), branch).await?),
362            None => Some(CommitGraph::open(self.root_uri()).await?),
363        };
364        Ok(())
365    }
366
367    pub(crate) async fn commit_updates_with_actor(
368        &mut self,
369        updates: &[SubTableUpdate],
370        actor_id: Option<&str>,
371    ) -> Result<PublishedSnapshot> {
372        let manifest_version = self.commit_manifest_updates(updates).await?;
373        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
374        Ok(PublishedSnapshot {
375            manifest_version,
376            _snapshot_id: snapshot_id,
377        })
378    }
379
380    /// Commit with publisher-level OCC fence. The `expected_table_versions` map
381    /// asserts the manifest's current latest non-tombstoned `table_version` for
382    /// each `table_key` matches what the caller observed before writing.
383    /// Mismatches surface as `OmniError::Manifest` with
384    /// `ManifestConflictDetails::ExpectedVersionMismatch`.
385    pub(crate) async fn commit_updates_with_actor_with_expected(
386        &mut self,
387        updates: &[SubTableUpdate],
388        expected_table_versions: &HashMap<String, u64>,
389        actor_id: Option<&str>,
390    ) -> Result<PublishedSnapshot> {
391        let manifest_version = self
392            .commit_manifest_updates_with_expected(updates, expected_table_versions)
393            .await?;
394        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
395        Ok(PublishedSnapshot {
396            manifest_version,
397            _snapshot_id: snapshot_id,
398        })
399    }
400
401    pub(crate) async fn commit_manifest_updates(
402        &mut self,
403        updates: &[SubTableUpdate],
404    ) -> Result<u64> {
405        let manifest_version = self.manifest.commit(updates).await?;
406        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
407        Ok(manifest_version)
408    }
409
410    pub(crate) async fn commit_manifest_updates_with_expected(
411        &mut self,
412        updates: &[SubTableUpdate],
413        expected_table_versions: &HashMap<String, u64>,
414    ) -> Result<u64> {
415        let manifest_version = self
416            .manifest
417            .commit_with_expected(updates, expected_table_versions)
418            .await?;
419        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
420        Ok(manifest_version)
421    }
422
423    pub(crate) async fn commit_manifest_changes(
424        &mut self,
425        changes: &[ManifestChange],
426    ) -> Result<u64> {
427        let manifest_version = self.manifest.commit_changes(changes).await?;
428        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
429        Ok(manifest_version)
430    }
431
432    pub(crate) async fn commit_changes_with_actor(
433        &mut self,
434        changes: &[ManifestChange],
435        actor_id: Option<&str>,
436    ) -> Result<PublishedSnapshot> {
437        let manifest_version = self.commit_manifest_changes(changes).await?;
438        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
439        Ok(PublishedSnapshot {
440            manifest_version,
441            _snapshot_id: snapshot_id,
442        })
443    }
444
445    pub(crate) async fn record_graph_commit(
446        &mut self,
447        manifest_version: u64,
448        actor_id: Option<&str>,
449    ) -> Result<SnapshotId> {
450        self.ensure_commit_graph_initialized().await?;
451        let current_branch = self.current_branch().map(str::to_string);
452        let Some(commit_graph) = &mut self.commit_graph else {
453            return Ok(SnapshotId::synthetic(
454                current_branch.as_deref(),
455                manifest_version,
456            ));
457        };
458        failpoints::maybe_fail("graph_publish.before_commit_append")?;
459        let graph_commit_id = commit_graph
460            .append_commit(current_branch.as_deref(), manifest_version, actor_id)
461            .await?;
462        Ok(SnapshotId::new(graph_commit_id))
463    }
464
465    pub(crate) async fn record_merge_commit(
466        &mut self,
467        manifest_version: u64,
468        parent_commit_id: &str,
469        merged_parent_commit_id: &str,
470        actor_id: Option<&str>,
471    ) -> Result<SnapshotId> {
472        self.ensure_commit_graph_initialized().await?;
473        let current_branch = self.current_branch().map(str::to_string);
474        let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
475            OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
476        })?;
477        failpoints::maybe_fail("graph_publish.before_commit_append")?;
478        let graph_commit_id = commit_graph
479            .append_merge_commit(
480                current_branch.as_deref(),
481                manifest_version,
482                parent_commit_id,
483                merged_parent_commit_id,
484                actor_id,
485            )
486            .await?;
487        Ok(SnapshotId::new(graph_commit_id))
488    }
489
490    async fn open_commit_graph_for_branch(
491        &self,
492        branch: Option<&str>,
493    ) -> Result<Option<CommitGraph>> {
494        if !self
495            .storage
496            .exists(&graph_commits_uri(self.root_uri()))
497            .await?
498        {
499            return Ok(None);
500        }
501        let graph = match branch {
502            Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
503            None => CommitGraph::open(self.root_uri()).await?,
504        };
505        Ok(Some(graph))
506    }
507
508    pub(crate) async fn list_commits(&self) -> Result<Vec<GraphCommit>> {
509        if let Some(commit_graph) = &self.commit_graph {
510            return commit_graph.load_commits().await;
511        }
512        if !self
513            .storage
514            .exists(&graph_commits_uri(self.root_uri()))
515            .await?
516        {
517            return Ok(Vec::new());
518        }
519        let commit_graph = match self.current_branch() {
520            Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
521            None => CommitGraph::open(self.root_uri()).await?,
522        };
523        commit_graph.load_commits().await
524    }
525}
526
527fn graph_commits_uri(root_uri: &str) -> String {
528    join_uri(root_uri, GRAPH_COMMITS_DIR)
529}
530
531fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
532    let branch = branch.trim();
533    if branch.is_empty() {
534        return Err(OmniError::manifest(
535            "branch name cannot be empty".to_string(),
536        ));
537    }
538    if branch == "main" {
539        return Ok(None);
540    }
541    Ok(Some(branch.to_string()))
542}