page_db/file.rs
1//! [`PageFile`]: a file of fixed-size pages, read and written through Direct I/O.
2
3use std::fs::File;
4use std::path::Path;
5
6use crate::error::{PageError, PageResult};
7use crate::page::{DEFAULT_PAGE_SIZE, Page, PageId, PageSize};
8use crate::sys;
9
10/// Options for opening a [`PageFile`].
11///
12/// Build with [`PageFileOptions::new`], adjust, then [`open`](PageFileOptions::open).
13/// The defaults are a 4 KiB page size, Direct I/O enabled, and create-if-absent.
14///
15/// # Examples
16///
17/// ```
18/// use page_db::{PageFileOptions, PageSize};
19///
20/// # let dir = tempfile::tempdir().unwrap();
21/// # let path = dir.path().join("data.pages");
22/// let file = PageFileOptions::new()
23/// .page_size(PageSize::new(8192)?)
24/// .direct_io(false) // buffered, e.g. on a filesystem without O_DIRECT
25/// .open(&path)?;
26/// assert_eq!(file.page_size(), 8192);
27/// # Ok::<(), page_db::PageError>(())
28/// ```
29#[derive(Debug, Clone)]
30#[must_use = "PageFileOptions does nothing until `open` is called"]
31pub struct PageFileOptions {
32 page_size: PageSize,
33 direct_io: bool,
34 create: bool,
35}
36
37impl Default for PageFileOptions {
38 fn default() -> Self {
39 Self {
40 page_size: DEFAULT_PAGE_SIZE,
41 direct_io: true,
42 create: true,
43 }
44 }
45}
46
47impl PageFileOptions {
48 /// Start from the defaults: 4 KiB pages, Direct I/O on, create-if-absent.
49 pub fn new() -> Self {
50 Self::default()
51 }
52
53 /// Set the page size. Every page in the file is this size; it is fixed for
54 /// the life of the file and the caller is responsible for reopening with the
55 /// same size.
56 pub fn page_size(mut self, page_size: PageSize) -> Self {
57 self.page_size = page_size;
58 self
59 }
60
61 /// Enable or disable Direct I/O (cache-bypass).
62 ///
63 /// Direct I/O is the default and the point of this crate. Disable it for a
64 /// filesystem that does not support it (some network and overlay
65 /// filesystems reject `O_DIRECT`); durability via [`PageFile::sync`] is
66 /// unaffected, only the page cache is.
67 pub fn direct_io(mut self, enabled: bool) -> Self {
68 self.direct_io = enabled;
69 self
70 }
71
72 /// Create the file if it does not exist (the default). When `false`, opening
73 /// a missing file is an error.
74 pub fn create(mut self, create: bool) -> Self {
75 self.create = create;
76 self
77 }
78
79 /// Open the page file at `path` with these options.
80 ///
81 /// # Errors
82 ///
83 /// Returns [`PageError::Io`] if the file cannot be opened (including a
84 /// filesystem that rejects Direct I/O, surfaced as the OS error).
85 pub fn open<P: AsRef<Path>>(self, path: P) -> PageResult<PageFile> {
86 let file = sys::open(path.as_ref(), self.direct_io, self.create)?;
87 Ok(PageFile {
88 file,
89 page_size: self.page_size,
90 })
91 }
92}
93
94/// A file of fixed-size pages.
95///
96/// A `PageFile` is an array of [`Page`]s on disk, addressed by [`PageId`]: page
97/// `n` occupies the byte range `n * page_size .. (n + 1) * page_size`. Reads and
98/// writes are positioned and take `&self`, so the handle is shared freely across
99/// threads — there is no shared file cursor to contend on. (The cache that will
100/// front these reads is a later release; today every read goes to disk.)
101///
102/// Durability is two steps, deliberately: [`write_page`](PageFile::write_page)
103/// places bytes, and [`sync`](PageFile::sync) makes them durable. Batch many
104/// writes, then sync once.
105///
106/// # Examples
107///
108/// ```
109/// use page_db::{PageFile, PageFileOptions, PageId, Lsn};
110///
111/// # let dir = tempfile::tempdir().unwrap();
112/// # let path = dir.path().join("data.pages");
113/// let file = PageFileOptions::new().direct_io(false).open(&path)?;
114///
115/// let mut page = file.allocate_page();
116/// page.set_lsn(Lsn::new(1));
117/// page.payload_mut()[..3].copy_from_slice(b"abc");
118/// file.write_page(PageId::new(0), &mut page)?;
119/// file.sync()?;
120///
121/// let got = file.read_page(PageId::new(0))?;
122/// assert_eq!(&got.payload()[..3], b"abc");
123/// assert_eq!(file.page_count()?, 1);
124/// # Ok::<(), page_db::PageError>(())
125/// ```
126#[derive(Debug)]
127pub struct PageFile {
128 file: File,
129 page_size: PageSize,
130}
131
132impl PageFile {
133 /// Open a page file at `path` with the given page size and the default
134 /// options (Direct I/O on, create-if-absent).
135 ///
136 /// For buffered I/O or other tuning, use [`PageFileOptions`].
137 ///
138 /// # Errors
139 ///
140 /// Returns [`PageError::Io`] if the file cannot be opened.
141 pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize) -> PageResult<Self> {
142 PageFileOptions::new().page_size(page_size).open(path)
143 }
144
145 /// The page size of this file, in bytes.
146 #[inline]
147 #[must_use]
148 pub fn page_size(&self) -> usize {
149 self.page_size.get()
150 }
151
152 /// The number of whole pages currently in the file.
153 ///
154 /// # Errors
155 ///
156 /// Returns [`PageError::Io`] if the file metadata cannot be read.
157 pub fn page_count(&self) -> PageResult<u64> {
158 let len = self.file.metadata()?.len();
159 Ok(len / self.page_size.get() as u64)
160 }
161
162 /// Allocate a fresh, zeroed page sized and aligned for this file.
163 ///
164 /// The page is in memory only; write it with
165 /// [`write_page`](PageFile::write_page) to place it in a slot.
166 #[must_use]
167 pub fn allocate_page(&self) -> Page {
168 Page::new(self.page_size)
169 }
170
171 /// Read the page at slot `id`, verifying its header and checksum.
172 ///
173 /// The page's magic, version, and CRC32C are checked, and its stamped id is
174 /// matched against `id`, before it is returned — so a corrupt or misdirected
175 /// page surfaces as an error rather than bad data.
176 ///
177 /// # Errors
178 ///
179 /// - [`PageError::ShortRead`] if the slot is past the end of the file.
180 /// - [`PageError::BadMagic`] / [`PageError::UnsupportedVersion`] /
181 /// [`PageError::ChecksumMismatch`] / [`PageError::MisdirectedPage`] if the
182 /// page fails validation.
183 /// - [`PageError::Io`] on an I/O failure.
184 pub fn read_page(&self, id: PageId) -> PageResult<Page> {
185 let mut page = Page::new(self.page_size);
186 self.read_into(id, &mut page)?;
187 Ok(page)
188 }
189
190 /// Read the page at slot `id` into an existing buffer, verifying it.
191 ///
192 /// This is the zero-allocation form of [`read_page`](PageFile::read_page):
193 /// it reuses `page`'s buffer instead of allocating a fresh one, which is how
194 /// a buffer pool recycles a frame on a cache miss. `page.page_size()` must
195 /// match the file's.
196 ///
197 /// # Errors
198 ///
199 /// As [`read_page`](PageFile::read_page), plus
200 /// [`PageError::InvalidPageSize`] if `page`'s size does not match the file's.
201 pub fn read_into(&self, id: PageId, page: &mut Page) -> PageResult<()> {
202 if page.page_size() != self.page_size.get() {
203 return Err(PageError::InvalidPageSize {
204 size: page.page_size(),
205 });
206 }
207 let offset = id.byte_offset(self.page_size.get());
208 let got = sys::read_at_full(&self.file, page.as_bytes_mut(), offset)?;
209 if got != self.page_size.get() {
210 return Err(PageError::ShortRead {
211 page_id: id.get(),
212 got,
213 page_size: self.page_size.get(),
214 });
215 }
216 page.verify(Some(id))?;
217 Ok(())
218 }
219
220 /// Write `page` to slot `id`, stamping the slot id and a fresh checksum.
221 ///
222 /// The page's id and checksum header fields are updated in place, so the
223 /// same page can be written, mutated, and written again. The write places
224 /// the bytes; call [`sync`](PageFile::sync) to make them durable.
225 ///
226 /// # Errors
227 ///
228 /// - [`PageError::InvalidPageSize`] if the page's size does not match the
229 /// file's.
230 /// - [`PageError::Io`] on an I/O failure.
231 pub fn write_page(&self, id: PageId, page: &mut Page) -> PageResult<()> {
232 if page.page_size() != self.page_size.get() {
233 return Err(PageError::InvalidPageSize {
234 size: page.page_size(),
235 });
236 }
237 page.stamp(id);
238 let offset = id.byte_offset(self.page_size.get());
239 sys::write_all_at(&self.file, page.as_bytes(), offset)?;
240 Ok(())
241 }
242
243 /// Flush all written pages to stable storage.
244 ///
245 /// Returns once the data is durable — `fdatasync` on Linux,
246 /// `FlushFileBuffers` on Windows, `F_FULLFSYNC` on macOS.
247 ///
248 /// # Errors
249 ///
250 /// Returns [`PageError::Io`] if the flush fails.
251 pub fn sync(&self) -> PageResult<()> {
252 sys::sync_data(&self.file)?;
253 Ok(())
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 #![allow(clippy::unwrap_used, clippy::expect_used)]
260
261 use super::*;
262 use crate::page::Lsn;
263
264 fn temp_file() -> (tempfile::TempDir, std::path::PathBuf) {
265 let dir = tempfile::tempdir().expect("tempdir");
266 let path = dir.path().join("test.pages");
267 (dir, path)
268 }
269
270 fn open_buffered(path: &Path) -> PageFile {
271 PageFileOptions::new()
272 .direct_io(false)
273 .open(path)
274 .expect("open")
275 }
276
277 #[test]
278 fn test_write_read_roundtrip() {
279 let (_dir, path) = temp_file();
280 let file = open_buffered(&path);
281
282 let mut page = file.allocate_page();
283 page.set_lsn(Lsn::new(11));
284 page.payload_mut()[..5].copy_from_slice(b"world");
285 file.write_page(PageId::new(2), &mut page).expect("write");
286 file.sync().expect("sync");
287
288 let got = file.read_page(PageId::new(2)).expect("read");
289 assert_eq!(got.id(), PageId::new(2));
290 assert_eq!(got.lsn(), Lsn::new(11));
291 assert_eq!(&got.payload()[..5], b"world");
292 }
293
294 #[test]
295 fn test_read_past_end_is_short_read() {
296 let (_dir, path) = temp_file();
297 let file = open_buffered(&path);
298 assert!(matches!(
299 file.read_page(PageId::new(0)),
300 Err(PageError::ShortRead { .. })
301 ));
302 }
303
304 #[test]
305 fn test_page_count_tracks_writes() {
306 let (_dir, path) = temp_file();
307 let file = open_buffered(&path);
308 assert_eq!(file.page_count().expect("count"), 0);
309
310 let mut page = file.allocate_page();
311 file.write_page(PageId::new(0), &mut page).expect("write");
312 assert_eq!(file.page_count().expect("count"), 1);
313
314 file.write_page(PageId::new(4), &mut page).expect("write");
315 assert_eq!(file.page_count().expect("count"), 5);
316 }
317
318 #[test]
319 fn test_corruption_on_disk_is_detected() {
320 let (_dir, path) = temp_file();
321 {
322 let file = open_buffered(&path);
323 let mut page = file.allocate_page();
324 page.payload_mut()[0] = 0x42;
325 file.write_page(PageId::new(0), &mut page).expect("write");
326 file.sync().expect("sync");
327 }
328 // Flip a payload byte directly in the file, past the 32-byte header.
329 {
330 use std::io::{Read, Seek, SeekFrom, Write};
331 let mut raw = std::fs::OpenOptions::new()
332 .read(true)
333 .write(true)
334 .open(&path)
335 .expect("reopen");
336 let _ = raw.seek(SeekFrom::Start(40)).expect("seek");
337 let mut b = [0u8; 1];
338 let _ = raw.read_exact(&mut b);
339 b[0] ^= 0xFF;
340 let _ = raw.seek(SeekFrom::Start(40)).expect("seek");
341 raw.write_all(&b).expect("write");
342 raw.sync_all().expect("sync");
343 }
344 let file = open_buffered(&path);
345 assert!(matches!(
346 file.read_page(PageId::new(0)),
347 Err(PageError::ChecksumMismatch { .. })
348 ));
349 }
350
351 #[test]
352 fn test_write_rejects_wrong_page_size() {
353 let (_dir, path) = temp_file();
354 let file = open_buffered(&path);
355 let mut wrong = Page::new(PageSize::new(8192).expect("valid"));
356 assert!(matches!(
357 file.write_page(PageId::new(0), &mut wrong),
358 Err(PageError::InvalidPageSize { size: 8192 })
359 ));
360 }
361}