Skip to main content

featherdb_storage/
page.rs

1//! Page format and operations
2
3use bytes::{Buf, BufMut};
4use featherdb_core::{constants, Error, PageId, Result};
5
6/// Page type discriminator
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8#[repr(u8)]
9pub enum PageType {
10    /// Leaf page containing key-value pairs
11    Leaf = 0,
12    /// Branch (internal) page containing keys and child pointers
13    Branch = 1,
14    /// Overflow page for large values
15    Overflow = 2,
16    /// Free page (in free list)
17    Free = 3,
18}
19
20impl TryFrom<u8> for PageType {
21    type Error = Error;
22
23    fn try_from(value: u8) -> Result<Self> {
24        match value {
25            0 => Ok(PageType::Leaf),
26            1 => Ok(PageType::Branch),
27            2 => Ok(PageType::Overflow),
28            3 => Ok(PageType::Free),
29            _ => Err(Error::InvalidPageType {
30                page_id: PageId::INVALID,
31                page_type: value,
32            }),
33        }
34    }
35}
36
37/// Page flags
38#[derive(Debug, Clone, Copy, Default)]
39pub struct PageFlags {
40    pub dirty: bool,
41    pub encrypted: bool,
42    pub compressed: bool,
43}
44
45impl PageFlags {
46    fn to_u8(self) -> u8 {
47        let mut flags = 0u8;
48        if self.dirty {
49            flags |= 0x01;
50        }
51        if self.encrypted {
52            flags |= 0x02;
53        }
54        if self.compressed {
55            flags |= 0x04;
56        }
57        flags
58    }
59
60    fn from_u8(value: u8) -> Self {
61        PageFlags {
62            dirty: value & 0x01 != 0,
63            encrypted: value & 0x02 != 0,
64            compressed: value & 0x04 != 0,
65        }
66    }
67}
68
69/// Slot array entry - points to a cell in the page
70#[derive(Debug, Clone, Copy)]
71pub struct SlotEntry {
72    /// Offset from start of page to the cell
73    pub offset: u16,
74    /// Length of the cell in bytes
75    pub len: u16,
76    /// Flags (e.g., deleted marker for MVCC)
77    pub flags: u8,
78    /// Reserved for future use
79    pub reserved: u8,
80}
81
82impl SlotEntry {
83    pub const SIZE: usize = constants::SLOT_SIZE;
84
85    pub fn serialize(&self, buf: &mut impl BufMut) {
86        buf.put_u16_le(self.offset);
87        buf.put_u16_le(self.len);
88        buf.put_u8(self.flags);
89        buf.put_u8(self.reserved);
90    }
91
92    pub fn deserialize(buf: &mut impl Buf) -> Self {
93        SlotEntry {
94            offset: buf.get_u16_le(),
95            len: buf.get_u16_le(),
96            flags: buf.get_u8(),
97            reserved: buf.get_u8(),
98        }
99    }
100
101    pub fn is_deleted(&self) -> bool {
102        self.flags & 0x01 != 0
103    }
104}
105
106/// A database page (4KB by default)
107///
108/// Layout:
109/// - Header (64 bytes)
110/// - Slot array (grows down from header)
111/// - Free space
112/// - Cell data (grows up from bottom)
113#[derive(Clone)]
114pub struct Page {
115    /// Raw page data
116    data: Vec<u8>,
117    /// Page size
118    page_size: usize,
119}
120
121impl Page {
122    /// Create a new empty page
123    pub fn new(page_id: PageId, page_type: PageType, page_size: usize) -> Self {
124        let mut data = vec![0u8; page_size];
125
126        // Initialize header
127        let mut cursor = &mut data[..];
128
129        // page_id: u64
130        cursor.put_u64_le(page_id.0);
131        // page_type: u8
132        cursor.put_u8(page_type as u8);
133        // flags: u8
134        cursor.put_u8(0);
135        // version: u64 (MVCC version)
136        cursor.put_u64_le(0);
137        // checksum: u128 (will be computed on write)
138        cursor.put_u128_le(0);
139        // item_count: u16
140        cursor.put_u16_le(0);
141        // free_start: u16 (starts right after header)
142        cursor.put_u16_le(constants::PAGE_HEADER_SIZE as u16);
143        // free_end: u16 (starts at end of page)
144        cursor.put_u16_le(page_size as u16);
145        // parent: u64
146        cursor.put_u64_le(PageId::INVALID.0);
147        // right_sibling: u64
148        cursor.put_u64_le(PageId::INVALID.0);
149        // reserved: 6 bytes
150        cursor.put_slice(&[0u8; 6]);
151
152        Page { data, page_size }
153    }
154
155    /// Create a page from raw bytes
156    pub fn from_bytes(data: Vec<u8>) -> Result<Self> {
157        let page_size = data.len();
158        if page_size < constants::PAGE_HEADER_SIZE {
159            return Err(Error::Internal("Page too small".into()));
160        }
161        Ok(Page { data, page_size })
162    }
163
164    /// Get raw bytes
165    pub fn as_bytes(&self) -> &[u8] {
166        &self.data
167    }
168
169    /// Get mutable raw bytes
170    pub fn as_bytes_mut(&mut self) -> &mut [u8] {
171        &mut self.data
172    }
173
174    /// Get page ID
175    pub fn page_id(&self) -> PageId {
176        PageId(u64::from_le_bytes(self.data[0..8].try_into().unwrap()))
177    }
178
179    /// Set page ID
180    pub fn set_page_id(&mut self, id: PageId) {
181        self.data[0..8].copy_from_slice(&id.0.to_le_bytes());
182    }
183
184    /// Get page type
185    pub fn page_type(&self) -> PageType {
186        PageType::try_from(self.data[8]).unwrap_or(PageType::Free)
187    }
188
189    /// Set page type
190    pub fn set_page_type(&mut self, page_type: PageType) {
191        self.data[8] = page_type as u8;
192    }
193
194    /// Get flags
195    pub fn flags(&self) -> PageFlags {
196        PageFlags::from_u8(self.data[9])
197    }
198
199    /// Set flags
200    pub fn set_flags(&mut self, flags: PageFlags) {
201        self.data[9] = flags.to_u8();
202    }
203
204    /// Get MVCC version
205    pub fn version(&self) -> u64 {
206        u64::from_le_bytes(self.data[10..18].try_into().unwrap())
207    }
208
209    /// Set MVCC version
210    pub fn set_version(&mut self, version: u64) {
211        self.data[10..18].copy_from_slice(&version.to_le_bytes());
212    }
213
214    /// Get stored checksum
215    pub fn checksum(&self) -> u128 {
216        u128::from_le_bytes(self.data[18..34].try_into().unwrap())
217    }
218
219    /// Compute and store checksum
220    pub fn compute_checksum(&mut self) {
221        // Zero out checksum field before computing
222        self.data[18..34].copy_from_slice(&[0u8; 16]);
223
224        let hash = xxhash_rust::xxh3::xxh3_128(&self.data);
225        self.data[18..34].copy_from_slice(&hash.to_le_bytes());
226    }
227
228    /// Verify checksum
229    pub fn verify_checksum(&self) -> bool {
230        let stored = self.checksum();
231
232        // Zero out checksum field for computation
233        let mut temp = self.data.clone();
234        temp[18..34].copy_from_slice(&[0u8; 16]);
235
236        let computed = xxhash_rust::xxh3::xxh3_128(&temp);
237        stored == computed
238    }
239
240    /// Get item count
241    pub fn item_count(&self) -> u16 {
242        u16::from_le_bytes(self.data[34..36].try_into().unwrap())
243    }
244
245    /// Set item count
246    fn set_item_count(&mut self, count: u16) {
247        self.data[34..36].copy_from_slice(&count.to_le_bytes());
248    }
249
250    /// Get free space start offset
251    pub fn free_start(&self) -> u16 {
252        u16::from_le_bytes(self.data[36..38].try_into().unwrap())
253    }
254
255    /// Set free space start offset
256    fn set_free_start(&mut self, offset: u16) {
257        self.data[36..38].copy_from_slice(&offset.to_le_bytes());
258    }
259
260    /// Get free space end offset
261    pub fn free_end(&self) -> u16 {
262        u16::from_le_bytes(self.data[38..40].try_into().unwrap())
263    }
264
265    /// Set free space end offset
266    fn set_free_end(&mut self, offset: u16) {
267        self.data[38..40].copy_from_slice(&offset.to_le_bytes());
268    }
269
270    /// Get parent page ID
271    pub fn parent(&self) -> PageId {
272        PageId(u64::from_le_bytes(self.data[40..48].try_into().unwrap()))
273    }
274
275    /// Set parent page ID
276    pub fn set_parent(&mut self, parent: PageId) {
277        self.data[40..48].copy_from_slice(&parent.0.to_le_bytes());
278    }
279
280    /// Get right sibling page ID
281    pub fn right_sibling(&self) -> Option<PageId> {
282        let id = PageId(u64::from_le_bytes(self.data[48..56].try_into().unwrap()));
283        if id.is_valid() {
284            Some(id)
285        } else {
286            None
287        }
288    }
289
290    /// Set right sibling page ID
291    pub fn set_right_sibling(&mut self, sibling: Option<PageId>) {
292        let id = sibling.unwrap_or(PageId::INVALID);
293        self.data[48..56].copy_from_slice(&id.0.to_le_bytes());
294    }
295
296    /// Get available free space
297    pub fn free_space(&self) -> usize {
298        let start = self.free_start() as usize;
299        let end = self.free_end() as usize;
300        end.saturating_sub(start)
301    }
302
303    /// Get usable space (total - header)
304    pub fn usable_space(&self) -> usize {
305        self.page_size - constants::PAGE_HEADER_SIZE
306    }
307
308    /// Check if page needs to be split (> 70% full)
309    pub fn needs_split(&self) -> bool {
310        let used = self.usable_space() - self.free_space();
311        used > (self.usable_space() * 70) / 100
312    }
313
314    /// Count non-deleted items by scanning slot flags only.
315    /// No cell data is read — this is O(item_count) with minimal work per slot.
316    #[inline]
317    pub fn count_live_items(&self) -> u16 {
318        let count = self.item_count() as usize;
319        let mut live = 0u16;
320        for i in 0..count {
321            // flags byte is at slot_offset + 4 (after offset:u16 + len:u16)
322            let flags_offset = constants::PAGE_HEADER_SIZE + i * SlotEntry::SIZE + 4;
323            if self.data[flags_offset] & 0x01 == 0 {
324                live += 1;
325            }
326        }
327        live
328    }
329
330    /// Iterate non-deleted cells, calling `f(cell_data)` for each live cell.
331    /// No allocations — reads slot flags inline and yields cell slices directly.
332    #[inline]
333    pub fn for_each_live_cell<F>(&self, mut f: F)
334    where
335        F: FnMut(&[u8]),
336    {
337        let count = self.item_count() as usize;
338        for i in 0..count {
339            let slot_offset = constants::PAGE_HEADER_SIZE + i * SlotEntry::SIZE;
340            let flags = self.data[slot_offset + 4];
341            if flags & 0x01 == 0 {
342                let offset =
343                    u16::from_le_bytes([self.data[slot_offset], self.data[slot_offset + 1]])
344                        as usize;
345                let len =
346                    u16::from_le_bytes([self.data[slot_offset + 2], self.data[slot_offset + 3]])
347                        as usize;
348                if offset + len <= self.data.len() {
349                    f(&self.data[offset..offset + len]);
350                }
351            }
352        }
353    }
354
355    /// Get a slot entry by index
356    pub fn get_slot(&self, index: usize) -> Option<SlotEntry> {
357        if index >= self.item_count() as usize {
358            return None;
359        }
360
361        let slot_offset = constants::PAGE_HEADER_SIZE + index * SlotEntry::SIZE;
362        let mut cursor = &self.data[slot_offset..];
363        Some(SlotEntry::deserialize(&mut cursor))
364    }
365
366    /// Get cell data by slot index
367    pub fn get_cell(&self, index: usize) -> Option<&[u8]> {
368        let slot = self.get_slot(index)?;
369        let start = slot.offset as usize;
370        let end = start + slot.len as usize;
371
372        if end <= self.page_size {
373            Some(&self.data[start..end])
374        } else {
375            None
376        }
377    }
378
379    /// Insert a cell into the page
380    /// Returns the slot index, or None if not enough space
381    pub fn insert_cell(&mut self, data: &[u8]) -> Option<usize> {
382        let cell_len = data.len();
383        let needed = cell_len + SlotEntry::SIZE;
384
385        if self.free_space() < needed {
386            return None;
387        }
388
389        // Allocate space for cell at the end (growing upward)
390        let new_free_end = self.free_end() as usize - cell_len;
391        self.data[new_free_end..new_free_end + cell_len].copy_from_slice(data);
392        self.set_free_end(new_free_end as u16);
393
394        // Add slot entry (growing downward from header)
395        let slot_index = self.item_count() as usize;
396        let slot_offset = constants::PAGE_HEADER_SIZE + slot_index * SlotEntry::SIZE;
397
398        let slot = SlotEntry {
399            offset: new_free_end as u16,
400            len: cell_len as u16,
401            flags: 0,
402            reserved: 0,
403        };
404
405        let mut cursor = &mut self.data[slot_offset..];
406        slot.serialize(&mut cursor);
407
408        self.set_item_count((slot_index + 1) as u16);
409        self.set_free_start((slot_offset + SlotEntry::SIZE) as u16);
410
411        Some(slot_index)
412    }
413
414    /// Delete a cell by marking it as deleted
415    pub fn delete_cell(&mut self, index: usize) -> bool {
416        if index >= self.item_count() as usize {
417            return false;
418        }
419
420        let slot_offset = constants::PAGE_HEADER_SIZE + index * SlotEntry::SIZE;
421        // Set deleted flag
422        self.data[slot_offset + 4] |= 0x01;
423        true
424    }
425
426    /// Update a cell in place (must fit in existing space)
427    pub fn update_cell(&mut self, index: usize, data: &[u8]) -> bool {
428        let slot = match self.get_slot(index) {
429            Some(s) => s,
430            None => return false,
431        };
432
433        if data.len() > slot.len as usize {
434            return false; // Doesn't fit
435        }
436
437        let start = slot.offset as usize;
438        self.data[start..start + data.len()].copy_from_slice(data);
439
440        // Update length in slot if smaller
441        if data.len() < slot.len as usize {
442            let slot_offset = constants::PAGE_HEADER_SIZE + index * SlotEntry::SIZE;
443            self.data[slot_offset + 2..slot_offset + 4]
444                .copy_from_slice(&(data.len() as u16).to_le_bytes());
445        }
446
447        true
448    }
449
450    /// Compact the page, removing deleted cells and reclaiming space
451    pub fn compact(&mut self) {
452        let mut live_cells: Vec<(usize, Vec<u8>)> = Vec::new();
453
454        // Collect non-deleted cells
455        for i in 0..self.item_count() as usize {
456            let slot = self.get_slot(i).unwrap();
457            if !slot.is_deleted() {
458                if let Some(cell) = self.get_cell(i) {
459                    live_cells.push((i, cell.to_vec()));
460                }
461            }
462        }
463
464        // Reset page
465        let page_id = self.page_id();
466        let page_type = self.page_type();
467        let parent = self.parent();
468        let sibling = self.right_sibling();
469        let version = self.version();
470
471        *self = Page::new(page_id, page_type, self.page_size);
472        self.set_parent(parent);
473        self.set_right_sibling(sibling);
474        self.set_version(version);
475
476        // Re-insert live cells
477        for (_, cell) in live_cells {
478            self.insert_cell(&cell);
479        }
480    }
481
482    /// Apply a redo operation from WAL
483    pub fn apply_redo(&mut self, data: &[u8]) -> Result<()> {
484        if data.len() != self.data.len() {
485            return Err(Error::Internal("Redo data size mismatch".into()));
486        }
487        self.data.copy_from_slice(data);
488        Ok(())
489    }
490}
491
492impl std::fmt::Debug for Page {
493    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494        f.debug_struct("Page")
495            .field("page_id", &self.page_id())
496            .field("page_type", &self.page_type())
497            .field("item_count", &self.item_count())
498            .field("free_space", &self.free_space())
499            .finish()
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    #[test]
508    fn test_page_creation() {
509        let page = Page::new(PageId(1), PageType::Leaf, 4096);
510
511        assert_eq!(page.page_id(), PageId(1));
512        assert_eq!(page.page_type(), PageType::Leaf);
513        assert_eq!(page.item_count(), 0);
514        assert!(page.free_space() > 4000);
515    }
516
517    #[test]
518    fn test_cell_operations() {
519        let mut page = Page::new(PageId(1), PageType::Leaf, 4096);
520
521        // Insert cells
522        let idx1 = page.insert_cell(b"hello").unwrap();
523        let idx2 = page.insert_cell(b"world").unwrap();
524
525        assert_eq!(idx1, 0);
526        assert_eq!(idx2, 1);
527        assert_eq!(page.item_count(), 2);
528
529        // Read cells
530        assert_eq!(page.get_cell(0), Some(b"hello".as_slice()));
531        assert_eq!(page.get_cell(1), Some(b"world".as_slice()));
532    }
533
534    #[test]
535    fn test_checksum() {
536        let mut page = Page::new(PageId(1), PageType::Leaf, 4096);
537        page.insert_cell(b"test data");
538
539        page.compute_checksum();
540        assert!(page.verify_checksum());
541
542        // Corrupt the page
543        page.data[100] ^= 0xFF;
544        assert!(!page.verify_checksum());
545    }
546}