1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/layout.rs"]
10mod layout;
11#[path = "manifest/metadata.rs"]
12mod metadata;
13#[path = "manifest/migrations.rs"]
14mod migrations;
15#[path = "manifest/namespace.rs"]
16mod namespace;
17#[path = "manifest/publisher.rs"]
18mod publisher;
19#[path = "manifest/recovery.rs"]
20mod recovery;
21#[path = "manifest/repo.rs"]
22mod repo;
23#[path = "manifest/state.rs"]
24mod state;
25
26use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
27pub(crate) use metadata::TableVersionMetadata;
28#[cfg(test)]
29use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
30use namespace::open_table_at_version_from_manifest;
31pub(crate) use namespace::open_table_head_for_write;
32#[cfg(test)]
33use namespace::{branch_manifest_namespace, staged_table_namespace};
34use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
35pub(crate) use recovery::{
36 delete_sidecar, has_schema_apply_sidecar, new_sidecar, recover_manifest_drift, write_sidecar,
37 RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
38 SidecarTableRegistration, SidecarTombstone,
39};
40use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
41pub use state::SubTableEntry;
42#[cfg(test)]
43use state::string_column;
44use state::{ManifestState, read_manifest_state};
45
46const OBJECT_TYPE_TABLE: &str = "table";
47const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
48const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
49const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
50
51#[derive(Debug, Clone)]
56pub struct Snapshot {
57 root_uri: String,
58 version: u64,
59 entries: HashMap<String, SubTableEntry>,
60}
61
62impl Snapshot {
63 pub async fn open(&self, table_key: &str) -> Result<Dataset> {
65 let entry = self
66 .entries
67 .get(table_key)
68 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
69 entry.open(&self.root_uri).await
70 }
71
72 pub fn version(&self) -> u64 {
74 self.version
75 }
76
77 pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
79 self.entries.get(table_key)
80 }
81
82 pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
83 self.entries.values()
84 }
85}
86
87impl SubTableUpdate {
88 pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
89 self.version_metadata.to_create_table_version_request(
90 &self.table_key,
91 self.table_version,
92 self.row_count,
93 self.table_branch.as_deref(),
94 )
95 }
96}
97
98#[derive(Debug, Clone)]
99pub(crate) struct TableRegistration {
100 pub(crate) table_key: String,
101 pub(crate) table_path: String,
102}
103
104#[derive(Debug, Clone)]
105pub(crate) struct TableTombstone {
106 pub(crate) table_key: String,
107 pub(crate) tombstone_version: u64,
108}
109
110#[derive(Debug, Clone)]
111pub(crate) enum ManifestChange {
112 Update(SubTableUpdate),
113 RegisterTable(TableRegistration),
114 Tombstone(TableTombstone),
115}
116
117impl SubTableEntry {
118 pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
119 open_table_at_version_from_manifest(
120 root_uri,
121 &self.table_key,
122 self.table_branch.as_deref(),
123 self.table_version,
124 )
125 .await
126 }
127}
128
129pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
130 if let Some(type_name) = table_key.strip_prefix("node:") {
131 return Ok(format!("nodes/{}", type_name_hash(type_name)));
132 }
133 if let Some(type_name) = table_key.strip_prefix("edge:") {
134 return Ok(format!("edges/{}", type_name_hash(type_name)));
135 }
136 Err(OmniError::manifest(format!(
137 "invalid table key '{}'",
138 table_key
139 )))
140}
141
142#[derive(Debug, Clone)]
144pub struct SubTableUpdate {
145 pub table_key: String,
146 pub table_version: u64,
147 pub table_branch: Option<String>,
148 pub row_count: u64,
149 pub(crate) version_metadata: TableVersionMetadata,
150}
151
152pub struct ManifestCoordinator {
159 root_uri: String,
160 dataset: Dataset,
161 known_state: ManifestState,
162 active_branch: Option<String>,
163 publisher: Arc<dyn ManifestBatchPublisher>,
164}
165
166impl ManifestCoordinator {
167 fn default_batch_publisher(
168 root_uri: &str,
169 active_branch: Option<&str>,
170 ) -> Arc<dyn ManifestBatchPublisher> {
171 Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
172 }
173
174 fn from_parts(
175 root_uri: &str,
176 dataset: Dataset,
177 known_state: ManifestState,
178 active_branch: Option<String>,
179 publisher: Arc<dyn ManifestBatchPublisher>,
180 ) -> Self {
181 Self {
182 root_uri: root_uri.trim_end_matches('/').to_string(),
183 dataset,
184 known_state,
185 active_branch,
186 publisher,
187 }
188 }
189
190 fn from_parts_with_default_publisher(
191 root_uri: &str,
192 dataset: Dataset,
193 known_state: ManifestState,
194 active_branch: Option<String>,
195 ) -> Self {
196 let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
197 Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
198 }
199
200 fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
201 Snapshot {
202 root_uri: root_uri.trim_end_matches('/').to_string(),
203 version: state.version,
204 entries: state
205 .entries
206 .into_iter()
207 .map(|entry| (entry.table_key.clone(), entry))
208 .collect(),
209 }
210 }
211
212 #[cfg(test)]
213 fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
214 self.publisher = publisher;
215 self
216 }
217
218 pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
222 let root = root_uri.trim_end_matches('/');
223 let (dataset, known_state) = init_manifest_repo(root, catalog).await?;
224
225 Ok(Self::from_parts_with_default_publisher(
226 root,
227 dataset,
228 known_state,
229 None,
230 ))
231 }
232
233 pub async fn open(root_uri: &str) -> Result<Self> {
235 let root = root_uri.trim_end_matches('/');
236 let (dataset, known_state) = open_manifest_repo(root, None).await?;
237 Ok(Self::from_parts_with_default_publisher(
238 root,
239 dataset,
240 known_state,
241 None,
242 ))
243 }
244
245 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
247 if branch == "main" {
248 return Self::open(root_uri).await;
249 }
250
251 let root = root_uri.trim_end_matches('/');
252 let (dataset, known_state) = open_manifest_repo(root, Some(branch)).await?;
253 Ok(Self::from_parts_with_default_publisher(
254 root,
255 dataset,
256 known_state,
257 Some(branch.to_string()),
258 ))
259 }
260
261 pub async fn snapshot_at(
262 root_uri: &str,
263 branch: Option<&str>,
264 version: u64,
265 ) -> Result<Snapshot> {
266 let root = root_uri.trim_end_matches('/');
267 Ok(Self::snapshot_from_state(
268 root,
269 snapshot_state_at(root, branch, version).await?,
270 ))
271 }
272
273 pub fn snapshot(&self) -> Snapshot {
275 Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
276 }
277
278 pub async fn refresh(&mut self) -> Result<()> {
280 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
281 self.known_state = read_manifest_state(&self.dataset).await?;
282 Ok(())
283 }
284
285 pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
290 let changes = updates
291 .iter()
292 .cloned()
293 .map(ManifestChange::Update)
294 .collect::<Vec<_>>();
295 self.commit_changes(&changes).await
296 }
297
298 pub async fn commit_with_expected(
304 &mut self,
305 updates: &[SubTableUpdate],
306 expected_table_versions: &HashMap<String, u64>,
307 ) -> Result<u64> {
308 let changes = updates
309 .iter()
310 .cloned()
311 .map(ManifestChange::Update)
312 .collect::<Vec<_>>();
313 self.commit_changes_with_expected(&changes, expected_table_versions)
314 .await
315 }
316
317 pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
318 self.commit_changes_with_expected(changes, &HashMap::new())
319 .await
320 }
321
322 pub(crate) async fn commit_changes_with_expected(
323 &mut self,
324 changes: &[ManifestChange],
325 expected_table_versions: &HashMap<String, u64>,
326 ) -> Result<u64> {
327 if changes.is_empty() && expected_table_versions.is_empty() {
328 return Ok(self.version());
329 }
330
331 self.dataset = self
332 .publisher
333 .publish(changes, expected_table_versions)
334 .await?;
335
336 self.known_state = read_manifest_state(&self.dataset).await?;
337 Ok(self.version())
338 }
339
340 pub fn version(&self) -> u64 {
342 self.dataset.version().version
343 }
344
345 pub fn active_branch(&self) -> Option<&str> {
346 self.active_branch.as_deref()
347 }
348
349 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
350 let mut ds = self.dataset.clone();
351 ds.create_branch(name, self.version(), None)
352 .await
353 .map_err(|e| OmniError::Lance(e.to_string()))?;
354 Ok(())
355 }
356
357 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
358 let uri = manifest_uri(&self.root_uri);
359 let mut ds = Dataset::open(&uri)
360 .await
361 .map_err(|e| OmniError::Lance(e.to_string()))?;
362 ds.delete_branch(name)
363 .await
364 .map_err(|e| OmniError::Lance(e.to_string()))?;
365 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
366 self.known_state = read_manifest_state(&self.dataset).await?;
367 Ok(())
368 }
369
370 pub async fn list_branches(&self) -> Result<Vec<String>> {
371 let branches = self
372 .dataset
373 .list_branches()
374 .await
375 .map_err(|e| OmniError::Lance(e.to_string()))?;
376 let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
377 names.sort();
378 let mut all = vec!["main".to_string()];
379 all.extend(names);
380 Ok(all)
381 }
382
383 pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
384 let branches = self
385 .dataset
386 .list_branches()
387 .await
388 .map_err(|e| OmniError::Lance(e.to_string()))?;
389 let mut frontier = vec![name.to_string()];
390 let mut descendants = Vec::new();
391 let mut seen = HashSet::new();
392
393 while let Some(parent) = frontier.pop() {
394 let mut children = branches
395 .iter()
396 .filter_map(|(branch, contents)| {
397 (contents.parent_branch.as_deref() == Some(parent.as_str()))
398 .then_some(branch.clone())
399 })
400 .collect::<Vec<_>>();
401 children.sort();
402 for child in children {
403 if seen.insert(child.clone()) {
404 frontier.push(child.clone());
405 descendants.push(child);
406 }
407 }
408 }
409
410 Ok(descendants)
411 }
412
413 pub fn root_uri(&self) -> &str {
415 &self.root_uri
416 }
417}
418
419#[cfg(test)]
420#[path = "manifest/tests.rs"]
421mod tests;