1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/graph.rs"]
10mod graph;
11#[path = "manifest/layout.rs"]
12mod layout;
13#[path = "manifest/metadata.rs"]
14mod metadata;
15#[path = "manifest/migrations.rs"]
16mod migrations;
17#[path = "manifest/namespace.rs"]
18mod namespace;
19#[path = "manifest/publisher.rs"]
20mod publisher;
21#[path = "manifest/recovery.rs"]
22mod recovery;
23#[path = "manifest/state.rs"]
24mod state;
25
26use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
27use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
28pub(crate) use metadata::TableVersionMetadata;
29#[cfg(test)]
30use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
31use namespace::open_table_at_version_from_manifest;
32pub(crate) use namespace::open_table_head_for_write;
33#[cfg(test)]
34use namespace::{branch_manifest_namespace, staged_table_namespace};
35use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
36pub(crate) use recovery::{
37 RecoveryMode, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration,
38 SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, heal_pending_sidecars_roll_forward,
39 list_sidecars, new_sidecar, recover_manifest_drift, schema_apply_serial_queue_key,
40 write_sidecar,
41};
42pub use state::SubTableEntry;
43#[cfg(test)]
44use state::string_column;
45use state::{ManifestState, read_manifest_state};
46
47const OBJECT_TYPE_TABLE: &str = "table";
48const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
49const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
50const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
51
52pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
64 let mut dataset = open_manifest_dataset(root_uri, None).await?;
65 migrations::migrate_internal_schema(&mut dataset).await
66}
67
68#[derive(Debug, Clone)]
73pub struct Snapshot {
74 root_uri: String,
75 version: u64,
76 entries: HashMap<String, SubTableEntry>,
77}
78
79impl Snapshot {
80 pub async fn open(&self, table_key: &str) -> Result<Dataset> {
82 let entry = self
83 .entries
84 .get(table_key)
85 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
86 entry.open(&self.root_uri).await
87 }
88
89 pub fn version(&self) -> u64 {
91 self.version
92 }
93
94 pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
96 self.entries.get(table_key)
97 }
98
99 pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
100 self.entries.values()
101 }
102}
103
104impl SubTableUpdate {
105 pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
106 self.version_metadata.to_create_table_version_request(
107 &self.table_key,
108 self.table_version,
109 self.row_count,
110 self.table_branch.as_deref(),
111 )
112 }
113}
114
115#[derive(Debug, Clone)]
116pub(crate) struct TableRegistration {
117 pub(crate) table_key: String,
118 pub(crate) table_path: String,
119}
120
121#[derive(Debug, Clone)]
122pub(crate) struct TableTombstone {
123 pub(crate) table_key: String,
124 pub(crate) tombstone_version: u64,
125}
126
127#[derive(Debug, Clone)]
128pub(crate) enum ManifestChange {
129 Update(SubTableUpdate),
130 RegisterTable(TableRegistration),
131 Tombstone(TableTombstone),
132}
133
134impl SubTableEntry {
135 pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
136 open_table_at_version_from_manifest(
137 root_uri,
138 &self.table_key,
139 self.table_branch.as_deref(),
140 self.table_version,
141 )
142 .await
143 }
144}
145
146pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
147 if let Some(type_name) = table_key.strip_prefix("node:") {
148 return Ok(format!("nodes/{}", type_name_hash(type_name)));
149 }
150 if let Some(type_name) = table_key.strip_prefix("edge:") {
151 return Ok(format!("edges/{}", type_name_hash(type_name)));
152 }
153 Err(OmniError::manifest(format!(
154 "invalid table key '{}'",
155 table_key
156 )))
157}
158
159#[derive(Debug, Clone)]
161pub struct SubTableUpdate {
162 pub table_key: String,
163 pub table_version: u64,
164 pub table_branch: Option<String>,
165 pub row_count: u64,
166 pub(crate) version_metadata: TableVersionMetadata,
167}
168
169pub struct ManifestCoordinator {
176 root_uri: String,
177 dataset: Dataset,
178 known_state: ManifestState,
179 active_branch: Option<String>,
180 publisher: Arc<dyn ManifestBatchPublisher>,
181}
182
183impl ManifestCoordinator {
184 fn default_batch_publisher(
185 root_uri: &str,
186 active_branch: Option<&str>,
187 ) -> Arc<dyn ManifestBatchPublisher> {
188 Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
189 }
190
191 fn from_parts(
192 root_uri: &str,
193 dataset: Dataset,
194 known_state: ManifestState,
195 active_branch: Option<String>,
196 publisher: Arc<dyn ManifestBatchPublisher>,
197 ) -> Self {
198 Self {
199 root_uri: root_uri.trim_end_matches('/').to_string(),
200 dataset,
201 known_state,
202 active_branch,
203 publisher,
204 }
205 }
206
207 fn from_parts_with_default_publisher(
208 root_uri: &str,
209 dataset: Dataset,
210 known_state: ManifestState,
211 active_branch: Option<String>,
212 ) -> Self {
213 let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
214 Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
215 }
216
217 fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
218 Snapshot {
219 root_uri: root_uri.trim_end_matches('/').to_string(),
220 version: state.version,
221 entries: state
222 .entries
223 .into_iter()
224 .map(|entry| (entry.table_key.clone(), entry))
225 .collect(),
226 }
227 }
228
229 #[cfg(test)]
230 fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
231 self.publisher = publisher;
232 self
233 }
234
235 pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
239 let root = root_uri.trim_end_matches('/');
240 let (dataset, known_state) = init_manifest_graph(root, catalog).await?;
241
242 Ok(Self::from_parts_with_default_publisher(
243 root,
244 dataset,
245 known_state,
246 None,
247 ))
248 }
249
250 pub async fn open(root_uri: &str) -> Result<Self> {
252 let root = root_uri.trim_end_matches('/');
253 let (dataset, known_state) = open_manifest_graph(root, None).await?;
254 Ok(Self::from_parts_with_default_publisher(
255 root,
256 dataset,
257 known_state,
258 None,
259 ))
260 }
261
262 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
264 if branch == "main" {
265 return Self::open(root_uri).await;
266 }
267
268 let root = root_uri.trim_end_matches('/');
269 let (dataset, known_state) = open_manifest_graph(root, Some(branch)).await?;
270 Ok(Self::from_parts_with_default_publisher(
271 root,
272 dataset,
273 known_state,
274 Some(branch.to_string()),
275 ))
276 }
277
278 pub async fn snapshot_at(
279 root_uri: &str,
280 branch: Option<&str>,
281 version: u64,
282 ) -> Result<Snapshot> {
283 let root = root_uri.trim_end_matches('/');
284 Ok(Self::snapshot_from_state(
285 root,
286 snapshot_state_at(root, branch, version).await?,
287 ))
288 }
289
290 pub fn snapshot(&self) -> Snapshot {
292 Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
293 }
294
295 pub async fn refresh(&mut self) -> Result<()> {
297 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
298 self.known_state = read_manifest_state(&self.dataset).await?;
299 Ok(())
300 }
301
302 pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
307 let changes = updates
308 .iter()
309 .cloned()
310 .map(ManifestChange::Update)
311 .collect::<Vec<_>>();
312 self.commit_changes(&changes).await
313 }
314
315 pub async fn commit_with_expected(
321 &mut self,
322 updates: &[SubTableUpdate],
323 expected_table_versions: &HashMap<String, u64>,
324 ) -> Result<u64> {
325 let changes = updates
326 .iter()
327 .cloned()
328 .map(ManifestChange::Update)
329 .collect::<Vec<_>>();
330 self.commit_changes_with_expected(&changes, expected_table_versions)
331 .await
332 }
333
334 pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
335 self.commit_changes_with_expected(changes, &HashMap::new())
336 .await
337 }
338
339 pub(crate) async fn commit_changes_with_expected(
340 &mut self,
341 changes: &[ManifestChange],
342 expected_table_versions: &HashMap<String, u64>,
343 ) -> Result<u64> {
344 if changes.is_empty() && expected_table_versions.is_empty() {
345 return Ok(self.version());
346 }
347
348 self.dataset = self
349 .publisher
350 .publish(changes, expected_table_versions)
351 .await?;
352
353 self.known_state = read_manifest_state(&self.dataset).await?;
354 Ok(self.version())
355 }
356
357 pub fn version(&self) -> u64 {
359 self.dataset.version().version
360 }
361
362 pub fn active_branch(&self) -> Option<&str> {
363 self.active_branch.as_deref()
364 }
365
366 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
367 let mut ds = self.dataset.clone();
368 ds.create_branch(name, self.version(), None)
369 .await
370 .map_err(|e| OmniError::Lance(e.to_string()))?;
371 Ok(())
372 }
373
374 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
375 let uri = manifest_uri(&self.root_uri);
376 let mut ds = Dataset::open(&uri)
377 .await
378 .map_err(|e| OmniError::Lance(e.to_string()))?;
379 ds.delete_branch(name)
380 .await
381 .map_err(|e| OmniError::Lance(e.to_string()))?;
382 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
383 self.known_state = read_manifest_state(&self.dataset).await?;
384 Ok(())
385 }
386
387 pub async fn list_branches(&self) -> Result<Vec<String>> {
388 let branches = self
389 .dataset
390 .list_branches()
391 .await
392 .map_err(|e| OmniError::Lance(e.to_string()))?;
393 let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
394 names.sort();
395 let mut all = vec!["main".to_string()];
396 all.extend(names);
397 Ok(all)
398 }
399
400 pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
401 let branches = self
402 .dataset
403 .list_branches()
404 .await
405 .map_err(|e| OmniError::Lance(e.to_string()))?;
406 let mut frontier = vec![name.to_string()];
407 let mut descendants = Vec::new();
408 let mut seen = HashSet::new();
409
410 while let Some(parent) = frontier.pop() {
411 let mut children = branches
412 .iter()
413 .filter_map(|(branch, contents)| {
414 (contents.parent_branch.as_deref() == Some(parent.as_str()))
415 .then_some(branch.clone())
416 })
417 .collect::<Vec<_>>();
418 children.sort();
419 for child in children {
420 if seen.insert(child.clone()) {
421 frontier.push(child.clone());
422 descendants.push(child);
423 }
424 }
425 }
426
427 Ok(descendants)
428 }
429
430 pub fn root_uri(&self) -> &str {
432 &self.root_uri
433 }
434}
435
436#[cfg(test)]
437#[path = "manifest/tests.rs"]
438mod tests;