1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/layout.rs"]
10mod layout;
11#[path = "manifest/metadata.rs"]
12mod metadata;
13#[path = "manifest/migrations.rs"]
14mod migrations;
15#[path = "manifest/namespace.rs"]
16mod namespace;
17#[path = "manifest/publisher.rs"]
18mod publisher;
19#[path = "manifest/repo.rs"]
20mod repo;
21#[path = "manifest/state.rs"]
22mod state;
23
24use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
25pub(crate) use metadata::TableVersionMetadata;
26#[cfg(test)]
27use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
28use namespace::open_table_at_version_from_manifest;
29pub(crate) use namespace::open_table_head_for_write;
30#[cfg(test)]
31use namespace::{branch_manifest_namespace, staged_table_namespace};
32use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
33use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
34pub use state::SubTableEntry;
35#[cfg(test)]
36use state::string_column;
37use state::{ManifestState, read_manifest_state};
38
39const OBJECT_TYPE_TABLE: &str = "table";
40const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
41const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
42const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
43
44#[derive(Debug, Clone)]
49pub struct Snapshot {
50 root_uri: String,
51 version: u64,
52 entries: HashMap<String, SubTableEntry>,
53}
54
55impl Snapshot {
56 pub async fn open(&self, table_key: &str) -> Result<Dataset> {
58 let entry = self
59 .entries
60 .get(table_key)
61 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
62 entry.open(&self.root_uri).await
63 }
64
65 pub fn version(&self) -> u64 {
67 self.version
68 }
69
70 pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
72 self.entries.get(table_key)
73 }
74
75 pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
76 self.entries.values()
77 }
78}
79
80impl SubTableUpdate {
81 pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
82 self.version_metadata.to_create_table_version_request(
83 &self.table_key,
84 self.table_version,
85 self.row_count,
86 self.table_branch.as_deref(),
87 )
88 }
89}
90
91#[derive(Debug, Clone)]
92pub(crate) struct TableRegistration {
93 pub(crate) table_key: String,
94 pub(crate) table_path: String,
95}
96
97#[derive(Debug, Clone)]
98pub(crate) struct TableTombstone {
99 pub(crate) table_key: String,
100 pub(crate) tombstone_version: u64,
101}
102
103#[derive(Debug, Clone)]
104pub(crate) enum ManifestChange {
105 Update(SubTableUpdate),
106 RegisterTable(TableRegistration),
107 Tombstone(TableTombstone),
108}
109
110impl SubTableEntry {
111 pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
112 open_table_at_version_from_manifest(
113 root_uri,
114 &self.table_key,
115 self.table_branch.as_deref(),
116 self.table_version,
117 )
118 .await
119 }
120}
121
122pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
123 if let Some(type_name) = table_key.strip_prefix("node:") {
124 return Ok(format!("nodes/{}", type_name_hash(type_name)));
125 }
126 if let Some(type_name) = table_key.strip_prefix("edge:") {
127 return Ok(format!("edges/{}", type_name_hash(type_name)));
128 }
129 Err(OmniError::manifest(format!(
130 "invalid table key '{}'",
131 table_key
132 )))
133}
134
135#[derive(Debug, Clone)]
137pub struct SubTableUpdate {
138 pub table_key: String,
139 pub table_version: u64,
140 pub table_branch: Option<String>,
141 pub row_count: u64,
142 pub(crate) version_metadata: TableVersionMetadata,
143}
144
145pub struct ManifestCoordinator {
152 root_uri: String,
153 dataset: Dataset,
154 known_state: ManifestState,
155 active_branch: Option<String>,
156 publisher: Arc<dyn ManifestBatchPublisher>,
157}
158
159impl ManifestCoordinator {
160 fn default_batch_publisher(
161 root_uri: &str,
162 active_branch: Option<&str>,
163 ) -> Arc<dyn ManifestBatchPublisher> {
164 Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
165 }
166
167 fn from_parts(
168 root_uri: &str,
169 dataset: Dataset,
170 known_state: ManifestState,
171 active_branch: Option<String>,
172 publisher: Arc<dyn ManifestBatchPublisher>,
173 ) -> Self {
174 Self {
175 root_uri: root_uri.trim_end_matches('/').to_string(),
176 dataset,
177 known_state,
178 active_branch,
179 publisher,
180 }
181 }
182
183 fn from_parts_with_default_publisher(
184 root_uri: &str,
185 dataset: Dataset,
186 known_state: ManifestState,
187 active_branch: Option<String>,
188 ) -> Self {
189 let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
190 Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
191 }
192
193 fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
194 Snapshot {
195 root_uri: root_uri.trim_end_matches('/').to_string(),
196 version: state.version,
197 entries: state
198 .entries
199 .into_iter()
200 .map(|entry| (entry.table_key.clone(), entry))
201 .collect(),
202 }
203 }
204
205 #[cfg(test)]
206 fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
207 self.publisher = publisher;
208 self
209 }
210
211 pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
215 let root = root_uri.trim_end_matches('/');
216 let (dataset, known_state) = init_manifest_repo(root, catalog).await?;
217
218 Ok(Self::from_parts_with_default_publisher(
219 root,
220 dataset,
221 known_state,
222 None,
223 ))
224 }
225
226 pub async fn open(root_uri: &str) -> Result<Self> {
228 let root = root_uri.trim_end_matches('/');
229 let (dataset, known_state) = open_manifest_repo(root, None).await?;
230 Ok(Self::from_parts_with_default_publisher(
231 root,
232 dataset,
233 known_state,
234 None,
235 ))
236 }
237
238 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
240 if branch == "main" {
241 return Self::open(root_uri).await;
242 }
243
244 let root = root_uri.trim_end_matches('/');
245 let (dataset, known_state) = open_manifest_repo(root, Some(branch)).await?;
246 Ok(Self::from_parts_with_default_publisher(
247 root,
248 dataset,
249 known_state,
250 Some(branch.to_string()),
251 ))
252 }
253
254 pub async fn snapshot_at(
255 root_uri: &str,
256 branch: Option<&str>,
257 version: u64,
258 ) -> Result<Snapshot> {
259 let root = root_uri.trim_end_matches('/');
260 Ok(Self::snapshot_from_state(
261 root,
262 snapshot_state_at(root, branch, version).await?,
263 ))
264 }
265
266 pub fn snapshot(&self) -> Snapshot {
268 Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
269 }
270
271 pub async fn refresh(&mut self) -> Result<()> {
273 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
274 self.known_state = read_manifest_state(&self.dataset).await?;
275 Ok(())
276 }
277
278 pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
283 let changes = updates
284 .iter()
285 .cloned()
286 .map(ManifestChange::Update)
287 .collect::<Vec<_>>();
288 self.commit_changes(&changes).await
289 }
290
291 pub async fn commit_with_expected(
297 &mut self,
298 updates: &[SubTableUpdate],
299 expected_table_versions: &HashMap<String, u64>,
300 ) -> Result<u64> {
301 let changes = updates
302 .iter()
303 .cloned()
304 .map(ManifestChange::Update)
305 .collect::<Vec<_>>();
306 self.commit_changes_with_expected(&changes, expected_table_versions)
307 .await
308 }
309
310 pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
311 self.commit_changes_with_expected(changes, &HashMap::new())
312 .await
313 }
314
315 pub(crate) async fn commit_changes_with_expected(
316 &mut self,
317 changes: &[ManifestChange],
318 expected_table_versions: &HashMap<String, u64>,
319 ) -> Result<u64> {
320 if changes.is_empty() && expected_table_versions.is_empty() {
321 return Ok(self.version());
322 }
323
324 self.dataset = self
325 .publisher
326 .publish(changes, expected_table_versions)
327 .await?;
328
329 self.known_state = read_manifest_state(&self.dataset).await?;
330 Ok(self.version())
331 }
332
333 pub fn version(&self) -> u64 {
335 self.dataset.version().version
336 }
337
338 pub fn active_branch(&self) -> Option<&str> {
339 self.active_branch.as_deref()
340 }
341
342 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
343 let mut ds = self.dataset.clone();
344 ds.create_branch(name, self.version(), None)
345 .await
346 .map_err(|e| OmniError::Lance(e.to_string()))?;
347 Ok(())
348 }
349
350 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
351 let uri = manifest_uri(&self.root_uri);
352 let mut ds = Dataset::open(&uri)
353 .await
354 .map_err(|e| OmniError::Lance(e.to_string()))?;
355 ds.delete_branch(name)
356 .await
357 .map_err(|e| OmniError::Lance(e.to_string()))?;
358 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
359 self.known_state = read_manifest_state(&self.dataset).await?;
360 Ok(())
361 }
362
363 pub async fn list_branches(&self) -> Result<Vec<String>> {
364 let branches = self
365 .dataset
366 .list_branches()
367 .await
368 .map_err(|e| OmniError::Lance(e.to_string()))?;
369 let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
370 names.sort();
371 let mut all = vec!["main".to_string()];
372 all.extend(names);
373 Ok(all)
374 }
375
376 pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
377 let branches = self
378 .dataset
379 .list_branches()
380 .await
381 .map_err(|e| OmniError::Lance(e.to_string()))?;
382 let mut frontier = vec![name.to_string()];
383 let mut descendants = Vec::new();
384 let mut seen = HashSet::new();
385
386 while let Some(parent) = frontier.pop() {
387 let mut children = branches
388 .iter()
389 .filter_map(|(branch, contents)| {
390 (contents.parent_branch.as_deref() == Some(parent.as_str()))
391 .then_some(branch.clone())
392 })
393 .collect::<Vec<_>>();
394 children.sort();
395 for child in children {
396 if seen.insert(child.clone()) {
397 frontier.push(child.clone());
398 descendants.push(child);
399 }
400 }
401 }
402
403 Ok(descendants)
404 }
405
406 pub fn root_uri(&self) -> &str {
408 &self.root_uri
409 }
410}
411
412#[cfg(test)]
413#[path = "manifest/tests.rs"]
414mod tests;