Skip to main content

sochdb_kernel/
page.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Page Management
19//!
20//! Basic page abstraction for the kernel.
21//! Provides page header structure with LSN tracking for ARIES recovery.
22
23use crate::error::{KernelError, KernelResult, PageErrorKind};
24use crate::kernel_api::PageId;
25use crate::wal::LogSequenceNumber;
26use bytes::{BufMut, Bytes, BytesMut};
27
28/// Default page size: 8KB (matches common filesystem block size)
29pub const PAGE_SIZE: usize = 8192;
30
31/// Page header size
32pub const PAGE_HEADER_SIZE: usize = 32;
33
34/// Usable page space (after header)
35pub const PAGE_DATA_SIZE: usize = PAGE_SIZE - PAGE_HEADER_SIZE;
36
37/// Page magic number for validation
38pub const PAGE_MAGIC: u32 = 0x544F4F4E; // "TOON"
39
40/// Page types
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42#[repr(u8)]
43pub enum PageType {
44    /// Free/unallocated page
45    Free = 0,
46    /// Data page (rows)
47    Data = 1,
48    /// Index page (B-tree node)
49    Index = 2,
50    /// Overflow page (for large values)
51    Overflow = 3,
52    /// Metadata page (catalog info)
53    Metadata = 4,
54}
55
56impl TryFrom<u8> for PageType {
57    type Error = KernelError;
58
59    fn try_from(value: u8) -> Result<Self, Self::Error> {
60        match value {
61            0 => Ok(Self::Free),
62            1 => Ok(Self::Data),
63            2 => Ok(Self::Index),
64            3 => Ok(Self::Overflow),
65            4 => Ok(Self::Metadata),
66            _ => Err(KernelError::Page {
67                kind: PageErrorKind::InvalidSize,
68            }),
69        }
70    }
71}
72
73/// Page header
74///
75/// Every page starts with this header for ARIES recovery support.
76///
77/// Layout (32 bytes):
78/// - magic: u32 (4 bytes) - validation magic number
79/// - page_id: u64 (8 bytes) - page identifier
80/// - page_lsn: u64 (8 bytes) - LSN of last modification (for recovery)
81/// - page_type: u8 (1 byte) - page type
82/// - flags: u8 (1 byte) - page flags
83/// - free_space: u16 (2 bytes) - free space in page
84/// - checksum: u32 (4 bytes) - page checksum
85/// - reserved: u32 (4 bytes) - reserved for future use
86#[derive(Debug, Clone, Copy)]
87pub struct PageHeader {
88    /// Magic number for validation
89    pub magic: u32,
90    /// Page identifier
91    pub page_id: PageId,
92    /// LSN of last modification
93    pub page_lsn: LogSequenceNumber,
94    /// Page type
95    pub page_type: PageType,
96    /// Flags
97    pub flags: u8,
98    /// Free space in page
99    pub free_space: u16,
100    /// Checksum
101    pub checksum: u32,
102}
103
104impl PageHeader {
105    /// Create a new page header
106    pub fn new(page_id: PageId, page_type: PageType) -> Self {
107        Self {
108            magic: PAGE_MAGIC,
109            page_id,
110            page_lsn: LogSequenceNumber::INVALID,
111            page_type,
112            flags: 0,
113            free_space: PAGE_DATA_SIZE as u16,
114            checksum: 0,
115        }
116    }
117
118    /// Serialize to bytes
119    pub fn serialize(&self) -> [u8; PAGE_HEADER_SIZE] {
120        let mut buf = [0u8; PAGE_HEADER_SIZE];
121        let mut cursor = 0;
122
123        buf[cursor..cursor + 4].copy_from_slice(&self.magic.to_le_bytes());
124        cursor += 4;
125
126        buf[cursor..cursor + 8].copy_from_slice(&self.page_id.to_le_bytes());
127        cursor += 8;
128
129        buf[cursor..cursor + 8].copy_from_slice(&self.page_lsn.0.to_le_bytes());
130        cursor += 8;
131
132        buf[cursor] = self.page_type as u8;
133        cursor += 1;
134
135        buf[cursor] = self.flags;
136        cursor += 1;
137
138        buf[cursor..cursor + 2].copy_from_slice(&self.free_space.to_le_bytes());
139        cursor += 2;
140
141        buf[cursor..cursor + 4].copy_from_slice(&self.checksum.to_le_bytes());
142        // cursor += 4; // reserved bytes remain zero
143
144        buf
145    }
146
147    /// Deserialize from bytes
148    pub fn deserialize(data: &[u8]) -> KernelResult<Self> {
149        if data.len() < PAGE_HEADER_SIZE {
150            return Err(KernelError::Page {
151                kind: PageErrorKind::InvalidSize,
152            });
153        }
154
155        let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
156        if magic != PAGE_MAGIC {
157            return Err(KernelError::Corruption {
158                details: format!(
159                    "invalid page magic: expected {:#x}, got {:#x}",
160                    PAGE_MAGIC, magic
161                ),
162            });
163        }
164
165        let page_id = u64::from_le_bytes(data[4..12].try_into().unwrap());
166        let page_lsn = LogSequenceNumber(u64::from_le_bytes(data[12..20].try_into().unwrap()));
167        let page_type = PageType::try_from(data[20])?;
168        let flags = data[21];
169        let free_space = u16::from_le_bytes(data[22..24].try_into().unwrap());
170        let checksum = u32::from_le_bytes(data[24..28].try_into().unwrap());
171
172        Ok(Self {
173            magic,
174            page_id,
175            page_lsn,
176            page_type,
177            flags,
178            free_space,
179            checksum,
180        })
181    }
182}
183
184/// Page - a fixed-size storage unit
185pub struct Page {
186    /// Page header
187    pub header: PageHeader,
188    /// Page data (excluding header)
189    pub data: BytesMut,
190}
191
192impl Page {
193    /// Create a new empty page
194    pub fn new(page_id: PageId, page_type: PageType) -> Self {
195        Self {
196            header: PageHeader::new(page_id, page_type),
197            data: BytesMut::zeroed(PAGE_DATA_SIZE),
198        }
199    }
200
201    /// Create from raw bytes
202    pub fn from_bytes(bytes: &[u8]) -> KernelResult<Self> {
203        if bytes.len() != PAGE_SIZE {
204            return Err(KernelError::Page {
205                kind: PageErrorKind::InvalidSize,
206            });
207        }
208
209        let header = PageHeader::deserialize(&bytes[..PAGE_HEADER_SIZE])?;
210        let data = BytesMut::from(&bytes[PAGE_HEADER_SIZE..]);
211
212        Ok(Self { header, data })
213    }
214
215    /// Serialize to bytes
216    pub fn to_bytes(&self) -> Bytes {
217        let mut buf = BytesMut::with_capacity(PAGE_SIZE);
218        buf.put_slice(&self.header.serialize());
219        buf.put_slice(&self.data);
220        buf.freeze()
221    }
222
223    /// Get page ID
224    pub fn page_id(&self) -> PageId {
225        self.header.page_id
226    }
227
228    /// Get page LSN
229    pub fn lsn(&self) -> LogSequenceNumber {
230        self.header.page_lsn
231    }
232
233    /// Set page LSN (after modification)
234    pub fn set_lsn(&mut self, lsn: LogSequenceNumber) {
235        self.header.page_lsn = lsn;
236    }
237
238    /// Check if page needs redo during recovery
239    ///
240    /// Returns true if the page's LSN is less than the WAL record's LSN,
241    /// meaning the WAL record's changes haven't been applied yet.
242    pub fn needs_redo(&self, record_lsn: LogSequenceNumber) -> bool {
243        self.header.page_lsn < record_lsn
244    }
245
246    /// Compute checksum for the page
247    pub fn compute_checksum(&self) -> u32 {
248        let mut hasher = crc32fast::Hasher::new();
249        // Include header fields except checksum itself
250        hasher.update(&self.header.magic.to_le_bytes());
251        hasher.update(&self.header.page_id.to_le_bytes());
252        hasher.update(&self.header.page_lsn.0.to_le_bytes());
253        hasher.update(&[self.header.page_type as u8, self.header.flags]);
254        hasher.update(&self.header.free_space.to_le_bytes());
255        hasher.update(&self.data);
256        hasher.finalize()
257    }
258
259    /// Validate page checksum
260    pub fn validate_checksum(&self) -> bool {
261        self.header.checksum == self.compute_checksum()
262    }
263
264    /// Update checksum before writing to disk
265    pub fn update_checksum(&mut self) {
266        self.header.checksum = self.compute_checksum();
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn test_page_header_roundtrip() {
276        let header = PageHeader {
277            magic: PAGE_MAGIC,
278            page_id: 42,
279            page_lsn: LogSequenceNumber(100),
280            page_type: PageType::Data,
281            flags: 0x01,
282            free_space: 1234,
283            checksum: 0xDEADBEEF,
284        };
285
286        let serialized = header.serialize();
287        let deserialized = PageHeader::deserialize(&serialized).unwrap();
288
289        assert_eq!(header.magic, deserialized.magic);
290        assert_eq!(header.page_id, deserialized.page_id);
291        assert_eq!(header.page_lsn, deserialized.page_lsn);
292        assert_eq!(header.page_type, deserialized.page_type);
293        assert_eq!(header.flags, deserialized.flags);
294        assert_eq!(header.free_space, deserialized.free_space);
295        assert_eq!(header.checksum, deserialized.checksum);
296    }
297
298    #[test]
299    fn test_page_roundtrip() {
300        let mut page = Page::new(1, PageType::Data);
301        page.data[0..5].copy_from_slice(b"hello");
302        page.set_lsn(LogSequenceNumber(50));
303        page.update_checksum();
304
305        let bytes = page.to_bytes();
306        let restored = Page::from_bytes(&bytes).unwrap();
307
308        assert_eq!(restored.page_id(), 1);
309        assert_eq!(restored.lsn(), LogSequenceNumber(50));
310        assert!(restored.validate_checksum());
311    }
312
313    #[test]
314    fn test_needs_redo() {
315        let mut page = Page::new(1, PageType::Data);
316        page.set_lsn(LogSequenceNumber(100));
317
318        // Record with lower LSN - already applied
319        assert!(!page.needs_redo(LogSequenceNumber(50)));
320
321        // Record with same LSN - already applied
322        assert!(!page.needs_redo(LogSequenceNumber(100)));
323
324        // Record with higher LSN - needs redo
325        assert!(page.needs_redo(LogSequenceNumber(150)));
326    }
327}