1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/layout.rs"]
10mod layout;
11#[path = "manifest/metadata.rs"]
12mod metadata;
13#[path = "manifest/namespace.rs"]
14mod namespace;
15#[path = "manifest/publisher.rs"]
16mod publisher;
17#[path = "manifest/repo.rs"]
18mod repo;
19#[path = "manifest/state.rs"]
20mod state;
21
22use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
23pub(crate) use metadata::TableVersionMetadata;
24#[cfg(test)]
25use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
26use namespace::open_table_at_version_from_manifest;
27pub(crate) use namespace::open_table_head_for_write;
28#[cfg(test)]
29use namespace::{branch_manifest_namespace, staged_table_namespace};
30use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
31use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
32pub use state::SubTableEntry;
33#[cfg(test)]
34use state::string_column;
35use state::{ManifestState, read_manifest_state};
36
37const OBJECT_TYPE_TABLE: &str = "table";
38const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
39const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
40const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
41
42#[derive(Debug, Clone)]
47pub struct Snapshot {
48 root_uri: String,
49 version: u64,
50 entries: HashMap<String, SubTableEntry>,
51}
52
53impl Snapshot {
54 pub async fn open(&self, table_key: &str) -> Result<Dataset> {
56 let entry = self
57 .entries
58 .get(table_key)
59 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
60 entry.open(&self.root_uri).await
61 }
62
63 pub fn version(&self) -> u64 {
65 self.version
66 }
67
68 pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
70 self.entries.get(table_key)
71 }
72
73 pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
74 self.entries.values()
75 }
76}
77
78impl SubTableUpdate {
79 pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
80 self.version_metadata.to_create_table_version_request(
81 &self.table_key,
82 self.table_version,
83 self.row_count,
84 self.table_branch.as_deref(),
85 )
86 }
87}
88
89#[derive(Debug, Clone)]
90pub(crate) struct TableRegistration {
91 pub(crate) table_key: String,
92 pub(crate) table_path: String,
93}
94
95#[derive(Debug, Clone)]
96pub(crate) struct TableTombstone {
97 pub(crate) table_key: String,
98 pub(crate) tombstone_version: u64,
99}
100
101#[derive(Debug, Clone)]
102pub(crate) enum ManifestChange {
103 Update(SubTableUpdate),
104 RegisterTable(TableRegistration),
105 Tombstone(TableTombstone),
106}
107
108impl SubTableEntry {
109 pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
110 open_table_at_version_from_manifest(
111 root_uri,
112 &self.table_key,
113 self.table_branch.as_deref(),
114 self.table_version,
115 )
116 .await
117 }
118}
119
120pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
121 if let Some(type_name) = table_key.strip_prefix("node:") {
122 return Ok(format!("nodes/{}", type_name_hash(type_name)));
123 }
124 if let Some(type_name) = table_key.strip_prefix("edge:") {
125 return Ok(format!("edges/{}", type_name_hash(type_name)));
126 }
127 Err(OmniError::manifest(format!(
128 "invalid table key '{}'",
129 table_key
130 )))
131}
132
133#[derive(Debug, Clone)]
135pub struct SubTableUpdate {
136 pub table_key: String,
137 pub table_version: u64,
138 pub table_branch: Option<String>,
139 pub row_count: u64,
140 pub(crate) version_metadata: TableVersionMetadata,
141}
142
143pub struct ManifestCoordinator {
150 root_uri: String,
151 dataset: Dataset,
152 known_state: ManifestState,
153 active_branch: Option<String>,
154 publisher: Arc<dyn ManifestBatchPublisher>,
155}
156
157impl ManifestCoordinator {
158 fn default_batch_publisher(
159 root_uri: &str,
160 active_branch: Option<&str>,
161 ) -> Arc<dyn ManifestBatchPublisher> {
162 Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
163 }
164
165 fn from_parts(
166 root_uri: &str,
167 dataset: Dataset,
168 known_state: ManifestState,
169 active_branch: Option<String>,
170 publisher: Arc<dyn ManifestBatchPublisher>,
171 ) -> Self {
172 Self {
173 root_uri: root_uri.trim_end_matches('/').to_string(),
174 dataset,
175 known_state,
176 active_branch,
177 publisher,
178 }
179 }
180
181 fn from_parts_with_default_publisher(
182 root_uri: &str,
183 dataset: Dataset,
184 known_state: ManifestState,
185 active_branch: Option<String>,
186 ) -> Self {
187 let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
188 Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
189 }
190
191 fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
192 Snapshot {
193 root_uri: root_uri.trim_end_matches('/').to_string(),
194 version: state.version,
195 entries: state
196 .entries
197 .into_iter()
198 .map(|entry| (entry.table_key.clone(), entry))
199 .collect(),
200 }
201 }
202
203 #[cfg(test)]
204 fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
205 self.publisher = publisher;
206 self
207 }
208
209 pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
213 let root = root_uri.trim_end_matches('/');
214 let (dataset, known_state) = init_manifest_repo(root, catalog).await?;
215
216 Ok(Self::from_parts_with_default_publisher(
217 root,
218 dataset,
219 known_state,
220 None,
221 ))
222 }
223
224 pub async fn open(root_uri: &str) -> Result<Self> {
226 let root = root_uri.trim_end_matches('/');
227 let (dataset, known_state) = open_manifest_repo(root, None).await?;
228 Ok(Self::from_parts_with_default_publisher(
229 root,
230 dataset,
231 known_state,
232 None,
233 ))
234 }
235
236 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
238 if branch == "main" {
239 return Self::open(root_uri).await;
240 }
241
242 let root = root_uri.trim_end_matches('/');
243 let (dataset, known_state) = open_manifest_repo(root, Some(branch)).await?;
244 Ok(Self::from_parts_with_default_publisher(
245 root,
246 dataset,
247 known_state,
248 Some(branch.to_string()),
249 ))
250 }
251
252 pub async fn snapshot_at(
253 root_uri: &str,
254 branch: Option<&str>,
255 version: u64,
256 ) -> Result<Snapshot> {
257 let root = root_uri.trim_end_matches('/');
258 Ok(Self::snapshot_from_state(
259 root,
260 snapshot_state_at(root, branch, version).await?,
261 ))
262 }
263
264 pub fn snapshot(&self) -> Snapshot {
266 Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
267 }
268
269 pub async fn refresh(&mut self) -> Result<()> {
271 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
272 self.known_state = read_manifest_state(&self.dataset).await?;
273 Ok(())
274 }
275
276 pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
281 let changes = updates
282 .iter()
283 .cloned()
284 .map(ManifestChange::Update)
285 .collect::<Vec<_>>();
286 self.commit_changes(&changes).await
287 }
288
289 pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
290 if changes.is_empty() {
291 return Ok(self.version());
292 }
293
294 self.dataset = self.publisher.publish(changes).await?;
295
296 self.known_state = read_manifest_state(&self.dataset).await?;
297 Ok(self.version())
298 }
299
300 pub fn version(&self) -> u64 {
302 self.dataset.version().version
303 }
304
305 pub fn active_branch(&self) -> Option<&str> {
306 self.active_branch.as_deref()
307 }
308
309 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
310 let mut ds = self.dataset.clone();
311 ds.create_branch(name, self.version(), None)
312 .await
313 .map_err(|e| OmniError::Lance(e.to_string()))?;
314 Ok(())
315 }
316
317 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
318 let uri = manifest_uri(&self.root_uri);
319 let mut ds = Dataset::open(&uri)
320 .await
321 .map_err(|e| OmniError::Lance(e.to_string()))?;
322 ds.delete_branch(name)
323 .await
324 .map_err(|e| OmniError::Lance(e.to_string()))?;
325 self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
326 self.known_state = read_manifest_state(&self.dataset).await?;
327 Ok(())
328 }
329
330 pub async fn list_branches(&self) -> Result<Vec<String>> {
331 let branches = self
332 .dataset
333 .list_branches()
334 .await
335 .map_err(|e| OmniError::Lance(e.to_string()))?;
336 let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
337 names.sort();
338 let mut all = vec!["main".to_string()];
339 all.extend(names);
340 Ok(all)
341 }
342
343 pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
344 let branches = self
345 .dataset
346 .list_branches()
347 .await
348 .map_err(|e| OmniError::Lance(e.to_string()))?;
349 let mut frontier = vec![name.to_string()];
350 let mut descendants = Vec::new();
351 let mut seen = HashSet::new();
352
353 while let Some(parent) = frontier.pop() {
354 let mut children = branches
355 .iter()
356 .filter_map(|(branch, contents)| {
357 (contents.parent_branch.as_deref() == Some(parent.as_str()))
358 .then_some(branch.clone())
359 })
360 .collect::<Vec<_>>();
361 children.sort();
362 for child in children {
363 if seen.insert(child.clone()) {
364 frontier.push(child.clone());
365 descendants.push(child);
366 }
367 }
368 }
369
370 Ok(descendants)
371 }
372
373 pub fn root_uri(&self) -> &str {
375 &self.root_uri
376 }
377}
378
379#[cfg(test)]
380#[path = "manifest/tests.rs"]
381mod tests;