1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/graph.rs"]
10mod graph;
11#[path = "manifest/layout.rs"]
12mod layout;
13#[path = "manifest/metadata.rs"]
14mod metadata;
15#[path = "manifest/migrations.rs"]
16mod migrations;
17#[path = "manifest/namespace.rs"]
18mod namespace;
19#[path = "manifest/publisher.rs"]
20mod publisher;
21#[path = "manifest/recovery.rs"]
22mod recovery;
23#[path = "manifest/state.rs"]
24mod state;
25
26use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
27use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
28pub(crate) use metadata::TableVersionMetadata;
29#[cfg(test)]
30use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
31use namespace::open_table_at_version_from_manifest;
32pub(crate) use namespace::open_table_head_for_write;
33#[cfg(test)]
34use namespace::{branch_manifest_namespace, staged_table_namespace};
35use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
36pub(crate) use recovery::{
37 RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
38 SidecarTableRegistration, SidecarTombstone, delete_sidecar, has_schema_apply_sidecar,
39 list_sidecars, new_sidecar, recover_manifest_drift, write_sidecar,
40};
41pub use state::SubTableEntry;
42#[cfg(test)]
43use state::string_column;
44use state::{ManifestState, read_manifest_state};
45
46const OBJECT_TYPE_TABLE: &str = "table";
47const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
48const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
49const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
50
51pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
63 let mut dataset = open_manifest_dataset(root_uri, None).await?;
64 migrations::migrate_internal_schema(&mut dataset).await
65}
66
67#[derive(Debug, Clone)]
72pub struct Snapshot {
73 root_uri: String,
74 version: u64,
75 entries: HashMap<String, SubTableEntry>,
76}
77
78impl Snapshot {
79 pub async fn open(&self, table_key: &str) -> Result<Dataset> {
81 let entry = self
82 .entries
83 .get(table_key)
84 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
85 entry.open(&self.root_uri).await
86 }
87
88 pub fn version(&self) -> u64 {
90 self.version
91 }
92
93 pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
95 self.entries.get(table_key)
96 }
97
98 pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
99 self.entries.values()
100 }
101}
102
103impl SubTableUpdate {
104 pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
105 self.version_metadata.to_create_table_version_request(
106 &self.table_key,
107 self.table_version,
108 self.row_count,
109 self.table_branch.as_deref(),
110 )
111 }
112}
113
114#[derive(Debug, Clone)]
115pub(crate) struct TableRegistration {
116 pub(crate) table_key: String,
117 pub(crate) table_path: String,
118}
119
120#[derive(Debug, Clone)]
121pub(crate) struct TableTombstone {
122 pub(crate) table_key: String,
123 pub(crate) tombstone_version: u64,
124}
125
126#[derive(Debug, Clone)]
127pub(crate) enum ManifestChange {
128 Update(SubTableUpdate),
129 RegisterTable(TableRegistration),
130 Tombstone(TableTombstone),
131}
132
133impl SubTableEntry {
134 pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
135 open_table_at_version_from_manifest(
136 root_uri,
137 &self.table_key,
138 self.table_branch.as_deref(),
139 self.table_version,
140 )
141 .await
142 }
143}
144
145pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
146 if let Some(type_name) = table_key.strip_prefix("node:") {
147 return Ok(format!("nodes/{}", type_name_hash(type_name)));
148 }
149 if let Some(type_name) = table_key.strip_prefix("edge:") {
150 return Ok(format!("edges/{}", type_name_hash(type_name)));
151 }
152 Err(OmniError::manifest(format!(
153 "invalid table key '{}'",
154 table_key
155 )))
156}
157
158#[derive(Debug, Clone)]
160pub struct SubTableUpdate {
161 pub table_key: String,
162 pub table_version: u64,
163 pub table_branch: Option<String>,
164 pub row_count: u64,
165 pub(crate) version_metadata: TableVersionMetadata,
166}
167
168pub struct ManifestCoordinator {
175 root_uri: String,
176 dataset: Dataset,
177 known_state: ManifestState,
178 active_branch: Option<String>,
179 publisher: Arc<dyn ManifestBatchPublisher>,
180}
181
182impl ManifestCoordinator {
183 fn default_batch_publisher(
184 root_uri: &str,
185 active_branch: Option<&str>,
186 ) -> Arc<dyn ManifestBatchPublisher> {
187 Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
188 }
189
190 fn from_parts(
191 root_uri: &str,
192 dataset: Dataset,
193 known_state: ManifestState,
194 active_branch: Option<String>,
195 publisher: Arc<dyn ManifestBatchPublisher>,
196 ) -> Self {
197 Self {
198 root_uri: root_uri.trim_end_matches('/').to_string(),
199 dataset,
200 known_state,
201 active_branch,
202 publisher,
203 }
204 }
205
206 fn from_parts_with_default_publisher(
207 root_uri: &str,
208 dataset: Dataset,
209 known_state: ManifestState,
210 active_branch: Option<String>,
211 ) -> Self {
212 let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
213 Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
214 }
215
216 fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
217 Snapshot {
218 root_uri: root_uri.trim_end_matches('/').to_string(),
219 version: state.version,
220 entries: state
221 .entries
222 .into_iter()
223 .map(|entry| (entry.table_key.clone(), entry))
224 .collect(),
225 }
226 }
227
228 #[cfg(test)]
229 fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
230 self.publisher = publisher;
231 self
232 }
233
234 pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
238 let root = root_uri.trim_end_matches('/');
239 let (dataset, known_state) = init_manifest_graph(root, catalog).await?;
240
241 Ok(Self::from_parts_with_default_publisher(
242 root,
243 dataset,
244 known_state,
245 None,
246 ))
247 }
248
249 pub async fn open(root_uri: &str) -> Result<Self> {
251 let root = root_uri.trim_end_matches('/');
252 let (dataset, known_state) = open_manifest_graph(root, None).await?;
253 Ok(Self::from_parts_with_default_publisher(
254 root,
255 dataset,
256 known_state,
257 None,
258 ))
259 }
260
261 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
263 if branch == "main" {
264 return Self::open(root_uri).await;
265 }
266
267 let root = root_uri.trim_end_matches('/');
268 let (dataset, known_state) = open_manifest_graph(root, Some(branch)).await?;
269 Ok(Self::from_parts_with_default_publisher(
270 root,
271 dataset,
272 known_state,
273 Some(branch.to_string()),
274 ))
275 }
276
277 pub async fn snapshot_at(
278 root_uri: &str,
279 branch: Option<&str>,
280 version: u64,
281 ) -> Result<Snapshot> {
282 let root = root_uri.trim_end_matches('/');
283 Ok(Self::snapshot_from_state(
284 root,
285 snapshot_state_at(root, branch, version).await?,
286 ))
287 }
288
289 pub fn snapshot(&self) -> Snapshot {
291 Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
292 }
293
294 pub async fn refresh(&mut self) -> Result<()> {
296 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
297 self.known_state = read_manifest_state(&self.dataset).await?;
298 Ok(())
299 }
300
301 pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
306 let changes = updates
307 .iter()
308 .cloned()
309 .map(ManifestChange::Update)
310 .collect::<Vec<_>>();
311 self.commit_changes(&changes).await
312 }
313
314 pub async fn commit_with_expected(
320 &mut self,
321 updates: &[SubTableUpdate],
322 expected_table_versions: &HashMap<String, u64>,
323 ) -> Result<u64> {
324 let changes = updates
325 .iter()
326 .cloned()
327 .map(ManifestChange::Update)
328 .collect::<Vec<_>>();
329 self.commit_changes_with_expected(&changes, expected_table_versions)
330 .await
331 }
332
333 pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
334 self.commit_changes_with_expected(changes, &HashMap::new())
335 .await
336 }
337
338 pub(crate) async fn commit_changes_with_expected(
339 &mut self,
340 changes: &[ManifestChange],
341 expected_table_versions: &HashMap<String, u64>,
342 ) -> Result<u64> {
343 if changes.is_empty() && expected_table_versions.is_empty() {
344 return Ok(self.version());
345 }
346
347 self.dataset = self
348 .publisher
349 .publish(changes, expected_table_versions)
350 .await?;
351
352 self.known_state = read_manifest_state(&self.dataset).await?;
353 Ok(self.version())
354 }
355
356 pub fn version(&self) -> u64 {
358 self.dataset.version().version
359 }
360
361 pub fn active_branch(&self) -> Option<&str> {
362 self.active_branch.as_deref()
363 }
364
365 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
366 let mut ds = self.dataset.clone();
367 ds.create_branch(name, self.version(), None)
368 .await
369 .map_err(|e| OmniError::Lance(e.to_string()))?;
370 Ok(())
371 }
372
373 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
374 let uri = manifest_uri(&self.root_uri);
375 let mut ds = Dataset::open(&uri)
376 .await
377 .map_err(|e| OmniError::Lance(e.to_string()))?;
378 ds.delete_branch(name)
379 .await
380 .map_err(|e| OmniError::Lance(e.to_string()))?;
381 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
382 self.known_state = read_manifest_state(&self.dataset).await?;
383 Ok(())
384 }
385
386 pub async fn list_branches(&self) -> Result<Vec<String>> {
387 let branches = self
388 .dataset
389 .list_branches()
390 .await
391 .map_err(|e| OmniError::Lance(e.to_string()))?;
392 let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
393 names.sort();
394 let mut all = vec!["main".to_string()];
395 all.extend(names);
396 Ok(all)
397 }
398
399 pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
400 let branches = self
401 .dataset
402 .list_branches()
403 .await
404 .map_err(|e| OmniError::Lance(e.to_string()))?;
405 let mut frontier = vec![name.to_string()];
406 let mut descendants = Vec::new();
407 let mut seen = HashSet::new();
408
409 while let Some(parent) = frontier.pop() {
410 let mut children = branches
411 .iter()
412 .filter_map(|(branch, contents)| {
413 (contents.parent_branch.as_deref() == Some(parent.as_str()))
414 .then_some(branch.clone())
415 })
416 .collect::<Vec<_>>();
417 children.sort();
418 for child in children {
419 if seen.insert(child.clone()) {
420 frontier.push(child.clone());
421 descendants.push(child);
422 }
423 }
424 }
425
426 Ok(descendants)
427 }
428
429 pub fn root_uri(&self) -> &str {
431 &self.root_uri
432 }
433}
434
435#[cfg(test)]
436#[path = "manifest/tests.rs"]
437mod tests;