1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4
5use omnigraph_compiler::catalog::Catalog;
6
7use crate::error::{OmniError, Result};
8use crate::failpoints;
9use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
10
11use super::commit_graph::{CommitGraph, GraphCommit};
12use super::is_internal_system_branch;
13use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
14
15const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18pub struct SnapshotId(String);
19
20impl SnapshotId {
21 pub fn new(id: impl Into<String>) -> Self {
22 Self(id.into())
23 }
24
25 pub fn as_str(&self) -> &str {
26 &self.0
27 }
28
29 pub(crate) fn synthetic(branch: Option<&str>, version: u64) -> Self {
30 match branch {
31 Some(branch) => Self(format!("manifest:{}:v{}", branch, version)),
32 None => Self(format!("manifest:main:v{}", version)),
33 }
34 }
35}
36
37impl fmt::Display for SnapshotId {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 self.0.fmt(f)
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum ReadTarget {
45 Branch(String),
46 Snapshot(SnapshotId),
47}
48
49impl ReadTarget {
50 pub fn branch(name: impl Into<String>) -> Self {
51 Self::Branch(name.into())
52 }
53
54 pub fn snapshot(id: impl Into<SnapshotId>) -> Self {
55 Self::Snapshot(id.into())
56 }
57}
58
59impl From<&str> for ReadTarget {
60 fn from(value: &str) -> Self {
61 Self::branch(value)
62 }
63}
64
65impl From<String> for ReadTarget {
66 fn from(value: String) -> Self {
67 Self::Branch(value)
68 }
69}
70
71impl From<SnapshotId> for ReadTarget {
72 fn from(value: SnapshotId) -> Self {
73 Self::Snapshot(value)
74 }
75}
76
77#[derive(Debug, Clone)]
78pub struct ResolvedTarget {
79 pub requested: ReadTarget,
80 pub branch: Option<String>,
81 pub snapshot_id: SnapshotId,
82 pub snapshot: Snapshot,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct PublishedSnapshot {
87 pub manifest_version: u64,
88 pub _snapshot_id: SnapshotId,
89}
90
91pub struct GraphCoordinator {
92 root_uri: String,
93 storage: Arc<dyn StorageAdapter>,
94 manifest: ManifestCoordinator,
95 commit_graph: Option<CommitGraph>,
96 bound_branch: Option<String>,
97}
98
99impl GraphCoordinator {
100 pub async fn init(
101 root_uri: &str,
102 catalog: &Catalog,
103 storage: Arc<dyn StorageAdapter>,
104 ) -> Result<Self> {
105 let root = normalize_root_uri(root_uri)?;
106 let manifest = ManifestCoordinator::init(&root, catalog).await?;
107 let commit_graph = Some(CommitGraph::init(&root, manifest.version()).await?);
108 Ok(Self {
109 root_uri: root,
110 storage,
111 manifest,
112 commit_graph,
113 bound_branch: None,
114 })
115 }
116
117 pub async fn open(root_uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
118 let root = normalize_root_uri(root_uri)?;
119 let manifest = ManifestCoordinator::open(&root).await?;
120 let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
121 Some(CommitGraph::open(&root).await?)
122 } else {
123 None
124 };
125 Ok(Self {
126 root_uri: root,
127 storage,
128 manifest,
129 commit_graph,
130 bound_branch: None,
131 })
132 }
133
134 pub async fn open_branch(
135 root_uri: &str,
136 branch: &str,
137 storage: Arc<dyn StorageAdapter>,
138 ) -> Result<Self> {
139 let branch = normalize_branch_name(branch)?;
140 let Some(branch_name) = branch else {
141 return Self::open(root_uri, storage).await;
142 };
143
144 let root = normalize_root_uri(root_uri)?;
145 let manifest = ManifestCoordinator::open_at_branch(&root, &branch_name).await?;
146 let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
147 Some(CommitGraph::open_at_branch(&root, &branch_name).await?)
148 } else {
149 None
150 };
151
152 Ok(Self {
153 root_uri: root,
154 storage,
155 manifest,
156 commit_graph,
157 bound_branch: Some(branch_name),
158 })
159 }
160
161 pub fn root_uri(&self) -> &str {
162 &self.root_uri
163 }
164
165 pub fn version(&self) -> u64 {
166 self.manifest.version()
167 }
168
169 pub fn snapshot(&self) -> Snapshot {
170 self.manifest.snapshot()
171 }
172
173 pub fn current_branch(&self) -> Option<&str> {
174 self.bound_branch.as_deref()
175 }
176
177 pub async fn refresh(&mut self) -> Result<()> {
178 self.manifest.refresh().await?;
179 if let Some(commit_graph) = &mut self.commit_graph {
180 commit_graph.refresh().await?;
181 }
182 Ok(())
183 }
184
185 pub async fn branch_list(&self) -> Result<Vec<String>> {
186 self.manifest.list_branches().await.map(|branches| {
187 branches
188 .into_iter()
189 .filter(|branch| !is_internal_system_branch(branch))
190 .collect()
191 })
192 }
193
194 pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
195 self.manifest.list_branches().await
196 }
197
198 pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
199 self.manifest
200 .descendant_branches(name)
201 .await
202 .map(|branches| {
203 branches
204 .into_iter()
205 .filter(|branch| !is_internal_system_branch(branch))
206 .collect()
207 })
208 }
209
210 pub async fn branch_create(&mut self, name: &str) -> Result<()> {
211 let branch = normalize_branch_name(name)?
212 .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?;
213 self.ensure_commit_graph_initialized().await?;
214 self.manifest.create_branch(&branch).await?;
215 failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
216 if let Some(commit_graph) = &mut self.commit_graph {
217 commit_graph.create_branch(&branch).await?;
218 }
219 Ok(())
220 }
221
222 pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
223 let branch = normalize_branch_name(name)?
224 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
225 if self.current_branch() == Some(branch.as_str()) {
226 return Err(OmniError::manifest_conflict(format!(
227 "cannot delete currently active branch '{}'",
228 branch
229 )));
230 }
231
232 self.manifest.delete_branch(&branch).await?;
233
234 if let Some(commit_graph) = &mut self.commit_graph {
235 commit_graph.delete_branch(&branch).await?;
236 } else if self
237 .storage
238 .exists(&graph_commits_uri(self.root_uri()))
239 .await?
240 {
241 let mut commit_graph = CommitGraph::open(self.root_uri()).await?;
242 commit_graph.delete_branch(&branch).await?;
243 }
244
245 Ok(())
246 }
247
248 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
249 ManifestCoordinator::snapshot_at(self.root_uri(), self.current_branch(), version).await
250 }
251
252 pub async fn resolve_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
253 let normalized = normalize_branch_name(branch)?;
254 let other = match normalized.as_deref() {
255 Some(branch) => {
256 GraphCoordinator::open_branch(self.root_uri(), branch, Arc::clone(&self.storage))
257 .await?
258 }
259 None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
260 };
261
262 Ok(other
263 .head_commit_id()
264 .await?
265 .unwrap_or_else(|| SnapshotId::synthetic(other.current_branch(), other.version())))
266 }
267
268 pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
269 match target {
270 ReadTarget::Branch(branch) => {
271 let normalized = normalize_branch_name(branch)?;
272 let other = match normalized.as_deref() {
273 Some(branch) => {
274 GraphCoordinator::open_branch(
275 self.root_uri(),
276 branch,
277 Arc::clone(&self.storage),
278 )
279 .await?
280 }
281 None => {
282 GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?
283 }
284 };
285 let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
286 SnapshotId::synthetic(other.current_branch(), other.version())
287 });
288 Ok(ResolvedTarget {
289 requested: target.clone(),
290 branch: other.bound_branch.clone(),
291 snapshot_id,
292 snapshot: other.snapshot(),
293 })
294 }
295 ReadTarget::Snapshot(snapshot_id) => {
296 let commit = self.resolve_commit(snapshot_id).await?;
297 let snapshot = ManifestCoordinator::snapshot_at(
298 self.root_uri(),
299 commit.manifest_branch.as_deref(),
300 commit.manifest_version,
301 )
302 .await?;
303 Ok(ResolvedTarget {
304 requested: target.clone(),
305 branch: commit.manifest_branch.clone(),
306 snapshot_id: snapshot_id.clone(),
307 snapshot,
308 })
309 }
310 }
311 }
312
313 pub async fn resolve_commit(&self, snapshot_id: &SnapshotId) -> Result<GraphCommit> {
314 if let Some(commit_graph) = &self.commit_graph {
315 if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
316 return Ok(commit);
317 }
318 }
319
320 for branch in self.manifest.list_branches().await? {
321 let normalized = normalize_branch_name(&branch)?;
322 let Some(commit_graph) = self
323 .open_commit_graph_for_branch(normalized.as_deref())
324 .await?
325 else {
326 break;
327 };
328 if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
329 return Ok(commit);
330 }
331 }
332
333 Err(OmniError::manifest_not_found(format!(
334 "commit '{}' not found",
335 snapshot_id
336 )))
337 }
338
339 pub(crate) async fn head_commit_id(&self) -> Result<Option<SnapshotId>> {
340 match &self.commit_graph {
341 Some(commit_graph) => commit_graph
342 .head_commit_id()
343 .await
344 .map(|id| id.map(SnapshotId::new)),
345 None => Ok(None),
346 }
347 }
348
349 pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
350 if self.commit_graph.is_some() {
351 return Ok(());
352 }
353 if !self
354 .storage
355 .exists(&graph_commits_uri(self.root_uri()))
356 .await?
357 {
358 let _ = CommitGraph::init(self.root_uri(), self.manifest.version()).await?;
359 }
360 self.commit_graph = match self.current_branch() {
361 Some(branch) => Some(CommitGraph::open_at_branch(self.root_uri(), branch).await?),
362 None => Some(CommitGraph::open(self.root_uri()).await?),
363 };
364 Ok(())
365 }
366
367 pub(crate) async fn commit_updates_with_actor(
368 &mut self,
369 updates: &[SubTableUpdate],
370 actor_id: Option<&str>,
371 ) -> Result<PublishedSnapshot> {
372 let manifest_version = self.commit_manifest_updates(updates).await?;
373 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
374 Ok(PublishedSnapshot {
375 manifest_version,
376 _snapshot_id: snapshot_id,
377 })
378 }
379
380 pub(crate) async fn commit_updates_with_actor_with_expected(
386 &mut self,
387 updates: &[SubTableUpdate],
388 expected_table_versions: &HashMap<String, u64>,
389 actor_id: Option<&str>,
390 ) -> Result<PublishedSnapshot> {
391 let manifest_version = self
392 .commit_manifest_updates_with_expected(updates, expected_table_versions)
393 .await?;
394 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
395 Ok(PublishedSnapshot {
396 manifest_version,
397 _snapshot_id: snapshot_id,
398 })
399 }
400
401 pub(crate) async fn commit_manifest_updates(
402 &mut self,
403 updates: &[SubTableUpdate],
404 ) -> Result<u64> {
405 let manifest_version = self.manifest.commit(updates).await?;
406 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
407 Ok(manifest_version)
408 }
409
410 pub(crate) async fn commit_manifest_updates_with_expected(
411 &mut self,
412 updates: &[SubTableUpdate],
413 expected_table_versions: &HashMap<String, u64>,
414 ) -> Result<u64> {
415 let manifest_version = self
416 .manifest
417 .commit_with_expected(updates, expected_table_versions)
418 .await?;
419 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
420 Ok(manifest_version)
421 }
422
423 pub(crate) async fn commit_manifest_changes(
424 &mut self,
425 changes: &[ManifestChange],
426 ) -> Result<u64> {
427 let manifest_version = self.manifest.commit_changes(changes).await?;
428 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
429 Ok(manifest_version)
430 }
431
432 pub(crate) async fn commit_changes_with_actor(
433 &mut self,
434 changes: &[ManifestChange],
435 actor_id: Option<&str>,
436 ) -> Result<PublishedSnapshot> {
437 let manifest_version = self.commit_manifest_changes(changes).await?;
438 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
439 Ok(PublishedSnapshot {
440 manifest_version,
441 _snapshot_id: snapshot_id,
442 })
443 }
444
445 pub(crate) async fn record_graph_commit(
446 &mut self,
447 manifest_version: u64,
448 actor_id: Option<&str>,
449 ) -> Result<SnapshotId> {
450 self.ensure_commit_graph_initialized().await?;
451 let current_branch = self.current_branch().map(str::to_string);
452 let Some(commit_graph) = &mut self.commit_graph else {
453 return Ok(SnapshotId::synthetic(
454 current_branch.as_deref(),
455 manifest_version,
456 ));
457 };
458 failpoints::maybe_fail("graph_publish.before_commit_append")?;
459 let graph_commit_id = commit_graph
460 .append_commit(current_branch.as_deref(), manifest_version, actor_id)
461 .await?;
462 Ok(SnapshotId::new(graph_commit_id))
463 }
464
465 pub(crate) async fn record_merge_commit(
466 &mut self,
467 manifest_version: u64,
468 parent_commit_id: &str,
469 merged_parent_commit_id: &str,
470 actor_id: Option<&str>,
471 ) -> Result<SnapshotId> {
472 self.ensure_commit_graph_initialized().await?;
473 let current_branch = self.current_branch().map(str::to_string);
474 let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
475 OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
476 })?;
477 failpoints::maybe_fail("graph_publish.before_commit_append")?;
478 let graph_commit_id = commit_graph
479 .append_merge_commit(
480 current_branch.as_deref(),
481 manifest_version,
482 parent_commit_id,
483 merged_parent_commit_id,
484 actor_id,
485 )
486 .await?;
487 Ok(SnapshotId::new(graph_commit_id))
488 }
489
490 async fn open_commit_graph_for_branch(
491 &self,
492 branch: Option<&str>,
493 ) -> Result<Option<CommitGraph>> {
494 if !self
495 .storage
496 .exists(&graph_commits_uri(self.root_uri()))
497 .await?
498 {
499 return Ok(None);
500 }
501 let graph = match branch {
502 Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
503 None => CommitGraph::open(self.root_uri()).await?,
504 };
505 Ok(Some(graph))
506 }
507
508 pub(crate) async fn list_commits(&self) -> Result<Vec<GraphCommit>> {
509 if let Some(commit_graph) = &self.commit_graph {
510 return commit_graph.load_commits().await;
511 }
512 if !self
513 .storage
514 .exists(&graph_commits_uri(self.root_uri()))
515 .await?
516 {
517 return Ok(Vec::new());
518 }
519 let commit_graph = match self.current_branch() {
520 Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
521 None => CommitGraph::open(self.root_uri()).await?,
522 };
523 commit_graph.load_commits().await
524 }
525}
526
527fn graph_commits_uri(root_uri: &str) -> String {
528 join_uri(root_uri, GRAPH_COMMITS_DIR)
529}
530
531fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
532 let branch = branch.trim();
533 if branch.is_empty() {
534 return Err(OmniError::manifest(
535 "branch name cannot be empty".to_string(),
536 ));
537 }
538 if branch == "main" {
539 return Ok(None);
540 }
541 Ok(Some(branch.to_string()))
542}