Skip to main content

omnigraph/db/
graph_coordinator.rs

1use std::fmt;
2use std::sync::Arc;
3
4use omnigraph_compiler::catalog::Catalog;
5
6use crate::error::{OmniError, Result};
7use crate::failpoints;
8use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
9
10use super::commit_graph::{CommitGraph, GraphCommit};
11use super::is_internal_system_branch;
12use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
13use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri};
14
15const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18pub struct SnapshotId(String);
19
20impl SnapshotId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn as_str(&self) -> &str {
26        &self.0
27    }
28
29    pub(crate) fn synthetic(branch: Option<&str>, version: u64) -> Self {
30        match branch {
31            Some(branch) => Self(format!("manifest:{}:v{}", branch, version)),
32            None => Self(format!("manifest:main:v{}", version)),
33        }
34    }
35}
36
37impl fmt::Display for SnapshotId {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        self.0.fmt(f)
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum ReadTarget {
45    Branch(String),
46    Snapshot(SnapshotId),
47}
48
49impl ReadTarget {
50    pub fn branch(name: impl Into<String>) -> Self {
51        Self::Branch(name.into())
52    }
53
54    pub fn snapshot(id: impl Into<SnapshotId>) -> Self {
55        Self::Snapshot(id.into())
56    }
57}
58
59impl From<&str> for ReadTarget {
60    fn from(value: &str) -> Self {
61        Self::branch(value)
62    }
63}
64
65impl From<String> for ReadTarget {
66    fn from(value: String) -> Self {
67        Self::Branch(value)
68    }
69}
70
71impl From<SnapshotId> for ReadTarget {
72    fn from(value: SnapshotId) -> Self {
73        Self::Snapshot(value)
74    }
75}
76
77#[derive(Debug, Clone)]
78pub struct ResolvedTarget {
79    pub requested: ReadTarget,
80    pub branch: Option<String>,
81    pub snapshot_id: SnapshotId,
82    pub snapshot: Snapshot,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct PublishedSnapshot {
87    pub manifest_version: u64,
88    pub _snapshot_id: SnapshotId,
89}
90
91pub struct GraphCoordinator {
92    root_uri: String,
93    storage: Arc<dyn StorageAdapter>,
94    manifest: ManifestCoordinator,
95    commit_graph: Option<CommitGraph>,
96    run_registry: Option<RunRegistry>,
97    bound_branch: Option<String>,
98}
99
100impl GraphCoordinator {
101    pub async fn init(
102        root_uri: &str,
103        catalog: &Catalog,
104        storage: Arc<dyn StorageAdapter>,
105    ) -> Result<Self> {
106        let root = normalize_root_uri(root_uri)?;
107        let manifest = ManifestCoordinator::init(&root, catalog).await?;
108        let commit_graph = Some(CommitGraph::init(&root, manifest.version()).await?);
109        Ok(Self {
110            root_uri: root,
111            storage,
112            manifest,
113            commit_graph,
114            run_registry: None,
115            bound_branch: None,
116        })
117    }
118
119    pub async fn open(root_uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
120        let root = normalize_root_uri(root_uri)?;
121        let manifest = ManifestCoordinator::open(&root).await?;
122        let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
123            Some(CommitGraph::open(&root).await?)
124        } else {
125            None
126        };
127        let run_registry = if storage.exists(&graph_runs_uri(&root)).await? {
128            Some(RunRegistry::open(&root).await?)
129        } else {
130            None
131        };
132        Ok(Self {
133            root_uri: root,
134            storage,
135            manifest,
136            commit_graph,
137            run_registry,
138            bound_branch: None,
139        })
140    }
141
142    pub async fn open_branch(
143        root_uri: &str,
144        branch: &str,
145        storage: Arc<dyn StorageAdapter>,
146    ) -> Result<Self> {
147        let branch = normalize_branch_name(branch)?;
148        let Some(branch_name) = branch else {
149            return Self::open(root_uri, storage).await;
150        };
151
152        let root = normalize_root_uri(root_uri)?;
153        let manifest = ManifestCoordinator::open_at_branch(&root, &branch_name).await?;
154        let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
155            Some(CommitGraph::open_at_branch(&root, &branch_name).await?)
156        } else {
157            None
158        };
159        let run_registry = if storage.exists(&graph_runs_uri(&root)).await? {
160            Some(RunRegistry::open(&root).await?)
161        } else {
162            None
163        };
164
165        Ok(Self {
166            root_uri: root,
167            storage,
168            manifest,
169            commit_graph,
170            run_registry,
171            bound_branch: Some(branch_name),
172        })
173    }
174
175    pub fn root_uri(&self) -> &str {
176        &self.root_uri
177    }
178
179    pub fn version(&self) -> u64 {
180        self.manifest.version()
181    }
182
183    pub fn snapshot(&self) -> Snapshot {
184        self.manifest.snapshot()
185    }
186
187    pub fn current_branch(&self) -> Option<&str> {
188        self.bound_branch.as_deref()
189    }
190
191    pub async fn refresh(&mut self) -> Result<()> {
192        self.manifest.refresh().await?;
193        if let Some(commit_graph) = &mut self.commit_graph {
194            commit_graph.refresh().await?;
195        }
196        if let Some(run_registry) = &mut self.run_registry {
197            let root_uri = self.root_uri.clone();
198            run_registry.refresh(&root_uri).await?;
199        }
200        Ok(())
201    }
202
203    pub async fn branch_list(&self) -> Result<Vec<String>> {
204        self.manifest.list_branches().await.map(|branches| {
205            branches
206                .into_iter()
207                .filter(|branch| !is_internal_system_branch(branch))
208                .collect()
209        })
210    }
211
212    pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
213        self.manifest.list_branches().await
214    }
215
216    pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
217        self.manifest
218            .descendant_branches(name)
219            .await
220            .map(|branches| {
221                branches
222                    .into_iter()
223                    .filter(|branch| !is_internal_system_branch(branch))
224                    .collect()
225            })
226    }
227
228    pub async fn branch_create(&mut self, name: &str) -> Result<()> {
229        let branch = normalize_branch_name(name)?
230            .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?;
231        self.ensure_commit_graph_initialized().await?;
232        self.manifest.create_branch(&branch).await?;
233        failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
234        if let Some(commit_graph) = &mut self.commit_graph {
235            commit_graph.create_branch(&branch).await?;
236        }
237        Ok(())
238    }
239
240    pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
241        let branch = normalize_branch_name(name)?
242            .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
243        if self.current_branch() == Some(branch.as_str()) {
244            return Err(OmniError::manifest_conflict(format!(
245                "cannot delete currently active branch '{}'",
246                branch
247            )));
248        }
249
250        self.manifest.delete_branch(&branch).await?;
251
252        if let Some(commit_graph) = &mut self.commit_graph {
253            commit_graph.delete_branch(&branch).await?;
254        } else if self
255            .storage
256            .exists(&graph_commits_uri(self.root_uri()))
257            .await?
258        {
259            let mut commit_graph = CommitGraph::open(self.root_uri()).await?;
260            commit_graph.delete_branch(&branch).await?;
261        }
262
263        Ok(())
264    }
265
266    pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
267        ManifestCoordinator::snapshot_at(self.root_uri(), self.current_branch(), version).await
268    }
269
270    pub async fn resolve_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
271        let normalized = normalize_branch_name(branch)?;
272        let other = match normalized.as_deref() {
273            Some(branch) => {
274                GraphCoordinator::open_branch(self.root_uri(), branch, Arc::clone(&self.storage))
275                    .await?
276            }
277            None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
278        };
279
280        Ok(other
281            .head_commit_id()
282            .await?
283            .unwrap_or_else(|| SnapshotId::synthetic(other.current_branch(), other.version())))
284    }
285
286    pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
287        match target {
288            ReadTarget::Branch(branch) => {
289                let normalized = normalize_branch_name(branch)?;
290                let other = match normalized.as_deref() {
291                    Some(branch) => {
292                        GraphCoordinator::open_branch(
293                            self.root_uri(),
294                            branch,
295                            Arc::clone(&self.storage),
296                        )
297                        .await?
298                    }
299                    None => {
300                        GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?
301                    }
302                };
303                let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
304                    SnapshotId::synthetic(other.current_branch(), other.version())
305                });
306                Ok(ResolvedTarget {
307                    requested: target.clone(),
308                    branch: other.bound_branch.clone(),
309                    snapshot_id,
310                    snapshot: other.snapshot(),
311                })
312            }
313            ReadTarget::Snapshot(snapshot_id) => {
314                let commit = self.resolve_commit(snapshot_id).await?;
315                let snapshot = ManifestCoordinator::snapshot_at(
316                    self.root_uri(),
317                    commit.manifest_branch.as_deref(),
318                    commit.manifest_version,
319                )
320                .await?;
321                Ok(ResolvedTarget {
322                    requested: target.clone(),
323                    branch: commit.manifest_branch.clone(),
324                    snapshot_id: snapshot_id.clone(),
325                    snapshot,
326                })
327            }
328        }
329    }
330
331    pub async fn resolve_commit(&self, snapshot_id: &SnapshotId) -> Result<GraphCommit> {
332        if let Some(commit_graph) = &self.commit_graph {
333            if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
334                return Ok(commit);
335            }
336        }
337
338        for branch in self.manifest.list_branches().await? {
339            let normalized = normalize_branch_name(&branch)?;
340            let Some(commit_graph) = self
341                .open_commit_graph_for_branch(normalized.as_deref())
342                .await?
343            else {
344                break;
345            };
346            if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
347                return Ok(commit);
348            }
349        }
350
351        Err(OmniError::manifest_not_found(format!(
352            "commit '{}' not found",
353            snapshot_id
354        )))
355    }
356
357    pub(crate) async fn head_commit_id(&self) -> Result<Option<SnapshotId>> {
358        match &self.commit_graph {
359            Some(commit_graph) => commit_graph
360                .head_commit_id()
361                .await
362                .map(|id| id.map(SnapshotId::new)),
363            None => Ok(None),
364        }
365    }
366
367    pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
368        if self.commit_graph.is_some() {
369            return Ok(());
370        }
371        if !self
372            .storage
373            .exists(&graph_commits_uri(self.root_uri()))
374            .await?
375        {
376            let _ = CommitGraph::init(self.root_uri(), self.manifest.version()).await?;
377        }
378        self.commit_graph = match self.current_branch() {
379            Some(branch) => Some(CommitGraph::open_at_branch(self.root_uri(), branch).await?),
380            None => Some(CommitGraph::open(self.root_uri()).await?),
381        };
382        Ok(())
383    }
384
385    pub(crate) async fn ensure_run_registry_initialized(&mut self) -> Result<()> {
386        if self.run_registry.is_some() {
387            return Ok(());
388        }
389        if !self
390            .storage
391            .exists(&graph_runs_uri(self.root_uri()))
392            .await?
393        {
394            let _ = RunRegistry::init(self.root_uri()).await?;
395        }
396        self.run_registry = Some(RunRegistry::open(self.root_uri()).await?);
397        Ok(())
398    }
399
400    pub(crate) async fn commit_updates_with_actor(
401        &mut self,
402        updates: &[SubTableUpdate],
403        actor_id: Option<&str>,
404    ) -> Result<PublishedSnapshot> {
405        let manifest_version = self.commit_manifest_updates(updates).await?;
406        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
407        Ok(PublishedSnapshot {
408            manifest_version,
409            _snapshot_id: snapshot_id,
410        })
411    }
412
413    pub(crate) async fn commit_manifest_updates(
414        &mut self,
415        updates: &[SubTableUpdate],
416    ) -> Result<u64> {
417        let manifest_version = self.manifest.commit(updates).await?;
418        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
419        Ok(manifest_version)
420    }
421
422    pub(crate) async fn commit_manifest_changes(
423        &mut self,
424        changes: &[ManifestChange],
425    ) -> Result<u64> {
426        let manifest_version = self.manifest.commit_changes(changes).await?;
427        failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
428        Ok(manifest_version)
429    }
430
431    pub(crate) async fn commit_changes_with_actor(
432        &mut self,
433        changes: &[ManifestChange],
434        actor_id: Option<&str>,
435    ) -> Result<PublishedSnapshot> {
436        let manifest_version = self.commit_manifest_changes(changes).await?;
437        let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
438        Ok(PublishedSnapshot {
439            manifest_version,
440            _snapshot_id: snapshot_id,
441        })
442    }
443
444    pub(crate) async fn record_graph_commit(
445        &mut self,
446        manifest_version: u64,
447        actor_id: Option<&str>,
448    ) -> Result<SnapshotId> {
449        self.ensure_commit_graph_initialized().await?;
450        let current_branch = self.current_branch().map(str::to_string);
451        let Some(commit_graph) = &mut self.commit_graph else {
452            return Ok(SnapshotId::synthetic(
453                current_branch.as_deref(),
454                manifest_version,
455            ));
456        };
457        failpoints::maybe_fail("graph_publish.before_commit_append")?;
458        let graph_commit_id = commit_graph
459            .append_commit(current_branch.as_deref(), manifest_version, actor_id)
460            .await?;
461        Ok(SnapshotId::new(graph_commit_id))
462    }
463
464    pub(crate) async fn record_merge_commit(
465        &mut self,
466        manifest_version: u64,
467        parent_commit_id: &str,
468        merged_parent_commit_id: &str,
469        actor_id: Option<&str>,
470    ) -> Result<SnapshotId> {
471        self.ensure_commit_graph_initialized().await?;
472        let current_branch = self.current_branch().map(str::to_string);
473        let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
474            OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
475        })?;
476        failpoints::maybe_fail("graph_publish.before_commit_append")?;
477        let graph_commit_id = commit_graph
478            .append_merge_commit(
479                current_branch.as_deref(),
480                manifest_version,
481                parent_commit_id,
482                merged_parent_commit_id,
483                actor_id,
484            )
485            .await?;
486        Ok(SnapshotId::new(graph_commit_id))
487    }
488
489    async fn open_commit_graph_for_branch(
490        &self,
491        branch: Option<&str>,
492    ) -> Result<Option<CommitGraph>> {
493        if !self
494            .storage
495            .exists(&graph_commits_uri(self.root_uri()))
496            .await?
497        {
498            return Ok(None);
499        }
500        let graph = match branch {
501            Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
502            None => CommitGraph::open(self.root_uri()).await?,
503        };
504        Ok(Some(graph))
505    }
506
507    pub(crate) async fn append_run_record(&mut self, record: &RunRecord) -> Result<()> {
508        self.ensure_run_registry_initialized().await?;
509        let Some(run_registry) = &mut self.run_registry else {
510            return Err(OmniError::manifest(
511                "run registry not initialized".to_string(),
512            ));
513        };
514        run_registry.append_record(record).await
515    }
516
517    pub(crate) async fn get_run(&self, run_id: &RunId) -> Result<RunRecord> {
518        if let Some(run_registry) = &self.run_registry {
519            if let Some(run) = run_registry.get_run(run_id).await? {
520                return Ok(run);
521            }
522        }
523        if !self
524            .storage
525            .exists(&graph_runs_uri(self.root_uri()))
526            .await?
527        {
528            return Err(OmniError::manifest_not_found(format!(
529                "run '{}' not found",
530                run_id
531            )));
532        }
533        let run_registry = RunRegistry::open(self.root_uri()).await?;
534        run_registry
535            .get_run(run_id)
536            .await?
537            .ok_or_else(|| OmniError::manifest_not_found(format!("run '{}' not found", run_id)))
538    }
539
540    pub(crate) async fn list_runs(&self) -> Result<Vec<RunRecord>> {
541        if let Some(run_registry) = &self.run_registry {
542            return run_registry.list_runs().await;
543        }
544        if !self
545            .storage
546            .exists(&graph_runs_uri(self.root_uri()))
547            .await?
548        {
549            return Ok(Vec::new());
550        }
551        let run_registry = RunRegistry::open(self.root_uri()).await?;
552        run_registry.list_runs().await
553    }
554
555    pub(crate) async fn list_commits(&self) -> Result<Vec<GraphCommit>> {
556        if let Some(commit_graph) = &self.commit_graph {
557            return commit_graph.load_commits().await;
558        }
559        if !self
560            .storage
561            .exists(&graph_commits_uri(self.root_uri()))
562            .await?
563        {
564            return Ok(Vec::new());
565        }
566        let commit_graph = match self.current_branch() {
567            Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
568            None => CommitGraph::open(self.root_uri()).await?,
569        };
570        commit_graph.load_commits().await
571    }
572}
573
574fn graph_commits_uri(root_uri: &str) -> String {
575    join_uri(root_uri, GRAPH_COMMITS_DIR)
576}
577
578fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
579    let branch = branch.trim();
580    if branch.is_empty() {
581        return Err(OmniError::manifest(
582            "branch name cannot be empty".to_string(),
583        ));
584    }
585    if branch == "main" {
586        return Ok(None);
587    }
588    Ok(Some(branch.to_string()))
589}