1use crate::error::{KernelError, KernelResult, PageErrorKind};
24use crate::kernel_api::PageId;
25use crate::wal::LogSequenceNumber;
26use bytes::{BufMut, Bytes, BytesMut};
27
28pub const PAGE_SIZE: usize = 8192;
30
31pub const PAGE_HEADER_SIZE: usize = 32;
33
34pub const PAGE_DATA_SIZE: usize = PAGE_SIZE - PAGE_HEADER_SIZE;
36
37pub const PAGE_MAGIC: u32 = 0x544F4F4E; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
42#[repr(u8)]
43pub enum PageType {
44 Free = 0,
46 Data = 1,
48 Index = 2,
50 Overflow = 3,
52 Metadata = 4,
54}
55
56impl TryFrom<u8> for PageType {
57 type Error = KernelError;
58
59 fn try_from(value: u8) -> Result<Self, Self::Error> {
60 match value {
61 0 => Ok(Self::Free),
62 1 => Ok(Self::Data),
63 2 => Ok(Self::Index),
64 3 => Ok(Self::Overflow),
65 4 => Ok(Self::Metadata),
66 _ => Err(KernelError::Page {
67 kind: PageErrorKind::InvalidSize,
68 }),
69 }
70 }
71}
72
73#[derive(Debug, Clone, Copy)]
87pub struct PageHeader {
88 pub magic: u32,
90 pub page_id: PageId,
92 pub page_lsn: LogSequenceNumber,
94 pub page_type: PageType,
96 pub flags: u8,
98 pub free_space: u16,
100 pub checksum: u32,
102}
103
104impl PageHeader {
105 pub fn new(page_id: PageId, page_type: PageType) -> Self {
107 Self {
108 magic: PAGE_MAGIC,
109 page_id,
110 page_lsn: LogSequenceNumber::INVALID,
111 page_type,
112 flags: 0,
113 free_space: PAGE_DATA_SIZE as u16,
114 checksum: 0,
115 }
116 }
117
118 pub fn serialize(&self) -> [u8; PAGE_HEADER_SIZE] {
120 let mut buf = [0u8; PAGE_HEADER_SIZE];
121 let mut cursor = 0;
122
123 buf[cursor..cursor + 4].copy_from_slice(&self.magic.to_le_bytes());
124 cursor += 4;
125
126 buf[cursor..cursor + 8].copy_from_slice(&self.page_id.to_le_bytes());
127 cursor += 8;
128
129 buf[cursor..cursor + 8].copy_from_slice(&self.page_lsn.0.to_le_bytes());
130 cursor += 8;
131
132 buf[cursor] = self.page_type as u8;
133 cursor += 1;
134
135 buf[cursor] = self.flags;
136 cursor += 1;
137
138 buf[cursor..cursor + 2].copy_from_slice(&self.free_space.to_le_bytes());
139 cursor += 2;
140
141 buf[cursor..cursor + 4].copy_from_slice(&self.checksum.to_le_bytes());
142 buf
145 }
146
147 pub fn deserialize(data: &[u8]) -> KernelResult<Self> {
149 if data.len() < PAGE_HEADER_SIZE {
150 return Err(KernelError::Page {
151 kind: PageErrorKind::InvalidSize,
152 });
153 }
154
155 let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
156 if magic != PAGE_MAGIC {
157 return Err(KernelError::Corruption {
158 details: format!(
159 "invalid page magic: expected {:#x}, got {:#x}",
160 PAGE_MAGIC, magic
161 ),
162 });
163 }
164
165 let page_id = u64::from_le_bytes(data[4..12].try_into().unwrap());
166 let page_lsn = LogSequenceNumber(u64::from_le_bytes(data[12..20].try_into().unwrap()));
167 let page_type = PageType::try_from(data[20])?;
168 let flags = data[21];
169 let free_space = u16::from_le_bytes(data[22..24].try_into().unwrap());
170 let checksum = u32::from_le_bytes(data[24..28].try_into().unwrap());
171
172 Ok(Self {
173 magic,
174 page_id,
175 page_lsn,
176 page_type,
177 flags,
178 free_space,
179 checksum,
180 })
181 }
182}
183
184pub struct Page {
186 pub header: PageHeader,
188 pub data: BytesMut,
190}
191
192impl Page {
193 pub fn new(page_id: PageId, page_type: PageType) -> Self {
195 Self {
196 header: PageHeader::new(page_id, page_type),
197 data: BytesMut::zeroed(PAGE_DATA_SIZE),
198 }
199 }
200
201 pub fn from_bytes(bytes: &[u8]) -> KernelResult<Self> {
203 if bytes.len() != PAGE_SIZE {
204 return Err(KernelError::Page {
205 kind: PageErrorKind::InvalidSize,
206 });
207 }
208
209 let header = PageHeader::deserialize(&bytes[..PAGE_HEADER_SIZE])?;
210 let data = BytesMut::from(&bytes[PAGE_HEADER_SIZE..]);
211
212 Ok(Self { header, data })
213 }
214
215 pub fn to_bytes(&self) -> Bytes {
217 let mut buf = BytesMut::with_capacity(PAGE_SIZE);
218 buf.put_slice(&self.header.serialize());
219 buf.put_slice(&self.data);
220 buf.freeze()
221 }
222
223 pub fn page_id(&self) -> PageId {
225 self.header.page_id
226 }
227
228 pub fn lsn(&self) -> LogSequenceNumber {
230 self.header.page_lsn
231 }
232
233 pub fn set_lsn(&mut self, lsn: LogSequenceNumber) {
235 self.header.page_lsn = lsn;
236 }
237
238 pub fn needs_redo(&self, record_lsn: LogSequenceNumber) -> bool {
243 self.header.page_lsn < record_lsn
244 }
245
246 pub fn compute_checksum(&self) -> u32 {
248 let mut hasher = crc32fast::Hasher::new();
249 hasher.update(&self.header.magic.to_le_bytes());
251 hasher.update(&self.header.page_id.to_le_bytes());
252 hasher.update(&self.header.page_lsn.0.to_le_bytes());
253 hasher.update(&[self.header.page_type as u8, self.header.flags]);
254 hasher.update(&self.header.free_space.to_le_bytes());
255 hasher.update(&self.data);
256 hasher.finalize()
257 }
258
259 pub fn validate_checksum(&self) -> bool {
261 self.header.checksum == self.compute_checksum()
262 }
263
264 pub fn update_checksum(&mut self) {
266 self.header.checksum = self.compute_checksum();
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn test_page_header_roundtrip() {
276 let header = PageHeader {
277 magic: PAGE_MAGIC,
278 page_id: 42,
279 page_lsn: LogSequenceNumber(100),
280 page_type: PageType::Data,
281 flags: 0x01,
282 free_space: 1234,
283 checksum: 0xDEADBEEF,
284 };
285
286 let serialized = header.serialize();
287 let deserialized = PageHeader::deserialize(&serialized).unwrap();
288
289 assert_eq!(header.magic, deserialized.magic);
290 assert_eq!(header.page_id, deserialized.page_id);
291 assert_eq!(header.page_lsn, deserialized.page_lsn);
292 assert_eq!(header.page_type, deserialized.page_type);
293 assert_eq!(header.flags, deserialized.flags);
294 assert_eq!(header.free_space, deserialized.free_space);
295 assert_eq!(header.checksum, deserialized.checksum);
296 }
297
298 #[test]
299 fn test_page_roundtrip() {
300 let mut page = Page::new(1, PageType::Data);
301 page.data[0..5].copy_from_slice(b"hello");
302 page.set_lsn(LogSequenceNumber(50));
303 page.update_checksum();
304
305 let bytes = page.to_bytes();
306 let restored = Page::from_bytes(&bytes).unwrap();
307
308 assert_eq!(restored.page_id(), 1);
309 assert_eq!(restored.lsn(), LogSequenceNumber(50));
310 assert!(restored.validate_checksum());
311 }
312
313 #[test]
314 fn test_needs_redo() {
315 let mut page = Page::new(1, PageType::Data);
316 page.set_lsn(LogSequenceNumber(100));
317
318 assert!(!page.needs_redo(LogSequenceNumber(50)));
320
321 assert!(!page.needs_redo(LogSequenceNumber(100)));
323
324 assert!(page.needs_redo(LogSequenceNumber(150)));
326 }
327}