Skip to main content

omnigraph/db/
manifest.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use crate::error::{OmniError, Result};
5use lance::Dataset;
6use lance_namespace::models::CreateTableVersionRequest;
7use omnigraph_compiler::catalog::Catalog;
8
9#[path = "manifest/graph.rs"]
10mod graph;
11#[path = "manifest/layout.rs"]
12mod layout;
13#[path = "manifest/metadata.rs"]
14mod metadata;
15#[path = "manifest/migrations.rs"]
16mod migrations;
17// Entirely test-only since RFC-013 step 3a: with both reads (Fix 2) and writes
18// bypassing the Lance namespace, nothing in production routes through it; the
19// `LanceNamespace` impls are retained only to validate the contract in unit tests.
20#[cfg(test)]
21#[path = "manifest/namespace.rs"]
22mod namespace;
23#[path = "manifest/publisher.rs"]
24mod publisher;
25#[path = "manifest/recovery.rs"]
26mod recovery;
27#[path = "manifest/state.rs"]
28mod state;
29
30use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
31use layout::{open_manifest_dataset, table_uri_for_path, type_name_hash};
32pub(crate) use layout::manifest_uri;
33pub(crate) use metadata::TableVersionMetadata;
34#[cfg(test)]
35use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
36#[cfg(test)]
37use namespace::{branch_manifest_namespace, staged_table_namespace};
38use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
39pub(crate) use recovery::{
40    RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
41    SidecarTableRegistration, SidecarTombstone, confirm_sidecar_phase_b, delete_sidecar,
42    has_schema_apply_sidecar, heal_pending_sidecars_roll_forward, list_sidecars, new_sidecar,
43    recover_manifest_drift, schema_apply_serial_queue_key, write_sidecar,
44};
45pub use state::SubTableEntry;
46#[cfg(test)]
47use state::string_column;
48use state::{ManifestState, read_manifest_state};
49
50const OBJECT_TYPE_TABLE: &str = "table";
51const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
52const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
53const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
54
55/// Apply pending internal-schema migrations against `__manifest` on the
56/// open-for-write path, independent of a publish.
57///
58/// `Omnigraph::open(ReadWrite)` calls this before the coordinator reads branch
59/// state, so branch-observing code (`branch_list`, the schema-apply
60/// blocking-branch checks) sees the post-migration graph. In particular the
61/// v2→v3 step sweeps legacy `__run__*` staging branches off `__manifest`
62/// (MR-770); running it here closes the window where those branches would
63/// otherwise block schema apply before the first publish runs the migration.
64///
65/// Idempotent: a no-op stamp read when the on-disk version already matches.
66pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
67    let mut dataset = open_manifest_dataset(root_uri, None).await?;
68    migrations::migrate_internal_schema(&mut dataset).await
69}
70
71/// Immutable point-in-time view of the database.
72///
73/// Cheap to create (no storage I/O). All reads within a query go through one
74/// Snapshot to guarantee cross-type consistency.
75#[derive(Debug, Clone)]
76pub struct Snapshot {
77    root_uri: String,
78    version: u64,
79    entries: HashMap<String, SubTableEntry>,
80    /// Per-graph read caches (shared `Session` + held-handle cache), injected by
81    /// `Omnigraph::resolved_target` for live Branch reads so table opens reuse
82    /// handles (0 IO on a warm repeat) and one `Session`. `None` for write-prelude
83    /// snapshots, time-travel / Snapshot-id reads, and directly-built test
84    /// snapshots, which fall back to a plain open.
85    read_caches: Option<Arc<crate::runtime_cache::ReadCaches>>,
86}
87
88impl Snapshot {
89    /// Open a sub-table dataset at its pinned version. With read caches present
90    /// (live Branch reads), reuse a held handle through the cache (0 open IO on a
91    /// warm repeat) and the shared `Session`; otherwise plain-open (Fix 2).
92    pub async fn open(&self, table_key: &str) -> Result<Dataset> {
93        let entry = self
94            .entries
95            .get(table_key)
96            .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
97        match &self.read_caches {
98            Some(caches) => {
99                let location = table_uri_for_path(
100                    &self.root_uri,
101                    &entry.table_path,
102                    entry.table_branch.as_deref(),
103                );
104                caches
105                    .handles
106                    .get_or_open(
107                        &entry.table_path,
108                        entry.table_branch.as_deref(),
109                        entry.table_version,
110                        entry.version_metadata.e_tag(),
111                        &location,
112                        Some(&caches.session),
113                    )
114                    .await
115            }
116            None => entry.open(&self.root_uri).await,
117        }
118    }
119
120    /// Attach per-graph read caches (shared `Session` + handle cache) so this
121    /// snapshot's table opens reuse handles and the session. Set by
122    /// `Omnigraph::resolved_target` for live Branch reads only.
123    pub(crate) fn set_read_caches(&mut self, caches: Arc<crate::runtime_cache::ReadCaches>) {
124        self.read_caches = Some(caches);
125    }
126
127    /// Manifest version this snapshot was taken from.
128    pub fn version(&self) -> u64 {
129        self.version
130    }
131
132    /// Look up a sub-table entry by key.
133    pub fn entry(&self, table_key: &str) -> Option<&SubTableEntry> {
134        self.entries.get(table_key)
135    }
136
137    pub fn entries(&self) -> impl Iterator<Item = &SubTableEntry> {
138        self.entries.values()
139    }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
143pub(crate) struct ManifestIncarnation {
144    pub(crate) version: u64,
145    pub(crate) e_tag: Option<String>,
146    timestamp_nanos: Option<u128>,
147}
148
149impl ManifestIncarnation {
150    pub(crate) fn matches(&self, held: &Self) -> bool {
151        if self.version != held.version {
152            return false;
153        }
154        match (&self.e_tag, &held.e_tag) {
155            (Some(latest), Some(current)) => latest == current,
156            _ => match (self.timestamp_nanos, held.timestamp_nanos) {
157                (Some(latest), Some(current)) => latest == current,
158                // Some object stores can omit both e_tag and manifest timestamp
159                // from the reachable API. In that narrow case the version-number
160                // probe is the strongest available identity.
161                _ => true,
162            },
163        }
164    }
165}
166
167impl SubTableUpdate {
168    pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
169        self.version_metadata.to_create_table_version_request(
170            &self.table_key,
171            self.table_version,
172            self.row_count,
173            self.table_branch.as_deref(),
174        )
175    }
176}
177
178#[derive(Debug, Clone)]
179pub(crate) struct TableRegistration {
180    pub(crate) table_key: String,
181    pub(crate) table_path: String,
182}
183
184#[derive(Debug, Clone)]
185pub(crate) struct TableTombstone {
186    pub(crate) table_key: String,
187    pub(crate) tombstone_version: u64,
188}
189
190#[derive(Debug, Clone)]
191pub(crate) enum ManifestChange {
192    Update(SubTableUpdate),
193    RegisterTable(TableRegistration),
194    Tombstone(TableTombstone),
195}
196
197impl SubTableEntry {
198    /// Open this sub-table at its pinned version directly by location (Fix 2),
199    /// without the Lance namespace — which would full-scan `__manifest` twice per
200    /// open (`describe_table` + `describe_table_version`). The resolved Snapshot
201    /// already holds the path, version, and branch. Branches are Lance native
202    /// branches, so `with_branch` resolves `{base}/tree/{branch}` from the base
203    /// URI; main uses `with_version`.
204    pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
205        // The branch-qualified location is the dataset that physically holds this
206        // version: main at `{table_path}`, a branch at
207        // `{table_path}/tree/{branch}` (Lance native-branch storage). `with_version`
208        // then resolves the version within THAT dataset's `_versions` — a branch
209        // version lives under `tree/{branch}/_versions`, not the base. This
210        // matches the physical layout the namespace path resolved, without the
211        // per-open `__manifest` scan.
212        let location = table_uri_for_path(root_uri, &self.table_path, self.table_branch.as_deref());
213        // Route through the instrumented data-table opener (Fix 3). With no
214        // session this is exactly the Fix-2 `from_uri(location).with_version`.
215        // This is the uncached fallback (a snapshot with no read caches); the
216        // cached path (`Snapshot::open` → handle cache) calls the same opener on
217        // a miss with the shared session, so both paths count on the per-query
218        // `table_wrapper`.
219        crate::instrumentation::open_table_dataset(&location, self.table_version, None).await
220    }
221}
222
223pub(crate) fn table_path_for_table_key(table_key: &str) -> Result<String> {
224    if let Some(type_name) = table_key.strip_prefix("node:") {
225        return Ok(format!("nodes/{}", type_name_hash(type_name)));
226    }
227    if let Some(type_name) = table_key.strip_prefix("edge:") {
228        return Ok(format!("edges/{}", type_name_hash(type_name)));
229    }
230    Err(OmniError::manifest(format!(
231        "invalid table key '{}'",
232        table_key
233    )))
234}
235
236/// An update to apply to the manifest via `commit`.
237#[derive(Debug, Clone)]
238pub struct SubTableUpdate {
239    pub table_key: String,
240    pub table_version: u64,
241    pub table_branch: Option<String>,
242    pub row_count: u64,
243    pub(crate) version_metadata: TableVersionMetadata,
244}
245
246/// Coordinates cross-dataset state through the namespace `__manifest` table.
247///
248/// Table rows register stable metadata such as location. Append-only
249/// `table_version` rows are the graph publish boundary and reconstruct the
250/// current graph snapshot by selecting the latest visible version row per
251/// sub-table.
252pub struct ManifestCoordinator {
253    root_uri: String,
254    dataset: Dataset,
255    known_state: ManifestState,
256    active_branch: Option<String>,
257    publisher: Arc<dyn ManifestBatchPublisher>,
258}
259
260impl ManifestCoordinator {
261    fn default_batch_publisher(
262        root_uri: &str,
263        active_branch: Option<&str>,
264    ) -> Arc<dyn ManifestBatchPublisher> {
265        Arc::new(GraphNamespacePublisher::new(root_uri, active_branch))
266    }
267
268    fn from_parts(
269        root_uri: &str,
270        dataset: Dataset,
271        known_state: ManifestState,
272        active_branch: Option<String>,
273        publisher: Arc<dyn ManifestBatchPublisher>,
274    ) -> Self {
275        Self {
276            root_uri: root_uri.trim_end_matches('/').to_string(),
277            dataset,
278            known_state,
279            active_branch,
280            publisher,
281        }
282    }
283
284    fn from_parts_with_default_publisher(
285        root_uri: &str,
286        dataset: Dataset,
287        known_state: ManifestState,
288        active_branch: Option<String>,
289    ) -> Self {
290        let publisher = Self::default_batch_publisher(root_uri, active_branch.as_deref());
291        Self::from_parts(root_uri, dataset, known_state, active_branch, publisher)
292    }
293
294    fn snapshot_from_state(root_uri: &str, state: ManifestState) -> Snapshot {
295        Snapshot {
296            root_uri: root_uri.trim_end_matches('/').to_string(),
297            version: state.version,
298            entries: state
299                .entries
300                .into_iter()
301                .map(|entry| (entry.table_key.clone(), entry))
302                .collect(),
303            read_caches: None,
304        }
305    }
306
307    #[cfg(test)]
308    fn with_batch_publisher(mut self, publisher: Arc<dyn ManifestBatchPublisher>) -> Self {
309        self.publisher = publisher;
310        self
311    }
312
313    /// Create a new graph at `root_uri` from a catalog.
314    ///
315    /// Creates per-type Lance datasets and the namespace `__manifest` table.
316    pub async fn init(root_uri: &str, catalog: &Catalog) -> Result<Self> {
317        let root = root_uri.trim_end_matches('/');
318        let (dataset, known_state) = init_manifest_graph(root, catalog).await?;
319
320        Ok(Self::from_parts_with_default_publisher(
321            root,
322            dataset,
323            known_state,
324            None,
325        ))
326    }
327
328    /// Open an existing graph's manifest.
329    pub async fn open(root_uri: &str) -> Result<Self> {
330        let root = root_uri.trim_end_matches('/');
331        let (dataset, known_state) = open_manifest_graph(root, None).await?;
332        Ok(Self::from_parts_with_default_publisher(
333            root,
334            dataset,
335            known_state,
336            None,
337        ))
338    }
339
340    /// Open an existing graph's manifest at a specific branch.
341    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
342        if branch == "main" {
343            return Self::open(root_uri).await;
344        }
345
346        let root = root_uri.trim_end_matches('/');
347        let (dataset, known_state) = open_manifest_graph(root, Some(branch)).await?;
348        Ok(Self::from_parts_with_default_publisher(
349            root,
350            dataset,
351            known_state,
352            Some(branch.to_string()),
353        ))
354    }
355
356    pub async fn snapshot_at(
357        root_uri: &str,
358        branch: Option<&str>,
359        version: u64,
360    ) -> Result<Snapshot> {
361        let root = root_uri.trim_end_matches('/');
362        Ok(Self::snapshot_from_state(
363            root,
364            snapshot_state_at(root, branch, version).await?,
365        ))
366    }
367
368    /// Return a Snapshot from the known manifest state. No storage I/O.
369    pub fn snapshot(&self) -> Snapshot {
370        Self::snapshot_from_state(&self.root_uri, self.known_state.clone())
371    }
372
373    /// Re-read manifest from storage to see other writers' commits.
374    pub async fn refresh(&mut self) -> Result<()> {
375        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
376        self.known_state = read_manifest_state(&self.dataset).await?;
377        Ok(())
378    }
379
380    /// Commit updated sub-table versions to the manifest.
381    ///
382    /// Atomically inserts one immutable `table_version` row per updated table.
383    /// The merge-insert commit on `__manifest` is the graph-level publish point.
384    pub async fn commit(&mut self, updates: &[SubTableUpdate]) -> Result<u64> {
385        let changes = updates
386            .iter()
387            .cloned()
388            .map(ManifestChange::Update)
389            .collect::<Vec<_>>();
390        self.commit_changes(&changes).await
391    }
392
393    /// Same as [`commit`], but with caller-supplied per-table expected
394    /// versions used for optimistic concurrency control. Each entry asserts
395    /// the manifest's current latest non-tombstoned `table_version` for that
396    /// `table_key` is exactly what the caller observed; mismatches surface
397    /// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`.
398    pub async fn commit_with_expected(
399        &mut self,
400        updates: &[SubTableUpdate],
401        expected_table_versions: &HashMap<String, u64>,
402    ) -> Result<u64> {
403        let changes = updates
404            .iter()
405            .cloned()
406            .map(ManifestChange::Update)
407            .collect::<Vec<_>>();
408        self.commit_changes_with_expected(&changes, expected_table_versions)
409            .await
410    }
411
412    pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
413        self.commit_changes_with_expected(changes, &HashMap::new())
414            .await
415    }
416
417    pub(crate) async fn commit_changes_with_expected(
418        &mut self,
419        changes: &[ManifestChange],
420        expected_table_versions: &HashMap<String, u64>,
421    ) -> Result<u64> {
422        if changes.is_empty() && expected_table_versions.is_empty() {
423            return Ok(self.version());
424        }
425
426        self.dataset = self
427            .publisher
428            .publish(changes, expected_table_versions)
429            .await?;
430
431        self.known_state = read_manifest_state(&self.dataset).await?;
432        Ok(self.version())
433    }
434
435    /// Current manifest version.
436    pub fn version(&self) -> u64 {
437        self.dataset.version().version
438    }
439
440    /// Latest committed manifest version on disk (one object-store op, no row
441    /// scan). The freshness probe for warm reuse: compare against `version()`
442    /// (the held handle's pinned version) to decide whether to refresh.
443    pub async fn probe_latest_version(&self) -> Result<u64> {
444        self.dataset
445            .latest_version_id()
446            .await
447            .map_err(|e| OmniError::Lance(e.to_string()))
448    }
449
450    pub(crate) fn incarnation(&self) -> ManifestIncarnation {
451        ManifestIncarnation {
452            version: self.version(),
453            e_tag: self.dataset.manifest_location().e_tag.clone(),
454            timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
455        }
456    }
457
458    /// Latest committed manifest identity. Main cannot be deleted/recreated, so
459    /// the cheap version-number probe is sufficient there. Non-main Lance
460    /// branches can be deleted and recreated with the same version number, so
461    /// load the latest manifest location and compare its e_tag / timestamp too.
462    pub(crate) async fn probe_latest_incarnation(&self) -> Result<ManifestIncarnation> {
463        if self.active_branch.is_none() {
464            return Ok(ManifestIncarnation {
465                version: self.probe_latest_version().await?,
466                e_tag: self.dataset.manifest_location().e_tag.clone(),
467                timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
468            });
469        }
470        let (manifest, location) = self
471            .dataset
472            .latest_manifest()
473            .await
474            .map_err(|e| OmniError::Lance(e.to_string()))?;
475        Ok(ManifestIncarnation {
476            version: manifest.version,
477            e_tag: location.e_tag,
478            timestamp_nanos: Some(manifest.timestamp_nanos),
479        })
480    }
481
482    pub fn active_branch(&self) -> Option<&str> {
483        self.active_branch.as_deref()
484    }
485
486    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
487        let mut ds = self.dataset.clone();
488        ds.create_branch(name, self.version(), None)
489            .await
490            .map_err(|e| OmniError::Lance(e.to_string()))?;
491        Ok(())
492    }
493
494    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
495        let uri = manifest_uri(&self.root_uri);
496        let mut ds = Dataset::open(&uri)
497            .await
498            .map_err(|e| OmniError::Lance(e.to_string()))?;
499        ds.delete_branch(name)
500            .await
501            .map_err(|e| OmniError::Lance(e.to_string()))?;
502        self.dataset = open_manifest_dataset(&self.root_uri, self.active_branch.as_deref()).await?;
503        self.known_state = read_manifest_state(&self.dataset).await?;
504        Ok(())
505    }
506
507    pub async fn list_branches(&self) -> Result<Vec<String>> {
508        let branches = self
509            .dataset
510            .list_branches()
511            .await
512            .map_err(|e| OmniError::Lance(e.to_string()))?;
513        let mut names: Vec<String> = branches.into_keys().filter(|name| name != "main").collect();
514        names.sort();
515        let mut all = vec!["main".to_string()];
516        all.extend(names);
517        Ok(all)
518    }
519
520    pub async fn descendant_branches(&self, name: &str) -> Result<Vec<String>> {
521        let branches = self
522            .dataset
523            .list_branches()
524            .await
525            .map_err(|e| OmniError::Lance(e.to_string()))?;
526        let mut frontier = vec![name.to_string()];
527        let mut descendants = Vec::new();
528        let mut seen = HashSet::new();
529
530        while let Some(parent) = frontier.pop() {
531            let mut children = branches
532                .iter()
533                .filter_map(|(branch, contents)| {
534                    (contents.parent_branch.as_deref() == Some(parent.as_str()))
535                        .then_some(branch.clone())
536                })
537                .collect::<Vec<_>>();
538            children.sort();
539            for child in children {
540                if seen.insert(child.clone()) {
541                    frontier.push(child.clone());
542                    descendants.push(child);
543                }
544            }
545        }
546
547        Ok(descendants)
548    }
549
550    /// Root URI of the graph.
551    pub fn root_uri(&self) -> &str {
552        &self.root_uri
553    }
554}
555
556#[cfg(test)]
557#[path = "manifest/tests.rs"]
558mod tests;