1use crate::error::{KernelError, KernelResult, PageErrorKind};
24use crate::kernel_api::PageId;
25use crate::wal::LogSequenceNumber;
26use bytes::{BufMut, Bytes, BytesMut};
27
28pub const PAGE_SIZE: usize = 8192;
30
31pub const PAGE_HEADER_SIZE: usize = 32;
33
34pub const PAGE_DATA_SIZE: usize = PAGE_SIZE - PAGE_HEADER_SIZE;
36
37pub const PAGE_MAGIC: u32 = 0x544F4F4E; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
42#[repr(u8)]
43pub enum PageType {
44 Free = 0,
46 Data = 1,
48 Index = 2,
50 Overflow = 3,
52 Metadata = 4,
54}
55
56impl TryFrom<u8> for PageType {
57 type Error = KernelError;
58
59 fn try_from(value: u8) -> Result<Self, Self::Error> {
60 match value {
61 0 => Ok(Self::Free),
62 1 => Ok(Self::Data),
63 2 => Ok(Self::Index),
64 3 => Ok(Self::Overflow),
65 4 => Ok(Self::Metadata),
66 _ => Err(KernelError::Page {
67 kind: PageErrorKind::InvalidSize,
68 }),
69 }
70 }
71}
72
73#[derive(Debug, Clone, Copy)]
87pub struct PageHeader {
88 pub magic: u32,
90 pub page_id: PageId,
92 pub page_lsn: LogSequenceNumber,
94 pub page_type: PageType,
96 pub flags: u8,
98 pub free_space: u16,
100 pub checksum: u32,
102}
103
104impl PageHeader {
105 pub fn new(page_id: PageId, page_type: PageType) -> Self {
107 Self {
108 magic: PAGE_MAGIC,
109 page_id,
110 page_lsn: LogSequenceNumber::INVALID,
111 page_type,
112 flags: 0,
113 free_space: PAGE_DATA_SIZE as u16,
114 checksum: 0,
115 }
116 }
117
118 pub fn serialize(&self) -> [u8; PAGE_HEADER_SIZE] {
120 let mut buf = [0u8; PAGE_HEADER_SIZE];
121 let mut cursor = 0;
122
123 buf[cursor..cursor + 4].copy_from_slice(&self.magic.to_le_bytes());
124 cursor += 4;
125
126 buf[cursor..cursor + 8].copy_from_slice(&self.page_id.to_le_bytes());
127 cursor += 8;
128
129 buf[cursor..cursor + 8].copy_from_slice(&self.page_lsn.0.to_le_bytes());
130 cursor += 8;
131
132 buf[cursor] = self.page_type as u8;
133 cursor += 1;
134
135 buf[cursor] = self.flags;
136 cursor += 1;
137
138 buf[cursor..cursor + 2].copy_from_slice(&self.free_space.to_le_bytes());
139 cursor += 2;
140
141 buf[cursor..cursor + 4].copy_from_slice(&self.checksum.to_le_bytes());
142 buf
145 }
146
147 pub fn deserialize(data: &[u8]) -> KernelResult<Self> {
149 if data.len() < PAGE_HEADER_SIZE {
150 return Err(KernelError::Page {
151 kind: PageErrorKind::InvalidSize,
152 });
153 }
154
155 let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
156 if magic != PAGE_MAGIC {
157 return Err(KernelError::Corruption {
158 details: format!(
159 "invalid page magic: expected {:#x}, got {:#x}",
160 PAGE_MAGIC, magic
161 ),
162 });
163 }
164
165 let page_id = u64::from_le_bytes(data[4..12].try_into().unwrap());
166 let page_lsn = LogSequenceNumber(u64::from_le_bytes(data[12..20].try_into().unwrap()));
167 let page_type = PageType::try_from(data[20])?;
168 let flags = data[21];
169 let free_space = u16::from_le_bytes(data[22..24].try_into().unwrap());
170 let checksum = u32::from_le_bytes(data[24..28].try_into().unwrap());
171
172 Ok(Self {
173 magic,
174 page_id,
175 page_lsn,
176 page_type,
177 flags,
178 free_space,
179 checksum,
180 })
181 }
182}
183
184pub struct Page {
186 pub header: PageHeader,
188 pub data: BytesMut,
190}
191
192impl Page {
193 pub fn new(page_id: PageId, page_type: PageType) -> Self {
195 Self {
196 header: PageHeader::new(page_id, page_type),
197 data: BytesMut::zeroed(PAGE_DATA_SIZE),
198 }
199 }
200
201 pub fn from_bytes(bytes: &[u8]) -> KernelResult<Self> {
203 if bytes.len() != PAGE_SIZE {
204 return Err(KernelError::Page {
205 kind: PageErrorKind::InvalidSize,
206 });
207 }
208
209 let header = PageHeader::deserialize(&bytes[..PAGE_HEADER_SIZE])?;
210 let data = BytesMut::from(&bytes[PAGE_HEADER_SIZE..]);
211
212 let page = Self { header, data };
213
214 if !page.validate_checksum() {
215 return Err(KernelError::Corruption {
216 details: format!(
217 "invalid page checksum: expected {:#x}, computed {:#x}",
218 page.header.checksum,
219 page.compute_checksum()
220 ),
221 });
222 }
223
224 Ok(page)
225 }
226
227 pub fn to_bytes(&self) -> Bytes {
229 let mut buf = BytesMut::with_capacity(PAGE_SIZE);
230 buf.put_slice(&self.header.serialize());
231 buf.put_slice(&self.data);
232 buf.freeze()
233 }
234
235 pub fn page_id(&self) -> PageId {
237 self.header.page_id
238 }
239
240 pub fn lsn(&self) -> LogSequenceNumber {
242 self.header.page_lsn
243 }
244
245 pub fn set_lsn(&mut self, lsn: LogSequenceNumber) {
247 self.header.page_lsn = lsn;
248 }
249
250 pub fn needs_redo(&self, record_lsn: LogSequenceNumber) -> bool {
255 self.header.page_lsn < record_lsn
256 }
257
258 pub fn compute_checksum(&self) -> u32 {
260 let mut hasher = crc32fast::Hasher::new();
261 hasher.update(&self.header.magic.to_le_bytes());
263 hasher.update(&self.header.page_id.to_le_bytes());
264 hasher.update(&self.header.page_lsn.0.to_le_bytes());
265 hasher.update(&[self.header.page_type as u8, self.header.flags]);
266 hasher.update(&self.header.free_space.to_le_bytes());
267 hasher.update(&self.data);
268 hasher.finalize()
269 }
270
271 pub fn validate_checksum(&self) -> bool {
273 self.header.checksum == self.compute_checksum()
274 }
275
276 pub fn update_checksum(&mut self) {
278 self.header.checksum = self.compute_checksum();
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn test_page_header_roundtrip() {
288 let header = PageHeader {
289 magic: PAGE_MAGIC,
290 page_id: 42,
291 page_lsn: LogSequenceNumber(100),
292 page_type: PageType::Data,
293 flags: 0x01,
294 free_space: 1234,
295 checksum: 0xDEADBEEF,
296 };
297
298 let serialized = header.serialize();
299 let deserialized = PageHeader::deserialize(&serialized).unwrap();
300
301 assert_eq!(header.magic, deserialized.magic);
302 assert_eq!(header.page_id, deserialized.page_id);
303 assert_eq!(header.page_lsn, deserialized.page_lsn);
304 assert_eq!(header.page_type, deserialized.page_type);
305 assert_eq!(header.flags, deserialized.flags);
306 assert_eq!(header.free_space, deserialized.free_space);
307 assert_eq!(header.checksum, deserialized.checksum);
308 }
309
310 #[test]
311 fn test_page_roundtrip() {
312 let mut page = Page::new(1, PageType::Data);
313 page.data[0..5].copy_from_slice(b"hello");
314 page.set_lsn(LogSequenceNumber(50));
315 page.update_checksum();
316
317 let bytes = page.to_bytes();
318 let restored = Page::from_bytes(&bytes).unwrap();
319
320 assert_eq!(restored.page_id(), 1);
321 assert_eq!(restored.lsn(), LogSequenceNumber(50));
322 assert!(restored.validate_checksum());
323 }
324
325 #[test]
326 fn test_needs_redo() {
327 let mut page = Page::new(1, PageType::Data);
328 page.set_lsn(LogSequenceNumber(100));
329
330 assert!(!page.needs_redo(LogSequenceNumber(50)));
332
333 assert!(!page.needs_redo(LogSequenceNumber(100)));
335
336 assert!(page.needs_redo(LogSequenceNumber(150)));
338 }
339
340 #[test]
341 fn test_page_from_bytes_rejects_corrupted_checksum() {
342 let mut page = Page::new(1, PageType::Data);
343 page.data[0..5].copy_from_slice(b"hello");
344 page.update_checksum();
345
346 let bytes = page.to_bytes();
347
348 let mut corrupted = bytes.to_vec();
349 corrupted[PAGE_HEADER_SIZE] = 0xFF;
350
351 let result = Page::from_bytes(&corrupted);
352 assert!(result.is_err());
353 }
354}