1use std::fmt;
2use std::sync::Arc;
3
4use omnigraph_compiler::catalog::Catalog;
5
6use crate::error::{OmniError, Result};
7use crate::failpoints;
8use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
9
10use super::commit_graph::{CommitGraph, GraphCommit};
11use super::is_internal_system_branch;
12use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
13use super::run_registry::{RunId, RunRecord, RunRegistry, graph_runs_uri};
14
15const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18pub struct SnapshotId(String);
19
20impl SnapshotId {
21 pub fn new(id: impl Into<String>) -> Self {
22 Self(id.into())
23 }
24
25 pub fn as_str(&self) -> &str {
26 &self.0
27 }
28
29 pub(crate) fn synthetic(branch: Option<&str>, version: u64) -> Self {
30 match branch {
31 Some(branch) => Self(format!("manifest:{}:v{}", branch, version)),
32 None => Self(format!("manifest:main:v{}", version)),
33 }
34 }
35}
36
37impl fmt::Display for SnapshotId {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 self.0.fmt(f)
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum ReadTarget {
45 Branch(String),
46 Snapshot(SnapshotId),
47}
48
49impl ReadTarget {
50 pub fn branch(name: impl Into<String>) -> Self {
51 Self::Branch(name.into())
52 }
53
54 pub fn snapshot(id: impl Into<SnapshotId>) -> Self {
55 Self::Snapshot(id.into())
56 }
57}
58
59impl From<&str> for ReadTarget {
60 fn from(value: &str) -> Self {
61 Self::branch(value)
62 }
63}
64
65impl From<String> for ReadTarget {
66 fn from(value: String) -> Self {
67 Self::Branch(value)
68 }
69}
70
71impl From<SnapshotId> for ReadTarget {
72 fn from(value: SnapshotId) -> Self {
73 Self::Snapshot(value)
74 }
75}
76
77#[derive(Debug, Clone)]
78pub struct ResolvedTarget {
79 pub requested: ReadTarget,
80 pub branch: Option<String>,
81 pub snapshot_id: SnapshotId,
82 pub snapshot: Snapshot,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct PublishedSnapshot {
87 pub manifest_version: u64,
88 pub _snapshot_id: SnapshotId,
89}
90
91pub struct GraphCoordinator {
92 root_uri: String,
93 storage: Arc<dyn StorageAdapter>,
94 manifest: ManifestCoordinator,
95 commit_graph: Option<CommitGraph>,
96 run_registry: Option<RunRegistry>,
97 bound_branch: Option<String>,
98}
99
100impl GraphCoordinator {
101 pub async fn init(
102 root_uri: &str,
103 catalog: &Catalog,
104 storage: Arc<dyn StorageAdapter>,
105 ) -> Result<Self> {
106 let root = normalize_root_uri(root_uri)?;
107 let manifest = ManifestCoordinator::init(&root, catalog).await?;
108 let commit_graph = Some(CommitGraph::init(&root, manifest.version()).await?);
109 Ok(Self {
110 root_uri: root,
111 storage,
112 manifest,
113 commit_graph,
114 run_registry: None,
115 bound_branch: None,
116 })
117 }
118
119 pub async fn open(root_uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
120 let root = normalize_root_uri(root_uri)?;
121 let manifest = ManifestCoordinator::open(&root).await?;
122 let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
123 Some(CommitGraph::open(&root).await?)
124 } else {
125 None
126 };
127 let run_registry = if storage.exists(&graph_runs_uri(&root)).await? {
128 Some(RunRegistry::open(&root).await?)
129 } else {
130 None
131 };
132 Ok(Self {
133 root_uri: root,
134 storage,
135 manifest,
136 commit_graph,
137 run_registry,
138 bound_branch: None,
139 })
140 }
141
142 pub async fn open_branch(
143 root_uri: &str,
144 branch: &str,
145 storage: Arc<dyn StorageAdapter>,
146 ) -> Result<Self> {
147 let branch = normalize_branch_name(branch)?;
148 let Some(branch_name) = branch else {
149 return Self::open(root_uri, storage).await;
150 };
151
152 let root = normalize_root_uri(root_uri)?;
153 let manifest = ManifestCoordinator::open_at_branch(&root, &branch_name).await?;
154 let commit_graph = if storage.exists(&graph_commits_uri(&root)).await? {
155 Some(CommitGraph::open_at_branch(&root, &branch_name).await?)
156 } else {
157 None
158 };
159 let run_registry = if storage.exists(&graph_runs_uri(&root)).await? {
160 Some(RunRegistry::open(&root).await?)
161 } else {
162 None
163 };
164
165 Ok(Self {
166 root_uri: root,
167 storage,
168 manifest,
169 commit_graph,
170 run_registry,
171 bound_branch: Some(branch_name),
172 })
173 }
174
175 pub fn root_uri(&self) -> &str {
176 &self.root_uri
177 }
178
179 pub fn version(&self) -> u64 {
180 self.manifest.version()
181 }
182
183 pub fn snapshot(&self) -> Snapshot {
184 self.manifest.snapshot()
185 }
186
187 pub fn current_branch(&self) -> Option<&str> {
188 self.bound_branch.as_deref()
189 }
190
191 pub async fn refresh(&mut self) -> Result<()> {
192 self.manifest.refresh().await?;
193 if let Some(commit_graph) = &mut self.commit_graph {
194 commit_graph.refresh().await?;
195 }
196 if let Some(run_registry) = &mut self.run_registry {
197 let root_uri = self.root_uri.clone();
198 run_registry.refresh(&root_uri).await?;
199 }
200 Ok(())
201 }
202
203 pub async fn branch_list(&self) -> Result<Vec<String>> {
204 self.manifest.list_branches().await.map(|branches| {
205 branches
206 .into_iter()
207 .filter(|branch| !is_internal_system_branch(branch))
208 .collect()
209 })
210 }
211
212 pub(crate) async fn all_branches(&self) -> Result<Vec<String>> {
213 self.manifest.list_branches().await
214 }
215
216 pub async fn branch_descendants(&self, name: &str) -> Result<Vec<String>> {
217 self.manifest
218 .descendant_branches(name)
219 .await
220 .map(|branches| {
221 branches
222 .into_iter()
223 .filter(|branch| !is_internal_system_branch(branch))
224 .collect()
225 })
226 }
227
228 pub async fn branch_create(&mut self, name: &str) -> Result<()> {
229 let branch = normalize_branch_name(name)?
230 .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?;
231 self.ensure_commit_graph_initialized().await?;
232 self.manifest.create_branch(&branch).await?;
233 failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
234 if let Some(commit_graph) = &mut self.commit_graph {
235 commit_graph.create_branch(&branch).await?;
236 }
237 Ok(())
238 }
239
240 pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
241 let branch = normalize_branch_name(name)?
242 .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
243 if self.current_branch() == Some(branch.as_str()) {
244 return Err(OmniError::manifest_conflict(format!(
245 "cannot delete currently active branch '{}'",
246 branch
247 )));
248 }
249
250 self.manifest.delete_branch(&branch).await?;
251
252 if let Some(commit_graph) = &mut self.commit_graph {
253 commit_graph.delete_branch(&branch).await?;
254 } else if self
255 .storage
256 .exists(&graph_commits_uri(self.root_uri()))
257 .await?
258 {
259 let mut commit_graph = CommitGraph::open(self.root_uri()).await?;
260 commit_graph.delete_branch(&branch).await?;
261 }
262
263 Ok(())
264 }
265
266 pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
267 ManifestCoordinator::snapshot_at(self.root_uri(), self.current_branch(), version).await
268 }
269
270 pub async fn resolve_snapshot_id(&self, branch: &str) -> Result<SnapshotId> {
271 let normalized = normalize_branch_name(branch)?;
272 let other = match normalized.as_deref() {
273 Some(branch) => {
274 GraphCoordinator::open_branch(self.root_uri(), branch, Arc::clone(&self.storage))
275 .await?
276 }
277 None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
278 };
279
280 Ok(other
281 .head_commit_id()
282 .await?
283 .unwrap_or_else(|| SnapshotId::synthetic(other.current_branch(), other.version())))
284 }
285
286 pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
287 match target {
288 ReadTarget::Branch(branch) => {
289 let normalized = normalize_branch_name(branch)?;
290 let other = match normalized.as_deref() {
291 Some(branch) => {
292 GraphCoordinator::open_branch(
293 self.root_uri(),
294 branch,
295 Arc::clone(&self.storage),
296 )
297 .await?
298 }
299 None => {
300 GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?
301 }
302 };
303 let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
304 SnapshotId::synthetic(other.current_branch(), other.version())
305 });
306 Ok(ResolvedTarget {
307 requested: target.clone(),
308 branch: other.bound_branch.clone(),
309 snapshot_id,
310 snapshot: other.snapshot(),
311 })
312 }
313 ReadTarget::Snapshot(snapshot_id) => {
314 let commit = self.resolve_commit(snapshot_id).await?;
315 let snapshot = ManifestCoordinator::snapshot_at(
316 self.root_uri(),
317 commit.manifest_branch.as_deref(),
318 commit.manifest_version,
319 )
320 .await?;
321 Ok(ResolvedTarget {
322 requested: target.clone(),
323 branch: commit.manifest_branch.clone(),
324 snapshot_id: snapshot_id.clone(),
325 snapshot,
326 })
327 }
328 }
329 }
330
331 pub async fn resolve_commit(&self, snapshot_id: &SnapshotId) -> Result<GraphCommit> {
332 if let Some(commit_graph) = &self.commit_graph {
333 if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
334 return Ok(commit);
335 }
336 }
337
338 for branch in self.manifest.list_branches().await? {
339 let normalized = normalize_branch_name(&branch)?;
340 let Some(commit_graph) = self
341 .open_commit_graph_for_branch(normalized.as_deref())
342 .await?
343 else {
344 break;
345 };
346 if let Some(commit) = commit_graph.get_commit(snapshot_id.as_str()) {
347 return Ok(commit);
348 }
349 }
350
351 Err(OmniError::manifest_not_found(format!(
352 "commit '{}' not found",
353 snapshot_id
354 )))
355 }
356
357 pub(crate) async fn head_commit_id(&self) -> Result<Option<SnapshotId>> {
358 match &self.commit_graph {
359 Some(commit_graph) => commit_graph
360 .head_commit_id()
361 .await
362 .map(|id| id.map(SnapshotId::new)),
363 None => Ok(None),
364 }
365 }
366
367 pub(crate) async fn ensure_commit_graph_initialized(&mut self) -> Result<()> {
368 if self.commit_graph.is_some() {
369 return Ok(());
370 }
371 if !self
372 .storage
373 .exists(&graph_commits_uri(self.root_uri()))
374 .await?
375 {
376 let _ = CommitGraph::init(self.root_uri(), self.manifest.version()).await?;
377 }
378 self.commit_graph = match self.current_branch() {
379 Some(branch) => Some(CommitGraph::open_at_branch(self.root_uri(), branch).await?),
380 None => Some(CommitGraph::open(self.root_uri()).await?),
381 };
382 Ok(())
383 }
384
385 pub(crate) async fn ensure_run_registry_initialized(&mut self) -> Result<()> {
386 if self.run_registry.is_some() {
387 return Ok(());
388 }
389 if !self
390 .storage
391 .exists(&graph_runs_uri(self.root_uri()))
392 .await?
393 {
394 let _ = RunRegistry::init(self.root_uri()).await?;
395 }
396 self.run_registry = Some(RunRegistry::open(self.root_uri()).await?);
397 Ok(())
398 }
399
400 pub(crate) async fn commit_updates_with_actor(
401 &mut self,
402 updates: &[SubTableUpdate],
403 actor_id: Option<&str>,
404 ) -> Result<PublishedSnapshot> {
405 let manifest_version = self.commit_manifest_updates(updates).await?;
406 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
407 Ok(PublishedSnapshot {
408 manifest_version,
409 _snapshot_id: snapshot_id,
410 })
411 }
412
413 pub(crate) async fn commit_manifest_updates(
414 &mut self,
415 updates: &[SubTableUpdate],
416 ) -> Result<u64> {
417 let manifest_version = self.manifest.commit(updates).await?;
418 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
419 Ok(manifest_version)
420 }
421
422 pub(crate) async fn commit_manifest_changes(
423 &mut self,
424 changes: &[ManifestChange],
425 ) -> Result<u64> {
426 let manifest_version = self.manifest.commit_changes(changes).await?;
427 failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
428 Ok(manifest_version)
429 }
430
431 pub(crate) async fn commit_changes_with_actor(
432 &mut self,
433 changes: &[ManifestChange],
434 actor_id: Option<&str>,
435 ) -> Result<PublishedSnapshot> {
436 let manifest_version = self.commit_manifest_changes(changes).await?;
437 let snapshot_id = self.record_graph_commit(manifest_version, actor_id).await?;
438 Ok(PublishedSnapshot {
439 manifest_version,
440 _snapshot_id: snapshot_id,
441 })
442 }
443
444 pub(crate) async fn record_graph_commit(
445 &mut self,
446 manifest_version: u64,
447 actor_id: Option<&str>,
448 ) -> Result<SnapshotId> {
449 self.ensure_commit_graph_initialized().await?;
450 let current_branch = self.current_branch().map(str::to_string);
451 let Some(commit_graph) = &mut self.commit_graph else {
452 return Ok(SnapshotId::synthetic(
453 current_branch.as_deref(),
454 manifest_version,
455 ));
456 };
457 failpoints::maybe_fail("graph_publish.before_commit_append")?;
458 let graph_commit_id = commit_graph
459 .append_commit(current_branch.as_deref(), manifest_version, actor_id)
460 .await?;
461 Ok(SnapshotId::new(graph_commit_id))
462 }
463
464 pub(crate) async fn record_merge_commit(
465 &mut self,
466 manifest_version: u64,
467 parent_commit_id: &str,
468 merged_parent_commit_id: &str,
469 actor_id: Option<&str>,
470 ) -> Result<SnapshotId> {
471 self.ensure_commit_graph_initialized().await?;
472 let current_branch = self.current_branch().map(str::to_string);
473 let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
474 OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
475 })?;
476 failpoints::maybe_fail("graph_publish.before_commit_append")?;
477 let graph_commit_id = commit_graph
478 .append_merge_commit(
479 current_branch.as_deref(),
480 manifest_version,
481 parent_commit_id,
482 merged_parent_commit_id,
483 actor_id,
484 )
485 .await?;
486 Ok(SnapshotId::new(graph_commit_id))
487 }
488
489 async fn open_commit_graph_for_branch(
490 &self,
491 branch: Option<&str>,
492 ) -> Result<Option<CommitGraph>> {
493 if !self
494 .storage
495 .exists(&graph_commits_uri(self.root_uri()))
496 .await?
497 {
498 return Ok(None);
499 }
500 let graph = match branch {
501 Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
502 None => CommitGraph::open(self.root_uri()).await?,
503 };
504 Ok(Some(graph))
505 }
506
507 pub(crate) async fn append_run_record(&mut self, record: &RunRecord) -> Result<()> {
508 self.ensure_run_registry_initialized().await?;
509 let Some(run_registry) = &mut self.run_registry else {
510 return Err(OmniError::manifest(
511 "run registry not initialized".to_string(),
512 ));
513 };
514 run_registry.append_record(record).await
515 }
516
517 pub(crate) async fn get_run(&self, run_id: &RunId) -> Result<RunRecord> {
518 if let Some(run_registry) = &self.run_registry {
519 if let Some(run) = run_registry.get_run(run_id).await? {
520 return Ok(run);
521 }
522 }
523 if !self
524 .storage
525 .exists(&graph_runs_uri(self.root_uri()))
526 .await?
527 {
528 return Err(OmniError::manifest_not_found(format!(
529 "run '{}' not found",
530 run_id
531 )));
532 }
533 let run_registry = RunRegistry::open(self.root_uri()).await?;
534 run_registry
535 .get_run(run_id)
536 .await?
537 .ok_or_else(|| OmniError::manifest_not_found(format!("run '{}' not found", run_id)))
538 }
539
540 pub(crate) async fn list_runs(&self) -> Result<Vec<RunRecord>> {
541 if let Some(run_registry) = &self.run_registry {
542 return run_registry.list_runs().await;
543 }
544 if !self
545 .storage
546 .exists(&graph_runs_uri(self.root_uri()))
547 .await?
548 {
549 return Ok(Vec::new());
550 }
551 let run_registry = RunRegistry::open(self.root_uri()).await?;
552 run_registry.list_runs().await
553 }
554
555 pub(crate) async fn list_commits(&self) -> Result<Vec<GraphCommit>> {
556 if let Some(commit_graph) = &self.commit_graph {
557 return commit_graph.load_commits().await;
558 }
559 if !self
560 .storage
561 .exists(&graph_commits_uri(self.root_uri()))
562 .await?
563 {
564 return Ok(Vec::new());
565 }
566 let commit_graph = match self.current_branch() {
567 Some(branch) => CommitGraph::open_at_branch(self.root_uri(), branch).await?,
568 None => CommitGraph::open(self.root_uri()).await?,
569 };
570 commit_graph.load_commits().await
571 }
572}
573
574fn graph_commits_uri(root_uri: &str) -> String {
575 join_uri(root_uri, GRAPH_COMMITS_DIR)
576}
577
578fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
579 let branch = branch.trim();
580 if branch.is_empty() {
581 return Err(OmniError::manifest(
582 "branch name cannot be empty".to_string(),
583 ));
584 }
585 if branch == "main" {
586 return Ok(None);
587 }
588 Ok(Some(branch.to_string()))
589}