1use crate::buffer::AlignedBuffer;
22use crate::checksum::Crc32c;
23use crate::error::{PageError, PageResult};
24
25pub const MIN_PAGE_SIZE: usize = 4096;
30
31pub const MAX_PAGE_SIZE: usize = 1 << 20;
33
34pub const PAGE_HEADER_SIZE: usize = 32;
37
38pub const DEFAULT_PAGE_SIZE: PageSize = PageSize(4096);
40
41const MAGIC: u32 = u32::from_le_bytes([b'P', b'G', b'D', b'B']);
42const FORMAT_VERSION: u16 = 1;
43
44const OFF_MAGIC: usize = 0;
45const OFF_VERSION: usize = 4;
46const OFF_PAGE_ID: usize = 8;
47const OFF_LSN: usize = 16;
48const OFF_CRC: usize = 24;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55pub struct PageId(u64);
56
57impl PageId {
58 #[inline]
60 #[must_use]
61 pub const fn new(id: u64) -> Self {
62 Self(id)
63 }
64
65 #[inline]
67 #[must_use]
68 pub const fn get(self) -> u64 {
69 self.0
70 }
71
72 #[inline]
74 #[must_use]
75 pub(crate) const fn byte_offset(self, page_size: usize) -> u64 {
76 self.0 * page_size as u64
77 }
78}
79
80impl std::fmt::Display for PageId {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 write!(f, "{}", self.0)
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
93#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
94pub struct Lsn(u64);
95
96impl Lsn {
97 pub const ZERO: Lsn = Lsn(0);
99
100 #[inline]
102 #[must_use]
103 pub const fn new(lsn: u64) -> Self {
104 Self(lsn)
105 }
106
107 #[inline]
109 #[must_use]
110 pub const fn get(self) -> u64 {
111 self.0
112 }
113}
114
115impl std::fmt::Display for Lsn {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 write!(f, "{}", self.0)
118 }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
139pub struct PageSize(usize);
140
141impl PageSize {
142 pub const fn new(size: usize) -> PageResult<Self> {
149 if size < MIN_PAGE_SIZE || size > MAX_PAGE_SIZE || !size.is_power_of_two() {
150 return Err(PageError::InvalidPageSize { size });
151 }
152 Ok(Self(size))
153 }
154
155 #[inline]
157 #[must_use]
158 pub const fn get(self) -> usize {
159 self.0
160 }
161
162 #[inline]
164 #[must_use]
165 pub const fn payload_len(self) -> usize {
166 self.0 - PAGE_HEADER_SIZE
167 }
168}
169
170impl Default for PageSize {
171 #[inline]
172 fn default() -> Self {
173 DEFAULT_PAGE_SIZE
174 }
175}
176
177pub struct Page {
208 buf: AlignedBuffer,
209 size: usize,
210}
211
212impl Page {
213 #[must_use]
215 pub fn new(page_size: PageSize) -> Self {
216 let size = page_size.get();
217 let mut buf = AlignedBuffer::new_zeroed(size, size);
218 {
219 let bytes = buf.as_mut_slice();
220 write_u32(bytes, OFF_MAGIC, MAGIC);
221 write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
222 }
223 Self { buf, size }
224 }
225
226 pub fn from_bytes(page_size: PageSize, bytes: &[u8]) -> PageResult<Self> {
239 let size = page_size.get();
240 if bytes.len() != size {
241 return Err(PageError::ShortRead {
242 page_id: 0,
243 got: bytes.len(),
244 page_size: size,
245 });
246 }
247 let mut buf = AlignedBuffer::new_zeroed(size, size);
248 buf.as_mut_slice().copy_from_slice(bytes);
249 let page = Self { buf, size };
250 page.verify(None)?;
251 Ok(page)
252 }
253
254 #[inline]
256 #[must_use]
257 pub fn page_size(&self) -> usize {
258 self.size
259 }
260
261 #[inline]
264 #[must_use]
265 pub fn id(&self) -> PageId {
266 PageId(read_u64(self.buf.as_slice(), OFF_PAGE_ID))
267 }
268
269 #[inline]
271 #[must_use]
272 pub fn lsn(&self) -> Lsn {
273 Lsn(read_u64(self.buf.as_slice(), OFF_LSN))
274 }
275
276 #[inline]
279 pub fn set_lsn(&mut self, lsn: Lsn) {
280 write_u64(self.buf.as_mut_slice(), OFF_LSN, lsn.0);
281 }
282
283 #[inline]
285 #[must_use]
286 pub fn payload(&self) -> &[u8] {
287 &self.buf.as_slice()[PAGE_HEADER_SIZE..]
288 }
289
290 #[inline]
292 pub fn payload_mut(&mut self) -> &mut [u8] {
293 &mut self.buf.as_mut_slice()[PAGE_HEADER_SIZE..]
294 }
295
296 #[must_use]
303 pub fn to_checksummed_bytes(&self) -> Vec<u8> {
304 let mut out = self.buf.as_slice().to_vec();
305 let crc = compute_checksum(&out);
306 write_u32(&mut out, OFF_CRC, crc);
307 out
308 }
309
310 pub(crate) fn reset(&mut self) {
314 let bytes = self.buf.as_mut_slice();
315 bytes.fill(0);
316 write_u32(bytes, OFF_MAGIC, MAGIC);
317 write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
318 }
319
320 pub(crate) fn stamp(&mut self, id: PageId) {
322 {
323 let bytes = self.buf.as_mut_slice();
324 write_u32(bytes, OFF_MAGIC, MAGIC);
325 write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
326 write_u64(bytes, OFF_PAGE_ID, id.0);
327 }
328 let crc = compute_checksum(self.buf.as_slice());
329 write_u32(self.buf.as_mut_slice(), OFF_CRC, crc);
330 }
331
332 pub(crate) fn verify(&self, expected: Option<PageId>) -> PageResult<()> {
334 let bytes = self.buf.as_slice();
335
336 let magic = read_u32(bytes, OFF_MAGIC);
337 if magic != MAGIC {
338 return Err(PageError::BadMagic {
339 found: magic,
340 expected: MAGIC,
341 });
342 }
343
344 let version = read_u16(bytes, OFF_VERSION);
345 if version != FORMAT_VERSION {
346 return Err(PageError::UnsupportedVersion {
347 found: version,
348 supported: FORMAT_VERSION,
349 });
350 }
351
352 let stored = read_u32(bytes, OFF_CRC);
353 let computed = compute_checksum(bytes);
354 if stored != computed {
355 return Err(PageError::ChecksumMismatch {
356 page_id: read_u64(bytes, OFF_PAGE_ID),
357 stored,
358 computed,
359 });
360 }
361
362 if let Some(expected) = expected {
363 let found = read_u64(bytes, OFF_PAGE_ID);
364 if found != expected.0 {
365 return Err(PageError::MisdirectedPage {
366 requested: expected.0,
367 found,
368 });
369 }
370 }
371
372 Ok(())
373 }
374
375 #[inline]
377 pub(crate) fn as_bytes(&self) -> &[u8] {
378 self.buf.as_slice()
379 }
380
381 #[inline]
383 pub(crate) fn as_bytes_mut(&mut self) -> &mut [u8] {
384 self.buf.as_mut_slice()
385 }
386}
387
388impl Clone for Page {
389 fn clone(&self) -> Self {
390 Self {
391 buf: self.buf.clone(),
392 size: self.size,
393 }
394 }
395}
396
397impl std::fmt::Debug for Page {
398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399 f.debug_struct("Page")
400 .field("id", &self.id())
401 .field("lsn", &self.lsn())
402 .field("page_size", &self.size)
403 .finish()
404 }
405}
406
407fn compute_checksum(bytes: &[u8]) -> u32 {
409 let mut crc = Crc32c::new();
410 crc.update(&bytes[..OFF_CRC]);
411 crc.update(&bytes[OFF_CRC + 4..]);
412 crc.finalize()
413}
414
415#[inline]
416fn read_u16(bytes: &[u8], off: usize) -> u16 {
417 u16::from_le_bytes([bytes[off], bytes[off + 1]])
418}
419
420#[inline]
421fn read_u32(bytes: &[u8], off: usize) -> u32 {
422 u32::from_le_bytes([bytes[off], bytes[off + 1], bytes[off + 2], bytes[off + 3]])
423}
424
425#[inline]
426fn read_u64(bytes: &[u8], off: usize) -> u64 {
427 u64::from_le_bytes([
428 bytes[off],
429 bytes[off + 1],
430 bytes[off + 2],
431 bytes[off + 3],
432 bytes[off + 4],
433 bytes[off + 5],
434 bytes[off + 6],
435 bytes[off + 7],
436 ])
437}
438
439#[inline]
440fn write_u16(bytes: &mut [u8], off: usize, value: u16) {
441 bytes[off..off + 2].copy_from_slice(&value.to_le_bytes());
442}
443
444#[inline]
445fn write_u32(bytes: &mut [u8], off: usize, value: u32) {
446 bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
447}
448
449#[inline]
450fn write_u64(bytes: &mut [u8], off: usize, value: u64) {
451 bytes[off..off + 8].copy_from_slice(&value.to_le_bytes());
452}
453
454#[cfg(test)]
455mod tests {
456 #![allow(clippy::unwrap_used, clippy::expect_used)]
457
458 use super::*;
459
460 #[test]
461 fn test_page_size_rejects_non_power_of_two() {
462 assert!(matches!(
463 PageSize::new(5000),
464 Err(PageError::InvalidPageSize { size: 5000 })
465 ));
466 }
467
468 #[test]
469 fn test_page_size_rejects_out_of_range() {
470 assert!(PageSize::new(2048).is_err());
471 assert!(PageSize::new(MAX_PAGE_SIZE * 2).is_err());
472 }
473
474 #[test]
475 fn test_page_size_payload_len() {
476 let ps = PageSize::new(4096).expect("valid");
477 assert_eq!(ps.payload_len(), 4096 - PAGE_HEADER_SIZE);
478 }
479
480 #[test]
481 fn test_new_page_has_valid_header() {
482 let page = Page::new(DEFAULT_PAGE_SIZE);
483 assert_eq!(page.id(), PageId::new(0));
484 assert_eq!(page.lsn(), Lsn::ZERO);
485 assert_eq!(page.page_size(), 4096);
486 }
487
488 #[test]
489 fn test_stamp_then_verify_roundtrips() {
490 let mut page = Page::new(DEFAULT_PAGE_SIZE);
491 page.set_lsn(Lsn::new(7));
492 page.payload_mut()[..4].copy_from_slice(b"data");
493 page.stamp(PageId::new(3));
494
495 assert_eq!(page.id(), PageId::new(3));
496 page.verify(Some(PageId::new(3))).expect("verifies");
497 assert_eq!(page.lsn(), Lsn::new(7));
498 }
499
500 #[test]
501 fn test_verify_detects_corruption() {
502 let mut page = Page::new(DEFAULT_PAGE_SIZE);
503 page.stamp(PageId::new(1));
504 page.payload_mut()[10] ^= 0xFF;
505 assert!(matches!(
506 page.verify(Some(PageId::new(1))),
507 Err(PageError::ChecksumMismatch { .. })
508 ));
509 }
510
511 #[test]
512 fn test_verify_detects_misdirected_page() {
513 let mut page = Page::new(DEFAULT_PAGE_SIZE);
514 page.stamp(PageId::new(5));
515 assert!(matches!(
516 page.verify(Some(PageId::new(6))),
517 Err(PageError::MisdirectedPage {
518 requested: 6,
519 found: 5
520 })
521 ));
522 }
523
524 #[test]
525 fn test_from_bytes_rejects_wrong_length() {
526 let bytes = vec![0u8; 100];
527 assert!(matches!(
528 Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes),
529 Err(PageError::ShortRead { .. })
530 ));
531 }
532
533 #[test]
534 fn test_from_bytes_rejects_bad_magic() {
535 let bytes = vec![0u8; 4096];
536 assert!(matches!(
537 Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes),
538 Err(PageError::BadMagic { .. })
539 ));
540 }
541
542 #[test]
543 fn test_to_bytes_from_bytes_roundtrips() {
544 let mut page = Page::new(DEFAULT_PAGE_SIZE);
545 page.set_lsn(Lsn::new(99));
546 page.payload_mut()[..5].copy_from_slice(b"hello");
547 let bytes = page.to_checksummed_bytes();
548
549 let loaded = Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes).expect("verifies");
550 assert_eq!(loaded.lsn(), Lsn::new(99));
551 assert_eq!(&loaded.payload()[..5], b"hello");
552 }
553}