Skip to main content

omnigraph/db/
manifest.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/graph.rs"]
10mod graph;
11#[path = "manifest/layout.rs"]
12mod layout;
13#[path = "manifest/metadata.rs"]
14mod metadata;
15#[path = "manifest/migrations.rs"]
16mod migrations;
17#[path = "manifest/namespace.rs"]
18mod namespace;
19#[path = "manifest/publisher.rs"]
20mod publisher;
21#[path = "manifest/recovery.rs"]
22mod recovery;
23#[path = "manifest/state.rs"]
24mod state;
25
26use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
27use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_hash};
28pub(crate) use metadata::TableVersionMetadata;
29#[cfg(test)]
30use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
31pub(crate) use namespace::open_table_head_for_write;
32#[cfg(test)]
33use namespace::{branch_manifest_namespace, staged_table_namespace};
34use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
35pub(crate) use recovery::{
36    RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
37    SidecarTableRegistration, SidecarTombstone, confirm_sidecar_phase_b, delete_sidecar,
38    has_schema_apply_sidecar, heal_pending_sidecars_roll_forward, list_sidecars, new_sidecar,
39    recover_manifest_drift, schema_apply_serial_queue_key, write_sidecar,
40};
41pub use state::SubTableEntry;
42#[cfg(test)]
43use state::string_column;
44use state::{ManifestState, read_manifest_state};
45
46const OBJECT_TYPE_TABLE: &str = "table";
47const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
48const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
49const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
50
51/// Apply pending internal-schema migrations against `__manifest` on the
52/// open-for-write path, independent of a publish.
53///
54/// `Omnigraph::open(ReadWrite)` calls this before the coordinator reads branch
55/// state, so branch-observing code (`branch_list`, the schema-apply
56/// blocking-branch checks) sees the post-migration graph. In particular the
57/// v2→v3 step sweeps legacy `__run__*` staging branches off `__manifest`
58/// (MR-770); running it here closes the window where those branches would
59/// otherwise block schema apply before the first publish runs the migration.
60///
61/// Idempotent: a no-op stamp read when the on-disk version already matches.
62pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
63    let mut dataset = open_manifest_dataset(root_uri, None).await?;
64    migrations::migrate_internal_schema(&mut dataset).await
65}
66
67/// Immutable point-in-time view of the database.
68///
69/// Cheap to create (no storage I/O). All reads within a query go through one
70/// Snapshot to guarantee cross-type consistency.
71#[derive(Debug, Clone)]
72pub struct Snapshot {
73    root_uri: String,
74    version: u64,
75    entries: HashMap<String, SubTableEntry>,
76    /// Per-graph read caches (shared `Session` + held-handle cache), injected by
77    /// `Omnigraph::resolved_target` for live Branch reads so table opens reuse
78    /// handles (0 IO on a warm repeat) and one `Session`. `None` for write-prelude
79    /// snapshots, time-travel / Snapshot-id reads, and directly-built test
80    /// snapshots, which fall back to a plain open.
81    read_caches: Option<Arc<crate::runtime_cache::ReadCaches>>,
82}
83
84impl Snapshot {
85    /// Open a sub-table dataset at its pinned version. With read caches present
86    /// (live Branch reads), reuse a held handle through the cache (0 open IO on a
87    /// warm repeat) and the shared `Session`; otherwise plain-open (Fix 2).
88    pub async fn open(&self, table_key: &str) -> Result<Dataset> {
89        let entry = self
90            .entries
91            .get(table_key)
92            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
93        match &self.read_caches {
94            Some(caches) => {
95                let location = table_uri_for_path(
96                    &self.root_uri,
97                    &entry.table_path,
98                    entry.table_branch.as_deref(),
99                );
100                caches
101                    .handles
102                    .get_or_open(
103                        &entry.table_path,
104                        entry.table_branch.as_deref(),
105                        entry.table_version,
106                        entry.version_metadata.e_tag(),
107                        &location,
108                        Some(&caches.session),
109                    )
110                    .await
111            }
112            None => entry.open(&self.root_uri).await,
113        }
114    }
115
116    /// Attach per-graph read caches (shared `Session` + handle cache) so this
117    /// snapshot's table opens reuse handles and the session. Set by
118    /// `Omnigraph::resolved_target` for live Branch reads only.
119    pub(crate) fn set_read_caches(&mut self, caches: Arc<crate::runtime_cache::ReadCaches>) {
120        self.read_caches = Some(caches);
121    }
122
123    /// Manifest version this snapshot was taken from.
124    pub fn version(&self) -> u64 {
125        self.version
126    }
127
128    /// Look up a sub-table entry by key.
129    pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
130        self.entries.get(table_key)
131    }
132
133    pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
134        self.entries.values()
135    }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub(crate) struct ManifestIncarnation {
140    pub(crate) version: u64,
141    pub(crate) e_tag: Option<String>,
142    timestamp_nanos: Option<u128>,
143}
144
145impl ManifestIncarnation {
146    pub(crate) fn matches(&self, held: &Self) -> bool {
147        if self.version != held.version {
148            return false;
149        }
150        match (&self.e_tag, &held.e_tag) {
151            (Some(latest), Some(current)) => latest == current,
152            _ => match (self.timestamp_nanos, held.timestamp_nanos) {
153                (Some(latest), Some(current)) => latest == current,
154                // Some object stores can omit both e_tag and manifest timestamp
155                // from the reachable API. In that narrow case the version-number
156                // probe is the strongest available identity.
157                _ => true,
158            },
159        }
160    }
161}
162
163impl SubTableUpdate {
164    pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
165        self.version_metadata.to_create_table_version_request(
166            &self.table_key,
167            self.table_version,
168            self.row_count,
169            self.table_branch.as_deref(),
170        )
171    }
172}
173
174#[derive(Debug, Clone)]
175pub(crate) struct TableRegistration {
176    pub(crate) table_key: String,
177    pub(crate) table_path: String,
178}
179
180#[derive(Debug, Clone)]
181pub(crate) struct TableTombstone {
182    pub(crate) table_key: String,
183    pub(crate) tombstone_version: u64,
184}
185
186#[derive(Debug, Clone)]
187pub(crate) enum ManifestChange {
188    Update(SubTableUpdate),
189    RegisterTable(TableRegistration),
190    Tombstone(TableTombstone),
191}
192
193impl SubTableEntry {
194    /// Open this sub-table at its pinned version directly by location (Fix 2),
195    /// without the Lance namespace — which would full-scan `__manifest` twice per
196    /// open (`describe_table` + `describe_table_version`). The resolved Snapshot
197    /// already holds the path, version, and branch. Branches are Lance native
198    /// branches, so `with_branch` resolves `{base}/tree/{branch}` from the base
199    /// URI; main uses `with_version`.
200    pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
201        // The branch-qualified location is the dataset that physically holds this
202        // version: main at `{table_path}`, a branch at
203        // `{table_path}/tree/{branch}` (Lance native-branch storage). `with_version`
204        // then resolves the version within THAT dataset's `_versions` — a branch
205        // version lives under `tree/{branch}/_versions`, not the base. This
206        // matches the physical layout the namespace path resolved, without the
207        // per-open `__manifest` scan.
208        let location = table_uri_for_path(root_uri, &self.table_path, self.table_branch.as_deref());
209        // Route through the instrumented data-table opener (Fix 3). With no
210        // session this is exactly the Fix-2 `from_uri(location).with_version`.
211        // This is the uncached fallback (a snapshot with no read caches); the
212        // cached path (`Snapshot::open` → handle cache) calls the same opener on
213        // a miss with the shared session, so both paths count on the per-query
214        // `table_wrapper`.
215        crate::instrumentation::open_table_dataset(&location, self.table_version, None).await
216    }
217}
218
219pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
220    if let Some(type_name) = table_key.strip_prefix("node:") {
221        return Ok(format!("nodes/{}", type_name_hash(type_name)));
222    }
223    if let Some(type_name) = table_key.strip_prefix("edge:") {
224        return Ok(format!("edges/{}", type_name_hash(type_name)));
225    }
226    Err(OmniError::manifest(format!(
227        "invalid table key '{}'",
228        table_key
229    )))
230}
231
232/// An update to apply to the manifest via `commit`.
233#[derive(Debug, Clone)]
234pub struct SubTableUpdate {
235    pub table_key: String,
236    pub table_version: u64,
237    pub table_branch: Option<String>,
238    pub row_count: u64,
239    pub(crate) version_metadata: TableVersionMetadata,
240}
241
242/// Coordinates cross-dataset state through the namespace `__manifest` table.
243///
244/// Table rows register stable metadata such as location. Append-only
245/// `table_version` rows are the graph publish boundary and reconstruct the
246/// current graph snapshot by selecting the latest visible version row per
247/// sub-table.
248pub struct ManifestCoordinator {
249    root_uri: String,
250    dataset: Dataset,
251    known_state: ManifestState,
252    active_branch: Option<String>,
253    publisher: Arc<dyn ManifestBatchPublisher>,
254}
255
256impl ManifestCoordinator {
257    fn default_batch_publisher(
258        root_uri: &str,
259        active_branch: Option<&str>,
260    ) -> Arc<dyn ManifestBatchPublisher> {
261        Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
262    }
263
264    fn from_parts(
265        root_uri: &str,
266        dataset: Dataset,
267        known_state: ManifestState,
268        active_branch: Option<String>,
269        publisher: Arc<dyn ManifestBatchPublisher>,
270    ) -> Self {
271        Self {
272            root_uri: root_uri.trim_end_matches('/').to_string(),
273            dataset,
274            known_state,
275            active_branch,
276            publisher,
277        }
278    }
279
280    fn from_parts_with_default_publisher(
281        root_uri: &str,
282        dataset: Dataset,
283        known_state: ManifestState,
284        active_branch: Option<String>,
285    ) -> Self {
286        let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
287        Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
288    }
289
290    fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
291        Snapshot {
292            root_uri: root_uri.trim_end_matches('/').to_string(),
293            version: state.version,
294            entries: state
295                .entries
296                .into_iter()
297                .map(|entry| (entry.table_key.clone(), entry))
298                .collect(),
299            read_caches: None,
300        }
301    }
302
303    #[cfg(test)]
304    fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
305        self.publisher = publisher;
306        self
307    }
308
309    /// Create a new graph at `root_uri` from a catalog.
310    ///
311    /// Creates per-type Lance datasets and the namespace `__manifest` table.
312    pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
313        let root = root_uri.trim_end_matches('/');
314        let (dataset, known_state) = init_manifest_graph(root, catalog).await?;
315
316        Ok(Self::from_parts_with_default_publisher(
317            root,
318            dataset,
319            known_state,
320            None,
321        ))
322    }
323
324    /// Open an existing graph's manifest.
325    pub async fn open(root_uri: &str) -> Result<Self> {
326        let root = root_uri.trim_end_matches('/');
327        let (dataset, known_state) = open_manifest_graph(root, None).await?;
328        Ok(Self::from_parts_with_default_publisher(
329            root,
330            dataset,
331            known_state,
332            None,
333        ))
334    }
335
336    /// Open an existing graph's manifest at a specific branch.
337    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
338        if branch == "main" {
339            return Self::open(root_uri).await;
340        }
341
342        let root = root_uri.trim_end_matches('/');
343        let (dataset, known_state) = open_manifest_graph(root, Some(branch)).await?;
344        Ok(Self::from_parts_with_default_publisher(
345            root,
346            dataset,
347            known_state,
348            Some(branch.to_string()),
349        ))
350    }
351
352    pub async fn snapshot_at(
353        root_uri: &str,
354        branch: Option<&str>,
355        version: u64,
356    ) -> Result<Snapshot> {
357        let root = root_uri.trim_end_matches('/');
358        Ok(Self::snapshot_from_state(
359            root,
360            snapshot_state_at(root, branch, version).await?,
361        ))
362    }
363
364    /// Return a Snapshot from the known manifest state. No storage I/O.
365    pub fn snapshot(&self) -> Snapshot {
366        Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
367    }
368
369    /// Re-read manifest from storage to see other writers' commits.
370    pub async fn refresh(&mut self) -> Result<()> {
371        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
372        self.known_state = read_manifest_state(&self.dataset).await?;
373        Ok(())
374    }
375
376    /// Commit updated sub-table versions to the manifest.
377    ///
378    /// Atomically inserts one immutable `table_version` row per updated table.
379    /// The merge-insert commit on `__manifest` is the graph-level publish point.
380    pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
381        let changes = updates
382            .iter()
383            .cloned()
384            .map(ManifestChange::Update)
385            .collect::<Vec<_>>();
386        self.commit_changes(&changes).await
387    }
388
389    /// Same as [`commit`], but with caller-supplied per-table expected
390    /// versions used for optimistic concurrency control. Each entry asserts
391    /// the manifest's current latest non-tombstoned `table_version` for that
392    /// `table_key` is exactly what the caller observed; mismatches surface
393    /// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`.
394    pub async fn commit_with_expected(
395        &mut self,
396        updates: &[SubTableUpdate],
397        expected_table_versions: &HashMap<String, u64>,
398    ) -> Result<u64> {
399        let changes = updates
400            .iter()
401            .cloned()
402            .map(ManifestChange::Update)
403            .collect::<Vec<_>>();
404        self.commit_changes_with_expected(&changes, expected_table_versions)
405            .await
406    }
407
408    pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
409        self.commit_changes_with_expected(changes, &HashMap::new())
410            .await
411    }
412
413    pub(crate) async fn commit_changes_with_expected(
414        &mut self,
415        changes: &[ManifestChange],
416        expected_table_versions: &HashMap<String, u64>,
417    ) -> Result<u64> {
418        if changes.is_empty() && expected_table_versions.is_empty() {
419            return Ok(self.version());
420        }
421
422        self.dataset = self
423            .publisher
424            .publish(changes, expected_table_versions)
425            .await?;
426
427        self.known_state = read_manifest_state(&self.dataset).await?;
428        Ok(self.version())
429    }
430
431    /// Current manifest version.
432    pub fn version(&self) -> u64 {
433        self.dataset.version().version
434    }
435
436    /// Latest committed manifest version on disk (one object-store op, no row
437    /// scan). The freshness probe for warm reuse: compare against `version()`
438    /// (the held handle's pinned version) to decide whether to refresh.
439    pub async fn probe_latest_version(&self) -> Result<u64> {
440        self.dataset
441            .latest_version_id()
442            .await
443            .map_err(|e| OmniError::Lance(e.to_string()))
444    }
445
446    pub(crate) fn incarnation(&self) -> ManifestIncarnation {
447        ManifestIncarnation {
448            version: self.version(),
449            e_tag: self.dataset.manifest_location().e_tag.clone(),
450            timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
451        }
452    }
453
454    /// Latest committed manifest identity. Main cannot be deleted/recreated, so
455    /// the cheap version-number probe is sufficient there. Non-main Lance
456    /// branches can be deleted and recreated with the same version number, so
457    /// load the latest manifest location and compare its e_tag / timestamp too.
458    pub(crate) async fn probe_latest_incarnation(&self) -> Result<ManifestIncarnation> {
459        if self.active_branch.is_none() {
460            return Ok(ManifestIncarnation {
461                version: self.probe_latest_version().await?,
462                e_tag: self.dataset.manifest_location().e_tag.clone(),
463                timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
464            });
465        }
466        let (manifest, location) = self
467            .dataset
468            .latest_manifest()
469            .await
470            .map_err(|e| OmniError::Lance(e.to_string()))?;
471        Ok(ManifestIncarnation {
472            version: manifest.version,
473            e_tag: location.e_tag,
474            timestamp_nanos: Some(manifest.timestamp_nanos),
475        })
476    }
477
478    pub fn active_branch(&self) -> Option<&str> {
479        self.active_branch.as_deref()
480    }
481
482    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
483        let mut ds = self.dataset.clone();
484        ds.create_branch(name, self.version(), None)
485            .await
486            .map_err(|e| OmniError::Lance(e.to_string()))?;
487        Ok(())
488    }
489
490    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
491        let uri = manifest_uri(&self.root_uri);
492        let mut ds = Dataset::open(&uri)
493            .await
494            .map_err(|e| OmniError::Lance(e.to_string()))?;
495        ds.delete_branch(name)
496            .await
497            .map_err(|e| OmniError::Lance(e.to_string()))?;
498        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
499        self.known_state = read_manifest_state(&self.dataset).await?;
500        Ok(())
501    }
502
503    pub async fn list_branches(&self) -> Result<Vec<String>> {
504        let branches = self
505            .dataset
506            .list_branches()
507            .await
508            .map_err(|e| OmniError::Lance(e.to_string()))?;
509        let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
510        names.sort();
511        let mut all = vec!["main".to_string()];
512        all.extend(names);
513        Ok(all)
514    }
515
516    pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
517        let branches = self
518            .dataset
519            .list_branches()
520            .await
521            .map_err(|e| OmniError::Lance(e.to_string()))?;
522        let mut frontier = vec![name.to_string()];
523        let mut descendants = Vec::new();
524        let mut seen = HashSet::new();
525
526        while let Some(parent) = frontier.pop() {
527            let mut children = branches
528                .iter()
529                .filter_map(|(branch, contents)| {
530                    (contents.parent_branch.as_deref() == Some(parent.as_str()))
531                        .then_some(branch.clone())
532                })
533                .collect::<Vec<_>>();
534            children.sort();
535            for child in children {
536                if seen.insert(child.clone()) {
537                    frontier.push(child.clone());
538                    descendants.push(child);
539                }
540            }
541        }
542
543        Ok(descendants)
544    }
545
546    /// Root URI of the graph.
547    pub fn root_uri(&self) -> &str {
548        &self.root_uri
549    }
550}
551
552#[cfg(test)]
553#[path = "manifest/tests.rs"]
554mod tests;