1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/graph.rs"]
10mod graph;
11#[path = "manifest/layout.rs"]
12mod layout;
13#[path = "manifest/metadata.rs"]
14mod metadata;
15#[path = "manifest/migrations.rs"]
16mod migrations;
17#[cfg(test)]
21#[path = "manifest/namespace.rs"]
22mod namespace;
23#[path = "manifest/publisher.rs"]
24mod publisher;
25#[path = "manifest/recovery.rs"]
26mod recovery;
27#[path = "manifest/state.rs"]
28mod state;
29
30use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
31use layout::{open_manifest_dataset, table_uri_for_path, type_name_hash};
32pub(crate) use layout::manifest_uri;
33pub(crate) use metadata::TableVersionMetadata;
34#[cfg(test)]
35use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
36#[cfg(test)]
37use namespace::{branch_manifest_namespace, staged_table_namespace};
38use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
39pub(crate) use recovery::{
40 RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
41 SidecarTableRegistration, SidecarTombstone, confirm_sidecar_phase_b, delete_sidecar,
42 has_schema_apply_sidecar, heal_pending_sidecars_roll_forward, list_sidecars, new_sidecar,
43 recover_manifest_drift, schema_apply_serial_queue_key, write_sidecar,
44};
45pub use state::SubTableEntry;
46#[cfg(test)]
47use state::string_column;
48use state::{ManifestState, read_manifest_state};
49
50const OBJECT_TYPE_TABLE: &str = "table";
51const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
52const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
53const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
54
55pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
67 let mut dataset = open_manifest_dataset(root_uri, None).await?;
68 migrations::migrate_internal_schema(&mut dataset).await
69}
70
71#[derive(Debug, Clone)]
76pub struct Snapshot {
77 root_uri: String,
78 version: u64,
79 entries: HashMap<String, SubTableEntry>,
80 read_caches: Option<Arc<crate::runtime_cache::ReadCaches>>,
86}
87
88impl Snapshot {
89 pub async fn open(&self, table_key: &str) -> Result<Dataset> {
93 let entry = self
94 .entries
95 .get(table_key)
96 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
97 match &self.read_caches {
98 Some(caches) => {
99 let location = table_uri_for_path(
100 &self.root_uri,
101 &entry.table_path,
102 entry.table_branch.as_deref(),
103 );
104 caches
105 .handles
106 .get_or_open(
107 &entry.table_path,
108 entry.table_branch.as_deref(),
109 entry.table_version,
110 entry.version_metadata.e_tag(),
111 &location,
112 Some(&caches.session),
113 )
114 .await
115 }
116 None => entry.open(&self.root_uri).await,
117 }
118 }
119
120 pub(crate) fn set_read_caches(&mut self, caches: Arc<crate::runtime_cache::ReadCaches>) {
124 self.read_caches = Some(caches);
125 }
126
127 pub fn version(&self) -> u64 {
129 self.version
130 }
131
132 pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
134 self.entries.get(table_key)
135 }
136
137 pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
138 self.entries.values()
139 }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
143pub(crate) struct ManifestIncarnation {
144 pub(crate) version: u64,
145 pub(crate) e_tag: Option<String>,
146 timestamp_nanos: Option<u128>,
147}
148
149impl ManifestIncarnation {
150 pub(crate) fn matches(&self, held: &Self) -> bool {
151 if self.version != held.version {
152 return false;
153 }
154 match (&self.e_tag, &held.e_tag) {
155 (Some(latest), Some(current)) => latest == current,
156 _ => match (self.timestamp_nanos, held.timestamp_nanos) {
157 (Some(latest), Some(current)) => latest == current,
158 _ => true,
162 },
163 }
164 }
165}
166
167impl SubTableUpdate {
168 pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
169 self.version_metadata.to_create_table_version_request(
170 &self.table_key,
171 self.table_version,
172 self.row_count,
173 self.table_branch.as_deref(),
174 )
175 }
176}
177
178#[derive(Debug, Clone)]
179pub(crate) struct TableRegistration {
180 pub(crate) table_key: String,
181 pub(crate) table_path: String,
182}
183
184#[derive(Debug, Clone)]
185pub(crate) struct TableTombstone {
186 pub(crate) table_key: String,
187 pub(crate) tombstone_version: u64,
188}
189
190#[derive(Debug, Clone)]
191pub(crate) enum ManifestChange {
192 Update(SubTableUpdate),
193 RegisterTable(TableRegistration),
194 Tombstone(TableTombstone),
195}
196
197impl SubTableEntry {
198 pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
205 let location = table_uri_for_path(root_uri, &self.table_path, self.table_branch.as_deref());
213 crate::instrumentation::open_table_dataset(&location, self.table_version, None).await
220 }
221}
222
223pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
224 if let Some(type_name) = table_key.strip_prefix("node:") {
225 return Ok(format!("nodes/{}", type_name_hash(type_name)));
226 }
227 if let Some(type_name) = table_key.strip_prefix("edge:") {
228 return Ok(format!("edges/{}", type_name_hash(type_name)));
229 }
230 Err(OmniError::manifest(format!(
231 "invalid table key '{}'",
232 table_key
233 )))
234}
235
236#[derive(Debug, Clone)]
238pub struct SubTableUpdate {
239 pub table_key: String,
240 pub table_version: u64,
241 pub table_branch: Option<String>,
242 pub row_count: u64,
243 pub(crate) version_metadata: TableVersionMetadata,
244}
245
246pub struct ManifestCoordinator {
253 root_uri: String,
254 dataset: Dataset,
255 known_state: ManifestState,
256 active_branch: Option<String>,
257 publisher: Arc<dyn ManifestBatchPublisher>,
258}
259
260impl ManifestCoordinator {
261 fn default_batch_publisher(
262 root_uri: &str,
263 active_branch: Option<&str>,
264 ) -> Arc<dyn ManifestBatchPublisher> {
265 Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
266 }
267
268 fn from_parts(
269 root_uri: &str,
270 dataset: Dataset,
271 known_state: ManifestState,
272 active_branch: Option<String>,
273 publisher: Arc<dyn ManifestBatchPublisher>,
274 ) -> Self {
275 Self {
276 root_uri: root_uri.trim_end_matches('/').to_string(),
277 dataset,
278 known_state,
279 active_branch,
280 publisher,
281 }
282 }
283
284 fn from_parts_with_default_publisher(
285 root_uri: &str,
286 dataset: Dataset,
287 known_state: ManifestState,
288 active_branch: Option<String>,
289 ) -> Self {
290 let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
291 Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
292 }
293
294 fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
295 Snapshot {
296 root_uri: root_uri.trim_end_matches('/').to_string(),
297 version: state.version,
298 entries: state
299 .entries
300 .into_iter()
301 .map(|entry| (entry.table_key.clone(), entry))
302 .collect(),
303 read_caches: None,
304 }
305 }
306
307 #[cfg(test)]
308 fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
309 self.publisher = publisher;
310 self
311 }
312
313 pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
317 let root = root_uri.trim_end_matches('/');
318 let (dataset, known_state) = init_manifest_graph(root, catalog).await?;
319
320 Ok(Self::from_parts_with_default_publisher(
321 root,
322 dataset,
323 known_state,
324 None,
325 ))
326 }
327
328 pub async fn open(root_uri: &str) -> Result<Self> {
330 let root = root_uri.trim_end_matches('/');
331 let (dataset, known_state) = open_manifest_graph(root, None).await?;
332 Ok(Self::from_parts_with_default_publisher(
333 root,
334 dataset,
335 known_state,
336 None,
337 ))
338 }
339
340 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
342 if branch == "main" {
343 return Self::open(root_uri).await;
344 }
345
346 let root = root_uri.trim_end_matches('/');
347 let (dataset, known_state) = open_manifest_graph(root, Some(branch)).await?;
348 Ok(Self::from_parts_with_default_publisher(
349 root,
350 dataset,
351 known_state,
352 Some(branch.to_string()),
353 ))
354 }
355
356 pub async fn snapshot_at(
357 root_uri: &str,
358 branch: Option<&str>,
359 version: u64,
360 ) -> Result<Snapshot> {
361 let root = root_uri.trim_end_matches('/');
362 Ok(Self::snapshot_from_state(
363 root,
364 snapshot_state_at(root, branch, version).await?,
365 ))
366 }
367
368 pub fn snapshot(&self) -> Snapshot {
370 Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
371 }
372
373 pub async fn refresh(&mut self) -> Result<()> {
375 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
376 self.known_state = read_manifest_state(&self.dataset).await?;
377 Ok(())
378 }
379
380 pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
385 let changes = updates
386 .iter()
387 .cloned()
388 .map(ManifestChange::Update)
389 .collect::<Vec<_>>();
390 self.commit_changes(&changes).await
391 }
392
393 pub async fn commit_with_expected(
399 &mut self,
400 updates: &[SubTableUpdate],
401 expected_table_versions: &HashMap<String, u64>,
402 ) -> Result<u64> {
403 let changes = updates
404 .iter()
405 .cloned()
406 .map(ManifestChange::Update)
407 .collect::<Vec<_>>();
408 self.commit_changes_with_expected(&changes, expected_table_versions)
409 .await
410 }
411
412 pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
413 self.commit_changes_with_expected(changes, &HashMap::new())
414 .await
415 }
416
417 pub(crate) async fn commit_changes_with_expected(
418 &mut self,
419 changes: &[ManifestChange],
420 expected_table_versions: &HashMap<String, u64>,
421 ) -> Result<u64> {
422 if changes.is_empty() && expected_table_versions.is_empty() {
423 return Ok(self.version());
424 }
425
426 self.dataset = self
427 .publisher
428 .publish(changes, expected_table_versions)
429 .await?;
430
431 self.known_state = read_manifest_state(&self.dataset).await?;
432 Ok(self.version())
433 }
434
435 pub fn version(&self) -> u64 {
437 self.dataset.version().version
438 }
439
440 pub async fn probe_latest_version(&self) -> Result<u64> {
444 self.dataset
445 .latest_version_id()
446 .await
447 .map_err(|e| OmniError::Lance(e.to_string()))
448 }
449
450 pub(crate) fn incarnation(&self) -> ManifestIncarnation {
451 ManifestIncarnation {
452 version: self.version(),
453 e_tag: self.dataset.manifest_location().e_tag.clone(),
454 timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
455 }
456 }
457
458 pub(crate) async fn probe_latest_incarnation(&self) -> Result<ManifestIncarnation> {
463 if self.active_branch.is_none() {
464 return Ok(ManifestIncarnation {
465 version: self.probe_latest_version().await?,
466 e_tag: self.dataset.manifest_location().e_tag.clone(),
467 timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
468 });
469 }
470 let (manifest, location) = self
471 .dataset
472 .latest_manifest()
473 .await
474 .map_err(|e| OmniError::Lance(e.to_string()))?;
475 Ok(ManifestIncarnation {
476 version: manifest.version,
477 e_tag: location.e_tag,
478 timestamp_nanos: Some(manifest.timestamp_nanos),
479 })
480 }
481
482 pub fn active_branch(&self) -> Option<&str> {
483 self.active_branch.as_deref()
484 }
485
486 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
487 let mut ds = self.dataset.clone();
488 ds.create_branch(name, self.version(), None)
489 .await
490 .map_err(|e| OmniError::Lance(e.to_string()))?;
491 Ok(())
492 }
493
494 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
495 let uri = manifest_uri(&self.root_uri);
496 let mut ds = Dataset::open(&uri)
497 .await
498 .map_err(|e| OmniError::Lance(e.to_string()))?;
499 ds.delete_branch(name)
500 .await
501 .map_err(|e| OmniError::Lance(e.to_string()))?;
502 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
503 self.known_state = read_manifest_state(&self.dataset).await?;
504 Ok(())
505 }
506
507 pub async fn list_branches(&self) -> Result<Vec<String>> {
508 let branches = self
509 .dataset
510 .list_branches()
511 .await
512 .map_err(|e| OmniError::Lance(e.to_string()))?;
513 let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
514 names.sort();
515 let mut all = vec!["main".to_string()];
516 all.extend(names);
517 Ok(all)
518 }
519
520 pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
521 let branches = self
522 .dataset
523 .list_branches()
524 .await
525 .map_err(|e| OmniError::Lance(e.to_string()))?;
526 let mut frontier = vec![name.to_string()];
527 let mut descendants = Vec::new();
528 let mut seen = HashSet::new();
529
530 while let Some(parent) = frontier.pop() {
531 let mut children = branches
532 .iter()
533 .filter_map(|(branch, contents)| {
534 (contents.parent_branch.as_deref() == Some(parent.as_str()))
535 .then_some(branch.clone())
536 })
537 .collect::<Vec<_>>();
538 children.sort();
539 for child in children {
540 if seen.insert(child.clone()) {
541 frontier.push(child.clone());
542 descendants.push(child);
543 }
544 }
545 }
546
547 Ok(descendants)
548 }
549
550 pub fn root_uri(&self) -> &str {
552 &self.root_uri
553 }
554}
555
556#[cfg(test)]
557#[path = "manifest/tests.rs"]
558mod tests;