Skip to main content

sochdb_kernel/
page.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Page Management
19//!
20//! Basic page abstraction for the kernel.
21//! Provides page header structure with LSN tracking for ARIES recovery.
22
23use crate::error::{KernelError, KernelResult, PageErrorKind};
24use crate::kernel_api::PageId;
25use crate::wal::LogSequenceNumber;
26use bytes::{BufMut, Bytes, BytesMut};
27
28/// Default page size: 8KB (matches common filesystem block size)
29pub const PAGE_SIZE: usize = 8192;
30
31/// Page header size
32pub const PAGE_HEADER_SIZE: usize = 32;
33
34/// Usable page space (after header)
35pub const PAGE_DATA_SIZE: usize = PAGE_SIZE - PAGE_HEADER_SIZE;
36
37/// Page magic number for validation
38pub const PAGE_MAGIC: u32 = 0x544F4F4E; // "TOON"
39
40/// Page types
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42#[repr(u8)]
43pub enum PageType {
44    /// Free/unallocated page
45    Free = 0,
46    /// Data page (rows)
47    Data = 1,
48    /// Index page (B-tree node)
49    Index = 2,
50    /// Overflow page (for large values)
51    Overflow = 3,
52    /// Metadata page (catalog info)
53    Metadata = 4,
54}
55
56impl TryFrom<u8> for PageType {
57    type Error = KernelError;
58
59    fn try_from(value: u8) -> Result<Self, Self::Error> {
60        match value {
61            0 => Ok(Self::Free),
62            1 => Ok(Self::Data),
63            2 => Ok(Self::Index),
64            3 => Ok(Self::Overflow),
65            4 => Ok(Self::Metadata),
66            _ => Err(KernelError::Page {
67                kind: PageErrorKind::InvalidSize,
68            }),
69        }
70    }
71}
72
73/// Page header
74///
75/// Every page starts with this header for ARIES recovery support.
76///
77/// Layout (32 bytes):
78/// - magic: u32 (4 bytes) - validation magic number
79/// - page_id: u64 (8 bytes) - page identifier
80/// - page_lsn: u64 (8 bytes) - LSN of last modification (for recovery)
81/// - page_type: u8 (1 byte) - page type
82/// - flags: u8 (1 byte) - page flags
83/// - free_space: u16 (2 bytes) - free space in page
84/// - checksum: u32 (4 bytes) - page checksum
85/// - reserved: u32 (4 bytes) - reserved for future use
86#[derive(Debug, Clone, Copy)]
87pub struct PageHeader {
88    /// Magic number for validation
89    pub magic: u32,
90    /// Page identifier
91    pub page_id: PageId,
92    /// LSN of last modification
93    pub page_lsn: LogSequenceNumber,
94    /// Page type
95    pub page_type: PageType,
96    /// Flags
97    pub flags: u8,
98    /// Free space in page
99    pub free_space: u16,
100    /// Checksum
101    pub checksum: u32,
102}
103
104impl PageHeader {
105    /// Create a new page header
106    pub fn new(page_id: PageId, page_type: PageType) -> Self {
107        Self {
108            magic: PAGE_MAGIC,
109            page_id,
110            page_lsn: LogSequenceNumber::INVALID,
111            page_type,
112            flags: 0,
113            free_space: PAGE_DATA_SIZE as u16,
114            checksum: 0,
115        }
116    }
117
118    /// Serialize to bytes
119    pub fn serialize(&self) -> [u8; PAGE_HEADER_SIZE] {
120        let mut buf = [0u8; PAGE_HEADER_SIZE];
121        let mut cursor = 0;
122
123        buf[cursor..cursor + 4].copy_from_slice(&self.magic.to_le_bytes());
124        cursor += 4;
125
126        buf[cursor..cursor + 8].copy_from_slice(&self.page_id.to_le_bytes());
127        cursor += 8;
128
129        buf[cursor..cursor + 8].copy_from_slice(&self.page_lsn.0.to_le_bytes());
130        cursor += 8;
131
132        buf[cursor] = self.page_type as u8;
133        cursor += 1;
134
135        buf[cursor] = self.flags;
136        cursor += 1;
137
138        buf[cursor..cursor + 2].copy_from_slice(&self.free_space.to_le_bytes());
139        cursor += 2;
140
141        buf[cursor..cursor + 4].copy_from_slice(&self.checksum.to_le_bytes());
142        // cursor += 4; // reserved bytes remain zero
143
144        buf
145    }
146
147    /// Deserialize from bytes
148    pub fn deserialize(data: &[u8]) -> KernelResult<Self> {
149        if data.len() < PAGE_HEADER_SIZE {
150            return Err(KernelError::Page {
151                kind: PageErrorKind::InvalidSize,
152            });
153        }
154
155        let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
156        if magic != PAGE_MAGIC {
157            return Err(KernelError::Corruption {
158                details: format!(
159                    "invalid page magic: expected {:#x}, got {:#x}",
160                    PAGE_MAGIC, magic
161                ),
162            });
163        }
164
165        let page_id = u64::from_le_bytes(data[4..12].try_into().unwrap());
166        let page_lsn = LogSequenceNumber(u64::from_le_bytes(data[12..20].try_into().unwrap()));
167        let page_type = PageType::try_from(data[20])?;
168        let flags = data[21];
169        let free_space = u16::from_le_bytes(data[22..24].try_into().unwrap());
170        let checksum = u32::from_le_bytes(data[24..28].try_into().unwrap());
171
172        Ok(Self {
173            magic,
174            page_id,
175            page_lsn,
176            page_type,
177            flags,
178            free_space,
179            checksum,
180        })
181    }
182}
183
184/// Page - a fixed-size storage unit
185pub struct Page {
186    /// Page header
187    pub header: PageHeader,
188    /// Page data (excluding header)
189    pub data: BytesMut,
190}
191
192impl Page {
193    /// Create a new empty page
194    pub fn new(page_id: PageId, page_type: PageType) -> Self {
195        Self {
196            header: PageHeader::new(page_id, page_type),
197            data: BytesMut::zeroed(PAGE_DATA_SIZE),
198        }
199    }
200
201    /// Create from raw bytes
202    pub fn from_bytes(bytes: &[u8]) -> KernelResult<Self> {
203        if bytes.len() != PAGE_SIZE {
204            return Err(KernelError::Page {
205                kind: PageErrorKind::InvalidSize,
206            });
207        }
208
209        let header = PageHeader::deserialize(&bytes[..PAGE_HEADER_SIZE])?;
210        let data = BytesMut::from(&bytes[PAGE_HEADER_SIZE..]);
211
212        let page = Self { header, data };
213
214        if !page.validate_checksum() {
215            return Err(KernelError::Corruption {
216                details: format!(
217                    "invalid page checksum: expected {:#x}, computed {:#x}",
218                    page.header.checksum,
219                    page.compute_checksum()
220                ),
221            });
222        }
223
224        Ok(page)
225    }
226
227    /// Serialize to bytes
228    pub fn to_bytes(&self) -> Bytes {
229        let mut buf = BytesMut::with_capacity(PAGE_SIZE);
230        buf.put_slice(&self.header.serialize());
231        buf.put_slice(&self.data);
232        buf.freeze()
233    }
234
235    /// Get page ID
236    pub fn page_id(&self) -> PageId {
237        self.header.page_id
238    }
239
240    /// Get page LSN
241    pub fn lsn(&self) -> LogSequenceNumber {
242        self.header.page_lsn
243    }
244
245    /// Set page LSN (after modification)
246    pub fn set_lsn(&mut self, lsn: LogSequenceNumber) {
247        self.header.page_lsn = lsn;
248    }
249
250    /// Check if page needs redo during recovery
251    ///
252    /// Returns true if the page's LSN is less than the WAL record's LSN,
253    /// meaning the WAL record's changes haven't been applied yet.
254    pub fn needs_redo(&self, record_lsn: LogSequenceNumber) -> bool {
255        self.header.page_lsn < record_lsn
256    }
257
258    /// Compute checksum for the page
259    pub fn compute_checksum(&self) -> u32 {
260        let mut hasher = crc32fast::Hasher::new();
261        // Include header fields except checksum itself
262        hasher.update(&self.header.magic.to_le_bytes());
263        hasher.update(&self.header.page_id.to_le_bytes());
264        hasher.update(&self.header.page_lsn.0.to_le_bytes());
265        hasher.update(&[self.header.page_type as u8, self.header.flags]);
266        hasher.update(&self.header.free_space.to_le_bytes());
267        hasher.update(&self.data);
268        hasher.finalize()
269    }
270
271    /// Validate page checksum
272    pub fn validate_checksum(&self) -> bool {
273        self.header.checksum == self.compute_checksum()
274    }
275
276    /// Update checksum before writing to disk
277    pub fn update_checksum(&mut self) {
278        self.header.checksum = self.compute_checksum();
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn test_page_header_roundtrip() {
288        let header = PageHeader {
289            magic: PAGE_MAGIC,
290            page_id: 42,
291            page_lsn: LogSequenceNumber(100),
292            page_type: PageType::Data,
293            flags: 0x01,
294            free_space: 1234,
295            checksum: 0xDEADBEEF,
296        };
297
298        let serialized = header.serialize();
299        let deserialized = PageHeader::deserialize(&serialized).unwrap();
300
301        assert_eq!(header.magic, deserialized.magic);
302        assert_eq!(header.page_id, deserialized.page_id);
303        assert_eq!(header.page_lsn, deserialized.page_lsn);
304        assert_eq!(header.page_type, deserialized.page_type);
305        assert_eq!(header.flags, deserialized.flags);
306        assert_eq!(header.free_space, deserialized.free_space);
307        assert_eq!(header.checksum, deserialized.checksum);
308    }
309
310    #[test]
311    fn test_page_roundtrip() {
312        let mut page = Page::new(1, PageType::Data);
313        page.data[0..5].copy_from_slice(b"hello");
314        page.set_lsn(LogSequenceNumber(50));
315        page.update_checksum();
316
317        let bytes = page.to_bytes();
318        let restored = Page::from_bytes(&bytes).unwrap();
319
320        assert_eq!(restored.page_id(), 1);
321        assert_eq!(restored.lsn(), LogSequenceNumber(50));
322        assert!(restored.validate_checksum());
323    }
324
325    #[test]
326    fn test_needs_redo() {
327        let mut page = Page::new(1, PageType::Data);
328        page.set_lsn(LogSequenceNumber(100));
329
330        // Record with lower LSN - already applied
331        assert!(!page.needs_redo(LogSequenceNumber(50)));
332
333        // Record with same LSN - already applied
334        assert!(!page.needs_redo(LogSequenceNumber(100)));
335
336        // Record with higher LSN - needs redo
337        assert!(page.needs_redo(LogSequenceNumber(150)));
338    }
339
340    #[test]
341    fn test_page_from_bytes_rejects_corrupted_checksum() {
342        let mut page = Page::new(1, PageType::Data);
343        page.data[0..5].copy_from_slice(b"hello");
344        page.update_checksum();
345
346        let bytes = page.to_bytes();
347
348        let mut corrupted = bytes.to_vec();
349        corrupted[PAGE_HEADER_SIZE] = 0xFF;
350
351        let result = Page::from_bytes(&corrupted);
352        assert!(result.is_err());
353    }
354}