1use crate::buffer::AlignedBuffer;
22use crate::checksum::Crc32c;
23use crate::error::{PageError, PageResult};
24
25pub const MIN_PAGE_SIZE: usize = 4096;
30
31pub const MAX_PAGE_SIZE: usize = 1 << 20;
33
34pub const PAGE_HEADER_SIZE: usize = 32;
37
38pub const DEFAULT_PAGE_SIZE: PageSize = PageSize(4096);
40
41const MAGIC: u32 = u32::from_le_bytes([b'P', b'G', b'D', b'B']);
42const FORMAT_VERSION: u16 = 1;
43
44const OFF_MAGIC: usize = 0;
45const OFF_VERSION: usize = 4;
46const OFF_PAGE_ID: usize = 8;
47const OFF_LSN: usize = 16;
48const OFF_CRC: usize = 24;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55pub struct PageId(u64);
56
57impl PageId {
58 #[inline]
60 #[must_use]
61 pub const fn new(id: u64) -> Self {
62 Self(id)
63 }
64
65 #[inline]
67 #[must_use]
68 pub const fn get(self) -> u64 {
69 self.0
70 }
71
72 #[inline]
74 #[must_use]
75 pub(crate) const fn byte_offset(self, page_size: usize) -> u64 {
76 self.0 * page_size as u64
77 }
78}
79
80impl std::fmt::Display for PageId {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 write!(f, "{}", self.0)
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
93#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
94pub struct Lsn(u64);
95
96impl Lsn {
97 pub const ZERO: Lsn = Lsn(0);
99
100 #[inline]
102 #[must_use]
103 pub const fn new(lsn: u64) -> Self {
104 Self(lsn)
105 }
106
107 #[inline]
109 #[must_use]
110 pub const fn get(self) -> u64 {
111 self.0
112 }
113}
114
115impl std::fmt::Display for Lsn {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 write!(f, "{}", self.0)
118 }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
139pub struct PageSize(usize);
140
141impl PageSize {
142 pub const fn new(size: usize) -> PageResult<Self> {
149 if size < MIN_PAGE_SIZE || size > MAX_PAGE_SIZE || !size.is_power_of_two() {
150 return Err(PageError::InvalidPageSize { size });
151 }
152 Ok(Self(size))
153 }
154
155 #[inline]
157 #[must_use]
158 pub const fn get(self) -> usize {
159 self.0
160 }
161
162 #[inline]
164 #[must_use]
165 pub const fn payload_len(self) -> usize {
166 self.0 - PAGE_HEADER_SIZE
167 }
168}
169
170impl Default for PageSize {
171 #[inline]
172 fn default() -> Self {
173 DEFAULT_PAGE_SIZE
174 }
175}
176
177pub struct Page {
208 buf: AlignedBuffer,
209 size: usize,
210}
211
212impl Page {
213 #[must_use]
215 pub fn new(page_size: PageSize) -> Self {
216 let size = page_size.get();
217 let mut buf = AlignedBuffer::new_zeroed(size, size);
218 {
219 let bytes = buf.as_mut_slice();
220 write_u32(bytes, OFF_MAGIC, MAGIC);
221 write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
222 }
223 Self { buf, size }
224 }
225
226 pub fn from_bytes(page_size: PageSize, bytes: &[u8]) -> PageResult<Self> {
239 let size = page_size.get();
240 if bytes.len() != size {
241 return Err(PageError::ShortRead {
242 page_id: 0,
243 got: bytes.len(),
244 page_size: size,
245 });
246 }
247 let mut buf = AlignedBuffer::new_zeroed(size, size);
248 buf.as_mut_slice().copy_from_slice(bytes);
249 let page = Self { buf, size };
250 page.verify(None)?;
251 Ok(page)
252 }
253
254 #[inline]
256 #[must_use]
257 pub fn page_size(&self) -> usize {
258 self.size
259 }
260
261 #[inline]
264 #[must_use]
265 pub fn id(&self) -> PageId {
266 PageId(read_u64(self.buf.as_slice(), OFF_PAGE_ID))
267 }
268
269 #[inline]
271 #[must_use]
272 pub fn lsn(&self) -> Lsn {
273 Lsn(read_u64(self.buf.as_slice(), OFF_LSN))
274 }
275
276 #[inline]
279 pub fn set_lsn(&mut self, lsn: Lsn) {
280 write_u64(self.buf.as_mut_slice(), OFF_LSN, lsn.0);
281 }
282
283 #[inline]
285 #[must_use]
286 pub fn payload(&self) -> &[u8] {
287 &self.buf.as_slice()[PAGE_HEADER_SIZE..]
288 }
289
290 #[inline]
292 pub fn payload_mut(&mut self) -> &mut [u8] {
293 &mut self.buf.as_mut_slice()[PAGE_HEADER_SIZE..]
294 }
295
296 #[must_use]
303 pub fn to_checksummed_bytes(&self) -> Vec<u8> {
304 let mut out = self.buf.as_slice().to_vec();
305 let crc = compute_checksum(&out);
306 write_u32(&mut out, OFF_CRC, crc);
307 out
308 }
309
310 pub(crate) fn stamp(&mut self, id: PageId) {
312 {
313 let bytes = self.buf.as_mut_slice();
314 write_u32(bytes, OFF_MAGIC, MAGIC);
315 write_u16(bytes, OFF_VERSION, FORMAT_VERSION);
316 write_u64(bytes, OFF_PAGE_ID, id.0);
317 }
318 let crc = compute_checksum(self.buf.as_slice());
319 write_u32(self.buf.as_mut_slice(), OFF_CRC, crc);
320 }
321
322 pub(crate) fn verify(&self, expected: Option<PageId>) -> PageResult<()> {
324 let bytes = self.buf.as_slice();
325
326 let magic = read_u32(bytes, OFF_MAGIC);
327 if magic != MAGIC {
328 return Err(PageError::BadMagic {
329 found: magic,
330 expected: MAGIC,
331 });
332 }
333
334 let version = read_u16(bytes, OFF_VERSION);
335 if version != FORMAT_VERSION {
336 return Err(PageError::UnsupportedVersion {
337 found: version,
338 supported: FORMAT_VERSION,
339 });
340 }
341
342 let stored = read_u32(bytes, OFF_CRC);
343 let computed = compute_checksum(bytes);
344 if stored != computed {
345 return Err(PageError::ChecksumMismatch {
346 page_id: read_u64(bytes, OFF_PAGE_ID),
347 stored,
348 computed,
349 });
350 }
351
352 if let Some(expected) = expected {
353 let found = read_u64(bytes, OFF_PAGE_ID);
354 if found != expected.0 {
355 return Err(PageError::MisdirectedPage {
356 requested: expected.0,
357 found,
358 });
359 }
360 }
361
362 Ok(())
363 }
364
365 #[inline]
367 pub(crate) fn as_bytes(&self) -> &[u8] {
368 self.buf.as_slice()
369 }
370
371 #[inline]
373 pub(crate) fn as_bytes_mut(&mut self) -> &mut [u8] {
374 self.buf.as_mut_slice()
375 }
376}
377
378impl Clone for Page {
379 fn clone(&self) -> Self {
380 Self {
381 buf: self.buf.clone(),
382 size: self.size,
383 }
384 }
385}
386
387impl std::fmt::Debug for Page {
388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389 f.debug_struct("Page")
390 .field("id", &self.id())
391 .field("lsn", &self.lsn())
392 .field("page_size", &self.size)
393 .finish()
394 }
395}
396
397fn compute_checksum(bytes: &[u8]) -> u32 {
399 let mut crc = Crc32c::new();
400 crc.update(&bytes[..OFF_CRC]);
401 crc.update(&bytes[OFF_CRC + 4..]);
402 crc.finalize()
403}
404
405#[inline]
406fn read_u16(bytes: &[u8], off: usize) -> u16 {
407 u16::from_le_bytes([bytes[off], bytes[off + 1]])
408}
409
410#[inline]
411fn read_u32(bytes: &[u8], off: usize) -> u32 {
412 u32::from_le_bytes([bytes[off], bytes[off + 1], bytes[off + 2], bytes[off + 3]])
413}
414
415#[inline]
416fn read_u64(bytes: &[u8], off: usize) -> u64 {
417 u64::from_le_bytes([
418 bytes[off],
419 bytes[off + 1],
420 bytes[off + 2],
421 bytes[off + 3],
422 bytes[off + 4],
423 bytes[off + 5],
424 bytes[off + 6],
425 bytes[off + 7],
426 ])
427}
428
429#[inline]
430fn write_u16(bytes: &mut [u8], off: usize, value: u16) {
431 bytes[off..off + 2].copy_from_slice(&value.to_le_bytes());
432}
433
434#[inline]
435fn write_u32(bytes: &mut [u8], off: usize, value: u32) {
436 bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
437}
438
439#[inline]
440fn write_u64(bytes: &mut [u8], off: usize, value: u64) {
441 bytes[off..off + 8].copy_from_slice(&value.to_le_bytes());
442}
443
444#[cfg(test)]
445mod tests {
446 #![allow(clippy::unwrap_used, clippy::expect_used)]
447
448 use super::*;
449
450 #[test]
451 fn test_page_size_rejects_non_power_of_two() {
452 assert!(matches!(
453 PageSize::new(5000),
454 Err(PageError::InvalidPageSize { size: 5000 })
455 ));
456 }
457
458 #[test]
459 fn test_page_size_rejects_out_of_range() {
460 assert!(PageSize::new(2048).is_err());
461 assert!(PageSize::new(MAX_PAGE_SIZE * 2).is_err());
462 }
463
464 #[test]
465 fn test_page_size_payload_len() {
466 let ps = PageSize::new(4096).expect("valid");
467 assert_eq!(ps.payload_len(), 4096 - PAGE_HEADER_SIZE);
468 }
469
470 #[test]
471 fn test_new_page_has_valid_header() {
472 let page = Page::new(DEFAULT_PAGE_SIZE);
473 assert_eq!(page.id(), PageId::new(0));
474 assert_eq!(page.lsn(), Lsn::ZERO);
475 assert_eq!(page.page_size(), 4096);
476 }
477
478 #[test]
479 fn test_stamp_then_verify_roundtrips() {
480 let mut page = Page::new(DEFAULT_PAGE_SIZE);
481 page.set_lsn(Lsn::new(7));
482 page.payload_mut()[..4].copy_from_slice(b"data");
483 page.stamp(PageId::new(3));
484
485 assert_eq!(page.id(), PageId::new(3));
486 page.verify(Some(PageId::new(3))).expect("verifies");
487 assert_eq!(page.lsn(), Lsn::new(7));
488 }
489
490 #[test]
491 fn test_verify_detects_corruption() {
492 let mut page = Page::new(DEFAULT_PAGE_SIZE);
493 page.stamp(PageId::new(1));
494 page.payload_mut()[10] ^= 0xFF;
495 assert!(matches!(
496 page.verify(Some(PageId::new(1))),
497 Err(PageError::ChecksumMismatch { .. })
498 ));
499 }
500
501 #[test]
502 fn test_verify_detects_misdirected_page() {
503 let mut page = Page::new(DEFAULT_PAGE_SIZE);
504 page.stamp(PageId::new(5));
505 assert!(matches!(
506 page.verify(Some(PageId::new(6))),
507 Err(PageError::MisdirectedPage {
508 requested: 6,
509 found: 5
510 })
511 ));
512 }
513
514 #[test]
515 fn test_from_bytes_rejects_wrong_length() {
516 let bytes = vec![0u8; 100];
517 assert!(matches!(
518 Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes),
519 Err(PageError::ShortRead { .. })
520 ));
521 }
522
523 #[test]
524 fn test_from_bytes_rejects_bad_magic() {
525 let bytes = vec![0u8; 4096];
526 assert!(matches!(
527 Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes),
528 Err(PageError::BadMagic { .. })
529 ));
530 }
531
532 #[test]
533 fn test_to_bytes_from_bytes_roundtrips() {
534 let mut page = Page::new(DEFAULT_PAGE_SIZE);
535 page.set_lsn(Lsn::new(99));
536 page.payload_mut()[..5].copy_from_slice(b"hello");
537 let bytes = page.to_checksummed_bytes();
538
539 let loaded = Page::from_bytes(DEFAULT_PAGE_SIZE, &bytes).expect("verifies");
540 assert_eq!(loaded.lsn(), Lsn::new(99));
541 assert_eq!(&loaded.payload()[..5], b"hello");
542 }
543}