Skip to main content

omnigraph/db/
manifest.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/layout.rs"]
10mod layout;
11#[path = "manifest/metadata.rs"]
12mod metadata;
13#[path = "manifest/namespace.rs"]
14mod namespace;
15#[path = "manifest/publisher.rs"]
16mod publisher;
17#[path = "manifest/repo.rs"]
18mod repo;
19#[path = "manifest/state.rs"]
20mod state;
21
22use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
23pub(crate) use metadata::TableVersionMetadata;
24#[cfg(test)]
25use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
26use namespace::open_table_at_version_from_manifest;
27pub(crate) use namespace::open_table_head_for_write;
28#[cfg(test)]
29use namespace::{branch_manifest_namespace, staged_table_namespace};
30use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
31use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
32pub use state::SubTableEntry;
33#[cfg(test)]
34use state::string_column;
35use state::{ManifestState, read_manifest_state};
36
37const OBJECT_TYPE_TABLE: &str = "table";
38const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
39const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
40const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
41
42/// Immutable point-in-time view of the database.
43///
44/// Cheap to create (no storage I/O). All reads within a query go through one
45/// Snapshot to guarantee cross-type consistency.
46#[derive(Debug, Clone)]
47pub struct Snapshot {
48    root_uri: String,
49    version: u64,
50    entries: HashMap<String, SubTableEntry>,
51}
52
53impl Snapshot {
54    /// Open a sub-table dataset at its pinned version.
55    pub async fn open(&self, table_key: &str) -> Result<Dataset> {
56        let entry = self
57            .entries
58            .get(table_key)
59            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
60        entry.open(&self.root_uri).await
61    }
62
63    /// Manifest version this snapshot was taken from.
64    pub fn version(&self) -> u64 {
65        self.version
66    }
67
68    /// Look up a sub-table entry by key.
69    pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
70        self.entries.get(table_key)
71    }
72
73    pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
74        self.entries.values()
75    }
76}
77
78impl SubTableUpdate {
79    pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
80        self.version_metadata.to_create_table_version_request(
81            &self.table_key,
82            self.table_version,
83            self.row_count,
84            self.table_branch.as_deref(),
85        )
86    }
87}
88
89#[derive(Debug, Clone)]
90pub(crate) struct TableRegistration {
91    pub(crate) table_key: String,
92    pub(crate) table_path: String,
93}
94
95#[derive(Debug, Clone)]
96pub(crate) struct TableTombstone {
97    pub(crate) table_key: String,
98    pub(crate) tombstone_version: u64,
99}
100
101#[derive(Debug, Clone)]
102pub(crate) enum ManifestChange {
103    Update(SubTableUpdate),
104    RegisterTable(TableRegistration),
105    Tombstone(TableTombstone),
106}
107
108impl SubTableEntry {
109    pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
110        open_table_at_version_from_manifest(
111            root_uri,
112            &self.table_key,
113            self.table_branch.as_deref(),
114            self.table_version,
115        )
116        .await
117    }
118}
119
120pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
121    if let Some(type_name) = table_key.strip_prefix("node:") {
122        return Ok(format!("nodes/{}", type_name_hash(type_name)));
123    }
124    if let Some(type_name) = table_key.strip_prefix("edge:") {
125        return Ok(format!("edges/{}", type_name_hash(type_name)));
126    }
127    Err(OmniError::manifest(format!(
128        "invalid table key '{}'",
129        table_key
130    )))
131}
132
133/// An update to apply to the manifest via `commit`.
134#[derive(Debug, Clone)]
135pub struct SubTableUpdate {
136    pub table_key: String,
137    pub table_version: u64,
138    pub table_branch: Option<String>,
139    pub row_count: u64,
140    pub(crate) version_metadata: TableVersionMetadata,
141}
142
143/// Coordinates cross-dataset state through the namespace `__manifest` table.
144///
145/// Table rows register stable metadata such as location. Append-only
146/// `table_version` rows are the graph publish boundary and reconstruct the
147/// current graph snapshot by selecting the latest visible version row per
148/// sub-table.
149pub struct ManifestCoordinator {
150    root_uri: String,
151    dataset: Dataset,
152    known_state: ManifestState,
153    active_branch: Option<String>,
154    publisher: Arc<dyn ManifestBatchPublisher>,
155}
156
157impl ManifestCoordinator {
158    fn default_batch_publisher(
159        root_uri: &str,
160        active_branch: Option<&str>,
161    ) -> Arc<dyn ManifestBatchPublisher> {
162        Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
163    }
164
165    fn from_parts(
166        root_uri: &str,
167        dataset: Dataset,
168        known_state: ManifestState,
169        active_branch: Option<String>,
170        publisher: Arc<dyn ManifestBatchPublisher>,
171    ) -> Self {
172        Self {
173            root_uri: root_uri.trim_end_matches('/').to_string(),
174            dataset,
175            known_state,
176            active_branch,
177            publisher,
178        }
179    }
180
181    fn from_parts_with_default_publisher(
182        root_uri: &str,
183        dataset: Dataset,
184        known_state: ManifestState,
185        active_branch: Option<String>,
186    ) -> Self {
187        let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
188        Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
189    }
190
191    fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
192        Snapshot {
193            root_uri: root_uri.trim_end_matches('/').to_string(),
194            version: state.version,
195            entries: state
196                .entries
197                .into_iter()
198                .map(|entry| (entry.table_key.clone(), entry))
199                .collect(),
200        }
201    }
202
203    #[cfg(test)]
204    fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
205        self.publisher = publisher;
206        self
207    }
208
209    /// Create a new repo at `root_uri` from a catalog.
210    ///
211    /// Creates per-type Lance datasets and the namespace `__manifest` table.
212    pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
213        let root = root_uri.trim_end_matches('/');
214        let (dataset, known_state) = init_manifest_repo(root, catalog).await?;
215
216        Ok(Self::from_parts_with_default_publisher(
217            root,
218            dataset,
219            known_state,
220            None,
221        ))
222    }
223
224    /// Open an existing repo's manifest.
225    pub async fn open(root_uri: &str) -> Result<Self> {
226        let root = root_uri.trim_end_matches('/');
227        let (dataset, known_state) = open_manifest_repo(root, None).await?;
228        Ok(Self::from_parts_with_default_publisher(
229            root,
230            dataset,
231            known_state,
232            None,
233        ))
234    }
235
236    /// Open an existing repo's manifest at a specific branch.
237    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
238        if branch == "main" {
239            return Self::open(root_uri).await;
240        }
241
242        let root = root_uri.trim_end_matches('/');
243        let (dataset, known_state) = open_manifest_repo(root, Some(branch)).await?;
244        Ok(Self::from_parts_with_default_publisher(
245            root,
246            dataset,
247            known_state,
248            Some(branch.to_string()),
249        ))
250    }
251
252    pub async fn snapshot_at(
253        root_uri: &str,
254        branch: Option<&str>,
255        version: u64,
256    ) -> Result<Snapshot> {
257        let root = root_uri.trim_end_matches('/');
258        Ok(Self::snapshot_from_state(
259            root,
260            snapshot_state_at(root, branch, version).await?,
261        ))
262    }
263
264    /// Return a Snapshot from the known manifest state. No storage I/O.
265    pub fn snapshot(&self) -> Snapshot {
266        Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
267    }
268
269    /// Re-read manifest from storage to see other writers' commits.
270    pub async fn refresh(&mut self) -> Result<()> {
271        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
272        self.known_state = read_manifest_state(&self.dataset).await?;
273        Ok(())
274    }
275
276    /// Commit updated sub-table versions to the manifest.
277    ///
278    /// Atomically inserts one immutable `table_version` row per updated table.
279    /// The merge-insert commit on `__manifest` is the graph-level publish point.
280    pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
281        let changes = updates
282            .iter()
283            .cloned()
284            .map(ManifestChange::Update)
285            .collect::<Vec<_>>();
286        self.commit_changes(&changes).await
287    }
288
289    pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
290        if changes.is_empty() {
291            return Ok(self.version());
292        }
293
294        self.dataset = self.publisher.publish(changes).await?;
295
296        self.known_state = read_manifest_state(&self.dataset).await?;
297        Ok(self.version())
298    }
299
300    /// Current manifest version.
301    pub fn version(&self) -> u64 {
302        self.dataset.version().version
303    }
304
305    pub fn active_branch(&self) -> Option<&str> {
306        self.active_branch.as_deref()
307    }
308
309    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
310        let mut ds = self.dataset.clone();
311        ds.create_branch(name, self.version(), None)
312            .await
313            .map_err(|e| OmniError::Lance(e.to_string()))?;
314        Ok(())
315    }
316
317    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
318        let uri = manifest_uri(&self.root_uri);
319        let mut ds = Dataset::open(&uri)
320            .await
321            .map_err(|e| OmniError::Lance(e.to_string()))?;
322        ds.delete_branch(name)
323            .await
324            .map_err(|e| OmniError::Lance(e.to_string()))?;
325        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
326        self.known_state = read_manifest_state(&self.dataset).await?;
327        Ok(())
328    }
329
330    pub async fn list_branches(&self) -> Result<Vec<String>> {
331        let branches = self
332            .dataset
333            .list_branches()
334            .await
335            .map_err(|e| OmniError::Lance(e.to_string()))?;
336        let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
337        names.sort();
338        let mut all = vec!["main".to_string()];
339        all.extend(names);
340        Ok(all)
341    }
342
343    pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
344        let branches = self
345            .dataset
346            .list_branches()
347            .await
348            .map_err(|e| OmniError::Lance(e.to_string()))?;
349        let mut frontier = vec![name.to_string()];
350        let mut descendants = Vec::new();
351        let mut seen = HashSet::new();
352
353        while let Some(parent) = frontier.pop() {
354            let mut children = branches
355                .iter()
356                .filter_map(|(branch, contents)| {
357                    (contents.parent_branch.as_deref() == Some(parent.as_str()))
358                        .then_some(branch.clone())
359                })
360                .collect::<Vec<_>>();
361            children.sort();
362            for child in children {
363                if seen.insert(child.clone()) {
364                    frontier.push(child.clone());
365                    descendants.push(child);
366                }
367            }
368        }
369
370        Ok(descendants)
371    }
372
373    /// Root URI of the repo.
374    pub fn root_uri(&self) -> &str {
375        &self.root_uri
376    }
377}
378
379#[cfg(test)]
380#[path = "manifest/tests.rs"]
381mod tests;