1use std::fs::{File, OpenOptions};
9use std::io::{Read, Seek, SeekFrom, Write};
10use std::path::Path;
11
12use mentedb_core::error::{MenteError, MenteResult};
13use tracing::{debug, info, trace};
14
15pub const PAGE_SIZE: usize = 16 * 1024;
17
18const MAGIC: u64 = 0x4D454E_5445444231;
20
21const VERSION: u32 = 1;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub struct PageId(pub u64);
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30#[repr(u8)]
31pub enum PageType {
32 Free = 0,
33 Data = 1,
34 Index = 2,
35 Overflow = 3,
36}
37
38impl From<u8> for PageType {
39 fn from(v: u8) -> Self {
40 match v {
41 1 => PageType::Data,
42 2 => PageType::Index,
43 3 => PageType::Overflow,
44 _ => PageType::Free,
45 }
46 }
47}
48
49#[repr(C)]
51#[derive(Debug, Clone, Copy)]
52pub struct PageHeader {
53 pub page_id: u64,
55 pub lsn: u64,
57 pub checksum: u32,
59 pub free_space: u16,
61 pub num_slots: u16,
63 pub page_type: u8,
65 pub _padding: [u8; 7],
67}
68
69pub const HEADER_SIZE: usize = std::mem::size_of::<PageHeader>();
71
72pub const PAGE_DATA_SIZE: usize = PAGE_SIZE - HEADER_SIZE;
74
75#[repr(C, align(4096))]
77pub struct Page {
78 pub header: PageHeader,
79 pub data: [u8; PAGE_DATA_SIZE],
80}
81
82impl std::fmt::Debug for Page {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 f.debug_struct("Page")
85 .field("header", &self.header)
86 .field("data_len", &self.data.len())
87 .finish()
88 }
89}
90
91impl Clone for Page {
92 fn clone(&self) -> Self {
93 let mut new_page = Page::zeroed();
94 new_page.header = self.header;
95 new_page.data.copy_from_slice(&self.data);
96 new_page
97 }
98}
99
100impl Page {
101 pub fn zeroed() -> Self {
103 unsafe { std::mem::zeroed() }
105 }
106
107 fn as_bytes(&self) -> &[u8; PAGE_SIZE] {
109 unsafe { &*(self as *const Page as *const [u8; PAGE_SIZE]) }
111 }
112
113 fn from_bytes(bytes: &[u8; PAGE_SIZE]) -> Self {
115 unsafe { std::ptr::read(bytes.as_ptr() as *const Page) }
117 }
118
119 pub fn compute_checksum(&self) -> u32 {
121 let mut h = crc32fast::Hasher::new();
122 h.update(&self.header.page_id.to_le_bytes());
123 h.update(&self.header.lsn.to_le_bytes());
124 h.update(&self.header.free_space.to_le_bytes());
125 h.update(&self.header.num_slots.to_le_bytes());
126 h.update(&[self.header.page_type]);
127 h.update(&self.data);
128 h.finalize()
129 }
130}
131
132#[repr(C)]
134struct FileHeader {
135 magic: u64,
136 version: u32,
137 _pad: u32,
138 page_count: u64,
139 free_list_head: u64,
140}
141
142pub struct PageManager {
144 file: File,
145 page_count: u64,
146 free_list_head: u64,
147}
148
149impl PageManager {
150 pub fn open(dir_path: &Path) -> MenteResult<Self> {
152 let file_path = dir_path.join("pages.db");
153 let exists = file_path.exists()
154 && std::fs::metadata(&file_path)
155 .map(|m| m.len() > 0)
156 .unwrap_or(false);
157
158 let mut file = OpenOptions::new()
159 .read(true)
160 .write(true)
161 .create(true)
162 .truncate(false)
163 .open(&file_path)?;
164
165 if exists {
166 let mut buf = [0u8; std::mem::size_of::<FileHeader>()];
167 file.seek(SeekFrom::Start(0))?;
168 file.read_exact(&mut buf)?;
169 let hdr: FileHeader = unsafe { std::ptr::read(buf.as_ptr() as *const FileHeader) };
170
171 if hdr.magic != MAGIC {
172 return Err(MenteError::Storage("invalid page file magic number".into()));
173 }
174 if hdr.version != VERSION {
175 return Err(MenteError::Storage(format!(
176 "unsupported page file version: {}",
177 hdr.version
178 )));
179 }
180
181 info!(page_count = hdr.page_count, "opened existing page file");
182 Ok(Self {
183 file,
184 page_count: hdr.page_count,
185 free_list_head: hdr.free_list_head,
186 })
187 } else {
188 let mut pm = Self {
189 file,
190 page_count: 1,
191 free_list_head: 0,
192 };
193 let mut header_page = Page::zeroed();
195 header_page.header.page_id = 0;
196 pm.write_page_raw(PageId(0), &header_page)?;
197 pm.write_file_header()?;
198 info!("created new page file");
199 Ok(pm)
200 }
201 }
202
203 fn write_file_header(&mut self) -> MenteResult<()> {
205 let hdr = FileHeader {
206 magic: MAGIC,
207 version: VERSION,
208 _pad: 0,
209 page_count: self.page_count,
210 free_list_head: self.free_list_head,
211 };
212 let bytes = unsafe {
213 std::slice::from_raw_parts(
214 &hdr as *const FileHeader as *const u8,
215 std::mem::size_of::<FileHeader>(),
216 )
217 };
218 self.file.seek(SeekFrom::Start(0))?;
219 self.file.write_all(bytes)?;
220 self.file.flush()?;
221 Ok(())
222 }
223
224 pub fn allocate_page(&mut self) -> MenteResult<PageId> {
226 if self.free_list_head != 0 {
227 let page_id = PageId(self.free_list_head);
228 let page = self.read_page(page_id)?;
229 let next_free = u64::from_le_bytes(page.data[..8].try_into().unwrap());
230 self.free_list_head = next_free;
231 self.write_file_header()?;
232 debug!(page_id = page_id.0, "allocated page from free list");
233 return Ok(page_id);
234 }
235
236 let page_id = PageId(self.page_count);
237 self.page_count += 1;
238
239 let mut page = Page::zeroed();
240 page.header.page_id = page_id.0;
241 page.header.page_type = PageType::Data as u8;
242 page.header.free_space = PAGE_DATA_SIZE as u16;
243 self.write_page_raw(page_id, &page)?;
244 self.write_file_header()?;
245
246 debug!(page_id = page_id.0, "allocated new page");
247 Ok(page_id)
248 }
249
250 pub fn read_page(&mut self, page_id: PageId) -> MenteResult<Box<Page>> {
252 if page_id.0 >= self.page_count {
253 return Err(MenteError::Storage(format!(
254 "page {} out of range (count={})",
255 page_id.0, self.page_count
256 )));
257 }
258
259 let offset = page_id.0 * PAGE_SIZE as u64;
260 let mut buf = [0u8; PAGE_SIZE];
261 self.file.seek(SeekFrom::Start(offset))?;
262 self.file.read_exact(&mut buf)?;
263
264 trace!(page_id = page_id.0, "read page from disk");
265 Ok(Box::new(Page::from_bytes(&buf)))
266 }
267
268 pub fn write_page(&mut self, page_id: PageId, page: &Page) -> MenteResult<()> {
270 self.write_page_raw(page_id, page)
271 }
272
273 fn write_page_raw(&mut self, page_id: PageId, page: &Page) -> MenteResult<()> {
274 let offset = page_id.0 * PAGE_SIZE as u64;
275 self.file.seek(SeekFrom::Start(offset))?;
276 self.file.write_all(page.as_bytes())?;
277 trace!(page_id = page_id.0, "wrote page to disk");
278 Ok(())
279 }
280
281 pub fn free_page(&mut self, page_id: PageId) -> MenteResult<()> {
283 let mut page = Page::zeroed();
284 page.header.page_id = page_id.0;
285 page.header.page_type = PageType::Free as u8;
286 page.data[..8].copy_from_slice(&self.free_list_head.to_le_bytes());
288
289 self.write_page(page_id, &page)?;
290 self.free_list_head = page_id.0;
291 self.write_file_header()?;
292
293 debug!(page_id = page_id.0, "freed page");
294 Ok(())
295 }
296
297 pub fn page_count(&self) -> u64 {
299 self.page_count
300 }
301
302 pub fn sync(&mut self) -> MenteResult<()> {
304 self.file.sync_data()?;
305 Ok(())
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 fn setup() -> (tempfile::TempDir, PageManager) {
314 let dir = tempfile::tempdir().unwrap();
315 let pm = PageManager::open(dir.path()).unwrap();
316 (dir, pm)
317 }
318
319 #[test]
320 fn test_allocate_and_read_write() {
321 let (_dir, mut pm) = setup();
322
323 let pid = pm.allocate_page().unwrap();
324 assert_eq!(pid.0, 1); let mut page = Page::zeroed();
327 page.header.page_id = pid.0;
328 page.header.page_type = PageType::Data as u8;
329 page.data[0..5].copy_from_slice(b"hello");
330 pm.write_page(pid, &page).unwrap();
331
332 let loaded = pm.read_page(pid).unwrap();
333 assert_eq!(&loaded.data[0..5], b"hello");
334 }
335
336 #[test]
337 fn test_free_and_reuse() {
338 let (_dir, mut pm) = setup();
339
340 let p1 = pm.allocate_page().unwrap();
341 let p2 = pm.allocate_page().unwrap();
342 assert_eq!(p1.0, 1);
343 assert_eq!(p2.0, 2);
344
345 pm.free_page(p1).unwrap();
347 let p3 = pm.allocate_page().unwrap();
348 assert_eq!(p3.0, p1.0);
349
350 let p4 = pm.allocate_page().unwrap();
352 assert_eq!(p4.0, 3);
353 }
354
355 #[test]
356 fn test_multiple_free_reuse() {
357 let (_dir, mut pm) = setup();
358
359 let p1 = pm.allocate_page().unwrap();
360 let p2 = pm.allocate_page().unwrap();
361 let _p3 = pm.allocate_page().unwrap();
362
363 pm.free_page(p1).unwrap();
365 pm.free_page(p2).unwrap();
366
367 let a1 = pm.allocate_page().unwrap();
369 let a2 = pm.allocate_page().unwrap();
370 assert_eq!(a1.0, p2.0);
371 assert_eq!(a2.0, p1.0);
372
373 let a3 = pm.allocate_page().unwrap();
375 assert_eq!(a3.0, 4);
376 }
377
378 #[test]
379 fn test_reopen() {
380 let dir = tempfile::tempdir().unwrap();
381 let pid;
382 {
383 let mut pm = PageManager::open(dir.path()).unwrap();
384 pid = pm.allocate_page().unwrap();
385 let mut page = Page::zeroed();
386 page.header.page_id = pid.0;
387 page.data[0..4].copy_from_slice(b"test");
388 pm.write_page(pid, &page).unwrap();
389 pm.sync().unwrap();
390 }
391 {
392 let mut pm = PageManager::open(dir.path()).unwrap();
393 let page = pm.read_page(pid).unwrap();
394 assert_eq!(&page.data[0..4], b"test");
395 }
396 }
397
398 #[test]
399 fn test_out_of_range() {
400 let (_dir, mut pm) = setup();
401 assert!(pm.read_page(PageId(999)).is_err());
402 }
403
404 #[test]
405 fn test_checksum() {
406 let mut page = Page::zeroed();
407 page.header.page_id = 42;
408 page.data[0] = 0xFF;
409 let c1 = page.compute_checksum();
410 page.data[0] = 0x00;
411 let c2 = page.compute_checksum();
412 assert_ne!(c1, c2);
413 }
414}