Skip to main content

page_db/
file.rs

1//! [`PageFile`]: a file of fixed-size pages, read and written through Direct I/O.
2
3use std::fs::File;
4use std::path::Path;
5
6use crate::error::{PageError, PageResult};
7use crate::page::{DEFAULT_PAGE_SIZE, Page, PageId, PageSize};
8use crate::sys;
9
10/// Options for opening a [`PageFile`].
11///
12/// Build with [`PageFileOptions::new`], adjust, then [`open`](PageFileOptions::open).
13/// The defaults are a 4 KiB page size, Direct I/O enabled, and create-if-absent.
14///
15/// # Examples
16///
17/// ```
18/// use page_db::{PageFileOptions, PageSize};
19///
20/// # let dir = tempfile::tempdir().unwrap();
21/// # let path = dir.path().join("data.pages");
22/// let file = PageFileOptions::new()
23///     .page_size(PageSize::new(8192)?)
24///     .direct_io(false)          // buffered, e.g. on a filesystem without O_DIRECT
25///     .open(&path)?;
26/// assert_eq!(file.page_size(), 8192);
27/// # Ok::<(), page_db::PageError>(())
28/// ```
29#[derive(Debug, Clone)]
30#[must_use = "PageFileOptions does nothing until `open` is called"]
31pub struct PageFileOptions {
32    page_size: PageSize,
33    direct_io: bool,
34    create: bool,
35}
36
37impl Default for PageFileOptions {
38    fn default() -> Self {
39        Self {
40            page_size: DEFAULT_PAGE_SIZE,
41            direct_io: true,
42            create: true,
43        }
44    }
45}
46
47impl PageFileOptions {
48    /// Start from the defaults: 4 KiB pages, Direct I/O on, create-if-absent.
49    pub fn new() -> Self {
50        Self::default()
51    }
52
53    /// Set the page size. Every page in the file is this size; it is fixed for
54    /// the life of the file and the caller is responsible for reopening with the
55    /// same size.
56    pub fn page_size(mut self, page_size: PageSize) -> Self {
57        self.page_size = page_size;
58        self
59    }
60
61    /// Enable or disable Direct I/O (cache-bypass).
62    ///
63    /// Direct I/O is the default and the point of this crate. Disable it for a
64    /// filesystem that does not support it (some network and overlay
65    /// filesystems reject `O_DIRECT`); durability via [`PageFile::sync`] is
66    /// unaffected, only the page cache is.
67    pub fn direct_io(mut self, enabled: bool) -> Self {
68        self.direct_io = enabled;
69        self
70    }
71
72    /// Create the file if it does not exist (the default). When `false`, opening
73    /// a missing file is an error.
74    pub fn create(mut self, create: bool) -> Self {
75        self.create = create;
76        self
77    }
78
79    /// Open the page file at `path` with these options.
80    ///
81    /// # Errors
82    ///
83    /// Returns [`PageError::Io`] if the file cannot be opened (including a
84    /// filesystem that rejects Direct I/O, surfaced as the OS error).
85    pub fn open<P: AsRef<Path>>(self, path: P) -> PageResult<PageFile> {
86        let file = sys::open(path.as_ref(), self.direct_io, self.create)?;
87        Ok(PageFile {
88            file,
89            page_size: self.page_size,
90        })
91    }
92}
93
94/// A file of fixed-size pages.
95///
96/// A `PageFile` is an array of [`Page`]s on disk, addressed by [`PageId`]: page
97/// `n` occupies the byte range `n * page_size .. (n + 1) * page_size`. Reads and
98/// writes are positioned and take `&self`, so the handle is shared freely across
99/// threads — there is no shared file cursor to contend on. (The cache that will
100/// front these reads is a later release; today every read goes to disk.)
101///
102/// Durability is two steps, deliberately: [`write_page`](PageFile::write_page)
103/// places bytes, and [`sync`](PageFile::sync) makes them durable. Batch many
104/// writes, then sync once.
105///
106/// # Examples
107///
108/// ```
109/// use page_db::{PageFile, PageFileOptions, PageId, Lsn};
110///
111/// # let dir = tempfile::tempdir().unwrap();
112/// # let path = dir.path().join("data.pages");
113/// let file = PageFileOptions::new().direct_io(false).open(&path)?;
114///
115/// let mut page = file.allocate_page();
116/// page.set_lsn(Lsn::new(1));
117/// page.payload_mut()[..3].copy_from_slice(b"abc");
118/// file.write_page(PageId::new(0), &mut page)?;
119/// file.sync()?;
120///
121/// let got = file.read_page(PageId::new(0))?;
122/// assert_eq!(&got.payload()[..3], b"abc");
123/// assert_eq!(file.page_count()?, 1);
124/// # Ok::<(), page_db::PageError>(())
125/// ```
126#[derive(Debug)]
127pub struct PageFile {
128    file: File,
129    page_size: PageSize,
130}
131
132impl PageFile {
133    /// Open a page file at `path` with the given page size and the default
134    /// options (Direct I/O on, create-if-absent).
135    ///
136    /// For buffered I/O or other tuning, use [`PageFileOptions`].
137    ///
138    /// # Errors
139    ///
140    /// Returns [`PageError::Io`] if the file cannot be opened.
141    pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize) -> PageResult<Self> {
142        PageFileOptions::new().page_size(page_size).open(path)
143    }
144
145    /// The page size of this file, in bytes.
146    #[inline]
147    #[must_use]
148    pub fn page_size(&self) -> usize {
149        self.page_size.get()
150    }
151
152    /// The number of whole pages currently in the file.
153    ///
154    /// # Errors
155    ///
156    /// Returns [`PageError::Io`] if the file metadata cannot be read.
157    pub fn page_count(&self) -> PageResult<u64> {
158        let len = self.file.metadata()?.len();
159        Ok(len / self.page_size.get() as u64)
160    }
161
162    /// Allocate a fresh, zeroed page sized and aligned for this file.
163    ///
164    /// The page is in memory only; write it with
165    /// [`write_page`](PageFile::write_page) to place it in a slot.
166    #[must_use]
167    pub fn allocate_page(&self) -> Page {
168        Page::new(self.page_size)
169    }
170
171    /// Read the page at slot `id`, verifying its header and checksum.
172    ///
173    /// The page's magic, version, and CRC32C are checked, and its stamped id is
174    /// matched against `id`, before it is returned — so a corrupt or misdirected
175    /// page surfaces as an error rather than bad data.
176    ///
177    /// # Errors
178    ///
179    /// - [`PageError::ShortRead`] if the slot is past the end of the file.
180    /// - [`PageError::BadMagic`] / [`PageError::UnsupportedVersion`] /
181    ///   [`PageError::ChecksumMismatch`] / [`PageError::MisdirectedPage`] if the
182    ///   page fails validation.
183    /// - [`PageError::Io`] on an I/O failure.
184    pub fn read_page(&self, id: PageId) -> PageResult<Page> {
185        let mut page = Page::new(self.page_size);
186        let offset = id.byte_offset(self.page_size.get());
187        let got = sys::read_at_full(&self.file, page.as_bytes_mut(), offset)?;
188        if got != self.page_size.get() {
189            return Err(PageError::ShortRead {
190                page_id: id.get(),
191                got,
192                page_size: self.page_size.get(),
193            });
194        }
195        page.verify(Some(id))?;
196        Ok(page)
197    }
198
199    /// Write `page` to slot `id`, stamping the slot id and a fresh checksum.
200    ///
201    /// The page's id and checksum header fields are updated in place, so the
202    /// same page can be written, mutated, and written again. The write places
203    /// the bytes; call [`sync`](PageFile::sync) to make them durable.
204    ///
205    /// # Errors
206    ///
207    /// - [`PageError::InvalidPageSize`] if the page's size does not match the
208    ///   file's.
209    /// - [`PageError::Io`] on an I/O failure.
210    pub fn write_page(&self, id: PageId, page: &mut Page) -> PageResult<()> {
211        if page.page_size() != self.page_size.get() {
212            return Err(PageError::InvalidPageSize {
213                size: page.page_size(),
214            });
215        }
216        page.stamp(id);
217        let offset = id.byte_offset(self.page_size.get());
218        sys::write_all_at(&self.file, page.as_bytes(), offset)?;
219        Ok(())
220    }
221
222    /// Flush all written pages to stable storage.
223    ///
224    /// Returns once the data is durable — `fdatasync` on Linux,
225    /// `FlushFileBuffers` on Windows, `F_FULLFSYNC` on macOS.
226    ///
227    /// # Errors
228    ///
229    /// Returns [`PageError::Io`] if the flush fails.
230    pub fn sync(&self) -> PageResult<()> {
231        sys::sync_data(&self.file)?;
232        Ok(())
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    #![allow(clippy::unwrap_used, clippy::expect_used)]
239
240    use super::*;
241    use crate::page::Lsn;
242
243    fn temp_file() -> (tempfile::TempDir, std::path::PathBuf) {
244        let dir = tempfile::tempdir().expect("tempdir");
245        let path = dir.path().join("test.pages");
246        (dir, path)
247    }
248
249    fn open_buffered(path: &Path) -> PageFile {
250        PageFileOptions::new()
251            .direct_io(false)
252            .open(path)
253            .expect("open")
254    }
255
256    #[test]
257    fn test_write_read_roundtrip() {
258        let (_dir, path) = temp_file();
259        let file = open_buffered(&path);
260
261        let mut page = file.allocate_page();
262        page.set_lsn(Lsn::new(11));
263        page.payload_mut()[..5].copy_from_slice(b"world");
264        file.write_page(PageId::new(2), &mut page).expect("write");
265        file.sync().expect("sync");
266
267        let got = file.read_page(PageId::new(2)).expect("read");
268        assert_eq!(got.id(), PageId::new(2));
269        assert_eq!(got.lsn(), Lsn::new(11));
270        assert_eq!(&got.payload()[..5], b"world");
271    }
272
273    #[test]
274    fn test_read_past_end_is_short_read() {
275        let (_dir, path) = temp_file();
276        let file = open_buffered(&path);
277        assert!(matches!(
278            file.read_page(PageId::new(0)),
279            Err(PageError::ShortRead { .. })
280        ));
281    }
282
283    #[test]
284    fn test_page_count_tracks_writes() {
285        let (_dir, path) = temp_file();
286        let file = open_buffered(&path);
287        assert_eq!(file.page_count().expect("count"), 0);
288
289        let mut page = file.allocate_page();
290        file.write_page(PageId::new(0), &mut page).expect("write");
291        assert_eq!(file.page_count().expect("count"), 1);
292
293        file.write_page(PageId::new(4), &mut page).expect("write");
294        assert_eq!(file.page_count().expect("count"), 5);
295    }
296
297    #[test]
298    fn test_corruption_on_disk_is_detected() {
299        let (_dir, path) = temp_file();
300        {
301            let file = open_buffered(&path);
302            let mut page = file.allocate_page();
303            page.payload_mut()[0] = 0x42;
304            file.write_page(PageId::new(0), &mut page).expect("write");
305            file.sync().expect("sync");
306        }
307        // Flip a payload byte directly in the file, past the 32-byte header.
308        {
309            use std::io::{Read, Seek, SeekFrom, Write};
310            let mut raw = std::fs::OpenOptions::new()
311                .read(true)
312                .write(true)
313                .open(&path)
314                .expect("reopen");
315            let _ = raw.seek(SeekFrom::Start(40)).expect("seek");
316            let mut b = [0u8; 1];
317            let _ = raw.read_exact(&mut b);
318            b[0] ^= 0xFF;
319            let _ = raw.seek(SeekFrom::Start(40)).expect("seek");
320            raw.write_all(&b).expect("write");
321            raw.sync_all().expect("sync");
322        }
323        let file = open_buffered(&path);
324        assert!(matches!(
325            file.read_page(PageId::new(0)),
326            Err(PageError::ChecksumMismatch { .. })
327        ));
328    }
329
330    #[test]
331    fn test_write_rejects_wrong_page_size() {
332        let (_dir, path) = temp_file();
333        let file = open_buffered(&path);
334        let mut wrong = Page::new(PageSize::new(8192).expect("valid"));
335        assert!(matches!(
336            file.write_page(PageId::new(0), &mut wrong),
337            Err(PageError::InvalidPageSize { size: 8192 })
338        ));
339    }
340}