Skip to main content

page_db/
page.rs

1//! The page format: identifiers, sizes, the on-disk header, and [`Page`].
2//!
3//! A page is a fixed-size block — `page_size` bytes — laid out as a 32-byte
4//! header followed by the payload the layer above is free to use. The header is
5//! little-endian on disk so a file is portable across architectures:
6//!
7//! | offset | size | field      | meaning                                      |
8//! |-------:|-----:|------------|----------------------------------------------|
9//! | 0      | 4    | magic      | `b"PGDB"`, identifies a page-db page          |
10//! | 4      | 2    | version    | header format version (currently 1)          |
11//! | 6      | 2    | flags      | reserved, written as 0                        |
12//! | 8      | 8    | page id    | the slot this page belongs to                 |
13//! | 16     | 8    | lsn        | write-ahead-log sequence number               |
14//! | 24     | 4    | crc32c     | checksum over the whole page, this field zero |
15//! | 28     | 4    | reserved   | reserved, written as 0                        |
16//!
17//! The checksum covers every byte of the page except its own four bytes, so it
18//! protects the header and the payload together. A page is never trusted
19//! without recomputing and matching that checksum first.
20
21use crate::buffer::AlignedBuffer;
22use crate::checksum::Crc32c;
23use crate::error::{PageError, PageResult};
24
25/// The smallest accepted page size, in bytes.
26///
27/// Below 4 KiB a page no longer reliably satisfies Direct I/O block alignment
28/// on common 4 KiB-sector devices, so it is the floor.
29pub const MIN_PAGE_SIZE: usize = 4096;
30
31/// The largest accepted page size, in bytes.
32pub const MAX_PAGE_SIZE: usize = 1 << 20;
33
34/// The size of the page header, in bytes. The usable payload of a page is
35/// `page_size - PAGE_HEADER_SIZE`.
36pub const PAGE_HEADER_SIZE: usize = 32;
37
38/// The default page size (4 KiB), matching the common OS and device page size.
39pub const DEFAULT_PAGE_SIZE: PageSize = PageSize(4096);
40
41const MAGIC: u32 = u32::from_le_bytes([b'P', b'G', b'D', b'B']);
42const FORMAT_VERSION: u16 = 1;
43
44const OFF_MAGIC: usize = 0;
45const OFF_VERSION: usize = 4;
46const OFF_PAGE_ID: usize = 8;
47const OFF_LSN: usize = 16;
48const OFF_CRC: usize = 24;
49
50/// The id of a page within a [`PageFile`](crate::PageFile) — its slot index.
51///
52/// Page ids are dense from zero: page `n` lives at byte offset `n * page_size`.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55pub struct PageId(u64);
56
57impl PageId {
58    /// Wrap a raw slot index.
59    #[inline]
60    #[must_use]
61    pub const fn new(id: u64) -> Self {
62        Self(id)
63    }
64
65    /// The raw slot index.
66    #[inline]
67    #[must_use]
68    pub const fn get(self) -> u64 {
69        self.0
70    }
71
72    /// The byte offset of this page in a file of the given page size.
73    #[inline]
74    #[must_use]
75    pub(crate) const fn byte_offset(self, page_size: usize) -> u64 {
76        self.0 * page_size as u64
77    }
78}
79
80impl std::fmt::Display for PageId {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        write!(f, "{}", self.0)
83    }
84}
85
86/// A write-ahead-log sequence number stamped into a page header.
87///
88/// page-db does not interpret the LSN; it carries the value so that a log
89/// (`wal-db`) and the recovery code above can order a page against the log
90/// records that describe it. [`Lsn::ZERO`] marks a page that has never been
91/// associated with a log record.
92#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
93#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
94pub struct Lsn(u64);
95
96impl Lsn {
97    /// The sentinel for "no log record" — a page that has not been logged.
98    pub const ZERO: Lsn = Lsn(0);
99
100    /// Wrap a raw sequence number.
101    #[inline]
102    #[must_use]
103    pub const fn new(lsn: u64) -> Self {
104        Self(lsn)
105    }
106
107    /// The raw sequence number.
108    #[inline]
109    #[must_use]
110    pub const fn get(self) -> u64 {
111        self.0
112    }
113}
114
115impl std::fmt::Display for Lsn {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        write!(f, "{}", self.0)
118    }
119}
120
121/// A validated page size.
122///
123/// A page size must be a power of two within
124/// [`MIN_PAGE_SIZE`]..=[`MAX_PAGE_SIZE`]. Validating once, here, means the rest
125/// of the crate can treat the size as a trusted invariant — buffer alignment,
126/// offset arithmetic, and the payload length all rely on it.
127///
128/// # Examples
129///
130/// ```
131/// use page_db::PageSize;
132///
133/// assert!(PageSize::new(8192).is_ok());
134/// assert!(PageSize::new(4096).is_ok());
135/// assert!(PageSize::new(5000).is_err());   // not a power of two
136/// assert!(PageSize::new(1024).is_err());   // below the 4 KiB floor
137/// ```
138#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
139pub struct PageSize(usize);
140
141impl PageSize {
142    /// Validate and wrap a page size in bytes.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`PageError::InvalidPageSize`] if `size` is not a power of two,
147    /// or falls outside [`MIN_PAGE_SIZE`]..=[`MAX_PAGE_SIZE`].
148    pub const fn new(size: usize) -> PageResult<Self> {
149        if size < MIN_PAGE_SIZE || size > MAX_PAGE_SIZE || !size.is_power_of_two() {
150            return Err(PageError::InvalidPageSize { size });
151        }
152        Ok(Self(size))
153    }
154
155    /// The page size in bytes.
156    #[inline]
157    #[must_use]
158    pub const fn get(self) -> usize {
159        self.0
160    }
161
162    /// The usable payload length of a page of this size.
163    #[inline]
164    #[must_use]
165    pub const fn payload_len(self) -> usize {
166        self.0 - PAGE_HEADER_SIZE
167    }
168}
169
170impl Default for PageSize {
171    #[inline]
172    fn default() -> Self {
173        DEFAULT_PAGE_SIZE
174    }
175}
176
177/// A single fixed-size page: a header and a payload in one aligned buffer.
178///
179/// A `Page` owns a buffer aligned for Direct I/O, so it can be read into and
180/// written from a [`PageFile`](crate::PageFile) without an intermediate copy.
181/// Build one with [`Page::new`] (an empty page) or get one back from
182/// [`PageFile::read_page`](crate::PageFile::read_page) /
183/// [`PageFile::allocate_page`](crate::PageFile::allocate_page).
184///
185/// The checksum is not maintained on every mutation — it would be wasted work
186/// to rechecksum after each `set_lsn` or payload write. Instead the page is
187/// checksummed once, when it is written
188/// ([`PageFile::write_page`](crate::PageFile::write_page) stamps it), or on
189/// demand via [`Page::from_bytes`], which verifies as it loads.
190///
191/// # Examples
192///
193/// ```
194/// use page_db::{Page, PageSize, Lsn};
195///
196/// let mut page = Page::new(PageSize::new(4096)?);
197/// page.set_lsn(Lsn::new(42));
198/// page.payload_mut()[..3].copy_from_slice(b"abc");
199///
200/// // Serialize to a checksummed byte block and load it back, verified.
201/// let bytes = page.to_checksummed_bytes();
202/// let loaded = Page::from_bytes(PageSize::new(4096)?, &bytes)?;
203/// assert_eq!(loaded.lsn(), Lsn::new(42));
204/// assert_eq!(&loaded.payload()[..3], b"abc");
205/// # Ok::<(), page_db::PageError>(())
206/// ```
207pub struct Page {
208    buf: AlignedBuffer,
209    size: usize,
210}
211
212impl Page {
213    /// Create an empty, zeroed page of the given size with a valid header.
214    #[must_use]
215    pub fn new(page_size: PageSize) -> Self {
216        let size = page_size.get();
217        let mut buf = AlignedBuffer::new_zeroed(size, size);
218        {
219            let bytes = buf.as_mut_slice();
220            write_u32(bytes, OFF_MAGIC, MAGIC);
221            write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
222        }
223        Self { buf, size }
224    }
225
226    /// Load a page from a byte block, verifying its header and checksum.
227    ///
228    /// The block must be exactly `page_size` bytes. This is the inverse of
229    /// [`Page::to_checksummed_bytes`] and the same validation
230    /// [`PageFile::read_page`](crate::PageFile::read_page) performs after a read.
231    ///
232    /// # Errors
233    ///
234    /// - [`PageError::ShortRead`] if `bytes.len()` is not `page_size`.
235    /// - [`PageError::BadMagic`] / [`PageError::UnsupportedVersion`] if the
236    ///   header is not a page-db page this build understands.
237    /// - [`PageError::ChecksumMismatch`] if the checksum does not match.
238    pub fn from_bytes(page_size: PageSize, bytes: &[u8]) -> PageResult<Self> {
239        let size = page_size.get();
240        if bytes.len() != size {
241            return Err(PageError::ShortRead {
242                page_id: 0,
243                got: bytes.len(),
244                page_size: size,
245            });
246        }
247        let mut buf = AlignedBuffer::new_zeroed(size, size);
248        buf.as_mut_slice().copy_from_slice(bytes);
249        let page = Self { buf, size };
250        page.verify(None)?;
251        Ok(page)
252    }
253
254    /// The page size in bytes.
255    #[inline]
256    #[must_use]
257    pub fn page_size(&self) -> usize {
258        self.size
259    }
260
261    /// The id stamped in the header. For a page from [`Page::new`] this is `0`
262    /// until the page is written to a slot.
263    #[inline]
264    #[must_use]
265    pub fn id(&self) -> PageId {
266        PageId(read_u64(self.buf.as_slice(), OFF_PAGE_ID))
267    }
268
269    /// The log sequence number stamped in the header.
270    #[inline]
271    #[must_use]
272    pub fn lsn(&self) -> Lsn {
273        Lsn(read_u64(self.buf.as_slice(), OFF_LSN))
274    }
275
276    /// Set the log sequence number. Takes effect in the checksum the next time
277    /// the page is stamped (on [`PageFile::write_page`](crate::PageFile::write_page)).
278    #[inline]
279    pub fn set_lsn(&mut self, lsn: Lsn) {
280        write_u64(self.buf.as_mut_slice(), OFF_LSN, lsn.0);
281    }
282
283    /// The payload — the page bytes after the header.
284    #[inline]
285    #[must_use]
286    pub fn payload(&self) -> &[u8] {
287        &self.buf.as_slice()[PAGE_HEADER_SIZE..]
288    }
289
290    /// The payload, mutably.
291    #[inline]
292    pub fn payload_mut(&mut self) -> &mut [u8] {
293        &mut self.buf.as_mut_slice()[PAGE_HEADER_SIZE..]
294    }
295
296    /// The whole page as a checksummed byte block, ready to persist elsewhere.
297    ///
298    /// The returned vector is `page_size` bytes with a freshly computed checksum
299    /// in the header; feed it back through [`Page::from_bytes`] to recover and
300    /// verify the page. The stamped id is left untouched (`0` unless the page
301    /// came from a file).
302    #[must_use]
303    pub fn to_checksummed_bytes(&self) -> Vec<u8> {
304        let mut out = self.buf.as_slice().to_vec();
305        let crc = compute_checksum(&out);
306        write_u32(&mut out, OFF_CRC, crc);
307        out
308    }
309
310    /// Stamp the slot id into the header and recompute the checksum.
311    pub(crate) fn stamp(&mut self, id: PageId) {
312        {
313            let bytes = self.buf.as_mut_slice();
314            write_u32(bytes, OFF_MAGIC, MAGIC);
315            write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
316            write_u64(bytes, OFF_PAGE_ID, id.0);
317        }
318        let crc = compute_checksum(self.buf.as_slice());
319        write_u32(self.buf.as_mut_slice(), OFF_CRC, crc);
320    }
321
322    /// Verify magic, version, checksum, and — if `expected` is set — the slot id.
323    pub(crate) fn verify(&self, expected: Option<PageId>) -> PageResult<()> {
324        let bytes = self.buf.as_slice();
325
326        let magic = read_u32(bytes, OFF_MAGIC);
327        if magic != MAGIC {
328            return Err(PageError::BadMagic {
329                found: magic,
330                expected: MAGIC,
331            });
332        }
333
334        let version = read_u16(bytes, OFF_VERSION);
335        if version != FORMAT_VERSION {
336            return Err(PageError::UnsupportedVersion {
337                found: version,
338                supported: FORMAT_VERSION,
339            });
340        }
341
342        let stored = read_u32(bytes, OFF_CRC);
343        let computed = compute_checksum(bytes);
344        if stored != computed {
345            return Err(PageError::ChecksumMismatch {
346                page_id: read_u64(bytes, OFF_PAGE_ID),
347                stored,
348                computed,
349            });
350        }
351
352        if let Some(expected) = expected {
353            let found = read_u64(bytes, OFF_PAGE_ID);
354            if found != expected.0 {
355                return Err(PageError::MisdirectedPage {
356                    requested: expected.0,
357                    found,
358                });
359            }
360        }
361
362        Ok(())
363    }
364
365    /// The whole page buffer, for positioned I/O.
366    #[inline]
367    pub(crate) fn as_bytes(&self) -> &[u8] {
368        self.buf.as_slice()
369    }
370
371    /// The whole page buffer, mutably, for reading into.
372    #[inline]
373    pub(crate) fn as_bytes_mut(&mut self) -> &mut [u8] {
374        self.buf.as_mut_slice()
375    }
376}
377
378impl Clone for Page {
379    fn clone(&self) -> Self {
380        Self {
381            buf: self.buf.clone(),
382            size: self.size,
383        }
384    }
385}
386
387impl std::fmt::Debug for Page {
388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389        f.debug_struct("Page")
390            .field("id", &self.id())
391            .field("lsn", &self.lsn())
392            .field("page_size", &self.size)
393            .finish()
394    }
395}
396
397/// Checksum the whole page, treating the four checksum bytes as absent.
398fn compute_checksum(bytes: &[u8]) -> u32 {
399    let mut crc = Crc32c::new();
400    crc.update(&bytes[..OFF_CRC]);
401    crc.update(&bytes[OFF_CRC + 4..]);
402    crc.finalize()
403}
404
405#[inline]
406fn read_u16(bytes: &[u8], off: usize) -> u16 {
407    u16::from_le_bytes([bytes[off], bytes[off + 1]])
408}
409
410#[inline]
411fn read_u32(bytes: &[u8], off: usize) -> u32 {
412    u32::from_le_bytes([bytes[off], bytes[off + 1], bytes[off + 2], bytes[off + 3]])
413}
414
415#[inline]
416fn read_u64(bytes: &[u8], off: usize) -> u64 {
417    u64::from_le_bytes([
418        bytes[off],
419        bytes[off + 1],
420        bytes[off + 2],
421        bytes[off + 3],
422        bytes[off + 4],
423        bytes[off + 5],
424        bytes[off + 6],
425        bytes[off + 7],
426    ])
427}
428
429#[inline]
430fn write_u16(bytes: &mut [u8], off: usize, value: u16) {
431    bytes[off..off + 2].copy_from_slice(&value.to_le_bytes());
432}
433
434#[inline]
435fn write_u32(bytes: &mut [u8], off: usize, value: u32) {
436    bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
437}
438
439#[inline]
440fn write_u64(bytes: &mut [u8], off: usize, value: u64) {
441    bytes[off..off + 8].copy_from_slice(&value.to_le_bytes());
442}
443
444#[cfg(test)]
445mod tests {
446    #![allow(clippy::unwrap_used, clippy::expect_used)]
447
448    use super::*;
449
450    #[test]
451    fn test_page_size_rejects_non_power_of_two() {
452        assert!(matches!(
453            PageSize::new(5000),
454            Err(PageError::InvalidPageSize { size: 5000 })
455        ));
456    }
457
458    #[test]
459    fn test_page_size_rejects_out_of_range() {
460        assert!(PageSize::new(2048).is_err());
461        assert!(PageSize::new(MAX_PAGE_SIZE * 2).is_err());
462    }
463
464    #[test]
465    fn test_page_size_payload_len() {
466        let ps = PageSize::new(4096).expect("valid");
467        assert_eq!(ps.payload_len(), 4096 - PAGE_HEADER_SIZE);
468    }
469
470    #[test]
471    fn test_new_page_has_valid_header() {
472        let page = Page::new(DEFAULT_PAGE_SIZE);
473        assert_eq!(page.id(), PageId::new(0));
474        assert_eq!(page.lsn(), Lsn::ZERO);
475        assert_eq!(page.page_size(), 4096);
476    }
477
478    #[test]
479    fn test_stamp_then_verify_roundtrips() {
480        let mut page = Page::new(DEFAULT_PAGE_SIZE);
481        page.set_lsn(Lsn::new(7));
482        page.payload_mut()[..4].copy_from_slice(b"data");
483        page.stamp(PageId::new(3));
484
485        assert_eq!(page.id(), PageId::new(3));
486        page.verify(Some(PageId::new(3))).expect("verifies");
487        assert_eq!(page.lsn(), Lsn::new(7));
488    }
489
490    #[test]
491    fn test_verify_detects_corruption() {
492        let mut page = Page::new(DEFAULT_PAGE_SIZE);
493        page.stamp(PageId::new(1));
494        page.payload_mut()[10] ^= 0xFF;
495        assert!(matches!(
496            page.verify(Some(PageId::new(1))),
497            Err(PageError::ChecksumMismatch { .. })
498        ));
499    }
500
501    #[test]
502    fn test_verify_detects_misdirected_page() {
503        let mut page = Page::new(DEFAULT_PAGE_SIZE);
504        page.stamp(PageId::new(5));
505        assert!(matches!(
506            page.verify(Some(PageId::new(6))),
507            Err(PageError::MisdirectedPage {
508                requested: 6,
509                found: 5
510            })
511        ));
512    }
513
514    #[test]
515    fn test_from_bytes_rejects_wrong_length() {
516        let bytes = vec![0u8; 100];
517        assert!(matches!(
518            Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes),
519            Err(PageError::ShortRead { .. })
520        ));
521    }
522
523    #[test]
524    fn test_from_bytes_rejects_bad_magic() {
525        let bytes = vec![0u8; 4096];
526        assert!(matches!(
527            Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes),
528            Err(PageError::BadMagic { .. })
529        ));
530    }
531
532    #[test]
533    fn test_to_bytes_from_bytes_roundtrips() {
534        let mut page = Page::new(DEFAULT_PAGE_SIZE);
535        page.set_lsn(Lsn::new(99));
536        page.payload_mut()[..5].copy_from_slice(b"hello");
537        let bytes = page.to_checksummed_bytes();
538
539        let loaded = Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes).expect("verifies");
540        assert_eq!(loaded.lsn(), Lsn::new(99));
541        assert_eq!(&loaded.payload()[..5], b"hello");
542    }
543}