Skip to main content

omnigraph/db/
manifest.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/layout.rs"]
10mod layout;
11#[path = "manifest/metadata.rs"]
12mod metadata;
13#[path = "manifest/migrations.rs"]
14mod migrations;
15#[path = "manifest/namespace.rs"]
16mod namespace;
17#[path = "manifest/publisher.rs"]
18mod publisher;
19#[path = "manifest/repo.rs"]
20mod repo;
21#[path = "manifest/state.rs"]
22mod state;
23
24use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
25pub(crate) use metadata::TableVersionMetadata;
26#[cfg(test)]
27use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
28use namespace::open_table_at_version_from_manifest;
29pub(crate) use namespace::open_table_head_for_write;
30#[cfg(test)]
31use namespace::{branch_manifest_namespace, staged_table_namespace};
32use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
33use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
34pub use state::SubTableEntry;
35#[cfg(test)]
36use state::string_column;
37use state::{ManifestState, read_manifest_state};
38
39const OBJECT_TYPE_TABLE: &str = "table";
40const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
41const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
42const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
43
44/// Immutable point-in-time view of the database.
45///
46/// Cheap to create (no storage I/O). All reads within a query go through one
47/// Snapshot to guarantee cross-type consistency.
48#[derive(Debug, Clone)]
49pub struct Snapshot {
50    root_uri: String,
51    version: u64,
52    entries: HashMap<String, SubTableEntry>,
53}
54
55impl Snapshot {
56    /// Open a sub-table dataset at its pinned version.
57    pub async fn open(&self, table_key: &str) -> Result<Dataset> {
58        let entry = self
59            .entries
60            .get(table_key)
61            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
62        entry.open(&self.root_uri).await
63    }
64
65    /// Manifest version this snapshot was taken from.
66    pub fn version(&self) -> u64 {
67        self.version
68    }
69
70    /// Look up a sub-table entry by key.
71    pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
72        self.entries.get(table_key)
73    }
74
75    pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
76        self.entries.values()
77    }
78}
79
80impl SubTableUpdate {
81    pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
82        self.version_metadata.to_create_table_version_request(
83            &self.table_key,
84            self.table_version,
85            self.row_count,
86            self.table_branch.as_deref(),
87        )
88    }
89}
90
91#[derive(Debug, Clone)]
92pub(crate) struct TableRegistration {
93    pub(crate) table_key: String,
94    pub(crate) table_path: String,
95}
96
97#[derive(Debug, Clone)]
98pub(crate) struct TableTombstone {
99    pub(crate) table_key: String,
100    pub(crate) tombstone_version: u64,
101}
102
103#[derive(Debug, Clone)]
104pub(crate) enum ManifestChange {
105    Update(SubTableUpdate),
106    RegisterTable(TableRegistration),
107    Tombstone(TableTombstone),
108}
109
110impl SubTableEntry {
111    pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
112        open_table_at_version_from_manifest(
113            root_uri,
114            &self.table_key,
115            self.table_branch.as_deref(),
116            self.table_version,
117        )
118        .await
119    }
120}
121
122pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
123    if let Some(type_name) = table_key.strip_prefix("node:") {
124        return Ok(format!("nodes/{}", type_name_hash(type_name)));
125    }
126    if let Some(type_name) = table_key.strip_prefix("edge:") {
127        return Ok(format!("edges/{}", type_name_hash(type_name)));
128    }
129    Err(OmniError::manifest(format!(
130        "invalid table key '{}'",
131        table_key
132    )))
133}
134
135/// An update to apply to the manifest via `commit`.
136#[derive(Debug, Clone)]
137pub struct SubTableUpdate {
138    pub table_key: String,
139    pub table_version: u64,
140    pub table_branch: Option<String>,
141    pub row_count: u64,
142    pub(crate) version_metadata: TableVersionMetadata,
143}
144
145/// Coordinates cross-dataset state through the namespace `__manifest` table.
146///
147/// Table rows register stable metadata such as location. Append-only
148/// `table_version` rows are the graph publish boundary and reconstruct the
149/// current graph snapshot by selecting the latest visible version row per
150/// sub-table.
151pub struct ManifestCoordinator {
152    root_uri: String,
153    dataset: Dataset,
154    known_state: ManifestState,
155    active_branch: Option<String>,
156    publisher: Arc<dyn ManifestBatchPublisher>,
157}
158
159impl ManifestCoordinator {
160    fn default_batch_publisher(
161        root_uri: &str,
162        active_branch: Option<&str>,
163    ) -> Arc<dyn ManifestBatchPublisher> {
164        Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
165    }
166
167    fn from_parts(
168        root_uri: &str,
169        dataset: Dataset,
170        known_state: ManifestState,
171        active_branch: Option<String>,
172        publisher: Arc<dyn ManifestBatchPublisher>,
173    ) -> Self {
174        Self {
175            root_uri: root_uri.trim_end_matches('/').to_string(),
176            dataset,
177            known_state,
178            active_branch,
179            publisher,
180        }
181    }
182
183    fn from_parts_with_default_publisher(
184        root_uri: &str,
185        dataset: Dataset,
186        known_state: ManifestState,
187        active_branch: Option<String>,
188    ) -> Self {
189        let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
190        Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
191    }
192
193    fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
194        Snapshot {
195            root_uri: root_uri.trim_end_matches('/').to_string(),
196            version: state.version,
197            entries: state
198                .entries
199                .into_iter()
200                .map(|entry| (entry.table_key.clone(), entry))
201                .collect(),
202        }
203    }
204
205    #[cfg(test)]
206    fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
207        self.publisher = publisher;
208        self
209    }
210
211    /// Create a new repo at `root_uri` from a catalog.
212    ///
213    /// Creates per-type Lance datasets and the namespace `__manifest` table.
214    pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
215        let root = root_uri.trim_end_matches('/');
216        let (dataset, known_state) = init_manifest_repo(root, catalog).await?;
217
218        Ok(Self::from_parts_with_default_publisher(
219            root,
220            dataset,
221            known_state,
222            None,
223        ))
224    }
225
226    /// Open an existing repo's manifest.
227    pub async fn open(root_uri: &str) -> Result<Self> {
228        let root = root_uri.trim_end_matches('/');
229        let (dataset, known_state) = open_manifest_repo(root, None).await?;
230        Ok(Self::from_parts_with_default_publisher(
231            root,
232            dataset,
233            known_state,
234            None,
235        ))
236    }
237
238    /// Open an existing repo's manifest at a specific branch.
239    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
240        if branch == "main" {
241            return Self::open(root_uri).await;
242        }
243
244        let root = root_uri.trim_end_matches('/');
245        let (dataset, known_state) = open_manifest_repo(root, Some(branch)).await?;
246        Ok(Self::from_parts_with_default_publisher(
247            root,
248            dataset,
249            known_state,
250            Some(branch.to_string()),
251        ))
252    }
253
254    pub async fn snapshot_at(
255        root_uri: &str,
256        branch: Option<&str>,
257        version: u64,
258    ) -> Result<Snapshot> {
259        let root = root_uri.trim_end_matches('/');
260        Ok(Self::snapshot_from_state(
261            root,
262            snapshot_state_at(root, branch, version).await?,
263        ))
264    }
265
266    /// Return a Snapshot from the known manifest state. No storage I/O.
267    pub fn snapshot(&self) -> Snapshot {
268        Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
269    }
270
271    /// Re-read manifest from storage to see other writers' commits.
272    pub async fn refresh(&mut self) -> Result<()> {
273        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
274        self.known_state = read_manifest_state(&self.dataset).await?;
275        Ok(())
276    }
277
278    /// Commit updated sub-table versions to the manifest.
279    ///
280    /// Atomically inserts one immutable `table_version` row per updated table.
281    /// The merge-insert commit on `__manifest` is the graph-level publish point.
282    pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
283        let changes = updates
284            .iter()
285            .cloned()
286            .map(ManifestChange::Update)
287            .collect::<Vec<_>>();
288        self.commit_changes(&changes).await
289    }
290
291    /// Same as [`commit`], but with caller-supplied per-table expected
292    /// versions used for optimistic concurrency control. Each entry asserts
293    /// the manifest's current latest non-tombstoned `table_version` for that
294    /// `table_key` is exactly what the caller observed; mismatches surface
295    /// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`.
296    pub async fn commit_with_expected(
297        &mut self,
298        updates: &[SubTableUpdate],
299        expected_table_versions: &HashMap<String, u64>,
300    ) -> Result<u64> {
301        let changes = updates
302            .iter()
303            .cloned()
304            .map(ManifestChange::Update)
305            .collect::<Vec<_>>();
306        self.commit_changes_with_expected(&changes, expected_table_versions)
307            .await
308    }
309
310    pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
311        self.commit_changes_with_expected(changes, &HashMap::new())
312            .await
313    }
314
315    pub(crate) async fn commit_changes_with_expected(
316        &mut self,
317        changes: &[ManifestChange],
318        expected_table_versions: &HashMap<String, u64>,
319    ) -> Result<u64> {
320        if changes.is_empty() && expected_table_versions.is_empty() {
321            return Ok(self.version());
322        }
323
324        self.dataset = self
325            .publisher
326            .publish(changes, expected_table_versions)
327            .await?;
328
329        self.known_state = read_manifest_state(&self.dataset).await?;
330        Ok(self.version())
331    }
332
333    /// Current manifest version.
334    pub fn version(&self) -> u64 {
335        self.dataset.version().version
336    }
337
338    pub fn active_branch(&self) -> Option<&str> {
339        self.active_branch.as_deref()
340    }
341
342    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
343        let mut ds = self.dataset.clone();
344        ds.create_branch(name, self.version(), None)
345            .await
346            .map_err(|e| OmniError::Lance(e.to_string()))?;
347        Ok(())
348    }
349
350    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
351        let uri = manifest_uri(&self.root_uri);
352        let mut ds = Dataset::open(&uri)
353            .await
354            .map_err(|e| OmniError::Lance(e.to_string()))?;
355        ds.delete_branch(name)
356            .await
357            .map_err(|e| OmniError::Lance(e.to_string()))?;
358        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
359        self.known_state = read_manifest_state(&self.dataset).await?;
360        Ok(())
361    }
362
363    pub async fn list_branches(&self) -> Result<Vec<String>> {
364        let branches = self
365            .dataset
366            .list_branches()
367            .await
368            .map_err(|e| OmniError::Lance(e.to_string()))?;
369        let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
370        names.sort();
371        let mut all = vec!["main".to_string()];
372        all.extend(names);
373        Ok(all)
374    }
375
376    pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
377        let branches = self
378            .dataset
379            .list_branches()
380            .await
381            .map_err(|e| OmniError::Lance(e.to_string()))?;
382        let mut frontier = vec![name.to_string()];
383        let mut descendants = Vec::new();
384        let mut seen = HashSet::new();
385
386        while let Some(parent) = frontier.pop() {
387            let mut children = branches
388                .iter()
389                .filter_map(|(branch, contents)| {
390                    (contents.parent_branch.as_deref() == Some(parent.as_str()))
391                        .then_some(branch.clone())
392                })
393                .collect::<Vec<_>>();
394            children.sort();
395            for child in children {
396                if seen.insert(child.clone()) {
397                    frontier.push(child.clone());
398                    descendants.push(child);
399                }
400            }
401        }
402
403        Ok(descendants)
404    }
405
406    /// Root URI of the repo.
407    pub fn root_uri(&self) -> &str {
408        &self.root_uri
409    }
410}
411
412#[cfg(test)]
413#[path = "manifest/tests.rs"]
414mod tests;