1use std::fs::{File, OpenOptions};
9use std::io::{Read, Seek, SeekFrom, Write};
10use std::path::Path;
11
12use mentedb_core::error::{MenteError, MenteResult};
13use tracing::{debug, info, trace};
14
15pub const PAGE_SIZE: usize = 64 * 1024;
17
18const MAGIC: u64 = 0x4D454E_5445444231;
20
21const VERSION: u32 = 1;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub struct PageId(pub u64);
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30#[repr(u8)]
31pub enum PageType {
32 Free = 0,
33 Data = 1,
34 Index = 2,
35 Overflow = 3,
36}
37
38impl From<u8> for PageType {
39 fn from(v: u8) -> Self {
40 match v {
41 1 => PageType::Data,
42 2 => PageType::Index,
43 3 => PageType::Overflow,
44 _ => PageType::Free,
45 }
46 }
47}
48
49#[repr(C)]
51#[derive(Debug, Clone, Copy)]
52pub struct PageHeader {
53 pub page_id: u64,
55 pub lsn: u64,
57 pub checksum: u32,
59 pub free_space: u16,
61 pub num_slots: u16,
63 pub page_type: u8,
65 pub _padding: [u8; 7],
67}
68
69pub const HEADER_SIZE: usize = std::mem::size_of::<PageHeader>();
71
72pub const PAGE_DATA_SIZE: usize = PAGE_SIZE - HEADER_SIZE;
74
75#[repr(C, align(4096))]
77pub struct Page {
78 pub header: PageHeader,
80 pub data: [u8; PAGE_DATA_SIZE],
82}
83
84impl std::fmt::Debug for Page {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("Page")
87 .field("header", &self.header)
88 .field("data_len", &self.data.len())
89 .finish()
90 }
91}
92
93impl Clone for Page {
94 fn clone(&self) -> Self {
95 let mut new_page = Page::zeroed();
96 new_page.header = self.header;
97 new_page.data.copy_from_slice(&self.data);
98 new_page
99 }
100}
101
102impl Page {
103 pub fn zeroed() -> Self {
105 unsafe { std::mem::zeroed() }
107 }
108
109 fn as_bytes(&self) -> &[u8; PAGE_SIZE] {
111 unsafe { &*(self as *const Page as *const [u8; PAGE_SIZE]) }
113 }
114
115 fn from_bytes(bytes: &[u8; PAGE_SIZE]) -> Self {
117 unsafe { std::ptr::read(bytes.as_ptr() as *const Page) }
119 }
120
121 pub fn compute_checksum(&self) -> u32 {
123 let mut h = crc32fast::Hasher::new();
124 h.update(&self.header.page_id.to_le_bytes());
125 h.update(&self.header.lsn.to_le_bytes());
126 h.update(&self.header.free_space.to_le_bytes());
127 h.update(&self.header.num_slots.to_le_bytes());
128 h.update(&[self.header.page_type]);
129 h.update(&self.data);
130 h.finalize()
131 }
132}
133
134#[repr(C)]
136struct FileHeader {
137 magic: u64,
138 version: u32,
139 _pad: u32,
140 page_count: u64,
141 free_list_head: u64,
142}
143
144pub struct PageManager {
146 file: File,
147 page_count: u64,
148 free_list_head: u64,
149}
150
151impl PageManager {
152 pub fn open(dir_path: &Path) -> MenteResult<Self> {
154 let file_path = dir_path.join("pages.db");
155 let exists = file_path.exists()
156 && std::fs::metadata(&file_path)
157 .map(|m| m.len() > 0)
158 .unwrap_or(false);
159
160 let mut file = OpenOptions::new()
161 .read(true)
162 .write(true)
163 .create(true)
164 .truncate(false)
165 .open(&file_path)?;
166
167 if exists {
168 let mut buf = [0u8; std::mem::size_of::<FileHeader>()];
169 file.seek(SeekFrom::Start(0))?;
170 file.read_exact(&mut buf)?;
171 let hdr: FileHeader = unsafe { std::ptr::read(buf.as_ptr() as *const FileHeader) };
172
173 if hdr.magic != MAGIC {
174 return Err(MenteError::Storage("invalid page file magic number".into()));
175 }
176 if hdr.version != VERSION {
177 return Err(MenteError::Storage(format!(
178 "unsupported page file version: {}",
179 hdr.version
180 )));
181 }
182
183 info!(page_count = hdr.page_count, "opened existing page file");
184 Ok(Self {
185 file,
186 page_count: hdr.page_count,
187 free_list_head: hdr.free_list_head,
188 })
189 } else {
190 let mut pm = Self {
191 file,
192 page_count: 1,
193 free_list_head: 0,
194 };
195 let mut header_page = Page::zeroed();
197 header_page.header.page_id = 0;
198 pm.write_page_raw(PageId(0), &header_page)?;
199 pm.write_file_header()?;
200 info!("created new page file");
201 Ok(pm)
202 }
203 }
204
205 pub fn reload_header(&mut self) -> MenteResult<()> {
207 let mut buf = [0u8; std::mem::size_of::<FileHeader>()];
208 self.file.seek(SeekFrom::Start(0))?;
209 self.file.read_exact(&mut buf)?;
210 let hdr: FileHeader = unsafe { std::ptr::read(buf.as_ptr() as *const FileHeader) };
211 if hdr.magic != MAGIC {
212 return Err(MenteError::Storage(
213 "invalid page file magic on reload".into(),
214 ));
215 }
216 self.page_count = hdr.page_count;
217 self.free_list_head = hdr.free_list_head;
218 debug!(page_count = self.page_count, "reloaded page file header");
219 Ok(())
220 }
221
222 fn write_file_header(&mut self) -> MenteResult<()> {
224 let hdr = FileHeader {
225 magic: MAGIC,
226 version: VERSION,
227 _pad: 0,
228 page_count: self.page_count,
229 free_list_head: self.free_list_head,
230 };
231 let bytes = unsafe {
232 std::slice::from_raw_parts(
233 &hdr as *const FileHeader as *const u8,
234 std::mem::size_of::<FileHeader>(),
235 )
236 };
237 self.file.seek(SeekFrom::Start(0))?;
238 self.file.write_all(bytes)?;
239 self.file.flush()?;
240 Ok(())
241 }
242
243 pub fn allocate_page(&mut self) -> MenteResult<PageId> {
245 if self.free_list_head != 0 {
246 let page_id = PageId(self.free_list_head);
247 let page = self.read_page(page_id)?;
248 let next_free = u64::from_le_bytes(page.data[..8].try_into().unwrap());
249 self.free_list_head = next_free;
250 self.write_file_header()?;
251 debug!(page_id = page_id.0, "allocated page from free list");
252 return Ok(page_id);
253 }
254
255 let page_id = PageId(self.page_count);
256 self.page_count += 1;
257
258 let mut page = Page::zeroed();
259 page.header.page_id = page_id.0;
260 page.header.page_type = PageType::Data as u8;
261 page.header.free_space = PAGE_DATA_SIZE as u16;
262 self.write_page_raw(page_id, &page)?;
263 self.write_file_header()?;
264
265 debug!(page_id = page_id.0, "allocated new page");
266 Ok(page_id)
267 }
268
269 pub fn read_page(&mut self, page_id: PageId) -> MenteResult<Box<Page>> {
271 if page_id.0 >= self.page_count {
272 return Err(MenteError::Storage(format!(
273 "page {} out of range (count={})",
274 page_id.0, self.page_count
275 )));
276 }
277
278 let offset = page_id.0 * PAGE_SIZE as u64;
279 let mut buf = [0u8; PAGE_SIZE];
280 self.file.seek(SeekFrom::Start(offset))?;
281 self.file.read_exact(&mut buf)?;
282
283 let page = Page::from_bytes(&buf);
284 if page.header.checksum != 0 {
285 let expected = page.compute_checksum();
286 if page.header.checksum != expected {
287 return Err(MenteError::Storage(format!(
288 "page {} checksum mismatch (stored={:#x}, computed={:#x})",
289 page_id.0, page.header.checksum, expected
290 )));
291 }
292 }
293
294 trace!(page_id = page_id.0, "read page from disk");
295 Ok(Box::new(page))
296 }
297
298 pub fn write_page(&mut self, page_id: PageId, page: &Page) -> MenteResult<()> {
300 self.write_page_raw(page_id, page)
301 }
302
303 fn write_page_raw(&mut self, page_id: PageId, page: &Page) -> MenteResult<()> {
304 let offset = page_id.0 * PAGE_SIZE as u64;
305 self.file.seek(SeekFrom::Start(offset))?;
306 self.file.write_all(page.as_bytes())?;
307 trace!(page_id = page_id.0, "wrote page to disk");
308 Ok(())
309 }
310
311 pub fn free_page(&mut self, page_id: PageId) -> MenteResult<()> {
313 let mut page = Page::zeroed();
314 page.header.page_id = page_id.0;
315 page.header.page_type = PageType::Free as u8;
316 page.data[..8].copy_from_slice(&self.free_list_head.to_le_bytes());
318
319 self.write_page(page_id, &page)?;
320 self.free_list_head = page_id.0;
321 self.write_file_header()?;
322
323 debug!(page_id = page_id.0, "freed page");
324 Ok(())
325 }
326
327 pub fn page_count(&self) -> u64 {
329 self.page_count
330 }
331
332 pub fn sync(&mut self) -> MenteResult<()> {
334 self.file.sync_data()?;
335 Ok(())
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 fn setup() -> (tempfile::TempDir, PageManager) {
344 let dir = tempfile::tempdir().unwrap();
345 let pm = PageManager::open(dir.path()).unwrap();
346 (dir, pm)
347 }
348
349 #[test]
350 fn test_allocate_and_read_write() {
351 let (_dir, mut pm) = setup();
352
353 let pid = pm.allocate_page().unwrap();
354 assert_eq!(pid.0, 1); let mut page = Page::zeroed();
357 page.header.page_id = pid.0;
358 page.header.page_type = PageType::Data as u8;
359 page.data[0..5].copy_from_slice(b"hello");
360 pm.write_page(pid, &page).unwrap();
361
362 let loaded = pm.read_page(pid).unwrap();
363 assert_eq!(&loaded.data[0..5], b"hello");
364 }
365
366 #[test]
367 fn test_free_and_reuse() {
368 let (_dir, mut pm) = setup();
369
370 let p1 = pm.allocate_page().unwrap();
371 let p2 = pm.allocate_page().unwrap();
372 assert_eq!(p1.0, 1);
373 assert_eq!(p2.0, 2);
374
375 pm.free_page(p1).unwrap();
377 let p3 = pm.allocate_page().unwrap();
378 assert_eq!(p3.0, p1.0);
379
380 let p4 = pm.allocate_page().unwrap();
382 assert_eq!(p4.0, 3);
383 }
384
385 #[test]
386 fn test_multiple_free_reuse() {
387 let (_dir, mut pm) = setup();
388
389 let p1 = pm.allocate_page().unwrap();
390 let p2 = pm.allocate_page().unwrap();
391 let _p3 = pm.allocate_page().unwrap();
392
393 pm.free_page(p1).unwrap();
395 pm.free_page(p2).unwrap();
396
397 let a1 = pm.allocate_page().unwrap();
399 let a2 = pm.allocate_page().unwrap();
400 assert_eq!(a1.0, p2.0);
401 assert_eq!(a2.0, p1.0);
402
403 let a3 = pm.allocate_page().unwrap();
405 assert_eq!(a3.0, 4);
406 }
407
408 #[test]
409 fn test_reopen() {
410 let dir = tempfile::tempdir().unwrap();
411 let pid;
412 {
413 let mut pm = PageManager::open(dir.path()).unwrap();
414 pid = pm.allocate_page().unwrap();
415 let mut page = Page::zeroed();
416 page.header.page_id = pid.0;
417 page.data[0..4].copy_from_slice(b"test");
418 pm.write_page(pid, &page).unwrap();
419 pm.sync().unwrap();
420 }
421 {
422 let mut pm = PageManager::open(dir.path()).unwrap();
423 let page = pm.read_page(pid).unwrap();
424 assert_eq!(&page.data[0..4], b"test");
425 }
426 }
427
428 #[test]
429 fn test_out_of_range() {
430 let (_dir, mut pm) = setup();
431 assert!(pm.read_page(PageId(999)).is_err());
432 }
433
434 #[test]
435 fn test_checksum() {
436 let mut page = Page::zeroed();
437 page.header.page_id = 42;
438 page.data[0] = 0xFF;
439 let c1 = page.compute_checksum();
440 page.data[0] = 0x00;
441 let c2 = page.compute_checksum();
442 assert_ne!(c1, c2);
443 }
444
445 #[test]
446 fn test_checksum_verified_on_read() {
447 let dir = tempfile::tempdir().unwrap();
448 let pid;
449 {
450 let mut pm = PageManager::open(dir.path()).unwrap();
451 pid = pm.allocate_page().unwrap();
452 let mut page = Page::zeroed();
453 page.header.page_id = pid.0;
454 page.header.page_type = PageType::Data as u8;
455 page.data[0..5].copy_from_slice(b"valid");
456 page.header.checksum = page.compute_checksum();
457 pm.write_page(pid, &page).unwrap();
458 pm.sync().unwrap();
459 }
460 {
461 let mut pm = PageManager::open(dir.path()).unwrap();
463 let page = pm.read_page(pid).unwrap();
464 assert_eq!(&page.data[0..5], b"valid");
465 }
466 {
467 let data_path = dir.path().join("pages.db");
469 let mut raw = std::fs::read(&data_path).unwrap();
470 let offset = pid.0 as usize * PAGE_SIZE;
471 raw[offset + std::mem::size_of::<PageHeader>()] ^= 0xFF;
473 std::fs::write(&data_path, &raw).unwrap();
474
475 let mut pm = PageManager::open(dir.path()).unwrap();
476 let result = pm.read_page(pid);
477 assert!(result.is_err(), "corrupted page should fail checksum");
478 }
479 }
480}