Skip to main content

quiver_core/
manifest.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The versioned manifest: the catalog and durability anchor.
3//!
4//! The manifest records what is live — per collection, the set of sealed,
5//! immutable segments with their LSN ranges and schema — plus the global
6//! `last_checkpointed_lsn` (the WAL position safely captured in segments) and
7//! the id allocators. It is the source of truth consulted first on recovery.
8//!
9//! Each update writes a new immutable `manifest-NNNNNNNNNN` file and atomically
10//! swaps the `CURRENT` pointer using the **write-new + fsync + atomic-rename**
11//! protocol (LevelDB-style, ADR-0004): the new manifest is written and `fsync`'d,
12//! the directory is `fsync`'d, then `CURRENT.tmp` is written, `fsync`'d, and
13//! `rename`d over `CURRENT`. A crash at any point leaves either the old or the
14//! new catalog fully intact — never a half-written one. A manifest file written
15//! but never pointed to by `CURRENT` is an orphan, ignored on read and garbage
16//! collected by the engine.
17//!
18//! The manifest body is `postcard`-encoded (ADR-0003) and laid out across one or
19//! more [`crate::page`] pages, inheriting their per-page CRC integrity. Page 0's
20//! body begins with the total body length so the reader concatenates exactly the
21//! right bytes.
22
23use std::fs::OpenOptions;
24use std::io::Write;
25use std::path::Path;
26
27use serde::{Deserialize, Serialize};
28
29use crate::error::{CoreError, Result};
30use crate::ids::{CollectionId, Lsn};
31use crate::page::{PageCodec, PageType};
32use crate::paged::{fsync_dir, read_paged, write_paged};
33
34/// On-disk manifest schema version (independent of the product SemVer and of the
35/// page format version). v2 (ADR-0025) added the per-collection index snapshot
36/// reference; v1 manifests are read and upgraded transparently.
37pub const MANIFEST_FORMAT_VERSION: u16 = 2;
38
39const CURRENT_FILE: &str = "CURRENT";
40const CURRENT_TMP: &str = "CURRENT.tmp";
41
42fn manifest_file_name(version: u64) -> String {
43    // Zero-padded so lexical order matches numeric order.
44    format!("manifest-{version:010}")
45}
46
47/// A reference to one sealed, immutable segment.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct SegmentRef {
50    /// Monotonic segment id; also names the segment's files.
51    pub id: u64,
52    /// Number of rows (including tombstoned) in the segment.
53    pub row_count: u64,
54    /// Lowest LSN captured in this segment.
55    pub lsn_low: Lsn,
56    /// Highest LSN captured in this segment.
57    pub lsn_high: Lsn,
58}
59
60/// A reference to one sealed, immutable index snapshot (ADR-0025).
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct IndexSnapshotRef {
63    /// Snapshot file id; also names the file (`index/idx-<id>`). Set to the
64    /// manifest version that first published it, so it is unique per checkpoint.
65    pub id: u64,
66    /// The checkpoint LSN the snapshot reflects — equal to the manifest's
67    /// `last_checkpointed_lsn` when written, so WAL replay above it reconstructs
68    /// exactly the post-snapshot mutations.
69    pub lsn: Lsn,
70}
71
72/// Catalog entry for one collection.
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct CollectionEntry {
75    /// Stable collection id.
76    pub id: CollectionId,
77    /// Human-readable collection name, unique within the store.
78    pub name: String,
79    /// Postcard-encoded collection descriptor (dim, dtype, metric, fields).
80    pub descriptor: Vec<u8>,
81    /// Live sealed segments, in creation order.
82    pub segments: Vec<SegmentRef>,
83    /// The current durable index snapshot for this collection, if any (ADR-0025).
84    /// `None` for a collection whose index is rebuilt on open (HNSW, the batch
85    /// graph, or a store written before v2).
86    pub index_snapshot: Option<IndexSnapshotRef>,
87}
88
89/// A complete catalog snapshot. Immutable once written; superseded by writing a
90/// higher version and swapping `CURRENT`.
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92pub struct Manifest {
93    /// Manifest schema version.
94    pub format_version: u16,
95    /// Monotonic manifest version; also names the file.
96    pub version: u64,
97    /// Highest LSN durably captured in segments — the WAL replay floor.
98    pub last_checkpointed_lsn: Lsn,
99    /// Next collection id to hand out.
100    pub next_collection_id: u64,
101    /// Next segment id to hand out.
102    pub next_segment_id: u64,
103    /// All collections in the store.
104    pub collections: Vec<CollectionEntry>,
105}
106
107impl Default for Manifest {
108    fn default() -> Self {
109        Self::empty()
110    }
111}
112
113impl Manifest {
114    /// An empty manifest for a brand-new store (version 0).
115    #[must_use]
116    pub fn empty() -> Self {
117        Self {
118            format_version: MANIFEST_FORMAT_VERSION,
119            version: 0,
120            last_checkpointed_lsn: Lsn::ZERO,
121            next_collection_id: 0,
122            next_segment_id: 0,
123            collections: Vec::new(),
124        }
125    }
126
127    /// Find a collection by id.
128    #[must_use]
129    pub fn collection(&self, id: CollectionId) -> Option<&CollectionEntry> {
130        self.collections.iter().find(|c| c.id == id)
131    }
132
133    /// Find a collection by name.
134    #[must_use]
135    pub fn collection_by_name(&self, name: &str) -> Option<&CollectionEntry> {
136        self.collections.iter().find(|c| c.name == name)
137    }
138}
139
140/// Serialize `manifest` and durably install it as the new `CURRENT`, using the
141/// write-new + fsync + atomic-rename protocol. `dir` is the store root.
142pub fn write_manifest(dir: &Path, manifest: &Manifest, codec: &dyn PageCodec) -> Result<()> {
143    let body = postcard::to_allocvec(manifest)?;
144
145    // 1. Write the new manifest file in full and fsync it.
146    let file_name = manifest_file_name(manifest.version);
147    let manifest_path = dir.join(&file_name);
148    write_paged(
149        &manifest_path,
150        codec,
151        PageType::Manifest,
152        manifest.version,
153        &body,
154    )?;
155    // 2. fsync the directory so the new file entry is durable before we point at it.
156    fsync_dir(dir)?;
157
158    // 3. Write CURRENT.tmp pointing at the new manifest, and fsync it.
159    let tmp_path = dir.join(CURRENT_TMP);
160    {
161        let mut f = OpenOptions::new()
162            .create(true)
163            .truncate(true)
164            .write(true)
165            .open(&tmp_path)
166            .map_err(|e| CoreError::io(&tmp_path, e))?;
167        f.write_all(file_name.as_bytes())
168            .map_err(|e| CoreError::io(&tmp_path, e))?;
169        f.write_all(b"\n")
170            .map_err(|e| CoreError::io(&tmp_path, e))?;
171        f.sync_data().map_err(|e| CoreError::io(&tmp_path, e))?;
172    }
173    // 4. Atomically swap CURRENT, then fsync the directory to make it durable.
174    let current_path = dir.join(CURRENT_FILE);
175    std::fs::rename(&tmp_path, &current_path).map_err(|e| CoreError::io(&current_path, e))?;
176    fsync_dir(dir)?;
177    Ok(())
178}
179
180/// Read the current manifest, or `None` if the store has no `CURRENT` yet (a
181/// fresh data directory).
182pub fn read_current(dir: &Path, codec: &dyn PageCodec) -> Result<Option<Manifest>> {
183    let current_path = dir.join(CURRENT_FILE);
184    let name = match std::fs::read_to_string(&current_path) {
185        Ok(s) => s.trim().to_owned(),
186        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
187        Err(e) => return Err(CoreError::io(&current_path, e)),
188    };
189    if name.is_empty() {
190        return Err(CoreError::MalformedPage("CURRENT is empty".to_owned()));
191    }
192    let manifest = read_manifest_file(&dir.join(&name), codec)?;
193    Ok(Some(manifest))
194}
195
196// Just the leading `format_version`, consumed without committing to a full
197// schema (postcard is not self-describing) so the reader can dispatch on it; the
198// remaining bytes are then decoded with the matching schema.
199#[derive(Deserialize)]
200struct VersionPeek {
201    format_version: u16,
202}
203
204// The v1 manifest body (everything after `format_version`), pre-ADR-0025:
205// collection entries carried no index snapshot reference. Retained read-only so a
206// store written before v2 still opens — its indexes simply rebuild on first load.
207#[derive(Deserialize)]
208#[cfg_attr(test, derive(Serialize))]
209struct CollectionEntryV1 {
210    id: CollectionId,
211    name: String,
212    descriptor: Vec<u8>,
213    segments: Vec<SegmentRef>,
214}
215
216#[derive(Deserialize)]
217#[cfg_attr(test, derive(Serialize))]
218struct ManifestV1Body {
219    version: u64,
220    last_checkpointed_lsn: Lsn,
221    next_collection_id: u64,
222    next_segment_id: u64,
223    collections: Vec<CollectionEntryV1>,
224}
225
226impl From<ManifestV1Body> for Manifest {
227    fn from(m: ManifestV1Body) -> Self {
228        Self {
229            format_version: MANIFEST_FORMAT_VERSION,
230            version: m.version,
231            last_checkpointed_lsn: m.last_checkpointed_lsn,
232            next_collection_id: m.next_collection_id,
233            next_segment_id: m.next_segment_id,
234            collections: m
235                .collections
236                .into_iter()
237                .map(|c| CollectionEntry {
238                    id: c.id,
239                    name: c.name,
240                    descriptor: c.descriptor,
241                    segments: c.segments,
242                    index_snapshot: None,
243                })
244                .collect(),
245        }
246    }
247}
248
249fn read_manifest_file(path: &Path, codec: &dyn PageCodec) -> Result<Manifest> {
250    let body = read_paged(path, codec, PageType::Manifest)?;
251    // Dispatch on the schema version (the first field) so an older on-disk
252    // manifest is upgraded rather than rejected (ADR-0025).
253    let (peek, rest) = postcard::take_from_bytes::<VersionPeek>(&body)?;
254    match peek.format_version {
255        1 => Ok(postcard::from_bytes::<ManifestV1Body>(rest)?.into()),
256        v if v == MANIFEST_FORMAT_VERSION => Ok(postcard::from_bytes::<Manifest>(&body)?),
257        other => Err(CoreError::UnsupportedVersion {
258            found: other,
259            supported: MANIFEST_FORMAT_VERSION,
260        }),
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use crate::page::{PAGE_BODY_CAP, PAGE_SIZE, PlainCodec};
268
269    fn sample(version: u64, n_collections: usize, desc_len: usize) -> Manifest {
270        let collections = (0..n_collections)
271            .map(|c| CollectionEntry {
272                id: CollectionId(c as u64),
273                name: format!("col-{c}"),
274                descriptor: vec![(c % 256) as u8; desc_len],
275                segments: vec![SegmentRef {
276                    id: c as u64,
277                    row_count: 10 * c as u64,
278                    lsn_low: Lsn(c as u64),
279                    lsn_high: Lsn(c as u64 + 5),
280                }],
281                index_snapshot: None,
282            })
283            .collect();
284        Manifest {
285            format_version: MANIFEST_FORMAT_VERSION,
286            version,
287            last_checkpointed_lsn: Lsn(version),
288            next_collection_id: n_collections as u64,
289            next_segment_id: n_collections as u64,
290            collections,
291        }
292    }
293
294    #[test]
295    fn fresh_dir_has_no_manifest() {
296        let dir = tempfile::tempdir().unwrap();
297        assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), None);
298    }
299
300    #[test]
301    fn write_then_read_roundtrips() {
302        let dir = tempfile::tempdir().unwrap();
303        let m = sample(1, 3, 16);
304        write_manifest(dir.path(), &m, &PlainCodec).unwrap();
305        let back = read_current(dir.path(), &PlainCodec).unwrap();
306        assert_eq!(back, Some(m));
307    }
308
309    #[test]
310    fn newer_version_supersedes() {
311        let dir = tempfile::tempdir().unwrap();
312        write_manifest(dir.path(), &sample(1, 1, 8), &PlainCodec).unwrap();
313        let v2 = sample(2, 2, 8);
314        write_manifest(dir.path(), &v2, &PlainCodec).unwrap();
315        assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(v2));
316    }
317
318    #[test]
319    fn multi_page_manifest_roundtrips() {
320        let dir = tempfile::tempdir().unwrap();
321        // A descriptor far larger than one page body forces several pages.
322        let m = sample(7, 2, PAGE_BODY_CAP);
323        write_manifest(dir.path(), &m, &PlainCodec).unwrap();
324        // Confirm the file really spans multiple pages.
325        let bytes = std::fs::read(dir.path().join(manifest_file_name(7))).unwrap();
326        assert!(bytes.len() > PAGE_SIZE * 2);
327        assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(m));
328    }
329
330    #[test]
331    fn temp_pointer_is_renamed_away() {
332        let dir = tempfile::tempdir().unwrap();
333        write_manifest(dir.path(), &sample(1, 1, 8), &PlainCodec).unwrap();
334        assert!(!dir.path().join(CURRENT_TMP).exists());
335        assert!(dir.path().join(CURRENT_FILE).exists());
336    }
337
338    #[test]
339    fn orphan_manifest_without_current_swap_is_ignored() {
340        let dir = tempfile::tempdir().unwrap();
341        let v1 = sample(1, 1, 8);
342        write_manifest(dir.path(), &v1, &PlainCodec).unwrap();
343        // Simulate a crash after a v2 file is written but before CURRENT is
344        // swapped: drop a bogus manifest-0000000002 with CURRENT untouched.
345        std::fs::write(dir.path().join(manifest_file_name(2)), b"garbage").unwrap();
346        // CURRENT still names v1, so the orphan is ignored.
347        assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(v1));
348    }
349
350    #[test]
351    fn stale_current_tmp_does_not_affect_read() {
352        let dir = tempfile::tempdir().unwrap();
353        let v1 = sample(1, 1, 8);
354        write_manifest(dir.path(), &v1, &PlainCodec).unwrap();
355        std::fs::write(dir.path().join(CURRENT_TMP), b"manifest-9999999999\n").unwrap();
356        assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(v1));
357    }
358
359    #[test]
360    fn corrupt_manifest_page_is_detected() {
361        let dir = tempfile::tempdir().unwrap();
362        write_manifest(dir.path(), &sample(1, 1, 64), &PlainCodec).unwrap();
363        let path = dir.path().join(manifest_file_name(1));
364        let mut bytes = std::fs::read(&path).unwrap();
365        // Flip a byte inside page 0's body (past the 32-byte header).
366        bytes[64] ^= 0xFF;
367        std::fs::write(&path, &bytes).unwrap();
368        assert!(matches!(
369            read_current(dir.path(), &PlainCodec),
370            Err(CoreError::PageCorrupt { .. })
371        ));
372    }
373
374    #[test]
375    fn accessors_find_collections() {
376        let m = sample(1, 3, 8);
377        assert_eq!(
378            m.collection(CollectionId(1)).map(|c| c.name.as_str()),
379            Some("col-1")
380        );
381        assert_eq!(
382            m.collection_by_name("col-2").map(|c| c.id),
383            Some(CollectionId(2))
384        );
385        assert!(m.collection(CollectionId(99)).is_none());
386        assert!(m.collection_by_name("nope").is_none());
387    }
388
389    #[test]
390    fn v2_manifest_round_trips_an_index_snapshot() {
391        let dir = tempfile::tempdir().unwrap();
392        let mut m = sample(4, 2, 8);
393        m.collections[0].index_snapshot = Some(IndexSnapshotRef {
394            id: 4,
395            lsn: Lsn(99),
396        });
397        write_manifest(dir.path(), &m, &PlainCodec).unwrap();
398        assert_eq!(read_current(dir.path(), &PlainCodec).unwrap(), Some(m));
399    }
400
401    #[test]
402    fn v1_manifest_without_index_snapshot_opens_and_upgrades() {
403        let dir = tempfile::tempdir().unwrap();
404        // A pre-v2 manifest, written in the old layout (no index_snapshot field).
405        let v1 = ManifestV1Body {
406            version: 3,
407            last_checkpointed_lsn: Lsn(42),
408            next_collection_id: 2,
409            next_segment_id: 5,
410            collections: vec![CollectionEntryV1 {
411                id: CollectionId(0),
412                name: "legacy".to_owned(),
413                descriptor: vec![1, 2, 3],
414                segments: vec![SegmentRef {
415                    id: 0,
416                    row_count: 7,
417                    lsn_low: Lsn(1),
418                    lsn_high: Lsn(9),
419                }],
420            }],
421        };
422        // The on-disk v1 layout is `format_version` (= 1) followed by the body.
423        let mut body = postcard::to_allocvec(&1u16).unwrap();
424        body.extend_from_slice(&postcard::to_allocvec(&v1).unwrap());
425        write_paged(
426            &dir.path().join(manifest_file_name(3)),
427            &PlainCodec,
428            PageType::Manifest,
429            3,
430            &body,
431        )
432        .unwrap();
433        std::fs::write(
434            dir.path().join(CURRENT_FILE),
435            format!("{}\n", manifest_file_name(3)),
436        )
437        .unwrap();
438
439        let got = read_current(dir.path(), &PlainCodec).unwrap().unwrap();
440        assert_eq!(got.format_version, MANIFEST_FORMAT_VERSION);
441        assert_eq!(got.version, 3);
442        assert_eq!(got.last_checkpointed_lsn, Lsn(42));
443        assert_eq!(got.collections.len(), 1);
444        assert_eq!(got.collections[0].name, "legacy");
445        assert_eq!(got.collections[0].index_snapshot, None);
446    }
447
448    #[test]
449    fn future_manifest_version_is_rejected() {
450        let dir = tempfile::tempdir().unwrap();
451        let mut m = sample(1, 1, 8);
452        m.format_version = 999;
453        write_manifest(dir.path(), &m, &PlainCodec).unwrap();
454        assert!(matches!(
455            read_current(dir.path(), &PlainCodec),
456            Err(CoreError::UnsupportedVersion { found: 999, .. })
457        ));
458    }
459}