Skip to main content

page_db/
file.rs

1//! [`PageFile`]: a file of fixed-size pages, read and written through Direct I/O.
2
3use std::fs::File;
4use std::path::Path;
5
6use crate::error::{PageError, PageResult};
7use crate::page::{DEFAULT_PAGE_SIZE, Page, PageId, PageSize};
8use crate::sys;
9
10/// Options for opening a [`PageFile`].
11///
12/// Build with [`PageFileOptions::new`], adjust, then [`open`](PageFileOptions::open).
13/// The defaults are a 4 KiB page size, Direct I/O enabled, and create-if-absent.
14///
15/// # Examples
16///
17/// ```
18/// use page_db::{PageFileOptions, PageSize};
19///
20/// # let dir = tempfile::tempdir().unwrap();
21/// # let path = dir.path().join("data.pages");
22/// let file = PageFileOptions::new()
23///     .page_size(PageSize::new(8192)?)
24///     .direct_io(false)          // buffered, e.g. on a filesystem without O_DIRECT
25///     .open(&path)?;
26/// assert_eq!(file.page_size(), 8192);
27/// # Ok::<(), page_db::PageError>(())
28/// ```
29#[derive(Debug, Clone)]
30#[must_use = "PageFileOptions does nothing until `open` is called"]
31pub struct PageFileOptions {
32    page_size: PageSize,
33    direct_io: bool,
34    create: bool,
35}
36
37impl Default for PageFileOptions {
38    fn default() -> Self {
39        Self {
40            page_size: DEFAULT_PAGE_SIZE,
41            direct_io: true,
42            create: true,
43        }
44    }
45}
46
47impl PageFileOptions {
48    /// Start from the defaults: 4 KiB pages, Direct I/O on, create-if-absent.
49    pub fn new() -> Self {
50        Self::default()
51    }
52
53    /// Set the page size. Every page in the file is this size; it is fixed for
54    /// the life of the file and the caller is responsible for reopening with the
55    /// same size.
56    pub fn page_size(mut self, page_size: PageSize) -> Self {
57        self.page_size = page_size;
58        self
59    }
60
61    /// Enable or disable Direct I/O (cache-bypass).
62    ///
63    /// Direct I/O is the default and the point of this crate. Disable it for a
64    /// filesystem that does not support it (some network and overlay
65    /// filesystems reject `O_DIRECT`); durability via [`PageFile::sync`] is
66    /// unaffected, only the page cache is.
67    pub fn direct_io(mut self, enabled: bool) -> Self {
68        self.direct_io = enabled;
69        self
70    }
71
72    /// Create the file if it does not exist (the default). When `false`, opening
73    /// a missing file is an error.
74    pub fn create(mut self, create: bool) -> Self {
75        self.create = create;
76        self
77    }
78
79    /// Open the page file at `path` with these options.
80    ///
81    /// # Errors
82    ///
83    /// Returns [`PageError::Io`] if the file cannot be opened (including a
84    /// filesystem that rejects Direct I/O, surfaced as the OS error).
85    pub fn open<P: AsRef<Path>>(self, path: P) -> PageResult<PageFile> {
86        let file = sys::open(path.as_ref(), self.direct_io, self.create)?;
87        Ok(PageFile {
88            file,
89            page_size: self.page_size,
90        })
91    }
92}
93
94/// A file of fixed-size pages.
95///
96/// A `PageFile` is an array of [`Page`]s on disk, addressed by [`PageId`]: page
97/// `n` occupies the byte range `n * page_size .. (n + 1) * page_size`. Reads and
98/// writes are positioned and take `&self`, so the handle is shared freely across
99/// threads — there is no shared file cursor to contend on. (The cache that will
100/// front these reads is a later release; today every read goes to disk.)
101///
102/// Durability is two steps, deliberately: [`write_page`](PageFile::write_page)
103/// places bytes, and [`sync`](PageFile::sync) makes them durable. Batch many
104/// writes, then sync once.
105///
106/// # Examples
107///
108/// ```
109/// use page_db::{PageFile, PageFileOptions, PageId, Lsn};
110///
111/// # let dir = tempfile::tempdir().unwrap();
112/// # let path = dir.path().join("data.pages");
113/// let file = PageFileOptions::new().direct_io(false).open(&path)?;
114///
115/// let mut page = file.allocate_page();
116/// page.set_lsn(Lsn::new(1));
117/// page.payload_mut()[..3].copy_from_slice(b"abc");
118/// file.write_page(PageId::new(0), &mut page)?;
119/// file.sync()?;
120///
121/// let got = file.read_page(PageId::new(0))?;
122/// assert_eq!(&got.payload()[..3], b"abc");
123/// assert_eq!(file.page_count()?, 1);
124/// # Ok::<(), page_db::PageError>(())
125/// ```
126#[derive(Debug)]
127pub struct PageFile {
128    file: File,
129    page_size: PageSize,
130}
131
132impl PageFile {
133    /// Open a page file at `path` with the given page size and the default
134    /// options (Direct I/O on, create-if-absent).
135    ///
136    /// For buffered I/O or other tuning, use [`PageFileOptions`].
137    ///
138    /// # Errors
139    ///
140    /// Returns [`PageError::Io`] if the file cannot be opened.
141    pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize) -> PageResult<Self> {
142        PageFileOptions::new().page_size(page_size).open(path)
143    }
144
145    /// The page size of this file, in bytes.
146    #[inline]
147    #[must_use]
148    pub fn page_size(&self) -> usize {
149        self.page_size.get()
150    }
151
152    /// The number of whole pages currently in the file.
153    ///
154    /// # Errors
155    ///
156    /// Returns [`PageError::Io`] if the file metadata cannot be read.
157    pub fn page_count(&self) -> PageResult<u64> {
158        let len = self.file.metadata()?.len();
159        Ok(len / self.page_size.get() as u64)
160    }
161
162    /// Allocate a fresh, zeroed page sized and aligned for this file.
163    ///
164    /// The page is in memory only; write it with
165    /// [`write_page`](PageFile::write_page) to place it in a slot.
166    #[must_use]
167    pub fn allocate_page(&self) -> Page {
168        Page::new(self.page_size)
169    }
170
171    /// Read the page at slot `id`, verifying its header and checksum.
172    ///
173    /// The page's magic, version, and CRC32C are checked, and its stamped id is
174    /// matched against `id`, before it is returned — so a corrupt or misdirected
175    /// page surfaces as an error rather than bad data.
176    ///
177    /// # Errors
178    ///
179    /// - [`PageError::ShortRead`] if the slot is past the end of the file.
180    /// - [`PageError::BadMagic`] / [`PageError::UnsupportedVersion`] /
181    ///   [`PageError::ChecksumMismatch`] / [`PageError::MisdirectedPage`] if the
182    ///   page fails validation.
183    /// - [`PageError::Io`] on an I/O failure.
184    pub fn read_page(&self, id: PageId) -> PageResult<Page> {
185        let mut page = Page::new(self.page_size);
186        self.read_into(id, &mut page)?;
187        Ok(page)
188    }
189
190    /// Read the page at slot `id` into an existing buffer, verifying it.
191    ///
192    /// This is the zero-allocation form of [`read_page`](PageFile::read_page):
193    /// it reuses `page`'s buffer instead of allocating a fresh one, which is how
194    /// a buffer pool recycles a frame on a cache miss. `page.page_size()` must
195    /// match the file's.
196    ///
197    /// # Errors
198    ///
199    /// As [`read_page`](PageFile::read_page), plus
200    /// [`PageError::InvalidPageSize`] if `page`'s size does not match the file's.
201    pub fn read_into(&self, id: PageId, page: &mut Page) -> PageResult<()> {
202        if page.page_size() != self.page_size.get() {
203            return Err(PageError::InvalidPageSize {
204                size: page.page_size(),
205            });
206        }
207        let offset = id.byte_offset(self.page_size.get());
208        let got = sys::read_at_full(&self.file, page.as_bytes_mut(), offset)?;
209        if got != self.page_size.get() {
210            return Err(PageError::ShortRead {
211                page_id: id.get(),
212                got,
213                page_size: self.page_size.get(),
214            });
215        }
216        page.verify(Some(id))?;
217        Ok(())
218    }
219
220    /// Write `page` to slot `id`, stamping the slot id and a fresh checksum.
221    ///
222    /// The page's id and checksum header fields are updated in place, so the
223    /// same page can be written, mutated, and written again. The write places
224    /// the bytes; call [`sync`](PageFile::sync) to make them durable.
225    ///
226    /// # Errors
227    ///
228    /// - [`PageError::InvalidPageSize`] if the page's size does not match the
229    ///   file's.
230    /// - [`PageError::Io`] on an I/O failure.
231    pub fn write_page(&self, id: PageId, page: &mut Page) -> PageResult<()> {
232        if page.page_size() != self.page_size.get() {
233            return Err(PageError::InvalidPageSize {
234                size: page.page_size(),
235            });
236        }
237        page.stamp(id);
238        let offset = id.byte_offset(self.page_size.get());
239        sys::write_all_at(&self.file, page.as_bytes(), offset)?;
240        Ok(())
241    }
242
243    /// Flush all written pages to stable storage.
244    ///
245    /// Returns once the data is durable — `fdatasync` on Linux,
246    /// `FlushFileBuffers` on Windows, `F_FULLFSYNC` on macOS.
247    ///
248    /// # Errors
249    ///
250    /// Returns [`PageError::Io`] if the flush fails.
251    pub fn sync(&self) -> PageResult<()> {
252        sys::sync_data(&self.file)?;
253        Ok(())
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    #![allow(clippy::unwrap_used, clippy::expect_used)]
260
261    use super::*;
262    use crate::page::Lsn;
263
264    fn temp_file() -> (tempfile::TempDir, std::path::PathBuf) {
265        let dir = tempfile::tempdir().expect("tempdir");
266        let path = dir.path().join("test.pages");
267        (dir, path)
268    }
269
270    fn open_buffered(path: &Path) -> PageFile {
271        PageFileOptions::new()
272            .direct_io(false)
273            .open(path)
274            .expect("open")
275    }
276
277    #[test]
278    fn test_write_read_roundtrip() {
279        let (_dir, path) = temp_file();
280        let file = open_buffered(&path);
281
282        let mut page = file.allocate_page();
283        page.set_lsn(Lsn::new(11));
284        page.payload_mut()[..5].copy_from_slice(b"world");
285        file.write_page(PageId::new(2), &mut page).expect("write");
286        file.sync().expect("sync");
287
288        let got = file.read_page(PageId::new(2)).expect("read");
289        assert_eq!(got.id(), PageId::new(2));
290        assert_eq!(got.lsn(), Lsn::new(11));
291        assert_eq!(&got.payload()[..5], b"world");
292    }
293
294    #[test]
295    fn test_read_past_end_is_short_read() {
296        let (_dir, path) = temp_file();
297        let file = open_buffered(&path);
298        assert!(matches!(
299            file.read_page(PageId::new(0)),
300            Err(PageError::ShortRead { .. })
301        ));
302    }
303
304    #[test]
305    fn test_page_count_tracks_writes() {
306        let (_dir, path) = temp_file();
307        let file = open_buffered(&path);
308        assert_eq!(file.page_count().expect("count"), 0);
309
310        let mut page = file.allocate_page();
311        file.write_page(PageId::new(0), &mut page).expect("write");
312        assert_eq!(file.page_count().expect("count"), 1);
313
314        file.write_page(PageId::new(4), &mut page).expect("write");
315        assert_eq!(file.page_count().expect("count"), 5);
316    }
317
318    #[test]
319    fn test_corruption_on_disk_is_detected() {
320        let (_dir, path) = temp_file();
321        {
322            let file = open_buffered(&path);
323            let mut page = file.allocate_page();
324            page.payload_mut()[0] = 0x42;
325            file.write_page(PageId::new(0), &mut page).expect("write");
326            file.sync().expect("sync");
327        }
328        // Flip a payload byte directly in the file, past the 32-byte header.
329        {
330            use std::io::{Read, Seek, SeekFrom, Write};
331            let mut raw = std::fs::OpenOptions::new()
332                .read(true)
333                .write(true)
334                .open(&path)
335                .expect("reopen");
336            let _ = raw.seek(SeekFrom::Start(40)).expect("seek");
337            let mut b = [0u8; 1];
338            let _ = raw.read_exact(&mut b);
339            b[0] ^= 0xFF;
340            let _ = raw.seek(SeekFrom::Start(40)).expect("seek");
341            raw.write_all(&b).expect("write");
342            raw.sync_all().expect("sync");
343        }
344        let file = open_buffered(&path);
345        assert!(matches!(
346            file.read_page(PageId::new(0)),
347            Err(PageError::ChecksumMismatch { .. })
348        ));
349    }
350
351    #[test]
352    fn test_write_rejects_wrong_page_size() {
353        let (_dir, path) = temp_file();
354        let file = open_buffered(&path);
355        let mut wrong = Page::new(PageSize::new(8192).expect("valid"));
356        assert!(matches!(
357            file.write_page(PageId::new(0), &mut wrong),
358            Err(PageError::InvalidPageSize { size: 8192 })
359        ));
360    }
361}