1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4
5use omnigraph_compiler::catalog::Catalog;
6
7use crate::error::{OmniError, Result};
8use crate::failpoints;
9use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
10
11use super::commit_graph::{CommitGraph, GraphCommit};
12use super::is_internal_system_branch;
13use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
14
15const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18pub struct SnapshotId(String);
19
20impl SnapshotId {
21 pub fn new(id: impl Into<String>) -> Self {
22 Self(id.into())
23 }
24
25 pub fn as_str(&self) -> &str {
26 &self.0
27 }
28
29 pub(crate) fn synthetic(branch: Option<&str>, version: u64) -> Self {
30 match branch {
31 Some(branch) => Self(format!("manifest:{}:v{}", branch, version)),
32 None => Self(format!("manifest:main:v{}", version)),
33 }
34 }
35}
36
37impl fmt::Display for SnapshotId {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 self.0.fmt(f)
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum ReadTarget {
45 Branch(String),
46 Snapshot(SnapshotId),
47}
48
49impl ReadTarget {
50 pub fn branch(name: impl Into<String>) -> Self {
51 Self::Branch(name.into())
52 }
53
54 pub fn snapshot(id: impl Into<SnapshotId>) -> Self {
55 Self::Snapshot(id.into())
56 }
57}
58
59impl From<&str> for ReadTarget {
60 fn from(value: &str) -> Self {
61 Self::branch(value)
62 }
63}
64
65impl From<String> for ReadTarget {
66 fn from(value: String) -> Self {
67 Self::Branch(value)
68 }
69}
70
71impl From<SnapshotId> for ReadTarget {
72 fn from(value: SnapshotId) -> Self {
73 Self::Snapshot(value)
74 }
75}
76
77#[derive(Debug, Clone)]
78pub struct ResolvedTarget {
79 pub requested: ReadTarget,
80 pub branch: Option<String>,
81 pub snapshot_id: SnapshotId,
82 pub snapshot: Snapshot,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct PublishedSnapshot {
87 pub manifest_version: u64,
88 pub _snapshot_id: SnapshotId,
89}
90
91pub struct GraphCoordinator {
92 root_uri: String,
93 storage: Arc<dyn StorageAdapter>,
94 manifest: ManifestCoordinator,
95 commit_graph: Option<CommitGraph>,
96 bound_branch: Option<String>,
97}
98
99impl GraphCoordinator {
100 pub async fn init(
101 root_uri: &str,
102 catalog: &Catalog,
103 storage: Arc<dyn StorageAdapter>,
104 ) -> Result<Self> {
105 let root = normalize_root_uri(root_uri)?;
106 let manifest = ManifestCoordinator::init(&root, catalog).await?;
107 let commit_graph = Some(CommitGraph::init(&root, manifest.version()).await?);
108 Ok(Self {
109 root_uri: root,
110 storage,
111 manifest,
112 commit_graph,
113 bound_branch: None,
114 })
115 }
116
117 pub async fn open(root_uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
118 let root = normalize_root_uri(root_uri)?;
119 let manifest = ManifestCoordinator::open(&root).await?;
120 let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
121 Some(CommitGraph::open(&root).await?)
122 } else {
123 None
124 };
125 Ok(Self {
126 root_uri: root,
127 storage,
128 manifest,
129 commit_graph,
130 bound_branch: None,
131 })
132 }
133
134 pub async fn open_branch(
135 root_uri: &str,
136 branch: &str,
137 storage: Arc<dyn StorageAdapter>,
138 ) -> Result<Self> {
139 let branch = normalize_branch_name(branch)?;
140 let Some(branch_name) = branch else {
141 return Self::open(root_uri, storage).await;
142 };
143
144 let root = normalize_root_uri(root_uri)?;
145 let manifest = ManifestCoordinator::open_at_branch(&root, &branch_name).await?;
146 let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
147 Some(CommitGraph::open_at_branch(&root, &branch_name).await?)
148 } else {
149 None
150 };
151
152 Ok(Self {
153 root_uri: root,
154 storage,
155 manifest,
156 commit_graph,
157 bound_branch: Some(branch_name),
158 })
159 }
160
161 pub fn root_uri(&self) -> &str {
162 &self.root_uri
163 }
164
165 pub fn version(&self) -> u64 {
166 self.manifest.version()
167 }
168
169 pub fn snapshot(&self) -> Snapshot {
170 self.manifest.snapshot()
171 }
172
173 pub fn current_branch(&self) -> Option<&str> {
174 self.bound_branch.as_deref()
175 }
176
177 pub async fn refresh(&mut self) -> Result<()> {
178 self.manifest.refresh().await?;
179 if let Some(commit_graph) = &mut self.commit_graph {
180 commit_graph.refresh().await?;
181 }
182 Ok(())
183 }
184
185 pub async fn branch_list(&self) -> Result<Vec<String>> {
186 self.manifest.list_branches().await.map(|branches| {
187 branches
188 .into_iter()
189 .filter(|branch| !is_internal_system_branch(branch))
190 .collect()
191 })
192 }
193
194 pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
195 self.manifest.list_branches().await
196 }
197
198 pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
199 self.manifest
200 .descendant_branches(name)
201 .await
202 .map(|branches| {
203 branches
204 .into_iter()
205 .filter(|branch| !is_internal_system_branch(branch))
206 .collect()
207 })
208 }
209
210 pub async fn branch_create(&mut self, name: &str) -> Result<()> {
211 let branch = normalize_branch_name(name)?
212 .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?;
213 self.ensure_commit_graph_initialized().await?;
214
215 self.manifest.create_branch(&branch).await?;
217
218 if let Err(err) = self.create_commit_graph_branch(&branch).await {
222 if let Err(rollback_err) = self.manifest.delete_branch(&branch).await {
223 tracing::warn!(
224 target: "omnigraph::branch_create",
225 branch = %branch,
226 error = %rollback_err,
227 "rollback of manifest branch failed after commit-graph create failure",
228 );
229 }
230 return Err(err);
231 }
232 Ok(())
233 }
234
235 async fn create_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
240 failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
241 let Some(commit_graph) = &mut self.commit_graph else {
242 return Ok(());
243 };
244 if commit_graph
245 .list_branches()
246 .await?
247 .iter()
248 .any(|existing| existing == branch)
249 {
250 commit_graph.force_delete_branch(branch).await?;
251 }
252 commit_graph.create_branch(branch).await
253 }
254
255 pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
256 let branch = normalize_branch_name(name)?
257 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
258 if self.current_branch() == Some(branch.as_str()) {
259 return Err(OmniError::manifest_conflict(format!(
260 "cannot delete currently active branch '{}'",
261 branch
262 )));
263 }
264
265 self.manifest.delete_branch(&branch).await?;
269
270 if let Err(err) = self.reclaim_commit_graph_branch(&branch).await {
275 tracing::warn!(
276 target: "omnigraph::branch_delete::cleanup",
277 branch = %branch,
278 error = %err,
279 "best-effort commit-graph branch reclaim failed; cleanup will reconcile",
280 );
281 }
282
283 Ok(())
284 }
285
286 async fn reclaim_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
289 failpoints::maybe_fail("branch_delete.before_commit_graph_reclaim")?;
290 if let Some(commit_graph) = &mut self.commit_graph {
291 commit_graph.force_delete_branch(branch).await
292 } else if self
293 .storage
294 .exists(&graph_commits_uri(self.root_uri()))
295 .await?
296 {
297 let mut commit_graph = CommitGraph::open(self.root_uri()).await?;
298 commit_graph.force_delete_branch(branch).await
299 } else {
300 Ok(())
301 }
302 }
303
304 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
305 ManifestCoordinator::snapshot_at(self.root_uri(), self.current_branch(), version).await
306 }
307
308 pub async fn resolve_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
309 let normalized = normalize_branch_name(branch)?;
310 let other = match normalized.as_deref() {
311 Some(branch) => {
312 GraphCoordinator::open_branch(self.root_uri(), branch, Arc::clone(&self.storage))
313 .await?
314 }
315 None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
316 };
317
318 Ok(other
319 .head_commit_id()
320 .await?
321 .unwrap_or_else(|| SnapshotId::synthetic(other.current_branch(), other.version())))
322 }
323
324 pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
325 match target {
326 ReadTarget::Branch(branch) => {
327 let normalized = normalize_branch_name(branch)?;
328 let other = match normalized.as_deref() {
329 Some(branch) => {
330 GraphCoordinator::open_branch(
331 self.root_uri(),
332 branch,
333 Arc::clone(&self.storage),
334 )
335 .await?
336 }
337 None => {
338 GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?
339 }
340 };
341 let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
342 SnapshotId::synthetic(other.current_branch(), other.version())
343 });
344 Ok(ResolvedTarget {
345 requested: target.clone(),
346 branch: other.bound_branch.clone(),
347 snapshot_id,
348 snapshot: other.snapshot(),
349 })
350 }
351 ReadTarget::Snapshot(snapshot_id) => {
352 let commit = self.resolve_commit(snapshot_id).await?;
353 let snapshot = ManifestCoordinator::snapshot_at(
354 self.root_uri(),
355 commit.manifest_branch.as_deref(),
356 commit.manifest_version,
357 )
358 .await?;
359 Ok(ResolvedTarget {
360 requested: target.clone(),
361 branch: commit.manifest_branch.clone(),
362 snapshot_id: snapshot_id.clone(),
363 snapshot,
364 })
365 }
366 }
367 }
368
369 pub async fn resolve_commit(&self, snapshot_id: &SnapshotId) -> Result<GraphCommit> {
370 if let Some(commit_graph) = &self.commit_graph {
371 if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
372 return Ok(commit);
373 }
374 }
375
376 for branch in self.manifest.list_branches().await? {
377 let normalized = normalize_branch_name(&branch)?;
378 let Some(commit_graph) = self
379 .open_commit_graph_for_branch(normalized.as_deref())
380 .await?
381 else {
382 break;
383 };
384 if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
385 return Ok(commit);
386 }
387 }
388
389 Err(OmniError::manifest_not_found(format!(
390 "commit '{}' not found",
391 snapshot_id
392 )))
393 }
394
395 pub(crate) async fn head_commit_id(&self) -> Result<Option<SnapshotId>> {
396 match &self.commit_graph {
397 Some(commit_graph) => commit_graph
398 .head_commit_id()
399 .await
400 .map(|id| id.map(SnapshotId::new)),
401 None => Ok(None),
402 }
403 }
404
405 pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
406 if self.commit_graph.is_some() {
407 return Ok(());
408 }
409 if !self
410 .storage
411 .exists(&graph_commits_uri(self.root_uri()))
412 .await?
413 {
414 let _ = CommitGraph::init(self.root_uri(), self.manifest.version()).await?;
415 }
416 self.commit_graph = match self.current_branch() {
417 Some(branch) => Some(CommitGraph::open_at_branch(self.root_uri(), branch).await?),
418 None => Some(CommitGraph::open(self.root_uri()).await?),
419 };
420 Ok(())
421 }
422
423 pub(crate) async fn commit_updates_with_actor(
424 &mut self,
425 updates: &[SubTableUpdate],
426 actor_id: Option<&str>,
427 ) -> Result<PublishedSnapshot> {
428 let manifest_version = self.commit_manifest_updates(updates).await?;
429 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
430 Ok(PublishedSnapshot {
431 manifest_version,
432 _snapshot_id: snapshot_id,
433 })
434 }
435
436 pub(crate) async fn commit_updates_with_actor_with_expected(
442 &mut self,
443 updates: &[SubTableUpdate],
444 expected_table_versions: &HashMap<String, u64>,
445 actor_id: Option<&str>,
446 ) -> Result<PublishedSnapshot> {
447 let manifest_version = self
448 .commit_manifest_updates_with_expected(updates, expected_table_versions)
449 .await?;
450 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
451 Ok(PublishedSnapshot {
452 manifest_version,
453 _snapshot_id: snapshot_id,
454 })
455 }
456
457 pub(crate) async fn commit_manifest_updates(
458 &mut self,
459 updates: &[SubTableUpdate],
460 ) -> Result<u64> {
461 let manifest_version = self.manifest.commit(updates).await?;
462 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
463 Ok(manifest_version)
464 }
465
466 pub(crate) async fn commit_manifest_updates_with_expected(
467 &mut self,
468 updates: &[SubTableUpdate],
469 expected_table_versions: &HashMap<String, u64>,
470 ) -> Result<u64> {
471 let manifest_version = self
472 .manifest
473 .commit_with_expected(updates, expected_table_versions)
474 .await?;
475 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
476 Ok(manifest_version)
477 }
478
479 pub(crate) async fn commit_manifest_changes(
480 &mut self,
481 changes: &[ManifestChange],
482 ) -> Result<u64> {
483 let manifest_version = self.manifest.commit_changes(changes).await?;
484 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
485 Ok(manifest_version)
486 }
487
488 pub(crate) async fn commit_changes_with_actor(
489 &mut self,
490 changes: &[ManifestChange],
491 actor_id: Option<&str>,
492 ) -> Result<PublishedSnapshot> {
493 let manifest_version = self.commit_manifest_changes(changes).await?;
494 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
495 Ok(PublishedSnapshot {
496 manifest_version,
497 _snapshot_id: snapshot_id,
498 })
499 }
500
501 pub(crate) async fn record_graph_commit(
502 &mut self,
503 manifest_version: u64,
504 actor_id: Option<&str>,
505 ) -> Result<SnapshotId> {
506 self.ensure_commit_graph_initialized().await?;
507 let current_branch = self.current_branch().map(str::to_string);
508 let Some(commit_graph) = &mut self.commit_graph else {
509 return Ok(SnapshotId::synthetic(
510 current_branch.as_deref(),
511 manifest_version,
512 ));
513 };
514 failpoints::maybe_fail("graph_publish.before_commit_append")?;
515 let graph_commit_id = commit_graph
516 .append_commit(current_branch.as_deref(), manifest_version, actor_id)
517 .await?;
518 Ok(SnapshotId::new(graph_commit_id))
519 }
520
521 pub(crate) async fn record_merge_commit(
522 &mut self,
523 manifest_version: u64,
524 parent_commit_id: &str,
525 merged_parent_commit_id: &str,
526 actor_id: Option<&str>,
527 ) -> Result<SnapshotId> {
528 self.ensure_commit_graph_initialized().await?;
529 let current_branch = self.current_branch().map(str::to_string);
530 let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
531 OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
532 })?;
533 failpoints::maybe_fail("graph_publish.before_commit_append")?;
534 let graph_commit_id = commit_graph
535 .append_merge_commit(
536 current_branch.as_deref(),
537 manifest_version,
538 parent_commit_id,
539 merged_parent_commit_id,
540 actor_id,
541 )
542 .await?;
543 Ok(SnapshotId::new(graph_commit_id))
544 }
545
546 async fn open_commit_graph_for_branch(
547 &self,
548 branch: Option<&str>,
549 ) -> Result<Option<CommitGraph>> {
550 if !self
551 .storage
552 .exists(&graph_commits_uri(self.root_uri()))
553 .await?
554 {
555 return Ok(None);
556 }
557 let graph = match branch {
558 Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
559 None => CommitGraph::open(self.root_uri()).await?,
560 };
561 Ok(Some(graph))
562 }
563
564 pub(crate) async fn list_commits(&self) -> Result<Vec<GraphCommit>> {
565 if let Some(commit_graph) = &self.commit_graph {
566 return commit_graph.load_commits().await;
567 }
568 if !self
569 .storage
570 .exists(&graph_commits_uri(self.root_uri()))
571 .await?
572 {
573 return Ok(Vec::new());
574 }
575 let commit_graph = match self.current_branch() {
576 Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
577 None => CommitGraph::open(self.root_uri()).await?,
578 };
579 commit_graph.load_commits().await
580 }
581}
582
583fn graph_commits_uri(root_uri: &str) -> String {
584 join_uri(root_uri, GRAPH_COMMITS_DIR)
585}
586
587fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
588 let branch = branch.trim();
589 if branch.is_empty() {
590 return Err(OmniError::manifest(
591 "branch name cannot be empty".to_string(),
592 ));
593 }
594 if branch == "main" {
595 return Ok(None);
596 }
597 Ok(Some(branch.to_string()))
598}