1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/graph.rs"]
10mod graph;
11#[path = "manifest/layout.rs"]
12mod layout;
13#[path = "manifest/metadata.rs"]
14mod metadata;
15#[path = "manifest/migrations.rs"]
16mod migrations;
17#[path = "manifest/namespace.rs"]
18mod namespace;
19#[path = "manifest/publisher.rs"]
20mod publisher;
21#[path = "manifest/recovery.rs"]
22mod recovery;
23#[path = "manifest/state.rs"]
24mod state;
25
26use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
27use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_hash};
28pub(crate) use metadata::TableVersionMetadata;
29#[cfg(test)]
30use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
31pub(crate) use namespace::open_table_head_for_write;
32#[cfg(test)]
33use namespace::{branch_manifest_namespace, staged_table_namespace};
34use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
35pub(crate) use recovery::{
36 RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
37 SidecarTableRegistration, SidecarTombstone, confirm_sidecar_phase_b, delete_sidecar,
38 has_schema_apply_sidecar, heal_pending_sidecars_roll_forward, list_sidecars, new_sidecar,
39 recover_manifest_drift, schema_apply_serial_queue_key, write_sidecar,
40};
41pub use state::SubTableEntry;
42#[cfg(test)]
43use state::string_column;
44use state::{ManifestState, read_manifest_state};
45
46const OBJECT_TYPE_TABLE: &str = "table";
47const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
48const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
49const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
50
51pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
63 let mut dataset = open_manifest_dataset(root_uri, None).await?;
64 migrations::migrate_internal_schema(&mut dataset).await
65}
66
67#[derive(Debug, Clone)]
72pub struct Snapshot {
73 root_uri: String,
74 version: u64,
75 entries: HashMap<String, SubTableEntry>,
76 read_caches: Option<Arc<crate::runtime_cache::ReadCaches>>,
82}
83
84impl Snapshot {
85 pub async fn open(&self, table_key: &str) -> Result<Dataset> {
89 let entry = self
90 .entries
91 .get(table_key)
92 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
93 match &self.read_caches {
94 Some(caches) => {
95 let location = table_uri_for_path(
96 &self.root_uri,
97 &entry.table_path,
98 entry.table_branch.as_deref(),
99 );
100 caches
101 .handles
102 .get_or_open(
103 &entry.table_path,
104 entry.table_branch.as_deref(),
105 entry.table_version,
106 entry.version_metadata.e_tag(),
107 &location,
108 Some(&caches.session),
109 )
110 .await
111 }
112 None => entry.open(&self.root_uri).await,
113 }
114 }
115
116 pub(crate) fn set_read_caches(&mut self, caches: Arc<crate::runtime_cache::ReadCaches>) {
120 self.read_caches = Some(caches);
121 }
122
123 pub fn version(&self) -> u64 {
125 self.version
126 }
127
128 pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
130 self.entries.get(table_key)
131 }
132
133 pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
134 self.entries.values()
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub(crate) struct ManifestIncarnation {
140 pub(crate) version: u64,
141 pub(crate) e_tag: Option<String>,
142 timestamp_nanos: Option<u128>,
143}
144
145impl ManifestIncarnation {
146 pub(crate) fn matches(&self, held: &Self) -> bool {
147 if self.version != held.version {
148 return false;
149 }
150 match (&self.e_tag, &held.e_tag) {
151 (Some(latest), Some(current)) => latest == current,
152 _ => match (self.timestamp_nanos, held.timestamp_nanos) {
153 (Some(latest), Some(current)) => latest == current,
154 _ => true,
158 },
159 }
160 }
161}
162
163impl SubTableUpdate {
164 pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
165 self.version_metadata.to_create_table_version_request(
166 &self.table_key,
167 self.table_version,
168 self.row_count,
169 self.table_branch.as_deref(),
170 )
171 }
172}
173
174#[derive(Debug, Clone)]
175pub(crate) struct TableRegistration {
176 pub(crate) table_key: String,
177 pub(crate) table_path: String,
178}
179
180#[derive(Debug, Clone)]
181pub(crate) struct TableTombstone {
182 pub(crate) table_key: String,
183 pub(crate) tombstone_version: u64,
184}
185
186#[derive(Debug, Clone)]
187pub(crate) enum ManifestChange {
188 Update(SubTableUpdate),
189 RegisterTable(TableRegistration),
190 Tombstone(TableTombstone),
191}
192
193impl SubTableEntry {
194 pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
201 let location = table_uri_for_path(root_uri, &self.table_path, self.table_branch.as_deref());
209 crate::instrumentation::open_table_dataset(&location, self.table_version, None).await
216 }
217}
218
219pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
220 if let Some(type_name) = table_key.strip_prefix("node:") {
221 return Ok(format!("nodes/{}", type_name_hash(type_name)));
222 }
223 if let Some(type_name) = table_key.strip_prefix("edge:") {
224 return Ok(format!("edges/{}", type_name_hash(type_name)));
225 }
226 Err(OmniError::manifest(format!(
227 "invalid table key '{}'",
228 table_key
229 )))
230}
231
232#[derive(Debug, Clone)]
234pub struct SubTableUpdate {
235 pub table_key: String,
236 pub table_version: u64,
237 pub table_branch: Option<String>,
238 pub row_count: u64,
239 pub(crate) version_metadata: TableVersionMetadata,
240}
241
242pub struct ManifestCoordinator {
249 root_uri: String,
250 dataset: Dataset,
251 known_state: ManifestState,
252 active_branch: Option<String>,
253 publisher: Arc<dyn ManifestBatchPublisher>,
254}
255
256impl ManifestCoordinator {
257 fn default_batch_publisher(
258 root_uri: &str,
259 active_branch: Option<&str>,
260 ) -> Arc<dyn ManifestBatchPublisher> {
261 Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
262 }
263
264 fn from_parts(
265 root_uri: &str,
266 dataset: Dataset,
267 known_state: ManifestState,
268 active_branch: Option<String>,
269 publisher: Arc<dyn ManifestBatchPublisher>,
270 ) -> Self {
271 Self {
272 root_uri: root_uri.trim_end_matches('/').to_string(),
273 dataset,
274 known_state,
275 active_branch,
276 publisher,
277 }
278 }
279
280 fn from_parts_with_default_publisher(
281 root_uri: &str,
282 dataset: Dataset,
283 known_state: ManifestState,
284 active_branch: Option<String>,
285 ) -> Self {
286 let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
287 Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
288 }
289
290 fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
291 Snapshot {
292 root_uri: root_uri.trim_end_matches('/').to_string(),
293 version: state.version,
294 entries: state
295 .entries
296 .into_iter()
297 .map(|entry| (entry.table_key.clone(), entry))
298 .collect(),
299 read_caches: None,
300 }
301 }
302
303 #[cfg(test)]
304 fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
305 self.publisher = publisher;
306 self
307 }
308
309 pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
313 let root = root_uri.trim_end_matches('/');
314 let (dataset, known_state) = init_manifest_graph(root, catalog).await?;
315
316 Ok(Self::from_parts_with_default_publisher(
317 root,
318 dataset,
319 known_state,
320 None,
321 ))
322 }
323
324 pub async fn open(root_uri: &str) -> Result<Self> {
326 let root = root_uri.trim_end_matches('/');
327 let (dataset, known_state) = open_manifest_graph(root, None).await?;
328 Ok(Self::from_parts_with_default_publisher(
329 root,
330 dataset,
331 known_state,
332 None,
333 ))
334 }
335
336 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
338 if branch == "main" {
339 return Self::open(root_uri).await;
340 }
341
342 let root = root_uri.trim_end_matches('/');
343 let (dataset, known_state) = open_manifest_graph(root, Some(branch)).await?;
344 Ok(Self::from_parts_with_default_publisher(
345 root,
346 dataset,
347 known_state,
348 Some(branch.to_string()),
349 ))
350 }
351
352 pub async fn snapshot_at(
353 root_uri: &str,
354 branch: Option<&str>,
355 version: u64,
356 ) -> Result<Snapshot> {
357 let root = root_uri.trim_end_matches('/');
358 Ok(Self::snapshot_from_state(
359 root,
360 snapshot_state_at(root, branch, version).await?,
361 ))
362 }
363
364 pub fn snapshot(&self) -> Snapshot {
366 Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
367 }
368
369 pub async fn refresh(&mut self) -> Result<()> {
371 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
372 self.known_state = read_manifest_state(&self.dataset).await?;
373 Ok(())
374 }
375
376 pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
381 let changes = updates
382 .iter()
383 .cloned()
384 .map(ManifestChange::Update)
385 .collect::<Vec<_>>();
386 self.commit_changes(&changes).await
387 }
388
389 pub async fn commit_with_expected(
395 &mut self,
396 updates: &[SubTableUpdate],
397 expected_table_versions: &HashMap<String, u64>,
398 ) -> Result<u64> {
399 let changes = updates
400 .iter()
401 .cloned()
402 .map(ManifestChange::Update)
403 .collect::<Vec<_>>();
404 self.commit_changes_with_expected(&changes, expected_table_versions)
405 .await
406 }
407
408 pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
409 self.commit_changes_with_expected(changes, &HashMap::new())
410 .await
411 }
412
413 pub(crate) async fn commit_changes_with_expected(
414 &mut self,
415 changes: &[ManifestChange],
416 expected_table_versions: &HashMap<String, u64>,
417 ) -> Result<u64> {
418 if changes.is_empty() && expected_table_versions.is_empty() {
419 return Ok(self.version());
420 }
421
422 self.dataset = self
423 .publisher
424 .publish(changes, expected_table_versions)
425 .await?;
426
427 self.known_state = read_manifest_state(&self.dataset).await?;
428 Ok(self.version())
429 }
430
431 pub fn version(&self) -> u64 {
433 self.dataset.version().version
434 }
435
436 pub async fn probe_latest_version(&self) -> Result<u64> {
440 self.dataset
441 .latest_version_id()
442 .await
443 .map_err(|e| OmniError::Lance(e.to_string()))
444 }
445
446 pub(crate) fn incarnation(&self) -> ManifestIncarnation {
447 ManifestIncarnation {
448 version: self.version(),
449 e_tag: self.dataset.manifest_location().e_tag.clone(),
450 timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
451 }
452 }
453
454 pub(crate) async fn probe_latest_incarnation(&self) -> Result<ManifestIncarnation> {
459 if self.active_branch.is_none() {
460 return Ok(ManifestIncarnation {
461 version: self.probe_latest_version().await?,
462 e_tag: self.dataset.manifest_location().e_tag.clone(),
463 timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
464 });
465 }
466 let (manifest, location) = self
467 .dataset
468 .latest_manifest()
469 .await
470 .map_err(|e| OmniError::Lance(e.to_string()))?;
471 Ok(ManifestIncarnation {
472 version: manifest.version,
473 e_tag: location.e_tag,
474 timestamp_nanos: Some(manifest.timestamp_nanos),
475 })
476 }
477
478 pub fn active_branch(&self) -> Option<&str> {
479 self.active_branch.as_deref()
480 }
481
482 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
483 let mut ds = self.dataset.clone();
484 ds.create_branch(name, self.version(), None)
485 .await
486 .map_err(|e| OmniError::Lance(e.to_string()))?;
487 Ok(())
488 }
489
490 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
491 let uri = manifest_uri(&self.root_uri);
492 let mut ds = Dataset::open(&uri)
493 .await
494 .map_err(|e| OmniError::Lance(e.to_string()))?;
495 ds.delete_branch(name)
496 .await
497 .map_err(|e| OmniError::Lance(e.to_string()))?;
498 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
499 self.known_state = read_manifest_state(&self.dataset).await?;
500 Ok(())
501 }
502
503 pub async fn list_branches(&self) -> Result<Vec<String>> {
504 let branches = self
505 .dataset
506 .list_branches()
507 .await
508 .map_err(|e| OmniError::Lance(e.to_string()))?;
509 let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
510 names.sort();
511 let mut all = vec!["main".to_string()];
512 all.extend(names);
513 Ok(all)
514 }
515
516 pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
517 let branches = self
518 .dataset
519 .list_branches()
520 .await
521 .map_err(|e| OmniError::Lance(e.to_string()))?;
522 let mut frontier = vec![name.to_string()];
523 let mut descendants = Vec::new();
524 let mut seen = HashSet::new();
525
526 while let Some(parent) = frontier.pop() {
527 let mut children = branches
528 .iter()
529 .filter_map(|(branch, contents)| {
530 (contents.parent_branch.as_deref() == Some(parent.as_str()))
531 .then_some(branch.clone())
532 })
533 .collect::<Vec<_>>();
534 children.sort();
535 for child in children {
536 if seen.insert(child.clone()) {
537 frontier.push(child.clone());
538 descendants.push(child);
539 }
540 }
541 }
542
543 Ok(descendants)
544 }
545
546 pub fn root_uri(&self) -> &str {
548 &self.root_uri
549 }
550}
551
552#[cfg(test)]
553#[path = "manifest/tests.rs"]
554mod tests;