Skip to main content

omnigraph/db/
manifest.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/graph.rs"]
10mod graph;
11#[path = "manifest/layout.rs"]
12mod layout;
13#[path = "manifest/metadata.rs"]
14mod metadata;
15#[path = "manifest/migrations.rs"]
16mod migrations;
17#[path = "manifest/namespace.rs"]
18mod namespace;
19#[path = "manifest/publisher.rs"]
20mod publisher;
21#[path = "manifest/recovery.rs"]
22mod recovery;
23#[path = "manifest/state.rs"]
24mod state;
25
26use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
27use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
28pub(crate) use metadata::TableVersionMetadata;
29#[cfg(test)]
30use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
31use namespace::open_table_at_version_from_manifest;
32pub(crate) use namespace::open_table_head_for_write;
33#[cfg(test)]
34use namespace::{branch_manifest_namespace, staged_table_namespace};
35use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
36pub(crate) use recovery::{
37    RecoveryMode, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration,
38    SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, heal_pending_sidecars_roll_forward,
39    list_sidecars, new_sidecar, recover_manifest_drift, schema_apply_serial_queue_key,
40    write_sidecar,
41};
42pub use state::SubTableEntry;
43#[cfg(test)]
44use state::string_column;
45use state::{ManifestState, read_manifest_state};
46
47const OBJECT_TYPE_TABLE: &str = "table";
48const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
49const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
50const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
51
52/// Apply pending internal-schema migrations against `__manifest` on the
53/// open-for-write path, independent of a publish.
54///
55/// `Omnigraph::open(ReadWrite)` calls this before the coordinator reads branch
56/// state, so branch-observing code (`branch_list`, the schema-apply
57/// blocking-branch checks) sees the post-migration graph. In particular the
58/// v2→v3 step sweeps legacy `__run__*` staging branches off `__manifest`
59/// (MR-770); running it here closes the window where those branches would
60/// otherwise block schema apply before the first publish runs the migration.
61///
62/// Idempotent: a no-op stamp read when the on-disk version already matches.
63pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
64    let mut dataset = open_manifest_dataset(root_uri, None).await?;
65    migrations::migrate_internal_schema(&mut dataset).await
66}
67
68/// Immutable point-in-time view of the database.
69///
70/// Cheap to create (no storage I/O). All reads within a query go through one
71/// Snapshot to guarantee cross-type consistency.
72#[derive(Debug, Clone)]
73pub struct Snapshot {
74    root_uri: String,
75    version: u64,
76    entries: HashMap<String, SubTableEntry>,
77}
78
79impl Snapshot {
80    /// Open a sub-table dataset at its pinned version.
81    pub async fn open(&self, table_key: &str) -> Result<Dataset> {
82        let entry = self
83            .entries
84            .get(table_key)
85            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
86        entry.open(&self.root_uri).await
87    }
88
89    /// Manifest version this snapshot was taken from.
90    pub fn version(&self) -> u64 {
91        self.version
92    }
93
94    /// Look up a sub-table entry by key.
95    pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
96        self.entries.get(table_key)
97    }
98
99    pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
100        self.entries.values()
101    }
102}
103
104impl SubTableUpdate {
105    pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
106        self.version_metadata.to_create_table_version_request(
107            &self.table_key,
108            self.table_version,
109            self.row_count,
110            self.table_branch.as_deref(),
111        )
112    }
113}
114
115#[derive(Debug, Clone)]
116pub(crate) struct TableRegistration {
117    pub(crate) table_key: String,
118    pub(crate) table_path: String,
119}
120
121#[derive(Debug, Clone)]
122pub(crate) struct TableTombstone {
123    pub(crate) table_key: String,
124    pub(crate) tombstone_version: u64,
125}
126
127#[derive(Debug, Clone)]
128pub(crate) enum ManifestChange {
129    Update(SubTableUpdate),
130    RegisterTable(TableRegistration),
131    Tombstone(TableTombstone),
132}
133
134impl SubTableEntry {
135    pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
136        open_table_at_version_from_manifest(
137            root_uri,
138            &self.table_key,
139            self.table_branch.as_deref(),
140            self.table_version,
141        )
142        .await
143    }
144}
145
146pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
147    if let Some(type_name) = table_key.strip_prefix("node:") {
148        return Ok(format!("nodes/{}", type_name_hash(type_name)));
149    }
150    if let Some(type_name) = table_key.strip_prefix("edge:") {
151        return Ok(format!("edges/{}", type_name_hash(type_name)));
152    }
153    Err(OmniError::manifest(format!(
154        "invalid table key '{}'",
155        table_key
156    )))
157}
158
159/// An update to apply to the manifest via `commit`.
160#[derive(Debug, Clone)]
161pub struct SubTableUpdate {
162    pub table_key: String,
163    pub table_version: u64,
164    pub table_branch: Option<String>,
165    pub row_count: u64,
166    pub(crate) version_metadata: TableVersionMetadata,
167}
168
169/// Coordinates cross-dataset state through the namespace `__manifest` table.
170///
171/// Table rows register stable metadata such as location. Append-only
172/// `table_version` rows are the graph publish boundary and reconstruct the
173/// current graph snapshot by selecting the latest visible version row per
174/// sub-table.
175pub struct ManifestCoordinator {
176    root_uri: String,
177    dataset: Dataset,
178    known_state: ManifestState,
179    active_branch: Option<String>,
180    publisher: Arc<dyn ManifestBatchPublisher>,
181}
182
183impl ManifestCoordinator {
184    fn default_batch_publisher(
185        root_uri: &str,
186        active_branch: Option<&str>,
187    ) -> Arc<dyn ManifestBatchPublisher> {
188        Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
189    }
190
191    fn from_parts(
192        root_uri: &str,
193        dataset: Dataset,
194        known_state: ManifestState,
195        active_branch: Option<String>,
196        publisher: Arc<dyn ManifestBatchPublisher>,
197    ) -> Self {
198        Self {
199            root_uri: root_uri.trim_end_matches('/').to_string(),
200            dataset,
201            known_state,
202            active_branch,
203            publisher,
204        }
205    }
206
207    fn from_parts_with_default_publisher(
208        root_uri: &str,
209        dataset: Dataset,
210        known_state: ManifestState,
211        active_branch: Option<String>,
212    ) -> Self {
213        let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
214        Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
215    }
216
217    fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
218        Snapshot {
219            root_uri: root_uri.trim_end_matches('/').to_string(),
220            version: state.version,
221            entries: state
222                .entries
223                .into_iter()
224                .map(|entry| (entry.table_key.clone(), entry))
225                .collect(),
226        }
227    }
228
229    #[cfg(test)]
230    fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
231        self.publisher = publisher;
232        self
233    }
234
235    /// Create a new graph at `root_uri` from a catalog.
236    ///
237    /// Creates per-type Lance datasets and the namespace `__manifest` table.
238    pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
239        let root = root_uri.trim_end_matches('/');
240        let (dataset, known_state) = init_manifest_graph(root, catalog).await?;
241
242        Ok(Self::from_parts_with_default_publisher(
243            root,
244            dataset,
245            known_state,
246            None,
247        ))
248    }
249
250    /// Open an existing graph's manifest.
251    pub async fn open(root_uri: &str) -> Result<Self> {
252        let root = root_uri.trim_end_matches('/');
253        let (dataset, known_state) = open_manifest_graph(root, None).await?;
254        Ok(Self::from_parts_with_default_publisher(
255            root,
256            dataset,
257            known_state,
258            None,
259        ))
260    }
261
262    /// Open an existing graph's manifest at a specific branch.
263    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
264        if branch == "main" {
265            return Self::open(root_uri).await;
266        }
267
268        let root = root_uri.trim_end_matches('/');
269        let (dataset, known_state) = open_manifest_graph(root, Some(branch)).await?;
270        Ok(Self::from_parts_with_default_publisher(
271            root,
272            dataset,
273            known_state,
274            Some(branch.to_string()),
275        ))
276    }
277
278    pub async fn snapshot_at(
279        root_uri: &str,
280        branch: Option<&str>,
281        version: u64,
282    ) -> Result<Snapshot> {
283        let root = root_uri.trim_end_matches('/');
284        Ok(Self::snapshot_from_state(
285            root,
286            snapshot_state_at(root, branch, version).await?,
287        ))
288    }
289
290    /// Return a Snapshot from the known manifest state. No storage I/O.
291    pub fn snapshot(&self) -> Snapshot {
292        Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
293    }
294
295    /// Re-read manifest from storage to see other writers' commits.
296    pub async fn refresh(&mut self) -> Result<()> {
297        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
298        self.known_state = read_manifest_state(&self.dataset).await?;
299        Ok(())
300    }
301
302    /// Commit updated sub-table versions to the manifest.
303    ///
304    /// Atomically inserts one immutable `table_version` row per updated table.
305    /// The merge-insert commit on `__manifest` is the graph-level publish point.
306    pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
307        let changes = updates
308            .iter()
309            .cloned()
310            .map(ManifestChange::Update)
311            .collect::<Vec<_>>();
312        self.commit_changes(&changes).await
313    }
314
315    /// Same as [`commit`], but with caller-supplied per-table expected
316    /// versions used for optimistic concurrency control. Each entry asserts
317    /// the manifest's current latest non-tombstoned `table_version` for that
318    /// `table_key` is exactly what the caller observed; mismatches surface
319    /// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`.
320    pub async fn commit_with_expected(
321        &mut self,
322        updates: &[SubTableUpdate],
323        expected_table_versions: &HashMap<String, u64>,
324    ) -> Result<u64> {
325        let changes = updates
326            .iter()
327            .cloned()
328            .map(ManifestChange::Update)
329            .collect::<Vec<_>>();
330        self.commit_changes_with_expected(&changes, expected_table_versions)
331            .await
332    }
333
334    pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
335        self.commit_changes_with_expected(changes, &HashMap::new())
336            .await
337    }
338
339    pub(crate) async fn commit_changes_with_expected(
340        &mut self,
341        changes: &[ManifestChange],
342        expected_table_versions: &HashMap<String, u64>,
343    ) -> Result<u64> {
344        if changes.is_empty() && expected_table_versions.is_empty() {
345            return Ok(self.version());
346        }
347
348        self.dataset = self
349            .publisher
350            .publish(changes, expected_table_versions)
351            .await?;
352
353        self.known_state = read_manifest_state(&self.dataset).await?;
354        Ok(self.version())
355    }
356
357    /// Current manifest version.
358    pub fn version(&self) -> u64 {
359        self.dataset.version().version
360    }
361
362    pub fn active_branch(&self) -> Option<&str> {
363        self.active_branch.as_deref()
364    }
365
366    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
367        let mut ds = self.dataset.clone();
368        ds.create_branch(name, self.version(), None)
369            .await
370            .map_err(|e| OmniError::Lance(e.to_string()))?;
371        Ok(())
372    }
373
374    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
375        let uri = manifest_uri(&self.root_uri);
376        let mut ds = Dataset::open(&uri)
377            .await
378            .map_err(|e| OmniError::Lance(e.to_string()))?;
379        ds.delete_branch(name)
380            .await
381            .map_err(|e| OmniError::Lance(e.to_string()))?;
382        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
383        self.known_state = read_manifest_state(&self.dataset).await?;
384        Ok(())
385    }
386
387    pub async fn list_branches(&self) -> Result<Vec<String>> {
388        let branches = self
389            .dataset
390            .list_branches()
391            .await
392            .map_err(|e| OmniError::Lance(e.to_string()))?;
393        let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
394        names.sort();
395        let mut all = vec!["main".to_string()];
396        all.extend(names);
397        Ok(all)
398    }
399
400    pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
401        let branches = self
402            .dataset
403            .list_branches()
404            .await
405            .map_err(|e| OmniError::Lance(e.to_string()))?;
406        let mut frontier = vec![name.to_string()];
407        let mut descendants = Vec::new();
408        let mut seen = HashSet::new();
409
410        while let Some(parent) = frontier.pop() {
411            let mut children = branches
412                .iter()
413                .filter_map(|(branch, contents)| {
414                    (contents.parent_branch.as_deref() == Some(parent.as_str()))
415                        .then_some(branch.clone())
416                })
417                .collect::<Vec<_>>();
418            children.sort();
419            for child in children {
420                if seen.insert(child.clone()) {
421                    frontier.push(child.clone());
422                    descendants.push(child);
423                }
424            }
425        }
426
427        Ok(descendants)
428    }
429
430    /// Root URI of the graph.
431    pub fn root_uri(&self) -> &str {
432        &self.root_uri
433    }
434}
435
436#[cfg(test)]
437#[path = "manifest/tests.rs"]
438mod tests;