1use bytes::{Buf, BufMut};
4use featherdb_core::{constants, Error, PageId, Result};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8#[repr(u8)]
9pub enum PageType {
10 Leaf = 0,
12 Branch = 1,
14 Overflow = 2,
16 Free = 3,
18}
19
20impl TryFrom<u8> for PageType {
21 type Error = Error;
22
23 fn try_from(value: u8) -> Result<Self> {
24 match value {
25 0 => Ok(PageType::Leaf),
26 1 => Ok(PageType::Branch),
27 2 => Ok(PageType::Overflow),
28 3 => Ok(PageType::Free),
29 _ => Err(Error::InvalidPageType {
30 page_id: PageId::INVALID,
31 page_type: value,
32 }),
33 }
34 }
35}
36
37#[derive(Debug, Clone, Copy, Default)]
39pub struct PageFlags {
40 pub dirty: bool,
41 pub encrypted: bool,
42 pub compressed: bool,
43}
44
45impl PageFlags {
46 fn to_u8(self) -> u8 {
47 let mut flags = 0u8;
48 if self.dirty {
49 flags |= 0x01;
50 }
51 if self.encrypted {
52 flags |= 0x02;
53 }
54 if self.compressed {
55 flags |= 0x04;
56 }
57 flags
58 }
59
60 fn from_u8(value: u8) -> Self {
61 PageFlags {
62 dirty: value & 0x01 != 0,
63 encrypted: value & 0x02 != 0,
64 compressed: value & 0x04 != 0,
65 }
66 }
67}
68
69#[derive(Debug, Clone, Copy)]
71pub struct SlotEntry {
72 pub offset: u16,
74 pub len: u16,
76 pub flags: u8,
78 pub reserved: u8,
80}
81
82impl SlotEntry {
83 pub const SIZE: usize = constants::SLOT_SIZE;
84
85 pub fn serialize(&self, buf: &mut impl BufMut) {
86 buf.put_u16_le(self.offset);
87 buf.put_u16_le(self.len);
88 buf.put_u8(self.flags);
89 buf.put_u8(self.reserved);
90 }
91
92 pub fn deserialize(buf: &mut impl Buf) -> Self {
93 SlotEntry {
94 offset: buf.get_u16_le(),
95 len: buf.get_u16_le(),
96 flags: buf.get_u8(),
97 reserved: buf.get_u8(),
98 }
99 }
100
101 pub fn is_deleted(&self) -> bool {
102 self.flags & 0x01 != 0
103 }
104}
105
106#[derive(Clone)]
114pub struct Page {
115 data: Vec<u8>,
117 page_size: usize,
119}
120
121impl Page {
122 pub fn new(page_id: PageId, page_type: PageType, page_size: usize) -> Self {
124 let mut data = vec![0u8; page_size];
125
126 let mut cursor = &mut data[..];
128
129 cursor.put_u64_le(page_id.0);
131 cursor.put_u8(page_type as u8);
133 cursor.put_u8(0);
135 cursor.put_u64_le(0);
137 cursor.put_u128_le(0);
139 cursor.put_u16_le(0);
141 cursor.put_u16_le(constants::PAGE_HEADER_SIZE as u16);
143 cursor.put_u16_le(page_size as u16);
145 cursor.put_u64_le(PageId::INVALID.0);
147 cursor.put_u64_le(PageId::INVALID.0);
149 cursor.put_slice(&[0u8; 6]);
151
152 Page { data, page_size }
153 }
154
155 pub fn from_bytes(data: Vec<u8>) -> Result<Self> {
157 let page_size = data.len();
158 if page_size < constants::PAGE_HEADER_SIZE {
159 return Err(Error::Internal("Page too small".into()));
160 }
161 Ok(Page { data, page_size })
162 }
163
164 pub fn as_bytes(&self) -> &[u8] {
166 &self.data
167 }
168
169 pub fn as_bytes_mut(&mut self) -> &mut [u8] {
171 &mut self.data
172 }
173
174 pub fn page_id(&self) -> PageId {
176 PageId(u64::from_le_bytes(self.data[0..8].try_into().unwrap()))
177 }
178
179 pub fn set_page_id(&mut self, id: PageId) {
181 self.data[0..8].copy_from_slice(&id.0.to_le_bytes());
182 }
183
184 pub fn page_type(&self) -> PageType {
186 PageType::try_from(self.data[8]).unwrap_or(PageType::Free)
187 }
188
189 pub fn set_page_type(&mut self, page_type: PageType) {
191 self.data[8] = page_type as u8;
192 }
193
194 pub fn flags(&self) -> PageFlags {
196 PageFlags::from_u8(self.data[9])
197 }
198
199 pub fn set_flags(&mut self, flags: PageFlags) {
201 self.data[9] = flags.to_u8();
202 }
203
204 pub fn version(&self) -> u64 {
206 u64::from_le_bytes(self.data[10..18].try_into().unwrap())
207 }
208
209 pub fn set_version(&mut self, version: u64) {
211 self.data[10..18].copy_from_slice(&version.to_le_bytes());
212 }
213
214 pub fn checksum(&self) -> u128 {
216 u128::from_le_bytes(self.data[18..34].try_into().unwrap())
217 }
218
219 pub fn compute_checksum(&mut self) {
221 self.data[18..34].copy_from_slice(&[0u8; 16]);
223
224 let hash = xxhash_rust::xxh3::xxh3_128(&self.data);
225 self.data[18..34].copy_from_slice(&hash.to_le_bytes());
226 }
227
228 pub fn verify_checksum(&self) -> bool {
230 let stored = self.checksum();
231
232 let mut temp = self.data.clone();
234 temp[18..34].copy_from_slice(&[0u8; 16]);
235
236 let computed = xxhash_rust::xxh3::xxh3_128(&temp);
237 stored == computed
238 }
239
240 pub fn item_count(&self) -> u16 {
242 u16::from_le_bytes(self.data[34..36].try_into().unwrap())
243 }
244
245 fn set_item_count(&mut self, count: u16) {
247 self.data[34..36].copy_from_slice(&count.to_le_bytes());
248 }
249
250 pub fn free_start(&self) -> u16 {
252 u16::from_le_bytes(self.data[36..38].try_into().unwrap())
253 }
254
255 fn set_free_start(&mut self, offset: u16) {
257 self.data[36..38].copy_from_slice(&offset.to_le_bytes());
258 }
259
260 pub fn free_end(&self) -> u16 {
262 u16::from_le_bytes(self.data[38..40].try_into().unwrap())
263 }
264
265 fn set_free_end(&mut self, offset: u16) {
267 self.data[38..40].copy_from_slice(&offset.to_le_bytes());
268 }
269
270 pub fn parent(&self) -> PageId {
272 PageId(u64::from_le_bytes(self.data[40..48].try_into().unwrap()))
273 }
274
275 pub fn set_parent(&mut self, parent: PageId) {
277 self.data[40..48].copy_from_slice(&parent.0.to_le_bytes());
278 }
279
280 pub fn right_sibling(&self) -> Option<PageId> {
282 let id = PageId(u64::from_le_bytes(self.data[48..56].try_into().unwrap()));
283 if id.is_valid() {
284 Some(id)
285 } else {
286 None
287 }
288 }
289
290 pub fn set_right_sibling(&mut self, sibling: Option<PageId>) {
292 let id = sibling.unwrap_or(PageId::INVALID);
293 self.data[48..56].copy_from_slice(&id.0.to_le_bytes());
294 }
295
296 pub fn free_space(&self) -> usize {
298 let start = self.free_start() as usize;
299 let end = self.free_end() as usize;
300 end.saturating_sub(start)
301 }
302
303 pub fn usable_space(&self) -> usize {
305 self.page_size - constants::PAGE_HEADER_SIZE
306 }
307
308 pub fn needs_split(&self) -> bool {
310 let used = self.usable_space() - self.free_space();
311 used > (self.usable_space() * 70) / 100
312 }
313
314 #[inline]
317 pub fn count_live_items(&self) -> u16 {
318 let count = self.item_count() as usize;
319 let mut live = 0u16;
320 for i in 0..count {
321 let flags_offset = constants::PAGE_HEADER_SIZE + i * SlotEntry::SIZE + 4;
323 if self.data[flags_offset] & 0x01 == 0 {
324 live += 1;
325 }
326 }
327 live
328 }
329
330 #[inline]
333 pub fn for_each_live_cell<F>(&self, mut f: F)
334 where
335 F: FnMut(&[u8]),
336 {
337 let count = self.item_count() as usize;
338 for i in 0..count {
339 let slot_offset = constants::PAGE_HEADER_SIZE + i * SlotEntry::SIZE;
340 let flags = self.data[slot_offset + 4];
341 if flags & 0x01 == 0 {
342 let offset =
343 u16::from_le_bytes([self.data[slot_offset], self.data[slot_offset + 1]])
344 as usize;
345 let len =
346 u16::from_le_bytes([self.data[slot_offset + 2], self.data[slot_offset + 3]])
347 as usize;
348 if offset + len <= self.data.len() {
349 f(&self.data[offset..offset + len]);
350 }
351 }
352 }
353 }
354
355 pub fn get_slot(&self, index: usize) -> Option<SlotEntry> {
357 if index >= self.item_count() as usize {
358 return None;
359 }
360
361 let slot_offset = constants::PAGE_HEADER_SIZE + index * SlotEntry::SIZE;
362 let mut cursor = &self.data[slot_offset..];
363 Some(SlotEntry::deserialize(&mut cursor))
364 }
365
366 pub fn get_cell(&self, index: usize) -> Option<&[u8]> {
368 let slot = self.get_slot(index)?;
369 let start = slot.offset as usize;
370 let end = start + slot.len as usize;
371
372 if end <= self.page_size {
373 Some(&self.data[start..end])
374 } else {
375 None
376 }
377 }
378
379 pub fn insert_cell(&mut self, data: &[u8]) -> Option<usize> {
382 let cell_len = data.len();
383 let needed = cell_len + SlotEntry::SIZE;
384
385 if self.free_space() < needed {
386 return None;
387 }
388
389 let new_free_end = self.free_end() as usize - cell_len;
391 self.data[new_free_end..new_free_end + cell_len].copy_from_slice(data);
392 self.set_free_end(new_free_end as u16);
393
394 let slot_index = self.item_count() as usize;
396 let slot_offset = constants::PAGE_HEADER_SIZE + slot_index * SlotEntry::SIZE;
397
398 let slot = SlotEntry {
399 offset: new_free_end as u16,
400 len: cell_len as u16,
401 flags: 0,
402 reserved: 0,
403 };
404
405 let mut cursor = &mut self.data[slot_offset..];
406 slot.serialize(&mut cursor);
407
408 self.set_item_count((slot_index + 1) as u16);
409 self.set_free_start((slot_offset + SlotEntry::SIZE) as u16);
410
411 Some(slot_index)
412 }
413
414 pub fn delete_cell(&mut self, index: usize) -> bool {
416 if index >= self.item_count() as usize {
417 return false;
418 }
419
420 let slot_offset = constants::PAGE_HEADER_SIZE + index * SlotEntry::SIZE;
421 self.data[slot_offset + 4] |= 0x01;
423 true
424 }
425
426 pub fn update_cell(&mut self, index: usize, data: &[u8]) -> bool {
428 let slot = match self.get_slot(index) {
429 Some(s) => s,
430 None => return false,
431 };
432
433 if data.len() > slot.len as usize {
434 return false; }
436
437 let start = slot.offset as usize;
438 self.data[start..start + data.len()].copy_from_slice(data);
439
440 if data.len() < slot.len as usize {
442 let slot_offset = constants::PAGE_HEADER_SIZE + index * SlotEntry::SIZE;
443 self.data[slot_offset + 2..slot_offset + 4]
444 .copy_from_slice(&(data.len() as u16).to_le_bytes());
445 }
446
447 true
448 }
449
450 pub fn compact(&mut self) {
452 let mut live_cells: Vec<(usize, Vec<u8>)> = Vec::new();
453
454 for i in 0..self.item_count() as usize {
456 let slot = self.get_slot(i).unwrap();
457 if !slot.is_deleted() {
458 if let Some(cell) = self.get_cell(i) {
459 live_cells.push((i, cell.to_vec()));
460 }
461 }
462 }
463
464 let page_id = self.page_id();
466 let page_type = self.page_type();
467 let parent = self.parent();
468 let sibling = self.right_sibling();
469 let version = self.version();
470
471 *self = Page::new(page_id, page_type, self.page_size);
472 self.set_parent(parent);
473 self.set_right_sibling(sibling);
474 self.set_version(version);
475
476 for (_, cell) in live_cells {
478 self.insert_cell(&cell);
479 }
480 }
481
482 pub fn apply_redo(&mut self, data: &[u8]) -> Result<()> {
484 if data.len() != self.data.len() {
485 return Err(Error::Internal("Redo data size mismatch".into()));
486 }
487 self.data.copy_from_slice(data);
488 Ok(())
489 }
490}
491
492impl std::fmt::Debug for Page {
493 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494 f.debug_struct("Page")
495 .field("page_id", &self.page_id())
496 .field("page_type", &self.page_type())
497 .field("item_count", &self.item_count())
498 .field("free_space", &self.free_space())
499 .finish()
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[test]
508 fn test_page_creation() {
509 let page = Page::new(PageId(1), PageType::Leaf, 4096);
510
511 assert_eq!(page.page_id(), PageId(1));
512 assert_eq!(page.page_type(), PageType::Leaf);
513 assert_eq!(page.item_count(), 0);
514 assert!(page.free_space() > 4000);
515 }
516
517 #[test]
518 fn test_cell_operations() {
519 let mut page = Page::new(PageId(1), PageType::Leaf, 4096);
520
521 let idx1 = page.insert_cell(b"hello").unwrap();
523 let idx2 = page.insert_cell(b"world").unwrap();
524
525 assert_eq!(idx1, 0);
526 assert_eq!(idx2, 1);
527 assert_eq!(page.item_count(), 2);
528
529 assert_eq!(page.get_cell(0), Some(b"hello".as_slice()));
531 assert_eq!(page.get_cell(1), Some(b"world".as_slice()));
532 }
533
534 #[test]
535 fn test_checksum() {
536 let mut page = Page::new(PageId(1), PageType::Leaf, 4096);
537 page.insert_cell(b"test data");
538
539 page.compute_checksum();
540 assert!(page.verify_checksum());
541
542 page.data[100] ^= 0xFF;
544 assert!(!page.verify_checksum());
545 }
546}