Skip to main content

omnigraph/db/
manifest.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/graph.rs"]
10mod graph;
11#[path = "manifest/layout.rs"]
12mod layout;
13#[path = "manifest/metadata.rs"]
14mod metadata;
15#[path = "manifest/migrations.rs"]
16mod migrations;
17#[path = "manifest/namespace.rs"]
18mod namespace;
19#[path = "manifest/publisher.rs"]
20mod publisher;
21#[path = "manifest/recovery.rs"]
22mod recovery;
23#[path = "manifest/state.rs"]
24mod state;
25
26use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
27use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
28pub(crate) use metadata::TableVersionMetadata;
29#[cfg(test)]
30use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
31use namespace::open_table_at_version_from_manifest;
32pub(crate) use namespace::open_table_head_for_write;
33#[cfg(test)]
34use namespace::{branch_manifest_namespace, staged_table_namespace};
35use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
36pub(crate) use recovery::{
37    RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
38    SidecarTableRegistration, SidecarTombstone, delete_sidecar, has_schema_apply_sidecar,
39    list_sidecars, new_sidecar, recover_manifest_drift, write_sidecar,
40};
41pub use state::SubTableEntry;
42#[cfg(test)]
43use state::string_column;
44use state::{ManifestState, read_manifest_state};
45
46const OBJECT_TYPE_TABLE: &str = "table";
47const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
48const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
49const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
50
51/// Apply pending internal-schema migrations against `__manifest` on the
52/// open-for-write path, independent of a publish.
53///
54/// `Omnigraph::open(ReadWrite)` calls this before the coordinator reads branch
55/// state, so branch-observing code (`branch_list`, the schema-apply
56/// blocking-branch checks) sees the post-migration graph. In particular the
57/// v2→v3 step sweeps legacy `__run__*` staging branches off `__manifest`
58/// (MR-770); running it here closes the window where those branches would
59/// otherwise block schema apply before the first publish runs the migration.
60///
61/// Idempotent: a no-op stamp read when the on-disk version already matches.
62pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
63    let mut dataset = open_manifest_dataset(root_uri, None).await?;
64    migrations::migrate_internal_schema(&mut dataset).await
65}
66
67/// Immutable point-in-time view of the database.
68///
69/// Cheap to create (no storage I/O). All reads within a query go through one
70/// Snapshot to guarantee cross-type consistency.
71#[derive(Debug, Clone)]
72pub struct Snapshot {
73    root_uri: String,
74    version: u64,
75    entries: HashMap<String, SubTableEntry>,
76}
77
78impl Snapshot {
79    /// Open a sub-table dataset at its pinned version.
80    pub async fn open(&self, table_key: &str) -> Result<Dataset> {
81        let entry = self
82            .entries
83            .get(table_key)
84            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
85        entry.open(&self.root_uri).await
86    }
87
88    /// Manifest version this snapshot was taken from.
89    pub fn version(&self) -> u64 {
90        self.version
91    }
92
93    /// Look up a sub-table entry by key.
94    pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
95        self.entries.get(table_key)
96    }
97
98    pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
99        self.entries.values()
100    }
101}
102
103impl SubTableUpdate {
104    pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
105        self.version_metadata.to_create_table_version_request(
106            &self.table_key,
107            self.table_version,
108            self.row_count,
109            self.table_branch.as_deref(),
110        )
111    }
112}
113
114#[derive(Debug, Clone)]
115pub(crate) struct TableRegistration {
116    pub(crate) table_key: String,
117    pub(crate) table_path: String,
118}
119
120#[derive(Debug, Clone)]
121pub(crate) struct TableTombstone {
122    pub(crate) table_key: String,
123    pub(crate) tombstone_version: u64,
124}
125
126#[derive(Debug, Clone)]
127pub(crate) enum ManifestChange {
128    Update(SubTableUpdate),
129    RegisterTable(TableRegistration),
130    Tombstone(TableTombstone),
131}
132
133impl SubTableEntry {
134    pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
135        open_table_at_version_from_manifest(
136            root_uri,
137            &self.table_key,
138            self.table_branch.as_deref(),
139            self.table_version,
140        )
141        .await
142    }
143}
144
145pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
146    if let Some(type_name) = table_key.strip_prefix("node:") {
147        return Ok(format!("nodes/{}", type_name_hash(type_name)));
148    }
149    if let Some(type_name) = table_key.strip_prefix("edge:") {
150        return Ok(format!("edges/{}", type_name_hash(type_name)));
151    }
152    Err(OmniError::manifest(format!(
153        "invalid table key '{}'",
154        table_key
155    )))
156}
157
158/// An update to apply to the manifest via `commit`.
159#[derive(Debug, Clone)]
160pub struct SubTableUpdate {
161    pub table_key: String,
162    pub table_version: u64,
163    pub table_branch: Option<String>,
164    pub row_count: u64,
165    pub(crate) version_metadata: TableVersionMetadata,
166}
167
168/// Coordinates cross-dataset state through the namespace `__manifest` table.
169///
170/// Table rows register stable metadata such as location. Append-only
171/// `table_version` rows are the graph publish boundary and reconstruct the
172/// current graph snapshot by selecting the latest visible version row per
173/// sub-table.
174pub struct ManifestCoordinator {
175    root_uri: String,
176    dataset: Dataset,
177    known_state: ManifestState,
178    active_branch: Option<String>,
179    publisher: Arc<dyn ManifestBatchPublisher>,
180}
181
182impl ManifestCoordinator {
183    fn default_batch_publisher(
184        root_uri: &str,
185        active_branch: Option<&str>,
186    ) -> Arc<dyn ManifestBatchPublisher> {
187        Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
188    }
189
190    fn from_parts(
191        root_uri: &str,
192        dataset: Dataset,
193        known_state: ManifestState,
194        active_branch: Option<String>,
195        publisher: Arc<dyn ManifestBatchPublisher>,
196    ) -> Self {
197        Self {
198            root_uri: root_uri.trim_end_matches('/').to_string(),
199            dataset,
200            known_state,
201            active_branch,
202            publisher,
203        }
204    }
205
206    fn from_parts_with_default_publisher(
207        root_uri: &str,
208        dataset: Dataset,
209        known_state: ManifestState,
210        active_branch: Option<String>,
211    ) -> Self {
212        let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
213        Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
214    }
215
216    fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
217        Snapshot {
218            root_uri: root_uri.trim_end_matches('/').to_string(),
219            version: state.version,
220            entries: state
221                .entries
222                .into_iter()
223                .map(|entry| (entry.table_key.clone(), entry))
224                .collect(),
225        }
226    }
227
228    #[cfg(test)]
229    fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
230        self.publisher = publisher;
231        self
232    }
233
234    /// Create a new graph at `root_uri` from a catalog.
235    ///
236    /// Creates per-type Lance datasets and the namespace `__manifest` table.
237    pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
238        let root = root_uri.trim_end_matches('/');
239        let (dataset, known_state) = init_manifest_graph(root, catalog).await?;
240
241        Ok(Self::from_parts_with_default_publisher(
242            root,
243            dataset,
244            known_state,
245            None,
246        ))
247    }
248
249    /// Open an existing graph's manifest.
250    pub async fn open(root_uri: &str) -> Result<Self> {
251        let root = root_uri.trim_end_matches('/');
252        let (dataset, known_state) = open_manifest_graph(root, None).await?;
253        Ok(Self::from_parts_with_default_publisher(
254            root,
255            dataset,
256            known_state,
257            None,
258        ))
259    }
260
261    /// Open an existing graph's manifest at a specific branch.
262    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
263        if branch == "main" {
264            return Self::open(root_uri).await;
265        }
266
267        let root = root_uri.trim_end_matches('/');
268        let (dataset, known_state) = open_manifest_graph(root, Some(branch)).await?;
269        Ok(Self::from_parts_with_default_publisher(
270            root,
271            dataset,
272            known_state,
273            Some(branch.to_string()),
274        ))
275    }
276
277    pub async fn snapshot_at(
278        root_uri: &str,
279        branch: Option<&str>,
280        version: u64,
281    ) -> Result<Snapshot> {
282        let root = root_uri.trim_end_matches('/');
283        Ok(Self::snapshot_from_state(
284            root,
285            snapshot_state_at(root, branch, version).await?,
286        ))
287    }
288
289    /// Return a Snapshot from the known manifest state. No storage I/O.
290    pub fn snapshot(&self) -> Snapshot {
291        Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
292    }
293
294    /// Re-read manifest from storage to see other writers' commits.
295    pub async fn refresh(&mut self) -> Result<()> {
296        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
297        self.known_state = read_manifest_state(&self.dataset).await?;
298        Ok(())
299    }
300
301    /// Commit updated sub-table versions to the manifest.
302    ///
303    /// Atomically inserts one immutable `table_version` row per updated table.
304    /// The merge-insert commit on `__manifest` is the graph-level publish point.
305    pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
306        let changes = updates
307            .iter()
308            .cloned()
309            .map(ManifestChange::Update)
310            .collect::<Vec<_>>();
311        self.commit_changes(&changes).await
312    }
313
314    /// Same as [`commit`], but with caller-supplied per-table expected
315    /// versions used for optimistic concurrency control. Each entry asserts
316    /// the manifest's current latest non-tombstoned `table_version` for that
317    /// `table_key` is exactly what the caller observed; mismatches surface
318    /// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`.
319    pub async fn commit_with_expected(
320        &mut self,
321        updates: &[SubTableUpdate],
322        expected_table_versions: &HashMap<String, u64>,
323    ) -> Result<u64> {
324        let changes = updates
325            .iter()
326            .cloned()
327            .map(ManifestChange::Update)
328            .collect::<Vec<_>>();
329        self.commit_changes_with_expected(&changes, expected_table_versions)
330            .await
331    }
332
333    pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
334        self.commit_changes_with_expected(changes, &HashMap::new())
335            .await
336    }
337
338    pub(crate) async fn commit_changes_with_expected(
339        &mut self,
340        changes: &[ManifestChange],
341        expected_table_versions: &HashMap<String, u64>,
342    ) -> Result<u64> {
343        if changes.is_empty() && expected_table_versions.is_empty() {
344            return Ok(self.version());
345        }
346
347        self.dataset = self
348            .publisher
349            .publish(changes, expected_table_versions)
350            .await?;
351
352        self.known_state = read_manifest_state(&self.dataset).await?;
353        Ok(self.version())
354    }
355
356    /// Current manifest version.
357    pub fn version(&self) -> u64 {
358        self.dataset.version().version
359    }
360
361    pub fn active_branch(&self) -> Option<&str> {
362        self.active_branch.as_deref()
363    }
364
365    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
366        let mut ds = self.dataset.clone();
367        ds.create_branch(name, self.version(), None)
368            .await
369            .map_err(|e| OmniError::Lance(e.to_string()))?;
370        Ok(())
371    }
372
373    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
374        let uri = manifest_uri(&self.root_uri);
375        let mut ds = Dataset::open(&uri)
376            .await
377            .map_err(|e| OmniError::Lance(e.to_string()))?;
378        ds.delete_branch(name)
379            .await
380            .map_err(|e| OmniError::Lance(e.to_string()))?;
381        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
382        self.known_state = read_manifest_state(&self.dataset).await?;
383        Ok(())
384    }
385
386    pub async fn list_branches(&self) -> Result<Vec<String>> {
387        let branches = self
388            .dataset
389            .list_branches()
390            .await
391            .map_err(|e| OmniError::Lance(e.to_string()))?;
392        let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
393        names.sort();
394        let mut all = vec!["main".to_string()];
395        all.extend(names);
396        Ok(all)
397    }
398
399    pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
400        let branches = self
401            .dataset
402            .list_branches()
403            .await
404            .map_err(|e| OmniError::Lance(e.to_string()))?;
405        let mut frontier = vec![name.to_string()];
406        let mut descendants = Vec::new();
407        let mut seen = HashSet::new();
408
409        while let Some(parent) = frontier.pop() {
410            let mut children = branches
411                .iter()
412                .filter_map(|(branch, contents)| {
413                    (contents.parent_branch.as_deref() == Some(parent.as_str()))
414                        .then_some(branch.clone())
415                })
416                .collect::<Vec<_>>();
417            children.sort();
418            for child in children {
419                if seen.insert(child.clone()) {
420                    frontier.push(child.clone());
421                    descendants.push(child);
422                }
423            }
424        }
425
426        Ok(descendants)
427    }
428
429    /// Root URI of the graph.
430    pub fn root_uri(&self) -> &str {
431        &self.root_uri
432    }
433}
434
435#[cfg(test)]
436#[path = "manifest/tests.rs"]
437mod tests;