Skip to main content

reddb_server/storage/engine/page/
impl.rs

1use super::*;
2
3impl Page {
4    /// Create a new empty page
5    pub fn new(page_type: PageType, page_id: u32) -> Self {
6        let mut page = Self {
7            data: [0u8; PAGE_SIZE],
8        };
9
10        let header = PageHeader::new(page_type, page_id);
11        page.set_header(&header);
12        page
13    }
14
15    /// Create a page from raw bytes
16    pub fn from_bytes(data: [u8; PAGE_SIZE]) -> Self {
17        Self { data }
18    }
19
20    /// Create a page from a byte slice (must be exactly PAGE_SIZE)
21    pub fn from_slice(slice: &[u8]) -> Result<Self, PageError> {
22        if slice.len() != PAGE_SIZE {
23            return Err(PageError::InvalidSize(slice.len()));
24        }
25        let mut data = [0u8; PAGE_SIZE];
26        data.copy_from_slice(slice);
27        Ok(Self { data })
28    }
29
30    /// Get raw page data
31    #[inline]
32    pub fn as_bytes(&self) -> &[u8; PAGE_SIZE] {
33        &self.data
34    }
35
36    /// Get mutable raw page data
37    #[inline]
38    pub fn as_bytes_mut(&mut self) -> &mut [u8; PAGE_SIZE] {
39        &mut self.data
40    }
41
42    /// Get page header
43    pub fn header(&self) -> Result<PageHeader, PageError> {
44        let header_bytes: [u8; HEADER_SIZE] = self.data[..HEADER_SIZE]
45            .try_into()
46            .expect("header size mismatch");
47        PageHeader::from_bytes(&header_bytes)
48    }
49
50    /// Set page header
51    pub fn set_header(&mut self, header: &PageHeader) {
52        let bytes = header.to_bytes();
53        self.data[..HEADER_SIZE].copy_from_slice(&bytes);
54    }
55
56    /// Get page type
57    pub fn page_type(&self) -> Result<PageType, PageError> {
58        PageType::from_u8(self.data[0]).ok_or(PageError::InvalidPageType(self.data[0]))
59    }
60
61    /// Get page ID
62    pub fn page_id(&self) -> u32 {
63        u32::from_le_bytes([self.data[8], self.data[9], self.data[10], self.data[11]])
64    }
65
66    /// Get the WAL Log Sequence Number stamped on this page.
67    ///
68    /// `0` means "no WAL guarantee" — the page was modified through a
69    /// path that did not append a WAL record (freelist trunks, header
70    /// shadow pages). The double-write buffer is responsible for the
71    /// integrity of `lsn == 0` pages.
72    ///
73    /// See `src/storage/engine/btree/README.md` § Invariant 3.
74    pub fn lsn(&self) -> u64 {
75        u64::from_le_bytes([
76            self.data[20],
77            self.data[21],
78            self.data[22],
79            self.data[23],
80            self.data[24],
81            self.data[25],
82            self.data[26],
83            self.data[27],
84        ])
85    }
86
87    /// Stamp the WAL LSN of the record describing the most recent
88    /// mutation to this page. The pager's flush path guarantees that
89    /// the WAL is durable up to this LSN before writing the page to
90    /// disk (WAL-first ordering — see `PLAN.md` § Target 3).
91    ///
92    /// Callers should pass the LSN returned by `WalWriter::append`
93    /// for the change record. Pass `0` only on legacy / non-WAL
94    /// write paths (DWB-protected freelist + header writes).
95    pub fn set_lsn(&mut self, lsn: u64) {
96        self.data[20..28].copy_from_slice(&lsn.to_le_bytes());
97    }
98
99    /// Get cell count
100    pub fn cell_count(&self) -> u16 {
101        u16::from_le_bytes([self.data[2], self.data[3]])
102    }
103
104    /// Set cell count
105    pub fn set_cell_count(&mut self, count: u16) {
106        self.data[2..4].copy_from_slice(&count.to_le_bytes());
107    }
108
109    /// Get parent page ID
110    pub fn parent_id(&self) -> u32 {
111        u32::from_le_bytes([self.data[12], self.data[13], self.data[14], self.data[15]])
112    }
113
114    /// Set parent page ID
115    pub fn set_parent_id(&mut self, parent_id: u32) {
116        self.data[12..16].copy_from_slice(&parent_id.to_le_bytes());
117    }
118
119    /// Get right child page ID (for interior nodes)
120    pub fn right_child(&self) -> u32 {
121        u32::from_le_bytes([self.data[16], self.data[17], self.data[18], self.data[19]])
122    }
123
124    /// Set right child page ID (for interior nodes)
125    pub fn set_right_child(&mut self, child_id: u32) {
126        self.data[16..20].copy_from_slice(&child_id.to_le_bytes());
127    }
128
129    /// Get free_start offset
130    pub fn free_start(&self) -> u16 {
131        u16::from_le_bytes([self.data[4], self.data[5]])
132    }
133
134    /// Set free_start offset
135    pub fn set_free_start(&mut self, offset: u16) {
136        self.data[4..6].copy_from_slice(&offset.to_le_bytes());
137    }
138
139    /// Get free_end offset
140    pub fn free_end(&self) -> u16 {
141        u16::from_le_bytes([self.data[6], self.data[7]])
142    }
143
144    /// Set free_end offset
145    pub fn set_free_end(&mut self, offset: u16) {
146        self.data[6..8].copy_from_slice(&offset.to_le_bytes());
147    }
148
149    /// Get content area (everything after header)
150    #[inline]
151    pub fn content(&self) -> &[u8] {
152        &self.data[HEADER_SIZE..]
153    }
154
155    /// Get mutable content area
156    #[inline]
157    pub fn content_mut(&mut self) -> &mut [u8] {
158        &mut self.data[HEADER_SIZE..]
159    }
160
161    /// Calculate and update checksum
162    pub fn update_checksum(&mut self) {
163        // Zero out checksum field before calculating
164        self.data[28..32].copy_from_slice(&[0u8; 4]);
165        // Calculate CRC32 of entire page
166        let checksum = crc32(&self.data);
167        // Store checksum
168        self.data[28..32].copy_from_slice(&checksum.to_le_bytes());
169    }
170
171    /// Verify page checksum
172    pub fn verify_checksum(&self) -> Result<(), PageError> {
173        let stored =
174            u32::from_le_bytes([self.data[28], self.data[29], self.data[30], self.data[31]]);
175
176        // Calculate checksum with stored value zeroed
177        let mut temp = self.data;
178        temp[28..32].copy_from_slice(&[0u8; 4]);
179        let calculated = crc32(&temp);
180
181        if stored != calculated {
182            Err(PageError::ChecksumMismatch {
183                expected: stored,
184                actual: calculated,
185            })
186        } else {
187            Ok(())
188        }
189    }
190
191    /// Get cell pointer at index
192    ///
193    /// Cell pointers are stored as u16 offsets starting at HEADER_SIZE.
194    pub fn get_cell_pointer(&self, index: usize) -> Result<u16, PageError> {
195        let count = self.cell_count() as usize;
196        if index >= count {
197            return Err(PageError::CellOutOfBounds(index));
198        }
199
200        let offset = HEADER_SIZE + index * 2;
201        Ok(u16::from_le_bytes([
202            self.data[offset],
203            self.data[offset + 1],
204        ]))
205    }
206
207    /// Set cell pointer at index
208    pub fn set_cell_pointer(&mut self, index: usize, pointer: u16) -> Result<(), PageError> {
209        if pointer < HEADER_SIZE as u16 || pointer >= PAGE_SIZE as u16 {
210            return Err(PageError::InvalidCellPointer(pointer));
211        }
212
213        let offset = HEADER_SIZE + index * 2;
214        self.data[offset..offset + 2].copy_from_slice(&pointer.to_le_bytes());
215        Ok(())
216    }
217
218    /// Get cell data by index
219    pub fn get_cell(&self, index: usize) -> Result<&[u8], PageError> {
220        let pointer = self.get_cell_pointer(index)? as usize;
221
222        // Read cell header to determine size
223        // Cell format: [key_len: u16][value_len: u32][key][value]
224        if pointer + 6 > PAGE_SIZE {
225            return Err(PageError::InvalidCellPointer(pointer as u16));
226        }
227
228        let key_len = u16::from_le_bytes([self.data[pointer], self.data[pointer + 1]]) as usize;
229        let value_len = u32::from_le_bytes([
230            self.data[pointer + 2],
231            self.data[pointer + 3],
232            self.data[pointer + 4],
233            self.data[pointer + 5],
234        ]) as usize;
235
236        let total_len = 6 + key_len + value_len;
237        if pointer + total_len > PAGE_SIZE {
238            return Err(PageError::InvalidCellPointer(pointer as u16));
239        }
240
241        Ok(&self.data[pointer..pointer + total_len])
242    }
243
244    /// Insert a new cell (key-value pair) into the page
245    ///
246    /// Returns the cell index on success.
247    pub fn insert_cell(&mut self, key: &[u8], value: &[u8]) -> Result<usize, PageError> {
248        let key_len = key.len();
249        let value_len = value.len();
250
251        // Check size limits
252        if key_len > u16::MAX as usize {
253            return Err(PageError::OverflowRequired);
254        }
255
256        // Cell format: [key_len: u16][value_len: u32][key][value]
257        let cell_size = 6 + key_len + value_len;
258
259        // Check if we need overflow
260        if cell_size > CONTENT_SIZE - 2 {
261            return Err(PageError::OverflowRequired);
262        }
263
264        // Read current header
265        let mut header = self.header()?;
266
267        // Check available space (need room for cell pointer + cell data)
268        let space_needed = 2 + cell_size;
269        if header.free_space() < space_needed {
270            return Err(PageError::PageFull);
271        }
272
273        // Allocate cell from end of page (growing upward)
274        let cell_offset = header.free_end as usize - cell_size;
275
276        // Write cell data
277        self.data[cell_offset..cell_offset + 2].copy_from_slice(&(key_len as u16).to_le_bytes());
278        self.data[cell_offset + 2..cell_offset + 6]
279            .copy_from_slice(&(value_len as u32).to_le_bytes());
280        self.data[cell_offset + 6..cell_offset + 6 + key_len].copy_from_slice(key);
281        self.data[cell_offset + 6 + key_len..cell_offset + 6 + key_len + value_len]
282            .copy_from_slice(value);
283
284        // Write cell pointer
285        let cell_index = header.cell_count as usize;
286        let pointer_offset = HEADER_SIZE + cell_index * 2;
287        self.data[pointer_offset..pointer_offset + 2]
288            .copy_from_slice(&(cell_offset as u16).to_le_bytes());
289
290        // Update header
291        header.cell_count += 1;
292        header.free_start += 2;
293        header.free_end = cell_offset as u16;
294        header.set_flag(PageFlag::Dirty);
295        self.set_header(&header);
296
297        Ok(cell_index)
298    }
299
300    /// Read key and value from cell at index
301    pub fn read_cell(&self, index: usize) -> Result<(Vec<u8>, Vec<u8>), PageError> {
302        let cell = self.get_cell(index)?;
303
304        let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
305        let value_len = u32::from_le_bytes([cell[2], cell[3], cell[4], cell[5]]) as usize;
306
307        let key = cell[6..6 + key_len].to_vec();
308        let value = cell[6 + key_len..6 + key_len + value_len].to_vec();
309
310        Ok((key, value))
311    }
312
313    /// Binary search for key in sorted cell array
314    ///
315    /// Returns Ok(index) if key is found, Err(insert_pos) if not.
316    pub fn search_key(&self, key: &[u8]) -> Result<usize, usize> {
317        let count = self.cell_count() as usize;
318        if count == 0 {
319            return Err(0);
320        }
321
322        let mut low = 0;
323        let mut high = count;
324
325        while low < high {
326            let mid = (low + high) / 2;
327            let (cell_key, _) = self.read_cell(mid).map_err(|_| mid)?;
328
329            match cell_key.as_slice().cmp(key) {
330                std::cmp::Ordering::Less => low = mid + 1,
331                std::cmp::Ordering::Greater => high = mid,
332                std::cmp::Ordering::Equal => return Ok(mid),
333            }
334        }
335
336        Err(low)
337    }
338
339    /// Create a database header page (page 0)
340    pub fn new_header_page(page_count: u32) -> Self {
341        let mut page = Self::new(PageType::Header, 0);
342
343        // Write magic bytes at start of content
344        page.data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
345
346        // Write version
347        page.data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&DB_VERSION.to_le_bytes());
348
349        // Write page size
350        page.data[HEADER_SIZE + 8..HEADER_SIZE + 12]
351            .copy_from_slice(&(PAGE_SIZE as u32).to_le_bytes());
352
353        // Write page count
354        page.data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&page_count.to_le_bytes());
355
356        // Write freelist head (0 = no free pages)
357        page.data[HEADER_SIZE + 16..HEADER_SIZE + 20].copy_from_slice(&0u32.to_le_bytes());
358
359        page.update_checksum();
360        page
361    }
362
363    /// Read page count from header page
364    pub fn read_page_count(&self) -> u32 {
365        u32::from_le_bytes([
366            self.data[HEADER_SIZE + 12],
367            self.data[HEADER_SIZE + 13],
368            self.data[HEADER_SIZE + 14],
369            self.data[HEADER_SIZE + 15],
370        ])
371    }
372
373    /// Write page count to header page
374    pub fn write_page_count(&mut self, count: u32) {
375        self.data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&count.to_le_bytes());
376    }
377
378    /// Read freelist head from header page
379    pub fn read_freelist_head(&self) -> u32 {
380        u32::from_le_bytes([
381            self.data[HEADER_SIZE + 16],
382            self.data[HEADER_SIZE + 17],
383            self.data[HEADER_SIZE + 18],
384            self.data[HEADER_SIZE + 19],
385        ])
386    }
387
388    /// Write freelist head to header page
389    pub fn write_freelist_head(&mut self, page_id: u32) {
390        self.data[HEADER_SIZE + 16..HEADER_SIZE + 20].copy_from_slice(&page_id.to_le_bytes());
391    }
392
393    /// Verify this is a valid header page
394    pub fn verify_header_page(&self) -> Result<(), PageError> {
395        // Check magic bytes
396        if self.data[HEADER_SIZE..HEADER_SIZE + 4] != MAGIC_BYTES {
397            return Err(PageError::InvalidPageType(self.data[0]));
398        }
399
400        // Check page size
401        let stored_page_size = u32::from_le_bytes([
402            self.data[HEADER_SIZE + 8],
403            self.data[HEADER_SIZE + 9],
404            self.data[HEADER_SIZE + 10],
405            self.data[HEADER_SIZE + 11],
406        ]) as usize;
407
408        if stored_page_size != PAGE_SIZE {
409            return Err(PageError::InvalidSize(stored_page_size));
410        }
411
412        Ok(())
413    }
414}