page_db/file.rs
1//! [`PageFile`]: a file of fixed-size pages, read and written through Direct I/O.
2
3use std::fs::File;
4use std::path::Path;
5
6use crate::error::{PageError, PageResult};
7use crate::page::{DEFAULT_PAGE_SIZE, Page, PageId, PageSize};
8use crate::sys;
9
10/// Options for opening a [`PageFile`].
11///
12/// Build with [`PageFileOptions::new`], adjust, then [`open`](PageFileOptions::open).
13/// The defaults are a 4 KiB page size, Direct I/O enabled, and create-if-absent.
14///
15/// # Examples
16///
17/// ```
18/// use page_db::{PageFileOptions, PageSize};
19///
20/// # let dir = tempfile::tempdir().unwrap();
21/// # let path = dir.path().join("data.pages");
22/// let file = PageFileOptions::new()
23/// .page_size(PageSize::new(8192)?)
24/// .direct_io(false) // buffered, e.g. on a filesystem without O_DIRECT
25/// .open(&path)?;
26/// assert_eq!(file.page_size(), 8192);
27/// # Ok::<(), page_db::PageError>(())
28/// ```
29#[derive(Debug, Clone)]
30#[must_use = "PageFileOptions does nothing until `open` is called"]
31pub struct PageFileOptions {
32 page_size: PageSize,
33 direct_io: bool,
34 create: bool,
35}
36
37impl Default for PageFileOptions {
38 fn default() -> Self {
39 Self {
40 page_size: DEFAULT_PAGE_SIZE,
41 direct_io: true,
42 create: true,
43 }
44 }
45}
46
47impl PageFileOptions {
48 /// Start from the defaults: 4 KiB pages, Direct I/O on, create-if-absent.
49 pub fn new() -> Self {
50 Self::default()
51 }
52
53 /// Set the page size. Every page in the file is this size; it is fixed for
54 /// the life of the file and the caller is responsible for reopening with the
55 /// same size.
56 pub fn page_size(mut self, page_size: PageSize) -> Self {
57 self.page_size = page_size;
58 self
59 }
60
61 /// Enable or disable Direct I/O (cache-bypass).
62 ///
63 /// Direct I/O is the default and the point of this crate. Disable it for a
64 /// filesystem that does not support it (some network and overlay
65 /// filesystems reject `O_DIRECT`); durability via [`PageFile::sync`] is
66 /// unaffected, only the page cache is.
67 pub fn direct_io(mut self, enabled: bool) -> Self {
68 self.direct_io = enabled;
69 self
70 }
71
72 /// Create the file if it does not exist (the default). When `false`, opening
73 /// a missing file is an error.
74 pub fn create(mut self, create: bool) -> Self {
75 self.create = create;
76 self
77 }
78
79 /// Open the page file at `path` with these options.
80 ///
81 /// # Errors
82 ///
83 /// Returns [`PageError::Io`] if the file cannot be opened (including a
84 /// filesystem that rejects Direct I/O, surfaced as the OS error).
85 pub fn open<P: AsRef<Path>>(self, path: P) -> PageResult<PageFile> {
86 let file = sys::open(path.as_ref(), self.direct_io, self.create)?;
87 Ok(PageFile {
88 file,
89 page_size: self.page_size,
90 })
91 }
92}
93
94/// A file of fixed-size pages.
95///
96/// A `PageFile` is an array of [`Page`]s on disk, addressed by [`PageId`]: page
97/// `n` occupies the byte range `n * page_size .. (n + 1) * page_size`. Reads and
98/// writes are positioned and take `&self`, so the handle is shared freely across
99/// threads — there is no shared file cursor to contend on. (The cache that will
100/// front these reads is a later release; today every read goes to disk.)
101///
102/// Durability is two steps, deliberately: [`write_page`](PageFile::write_page)
103/// places bytes, and [`sync`](PageFile::sync) makes them durable. Batch many
104/// writes, then sync once.
105///
106/// # Examples
107///
108/// ```
109/// use page_db::{PageFile, PageFileOptions, PageId, Lsn};
110///
111/// # let dir = tempfile::tempdir().unwrap();
112/// # let path = dir.path().join("data.pages");
113/// let file = PageFileOptions::new().direct_io(false).open(&path)?;
114///
115/// let mut page = file.allocate_page();
116/// page.set_lsn(Lsn::new(1));
117/// page.payload_mut()[..3].copy_from_slice(b"abc");
118/// file.write_page(PageId::new(0), &mut page)?;
119/// file.sync()?;
120///
121/// let got = file.read_page(PageId::new(0))?;
122/// assert_eq!(&got.payload()[..3], b"abc");
123/// assert_eq!(file.page_count()?, 1);
124/// # Ok::<(), page_db::PageError>(())
125/// ```
126#[derive(Debug)]
127pub struct PageFile {
128 file: File,
129 page_size: PageSize,
130}
131
132impl PageFile {
133 /// Open a page file at `path` with the given page size and the default
134 /// options (Direct I/O on, create-if-absent).
135 ///
136 /// For buffered I/O or other tuning, use [`PageFileOptions`].
137 ///
138 /// # Errors
139 ///
140 /// Returns [`PageError::Io`] if the file cannot be opened.
141 pub fn open<P: AsRef<Path>>(path: P, page_size: PageSize) -> PageResult<Self> {
142 PageFileOptions::new().page_size(page_size).open(path)
143 }
144
145 /// The page size of this file, in bytes.
146 #[inline]
147 #[must_use]
148 pub fn page_size(&self) -> usize {
149 self.page_size.get()
150 }
151
152 /// The number of whole pages currently in the file.
153 ///
154 /// # Errors
155 ///
156 /// Returns [`PageError::Io`] if the file metadata cannot be read.
157 pub fn page_count(&self) -> PageResult<u64> {
158 let len = self.file.metadata()?.len();
159 Ok(len / self.page_size.get() as u64)
160 }
161
162 /// Allocate a fresh, zeroed page sized and aligned for this file.
163 ///
164 /// The page is in memory only; write it with
165 /// [`write_page`](PageFile::write_page) to place it in a slot.
166 #[must_use]
167 pub fn allocate_page(&self) -> Page {
168 Page::new(self.page_size)
169 }
170
171 /// Read the page at slot `id`, verifying its header and checksum.
172 ///
173 /// The page's magic, version, and CRC32C are checked, and its stamped id is
174 /// matched against `id`, before it is returned — so a corrupt or misdirected
175 /// page surfaces as an error rather than bad data.
176 ///
177 /// # Errors
178 ///
179 /// - [`PageError::ShortRead`] if the slot is past the end of the file.
180 /// - [`PageError::BadMagic`] / [`PageError::UnsupportedVersion`] /
181 /// [`PageError::ChecksumMismatch`] / [`PageError::MisdirectedPage`] if the
182 /// page fails validation.
183 /// - [`PageError::Io`] on an I/O failure.
184 pub fn read_page(&self, id: PageId) -> PageResult<Page> {
185 let mut page = Page::new(self.page_size);
186 let offset = id.byte_offset(self.page_size.get());
187 let got = sys::read_at_full(&self.file, page.as_bytes_mut(), offset)?;
188 if got != self.page_size.get() {
189 return Err(PageError::ShortRead {
190 page_id: id.get(),
191 got,
192 page_size: self.page_size.get(),
193 });
194 }
195 page.verify(Some(id))?;
196 Ok(page)
197 }
198
199 /// Write `page` to slot `id`, stamping the slot id and a fresh checksum.
200 ///
201 /// The page's id and checksum header fields are updated in place, so the
202 /// same page can be written, mutated, and written again. The write places
203 /// the bytes; call [`sync`](PageFile::sync) to make them durable.
204 ///
205 /// # Errors
206 ///
207 /// - [`PageError::InvalidPageSize`] if the page's size does not match the
208 /// file's.
209 /// - [`PageError::Io`] on an I/O failure.
210 pub fn write_page(&self, id: PageId, page: &mut Page) -> PageResult<()> {
211 if page.page_size() != self.page_size.get() {
212 return Err(PageError::InvalidPageSize {
213 size: page.page_size(),
214 });
215 }
216 page.stamp(id);
217 let offset = id.byte_offset(self.page_size.get());
218 sys::write_all_at(&self.file, page.as_bytes(), offset)?;
219 Ok(())
220 }
221
222 /// Flush all written pages to stable storage.
223 ///
224 /// Returns once the data is durable — `fdatasync` on Linux,
225 /// `FlushFileBuffers` on Windows, `F_FULLFSYNC` on macOS.
226 ///
227 /// # Errors
228 ///
229 /// Returns [`PageError::Io`] if the flush fails.
230 pub fn sync(&self) -> PageResult<()> {
231 sys::sync_data(&self.file)?;
232 Ok(())
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 #![allow(clippy::unwrap_used, clippy::expect_used)]
239
240 use super::*;
241 use crate::page::Lsn;
242
243 fn temp_file() -> (tempfile::TempDir, std::path::PathBuf) {
244 let dir = tempfile::tempdir().expect("tempdir");
245 let path = dir.path().join("test.pages");
246 (dir, path)
247 }
248
249 fn open_buffered(path: &Path) -> PageFile {
250 PageFileOptions::new()
251 .direct_io(false)
252 .open(path)
253 .expect("open")
254 }
255
256 #[test]
257 fn test_write_read_roundtrip() {
258 let (_dir, path) = temp_file();
259 let file = open_buffered(&path);
260
261 let mut page = file.allocate_page();
262 page.set_lsn(Lsn::new(11));
263 page.payload_mut()[..5].copy_from_slice(b"world");
264 file.write_page(PageId::new(2), &mut page).expect("write");
265 file.sync().expect("sync");
266
267 let got = file.read_page(PageId::new(2)).expect("read");
268 assert_eq!(got.id(), PageId::new(2));
269 assert_eq!(got.lsn(), Lsn::new(11));
270 assert_eq!(&got.payload()[..5], b"world");
271 }
272
273 #[test]
274 fn test_read_past_end_is_short_read() {
275 let (_dir, path) = temp_file();
276 let file = open_buffered(&path);
277 assert!(matches!(
278 file.read_page(PageId::new(0)),
279 Err(PageError::ShortRead { .. })
280 ));
281 }
282
283 #[test]
284 fn test_page_count_tracks_writes() {
285 let (_dir, path) = temp_file();
286 let file = open_buffered(&path);
287 assert_eq!(file.page_count().expect("count"), 0);
288
289 let mut page = file.allocate_page();
290 file.write_page(PageId::new(0), &mut page).expect("write");
291 assert_eq!(file.page_count().expect("count"), 1);
292
293 file.write_page(PageId::new(4), &mut page).expect("write");
294 assert_eq!(file.page_count().expect("count"), 5);
295 }
296
297 #[test]
298 fn test_corruption_on_disk_is_detected() {
299 let (_dir, path) = temp_file();
300 {
301 let file = open_buffered(&path);
302 let mut page = file.allocate_page();
303 page.payload_mut()[0] = 0x42;
304 file.write_page(PageId::new(0), &mut page).expect("write");
305 file.sync().expect("sync");
306 }
307 // Flip a payload byte directly in the file, past the 32-byte header.
308 {
309 use std::io::{Read, Seek, SeekFrom, Write};
310 let mut raw = std::fs::OpenOptions::new()
311 .read(true)
312 .write(true)
313 .open(&path)
314 .expect("reopen");
315 let _ = raw.seek(SeekFrom::Start(40)).expect("seek");
316 let mut b = [0u8; 1];
317 let _ = raw.read_exact(&mut b);
318 b[0] ^= 0xFF;
319 let _ = raw.seek(SeekFrom::Start(40)).expect("seek");
320 raw.write_all(&b).expect("write");
321 raw.sync_all().expect("sync");
322 }
323 let file = open_buffered(&path);
324 assert!(matches!(
325 file.read_page(PageId::new(0)),
326 Err(PageError::ChecksumMismatch { .. })
327 ));
328 }
329
330 #[test]
331 fn test_write_rejects_wrong_page_size() {
332 let (_dir, path) = temp_file();
333 let file = open_buffered(&path);
334 let mut wrong = Page::new(PageSize::new(8192).expect("valid"));
335 assert!(matches!(
336 file.write_page(PageId::new(0), &mut wrong),
337 Err(PageError::InvalidPageSize { size: 8192 })
338 ));
339 }
340}