Skip to main content

omnigraph/db/
manifest.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/layout.rs"]
10mod layout;
11#[path = "manifest/metadata.rs"]
12mod metadata;
13#[path = "manifest/migrations.rs"]
14mod migrations;
15#[path = "manifest/namespace.rs"]
16mod namespace;
17#[path = "manifest/publisher.rs"]
18mod publisher;
19#[path = "manifest/recovery.rs"]
20mod recovery;
21#[path = "manifest/repo.rs"]
22mod repo;
23#[path = "manifest/state.rs"]
24mod state;
25
26use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
27pub(crate) use metadata::TableVersionMetadata;
28#[cfg(test)]
29use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
30use namespace::open_table_at_version_from_manifest;
31pub(crate) use namespace::open_table_head_for_write;
32#[cfg(test)]
33use namespace::{branch_manifest_namespace, staged_table_namespace};
34use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
35pub(crate) use recovery::{
36    delete_sidecar, has_schema_apply_sidecar, new_sidecar, recover_manifest_drift, write_sidecar,
37    RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
38    SidecarTableRegistration, SidecarTombstone,
39};
40use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
41pub use state::SubTableEntry;
42#[cfg(test)]
43use state::string_column;
44use state::{ManifestState, read_manifest_state};
45
46const OBJECT_TYPE_TABLE: &str = "table";
47const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
48const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
49const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
50
51/// Immutable point-in-time view of the database.
52///
53/// Cheap to create (no storage I/O). All reads within a query go through one
54/// Snapshot to guarantee cross-type consistency.
55#[derive(Debug, Clone)]
56pub struct Snapshot {
57    root_uri: String,
58    version: u64,
59    entries: HashMap<String, SubTableEntry>,
60}
61
62impl Snapshot {
63    /// Open a sub-table dataset at its pinned version.
64    pub async fn open(&self, table_key: &str) -> Result<Dataset> {
65        let entry = self
66            .entries
67            .get(table_key)
68            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
69        entry.open(&self.root_uri).await
70    }
71
72    /// Manifest version this snapshot was taken from.
73    pub fn version(&self) -> u64 {
74        self.version
75    }
76
77    /// Look up a sub-table entry by key.
78    pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
79        self.entries.get(table_key)
80    }
81
82    pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
83        self.entries.values()
84    }
85}
86
87impl SubTableUpdate {
88    pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
89        self.version_metadata.to_create_table_version_request(
90            &self.table_key,
91            self.table_version,
92            self.row_count,
93            self.table_branch.as_deref(),
94        )
95    }
96}
97
98#[derive(Debug, Clone)]
99pub(crate) struct TableRegistration {
100    pub(crate) table_key: String,
101    pub(crate) table_path: String,
102}
103
104#[derive(Debug, Clone)]
105pub(crate) struct TableTombstone {
106    pub(crate) table_key: String,
107    pub(crate) tombstone_version: u64,
108}
109
110#[derive(Debug, Clone)]
111pub(crate) enum ManifestChange {
112    Update(SubTableUpdate),
113    RegisterTable(TableRegistration),
114    Tombstone(TableTombstone),
115}
116
117impl SubTableEntry {
118    pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
119        open_table_at_version_from_manifest(
120            root_uri,
121            &self.table_key,
122            self.table_branch.as_deref(),
123            self.table_version,
124        )
125        .await
126    }
127}
128
129pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
130    if let Some(type_name) = table_key.strip_prefix("node:") {
131        return Ok(format!("nodes/{}", type_name_hash(type_name)));
132    }
133    if let Some(type_name) = table_key.strip_prefix("edge:") {
134        return Ok(format!("edges/{}", type_name_hash(type_name)));
135    }
136    Err(OmniError::manifest(format!(
137        "invalid table key '{}'",
138        table_key
139    )))
140}
141
142/// An update to apply to the manifest via `commit`.
143#[derive(Debug, Clone)]
144pub struct SubTableUpdate {
145    pub table_key: String,
146    pub table_version: u64,
147    pub table_branch: Option<String>,
148    pub row_count: u64,
149    pub(crate) version_metadata: TableVersionMetadata,
150}
151
152/// Coordinates cross-dataset state through the namespace `__manifest` table.
153///
154/// Table rows register stable metadata such as location. Append-only
155/// `table_version` rows are the graph publish boundary and reconstruct the
156/// current graph snapshot by selecting the latest visible version row per
157/// sub-table.
158pub struct ManifestCoordinator {
159    root_uri: String,
160    dataset: Dataset,
161    known_state: ManifestState,
162    active_branch: Option<String>,
163    publisher: Arc<dyn ManifestBatchPublisher>,
164}
165
166impl ManifestCoordinator {
167    fn default_batch_publisher(
168        root_uri: &str,
169        active_branch: Option<&str>,
170    ) -> Arc<dyn ManifestBatchPublisher> {
171        Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
172    }
173
174    fn from_parts(
175        root_uri: &str,
176        dataset: Dataset,
177        known_state: ManifestState,
178        active_branch: Option<String>,
179        publisher: Arc<dyn ManifestBatchPublisher>,
180    ) -> Self {
181        Self {
182            root_uri: root_uri.trim_end_matches('/').to_string(),
183            dataset,
184            known_state,
185            active_branch,
186            publisher,
187        }
188    }
189
190    fn from_parts_with_default_publisher(
191        root_uri: &str,
192        dataset: Dataset,
193        known_state: ManifestState,
194        active_branch: Option<String>,
195    ) -> Self {
196        let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
197        Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
198    }
199
200    fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
201        Snapshot {
202            root_uri: root_uri.trim_end_matches('/').to_string(),
203            version: state.version,
204            entries: state
205                .entries
206                .into_iter()
207                .map(|entry| (entry.table_key.clone(), entry))
208                .collect(),
209        }
210    }
211
212    #[cfg(test)]
213    fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
214        self.publisher = publisher;
215        self
216    }
217
218    /// Create a new repo at `root_uri` from a catalog.
219    ///
220    /// Creates per-type Lance datasets and the namespace `__manifest` table.
221    pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
222        let root = root_uri.trim_end_matches('/');
223        let (dataset, known_state) = init_manifest_repo(root, catalog).await?;
224
225        Ok(Self::from_parts_with_default_publisher(
226            root,
227            dataset,
228            known_state,
229            None,
230        ))
231    }
232
233    /// Open an existing repo's manifest.
234    pub async fn open(root_uri: &str) -> Result<Self> {
235        let root = root_uri.trim_end_matches('/');
236        let (dataset, known_state) = open_manifest_repo(root, None).await?;
237        Ok(Self::from_parts_with_default_publisher(
238            root,
239            dataset,
240            known_state,
241            None,
242        ))
243    }
244
245    /// Open an existing repo's manifest at a specific branch.
246    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
247        if branch == "main" {
248            return Self::open(root_uri).await;
249        }
250
251        let root = root_uri.trim_end_matches('/');
252        let (dataset, known_state) = open_manifest_repo(root, Some(branch)).await?;
253        Ok(Self::from_parts_with_default_publisher(
254            root,
255            dataset,
256            known_state,
257            Some(branch.to_string()),
258        ))
259    }
260
261    pub async fn snapshot_at(
262        root_uri: &str,
263        branch: Option<&str>,
264        version: u64,
265    ) -> Result<Snapshot> {
266        let root = root_uri.trim_end_matches('/');
267        Ok(Self::snapshot_from_state(
268            root,
269            snapshot_state_at(root, branch, version).await?,
270        ))
271    }
272
273    /// Return a Snapshot from the known manifest state. No storage I/O.
274    pub fn snapshot(&self) -> Snapshot {
275        Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
276    }
277
278    /// Re-read manifest from storage to see other writers' commits.
279    pub async fn refresh(&mut self) -> Result<()> {
280        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
281        self.known_state = read_manifest_state(&self.dataset).await?;
282        Ok(())
283    }
284
285    /// Commit updated sub-table versions to the manifest.
286    ///
287    /// Atomically inserts one immutable `table_version` row per updated table.
288    /// The merge-insert commit on `__manifest` is the graph-level publish point.
289    pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
290        let changes = updates
291            .iter()
292            .cloned()
293            .map(ManifestChange::Update)
294            .collect::<Vec<_>>();
295        self.commit_changes(&changes).await
296    }
297
298    /// Same as [`commit`], but with caller-supplied per-table expected
299    /// versions used for optimistic concurrency control. Each entry asserts
300    /// the manifest's current latest non-tombstoned `table_version` for that
301    /// `table_key` is exactly what the caller observed; mismatches surface
302    /// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`.
303    pub async fn commit_with_expected(
304        &mut self,
305        updates: &[SubTableUpdate],
306        expected_table_versions: &HashMap<String, u64>,
307    ) -> Result<u64> {
308        let changes = updates
309            .iter()
310            .cloned()
311            .map(ManifestChange::Update)
312            .collect::<Vec<_>>();
313        self.commit_changes_with_expected(&changes, expected_table_versions)
314            .await
315    }
316
317    pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
318        self.commit_changes_with_expected(changes, &HashMap::new())
319            .await
320    }
321
322    pub(crate) async fn commit_changes_with_expected(
323        &mut self,
324        changes: &[ManifestChange],
325        expected_table_versions: &HashMap<String, u64>,
326    ) -> Result<u64> {
327        if changes.is_empty() && expected_table_versions.is_empty() {
328            return Ok(self.version());
329        }
330
331        self.dataset = self
332            .publisher
333            .publish(changes, expected_table_versions)
334            .await?;
335
336        self.known_state = read_manifest_state(&self.dataset).await?;
337        Ok(self.version())
338    }
339
340    /// Current manifest version.
341    pub fn version(&self) -> u64 {
342        self.dataset.version().version
343    }
344
345    pub fn active_branch(&self) -> Option<&str> {
346        self.active_branch.as_deref()
347    }
348
349    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
350        let mut ds = self.dataset.clone();
351        ds.create_branch(name, self.version(), None)
352            .await
353            .map_err(|e| OmniError::Lance(e.to_string()))?;
354        Ok(())
355    }
356
357    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
358        let uri = manifest_uri(&self.root_uri);
359        let mut ds = Dataset::open(&uri)
360            .await
361            .map_err(|e| OmniError::Lance(e.to_string()))?;
362        ds.delete_branch(name)
363            .await
364            .map_err(|e| OmniError::Lance(e.to_string()))?;
365        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
366        self.known_state = read_manifest_state(&self.dataset).await?;
367        Ok(())
368    }
369
370    pub async fn list_branches(&self) -> Result<Vec<String>> {
371        let branches = self
372            .dataset
373            .list_branches()
374            .await
375            .map_err(|e| OmniError::Lance(e.to_string()))?;
376        let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
377        names.sort();
378        let mut all = vec!["main".to_string()];
379        all.extend(names);
380        Ok(all)
381    }
382
383    pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
384        let branches = self
385            .dataset
386            .list_branches()
387            .await
388            .map_err(|e| OmniError::Lance(e.to_string()))?;
389        let mut frontier = vec![name.to_string()];
390        let mut descendants = Vec::new();
391        let mut seen = HashSet::new();
392
393        while let Some(parent) = frontier.pop() {
394            let mut children = branches
395                .iter()
396                .filter_map(|(branch, contents)| {
397                    (contents.parent_branch.as_deref() == Some(parent.as_str()))
398                        .then_some(branch.clone())
399                })
400                .collect::<Vec<_>>();
401            children.sort();
402            for child in children {
403                if seen.insert(child.clone()) {
404                    frontier.push(child.clone());
405                    descendants.push(child);
406                }
407            }
408        }
409
410        Ok(descendants)
411    }
412
413    /// Root URI of the repo.
414    pub fn root_uri(&self) -> &str {
415        &self.root_uri
416    }
417}
418
419#[cfg(test)]
420#[path = "manifest/tests.rs"]
421mod tests;