1use crate::{BufferPool, Page, PageType};
4use bytes::BufMut;
5use featherdb_core::{constants, Error, PageId, Result};
6use std::sync::Arc;
7
8const OVERFLOW_MARKER: u32 = u32::MAX;
11
12const OVERFLOW_CHUNK_HEADER: usize = 2; fn overflow_data_capacity(page_size: usize) -> usize {
19 page_size - constants::PAGE_HEADER_SIZE - OVERFLOW_CHUNK_HEADER
20}
21
22fn max_inline_cell_size(page_size: usize) -> usize {
27 page_size - constants::PAGE_HEADER_SIZE - constants::SLOT_SIZE
29}
30
31pub struct BTree {
33 root_page: PageId,
35 buffer_pool: Arc<BufferPool>,
37}
38
39impl BTree {
40 pub fn new(buffer_pool: Arc<BufferPool>) -> Result<Self> {
42 let (root_page, _guard) = buffer_pool.allocate_page(PageType::Leaf)?;
43
44 Ok(BTree {
45 root_page,
46 buffer_pool,
47 })
48 }
49
50 pub fn open(root_page: PageId, buffer_pool: Arc<BufferPool>) -> Self {
52 BTree {
53 root_page,
54 buffer_pool,
55 }
56 }
57
58 pub fn root_page(&self) -> PageId {
60 self.root_page
61 }
62
63 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
65 let (leaf_page_id, _) = self.find_leaf(key)?;
66 let guard = self.buffer_pool.get_page(leaf_page_id)?;
67 let page = guard.read();
68
69 for i in 0..page.item_count() as usize {
73 if let Some(slot) = page.get_slot(i) {
75 if slot.is_deleted() {
76 continue;
77 }
78 }
79
80 if let Some(cell) = page.get_cell(i) {
81 let (cell_key, _) = Self::decode_leaf_cell(cell)?;
82
83 if cell_key == key {
84 if Self::cell_is_overflow(cell) {
86 let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
87 let meta = &cell[6 + key_len..];
88 let (overflow_page, total_len) = Self::decode_overflow_meta(meta)?;
89 let value = self.read_overflow_pages(overflow_page, total_len as usize)?;
90 return Ok(Some(value));
91 } else {
92 let (_, cell_value) = Self::decode_leaf_cell(cell)?;
93 return Ok(Some(cell_value.to_vec()));
94 }
95 }
96 }
97 }
98
99 Ok(None)
100 }
101
102 pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
105 let (leaf_page_id, path) = self.find_leaf(key)?;
106
107 let guard = self.buffer_pool.get_page_for_write(leaf_page_id)?;
109 let mut page = guard.write();
110
111 let mut old_value = None;
113
114 for i in 0..page.item_count() as usize {
115 if let Some(slot) = page.get_slot(i) {
117 if slot.is_deleted() {
118 continue;
119 }
120 }
121
122 if let Some(cell) = page.get_cell(i) {
123 let (cell_key, _) = Self::decode_leaf_cell(cell)?;
124
125 if cell_key == key {
126 if Self::cell_is_overflow(cell) {
128 let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
129 let meta = &cell[6 + key_len..];
130 let (overflow_page, total_len) = Self::decode_overflow_meta(meta)?;
131 old_value =
132 Some(self.read_overflow_pages(overflow_page, total_len as usize)?);
133 self.free_overflow_pages(overflow_page)?;
135 } else {
136 let (_, cell_value) = Self::decode_leaf_cell(cell)?;
137 old_value = Some(cell_value.to_vec());
138 }
139 page.delete_cell(i);
140 break;
141 }
142 }
143 }
144
145 if old_value.is_some() {
151 page.compact();
152 }
153
154 let ps = page.as_bytes().len();
156 let inline_cell = Self::encode_leaf_cell(key, value);
157 let needs_overflow = inline_cell.len() > max_inline_cell_size(ps);
158
159 let cell = if needs_overflow {
160 drop(page);
162 drop(guard);
163 let overflow_page_id = self.write_overflow_pages(value)?;
164 let overflow_cell =
165 Self::encode_overflow_cell(key, overflow_page_id, value.len() as u32);
166
167 let guard = self.buffer_pool.get_page_for_write(leaf_page_id)?;
169 let mut page = guard.write();
170
171 if page.insert_cell(&overflow_cell).is_some() {
172 return Ok(old_value);
173 }
174
175 drop(page);
177 drop(guard);
178 self.insert_overflow_with_split(
179 key,
180 overflow_page_id,
181 value.len() as u32,
182 leaf_page_id,
183 &path,
184 )?;
185 return Ok(old_value);
186 } else {
187 inline_cell
188 };
189
190 if page.insert_cell(&cell).is_some() {
192 return Ok(old_value);
193 }
194
195 drop(page);
197 drop(guard);
198
199 self.insert_with_split(key, value, leaf_page_id, &path)?;
200
201 Ok(old_value)
202 }
203
204 pub fn delete(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
207 let (leaf_page_id, _) = self.find_leaf(key)?;
208
209 let guard = self.buffer_pool.get_page_for_write(leaf_page_id)?;
210 let mut page = guard.write();
211
212 for i in 0..page.item_count() as usize {
213 if let Some(cell) = page.get_cell(i) {
214 let (cell_key, _) = Self::decode_leaf_cell(cell)?;
215
216 if cell_key == key {
217 let old_value = if Self::cell_is_overflow(cell) {
218 let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
219 let meta = &cell[6 + key_len..];
220 let (overflow_page, total_len) = Self::decode_overflow_meta(meta)?;
221 let val = self.read_overflow_pages(overflow_page, total_len as usize)?;
222 self.free_overflow_pages(overflow_page)?;
223 val
224 } else {
225 let (_, cell_value) = Self::decode_leaf_cell(cell)?;
226 cell_value.to_vec()
227 };
228 page.delete_cell(i);
229 return Ok(Some(old_value));
230 }
231 }
232 }
233
234 Ok(None)
235 }
236
237 pub fn range<'a>(&'a self, start: &[u8], end: &[u8]) -> Result<RangeIterator<'a>> {
239 let (leaf_id, _) = self.find_leaf(start)?;
240
241 Ok(RangeIterator {
242 btree: self,
243 current_page: Some(leaf_id),
244 start_key: start.to_vec(),
245 end_key: end.to_vec(),
246 started: false,
247 buffered: Vec::new(),
248 })
249 }
250
251 pub fn iter(&self) -> Result<RangeIterator<'_>> {
253 let mut current = self.root_page;
255
256 loop {
257 let guard = self.buffer_pool.get_page(current)?;
258 let page = guard.read();
259
260 match page.page_type() {
261 PageType::Leaf => {
262 return Ok(RangeIterator {
263 btree: self,
264 current_page: Some(current),
265 start_key: vec![],
266 end_key: vec![0xFF; 256], started: true,
268 buffered: Vec::new(),
269 });
270 }
271 PageType::Branch => {
272 if let Some(cell) = page.get_cell(0) {
274 let (child_id, _) = Self::decode_branch_cell(cell)?;
275 current = child_id;
276 } else {
277 return Err(Error::Internal("Empty branch page".into()));
278 }
279 }
280 _ => return Err(Error::Internal("Unexpected page type in B-tree".into())),
281 }
282 }
283 }
284
285 pub fn for_each_value<F>(&self, mut f: F) -> Result<()>
290 where
291 F: FnMut(&[u8]) -> Result<bool>,
292 {
293 let mut current = self.root_page;
295 loop {
296 let guard = self.buffer_pool.get_page(current)?;
297 let page = guard.read();
298 match page.page_type() {
299 PageType::Leaf => break,
300 PageType::Branch => {
301 if let Some(cell) = page.get_cell(0) {
302 let (child_id, _) = Self::decode_branch_cell(cell)?;
303 current = child_id;
304 } else {
305 return Err(Error::Internal("Empty branch page".into()));
306 }
307 }
308 _ => return Err(Error::Internal("Unexpected page type in B-tree".into())),
309 }
310 }
311
312 let mut leaf_page = Some(current);
314 while let Some(page_id) = leaf_page {
315 let guard = self.buffer_pool.get_page(page_id)?;
316 let page = guard.read();
317
318 let count = page.item_count() as usize;
319 for i in 0..count {
320 let slot = match page.get_slot(i) {
322 Some(s) => s,
323 None => continue,
324 };
325 if slot.is_deleted() {
326 continue;
327 }
328
329 if let Some(cell) = page.get_cell(i) {
330 if Self::cell_is_overflow(cell) {
331 let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
333 let meta = &cell[6 + key_len..];
334 let (overflow_page, total_len) = Self::decode_overflow_meta(meta)?;
335 let value = self.read_overflow_pages(overflow_page, total_len as usize)?;
336 if !f(&value)? {
337 return Ok(());
338 }
339 } else {
340 let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
342 let value_len =
343 u32::from_le_bytes([cell[2], cell[3], cell[4], cell[5]]) as usize;
344 let value_start = 6 + key_len;
345 let value = &cell[value_start..value_start + value_len];
346 if !f(value)? {
347 return Ok(());
348 }
349 }
350 }
351 }
352
353 leaf_page = page.right_sibling();
354 }
355
356 Ok(())
357 }
358
359 pub fn count_leaf_items(&self) -> Result<u64> {
362 let mut current = self.root_page;
364 loop {
365 let guard = self.buffer_pool.get_page(current)?;
366 let page = guard.read();
367 match page.page_type() {
368 PageType::Leaf => break,
369 PageType::Branch => {
370 if let Some(cell) = page.get_cell(0) {
371 let (child_id, _) = Self::decode_branch_cell(cell)?;
372 current = child_id;
373 } else {
374 return Err(Error::Internal("Empty branch page".into()));
375 }
376 }
377 _ => return Err(Error::Internal("Unexpected page type in B-tree".into())),
378 }
379 }
380
381 let mut total: u64 = 0;
382 let mut leaf_page = Some(current);
383 while let Some(page_id) = leaf_page {
384 let guard = self.buffer_pool.get_page(page_id)?;
385 let page = guard.read();
386 total += page.count_live_items() as u64;
387 leaf_page = page.right_sibling();
388 }
389
390 Ok(total)
391 }
392
393 fn find_leaf(&self, key: &[u8]) -> Result<(PageId, Vec<PageId>)> {
395 let mut path = Vec::new();
396 let mut current = self.root_page;
397
398 loop {
399 path.push(current);
400 let guard = self.buffer_pool.get_page(current)?;
401 let page = guard.read();
402
403 match page.page_type() {
404 PageType::Leaf => return Ok((current, path)),
405 PageType::Branch => {
406 current = self.find_child(&page, key)?;
407 }
408 _ => {
409 return Err(Error::InvalidPageType {
410 page_id: current,
411 page_type: page.page_type() as u8,
412 })
413 }
414 }
415 }
416 }
417
418 fn find_child(&self, page: &Page, key: &[u8]) -> Result<PageId> {
420 for i in 0..page.item_count() as usize {
424 if let Some(cell) = page.get_cell(i) {
425 let (child_id, separator) = Self::decode_branch_cell(cell)?;
426
427 if separator.is_empty() || key < separator {
429 return Ok(child_id);
430 }
431 }
432 }
433
434 if let Some(cell) = page.get_cell(page.item_count() as usize - 1) {
436 let (child_id, _) = Self::decode_branch_cell(cell)?;
437 return Ok(child_id);
438 }
439
440 Err(Error::Internal("Branch page has no children".into()))
441 }
442
443 fn insert_with_split(
446 &mut self,
447 key: &[u8],
448 value: &[u8],
449 leaf_page_id: PageId,
450 path: &[PageId],
451 ) -> Result<()> {
452 let new_cell = Self::encode_leaf_cell(key, value);
453 self.split_and_insert_cell(key, &new_cell, leaf_page_id, path)
454 }
455
456 fn insert_overflow_with_split(
458 &mut self,
459 key: &[u8],
460 overflow_page: PageId,
461 total_len: u32,
462 leaf_page_id: PageId,
463 path: &[PageId],
464 ) -> Result<()> {
465 let new_cell = Self::encode_overflow_cell(key, overflow_page, total_len);
466 self.split_and_insert_cell(key, &new_cell, leaf_page_id, path)
467 }
468
469 fn split_and_insert_cell(
472 &mut self,
473 key: &[u8],
474 new_cell: &[u8],
475 leaf_page_id: PageId,
476 path: &[PageId],
477 ) -> Result<()> {
478 let guard = self.buffer_pool.get_page(leaf_page_id)?;
480 let page = guard.read();
481
482 let mut entries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
484 for i in 0..page.item_count() as usize {
485 if let Some(cell) = page.get_cell(i) {
486 let slot = page.get_slot(i).unwrap();
487 if !slot.is_deleted() {
488 let (k, _) = Self::decode_leaf_cell(cell)?;
489 entries.push((k.to_vec(), cell.to_vec()));
490 }
491 }
492 }
493
494 let right_sibling = page.right_sibling();
495 drop(page);
496 drop(guard);
497
498 let insert_pos = entries
500 .iter()
501 .position(|(k, _)| k.as_slice() > key)
502 .unwrap_or(entries.len());
503 entries.insert(insert_pos, (key.to_vec(), new_cell.to_vec()));
504
505 let mid = entries.len() / 2;
507 let left_entries = &entries[..mid];
508 let right_entries = &entries[mid..];
509
510 let split_key = right_entries[0].0.clone();
512
513 let (new_page_id, new_guard) = self.buffer_pool.allocate_page(PageType::Leaf)?;
515 {
516 let mut new_page = new_guard.write();
517 new_page.set_right_sibling(right_sibling);
518
519 for (_, raw_cell) in right_entries {
520 new_page.insert_cell(raw_cell);
521 }
522 }
523
524 let guard = self.buffer_pool.get_page_for_write(leaf_page_id)?;
526 {
527 let mut page = guard.write();
528
529 let page_id = page.page_id();
531 let page_type = page.page_type();
532 let parent = page.parent();
533 let page_size = page.as_bytes().len();
534
535 *page = Page::new(page_id, page_type, page_size);
536 page.set_parent(parent);
537 page.set_right_sibling(Some(new_page_id));
538
539 for (_, raw_cell) in left_entries {
540 page.insert_cell(raw_cell);
541 }
542 }
543
544 if path.len() > 1 {
546 let parent_id = path[path.len() - 2];
547 self.insert_into_parent(parent_id, &split_key, new_page_id, &path[..path.len() - 1])?;
548 } else {
549 self.create_new_root(leaf_page_id, &split_key, new_page_id)?;
551 }
552
553 Ok(())
554 }
555
556 fn insert_into_parent(
558 &mut self,
559 parent_id: PageId,
560 separator: &[u8],
561 child_id: PageId,
562 path: &[PageId],
563 ) -> Result<()> {
564 let guard = self.buffer_pool.get_page_for_write(parent_id)?;
565 let mut page = guard.write();
566
567 let cell = Self::encode_branch_cell(child_id, separator);
568
569 if page.insert_cell(&cell).is_some() {
570 return Ok(());
571 }
572
573 let mut entries: Vec<(Vec<u8>, PageId)> = Vec::new();
576 for i in 0..page.item_count() as usize {
577 if let Some(cell) = page.get_cell(i) {
578 let slot = page.get_slot(i).unwrap();
579 if !slot.is_deleted() {
580 let (child, sep) = Self::decode_branch_cell(cell)?;
581 entries.push((sep.to_vec(), child));
582 }
583 }
584 }
585 drop(page);
586 drop(guard);
587
588 let insert_pos = entries
590 .iter()
591 .position(|(k, _)| k.as_slice() > separator)
592 .unwrap_or(entries.len());
593 entries.insert(insert_pos, (separator.to_vec(), child_id));
594
595 let mid = entries.len() / 2;
597 let left_entries = &entries[..mid];
598 let right_entries = &entries[mid + 1..]; let (middle_key, _) = &entries[mid];
600
601 let (new_page_id, new_guard) = self.buffer_pool.allocate_page(PageType::Branch)?;
603 {
604 let mut new_page = new_guard.write();
605 for (sep, child) in right_entries {
606 let cell = Self::encode_branch_cell(*child, sep);
607 new_page.insert_cell(&cell);
608 }
609 }
610
611 let guard = self.buffer_pool.get_page_for_write(parent_id)?;
613 {
614 let mut page = guard.write();
615
616 let page_id = page.page_id();
618 let page_type = page.page_type();
619 let grandparent = page.parent();
620 let page_size = page.as_bytes().len();
621
622 *page = Page::new(page_id, page_type, page_size);
623 page.set_parent(grandparent);
624
625 for (sep, child) in left_entries {
626 let cell = Self::encode_branch_cell(*child, sep);
627 page.insert_cell(&cell);
628 }
629 }
630 drop(guard);
631
632 for (_, child_page_id) in right_entries {
634 let guard = self.buffer_pool.get_page_for_write(*child_page_id)?;
635 let mut page = guard.write();
636 page.set_parent(new_page_id);
637 }
638
639 let parent_pos = path.iter().position(|&p| p == parent_id);
642 if let Some(pos) = parent_pos {
643 if pos > 0 {
644 let grandparent_id = path[pos - 1];
646 self.insert_into_parent(grandparent_id, middle_key, new_page_id, &path[..pos])?;
647 } else {
648 self.create_new_root(parent_id, middle_key, new_page_id)?;
650 }
651 } else {
652 self.create_new_root(parent_id, middle_key, new_page_id)?;
654 }
655
656 Ok(())
657 }
658
659 fn create_new_root(
661 &mut self,
662 left_child: PageId,
663 separator: &[u8],
664 right_child: PageId,
665 ) -> Result<()> {
666 let (new_root_id, guard) = self.buffer_pool.allocate_page(PageType::Branch)?;
667 {
668 let mut page = guard.write();
669
670 let left_cell = Self::encode_branch_cell(left_child, &[]);
672 page.insert_cell(&left_cell);
673
674 let right_cell = Self::encode_branch_cell(right_child, separator);
676 page.insert_cell(&right_cell);
677 }
678
679 {
681 let guard = self.buffer_pool.get_page_for_write(left_child)?;
682 let mut page = guard.write();
683 page.set_parent(new_root_id);
684 }
685 {
686 let guard = self.buffer_pool.get_page_for_write(right_child)?;
687 let mut page = guard.write();
688 page.set_parent(new_root_id);
689 }
690
691 self.root_page = new_root_id;
692 Ok(())
693 }
694
695 fn encode_leaf_cell(key: &[u8], value: &[u8]) -> Vec<u8> {
697 let mut buf = Vec::with_capacity(6 + key.len() + value.len());
698 buf.put_u16_le(key.len() as u16);
699 buf.put_u32_le(value.len() as u32);
700 buf.extend_from_slice(key);
701 buf.extend_from_slice(value);
702 buf
703 }
704
705 fn encode_overflow_cell(key: &[u8], overflow_page: PageId, total_len: u32) -> Vec<u8> {
708 let mut buf = Vec::with_capacity(6 + key.len() + 12);
709 buf.put_u16_le(key.len() as u16);
710 buf.put_u32_le(OVERFLOW_MARKER);
711 buf.extend_from_slice(key);
712 buf.put_u64_le(overflow_page.0);
713 buf.put_u32_le(total_len);
714 buf
715 }
716
717 fn decode_leaf_cell(cell: &[u8]) -> Result<(&[u8], &[u8])> {
721 if cell.len() < 6 {
722 return Err(Error::Internal("Leaf cell too small".into()));
723 }
724
725 let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
726 let value_len = u32::from_le_bytes([cell[2], cell[3], cell[4], cell[5]]);
727
728 if value_len == OVERFLOW_MARKER {
729 let key_end = 6 + key_len;
732 if cell.len() < key_end {
733 return Err(Error::Internal("Overflow cell key truncated".into()));
734 }
735 let key = &cell[6..key_end];
736 let meta = &cell[key_end..];
739 Ok((key, meta))
740 } else {
741 let value_len = value_len as usize;
742 if cell.len() < 6 + key_len + value_len {
743 return Err(Error::Internal("Leaf cell data truncated".into()));
744 }
745 let key = &cell[6..6 + key_len];
746 let value = &cell[6 + key_len..6 + key_len + value_len];
747 Ok((key, value))
748 }
749 }
750
751 fn cell_is_overflow(cell: &[u8]) -> bool {
753 if cell.len() < 6 {
754 return false;
755 }
756 u32::from_le_bytes([cell[2], cell[3], cell[4], cell[5]]) == OVERFLOW_MARKER
757 }
758
759 fn write_overflow_pages(&self, value: &[u8]) -> Result<PageId> {
762 let ps = self.page_size();
763 let cap = overflow_data_capacity(ps);
764
765 let mut remaining = value;
766 let mut first_page: Option<PageId> = None;
767 let mut prev_page: Option<PageId> = None;
768
769 while !remaining.is_empty() {
770 let chunk_len = remaining.len().min(cap);
771 let (chunk, rest) = remaining.split_at(chunk_len);
772 remaining = rest;
773
774 let (page_id, guard) = self.buffer_pool.allocate_page(PageType::Overflow)?;
775 {
776 let mut page = guard.write();
777 let bytes = page.as_bytes_mut();
778 let hdr = constants::PAGE_HEADER_SIZE;
780 bytes[hdr..hdr + 2].copy_from_slice(&(chunk_len as u16).to_le_bytes());
781 let data_start = hdr + OVERFLOW_CHUNK_HEADER;
783 bytes[data_start..data_start + chunk_len].copy_from_slice(chunk);
784 }
785
786 if let Some(prev_id) = prev_page {
788 let prev_guard = self.buffer_pool.get_page_for_write(prev_id)?;
789 let mut prev = prev_guard.write();
790 prev.set_right_sibling(Some(page_id));
791 }
792
793 if first_page.is_none() {
794 first_page = Some(page_id);
795 }
796 prev_page = Some(page_id);
797 }
798
799 first_page.ok_or_else(|| Error::Internal("Empty overflow write".into()))
800 }
801
802 fn read_overflow_pages(&self, first_page: PageId, total_len: usize) -> Result<Vec<u8>> {
804 let mut result = Vec::with_capacity(total_len);
805 let mut current = Some(first_page);
806
807 while let Some(page_id) = current {
808 let guard = self.buffer_pool.get_page(page_id)?;
809 let page = guard.read();
810 let bytes = page.as_bytes();
811 let hdr = constants::PAGE_HEADER_SIZE;
812 let chunk_len = u16::from_le_bytes([bytes[hdr], bytes[hdr + 1]]) as usize;
813 let data_start = hdr + OVERFLOW_CHUNK_HEADER;
814 if data_start + chunk_len > bytes.len() {
815 return Err(Error::Internal(
816 "Overflow page data exceeds page size".into(),
817 ));
818 }
819 result.extend_from_slice(&bytes[data_start..data_start + chunk_len]);
820 current = page.right_sibling();
821 }
822
823 if result.len() != total_len {
824 return Err(Error::Internal(format!(
825 "Overflow data length mismatch: expected {}, got {}",
826 total_len,
827 result.len()
828 )));
829 }
830 Ok(result)
831 }
832
833 fn decode_overflow_meta(meta: &[u8]) -> Result<(PageId, u32)> {
835 if meta.len() < 12 {
836 return Err(Error::Internal("Overflow metadata too small".into()));
837 }
838 let page_id = PageId(u64::from_le_bytes(meta[0..8].try_into().unwrap()));
839 let total_len = u32::from_le_bytes(meta[8..12].try_into().unwrap());
840 Ok((page_id, total_len))
841 }
842
843 fn decode_leaf_cell_full(&self, cell: &[u8]) -> Result<(Vec<u8>, Vec<u8>)> {
846 let (key, val_or_meta) = Self::decode_leaf_cell(cell)?;
847 if Self::cell_is_overflow(cell) {
848 let (overflow_page, total_len) = Self::decode_overflow_meta(val_or_meta)?;
849 let value = self.read_overflow_pages(overflow_page, total_len as usize)?;
850 Ok((key.to_vec(), value))
851 } else {
852 Ok((key.to_vec(), val_or_meta.to_vec()))
853 }
854 }
855
856 fn free_overflow_pages(&self, first_page: PageId) -> Result<()> {
858 let mut current = Some(first_page);
859 while let Some(page_id) = current {
860 let guard = self.buffer_pool.get_page(page_id)?;
861 let page = guard.read();
862 current = page.right_sibling();
863 drop(page);
866 drop(guard);
867 let guard = self.buffer_pool.get_page_for_write(page_id)?;
868 let mut page = guard.write();
869 let ps = page.as_bytes().len();
870 *page = Page::new(page_id, PageType::Free, ps);
871 }
872 Ok(())
873 }
874
875 fn page_size(&self) -> usize {
877 self.buffer_pool
880 .get_page(self.root_page)
881 .map(|g| g.read().as_bytes().len())
882 .unwrap_or(4096)
883 }
884
885 fn encode_branch_cell(child_id: PageId, key: &[u8]) -> Vec<u8> {
887 let mut buf = Vec::with_capacity(10 + key.len());
888 buf.put_u64_le(child_id.0);
889 buf.put_u16_le(key.len() as u16);
890 buf.extend_from_slice(key);
891 buf
892 }
893
894 fn decode_branch_cell(cell: &[u8]) -> Result<(PageId, &[u8])> {
896 if cell.len() < 10 {
897 return Err(Error::Internal("Branch cell too small".into()));
898 }
899
900 let child_id = PageId(u64::from_le_bytes(cell[0..8].try_into().unwrap()));
901 let key_len = u16::from_le_bytes([cell[8], cell[9]]) as usize;
902
903 if cell.len() < 10 + key_len {
904 return Err(Error::Internal("Branch cell data truncated".into()));
905 }
906
907 let key = &cell[10..10 + key_len];
908
909 Ok((child_id, key))
910 }
911}
912
913pub struct RangeIterator<'a> {
915 btree: &'a BTree,
916 current_page: Option<PageId>,
917 start_key: Vec<u8>,
918 end_key: Vec<u8>,
919 started: bool,
920 buffered: Vec<(Vec<u8>, Vec<u8>)>,
922}
923
924impl<'a> Iterator for RangeIterator<'a> {
925 type Item = Result<(Vec<u8>, Vec<u8>)>;
926
927 fn next(&mut self) -> Option<Self::Item> {
928 loop {
929 if let Some(pair) = self.buffered.pop() {
931 if pair.0.as_slice() > self.end_key.as_slice() {
933 return None;
934 }
935 return Some(Ok(pair)); }
937
938 let page_id = self.current_page?;
940 let guard = match self.btree.buffer_pool.get_page(page_id) {
941 Ok(g) => g,
942 Err(e) => return Some(Err(e)),
943 };
944 let page = guard.read();
945
946 self.buffered.clear();
947
948 let start_cell = if !self.started {
950 self.started = true;
951 let mut found_start = page.item_count() as usize; for i in 0..page.item_count() as usize {
953 if let Some(cell) = page.get_cell(i) {
954 match BTree::decode_leaf_cell(cell) {
955 Ok((key, _)) if key >= self.start_key.as_slice() => {
956 found_start = i;
957 break;
958 }
959 Ok(_) => {}
960 Err(e) => return Some(Err(e)),
961 }
962 }
963 }
964 found_start
965 } else {
966 0
967 };
968
969 for i in start_cell..page.item_count() as usize {
971 if let Some(cell) = page.get_cell(i) {
972 let slot = match page.get_slot(i) {
973 Some(s) => s,
974 None => continue,
975 };
976 if slot.is_deleted() {
977 continue;
978 }
979 match self.btree.decode_leaf_cell_full(cell) {
981 Ok((key, value)) => {
982 self.buffered.push((key, value));
983 }
984 Err(e) => return Some(Err(e)),
985 }
986 }
987 }
988
989 self.buffered.reverse();
991
992 self.current_page = page.right_sibling();
994 }
999 }
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004 use super::*;
1005 use crate::FileManager;
1006 use featherdb_core::Config;
1007 use tempfile::TempDir;
1008
1009 fn create_test_btree() -> (TempDir, BTree) {
1010 let tmp = TempDir::new().unwrap();
1011 let path = tmp.path().join("test.db");
1012 let config = Config::new(&path);
1013
1014 let fm = Arc::new(FileManager::open(&config).unwrap());
1015 fm.init_if_needed(&config).unwrap();
1016
1017 let bp = Arc::new(BufferPool::new(100, fm));
1018 let btree = BTree::new(bp).unwrap();
1019
1020 (tmp, btree)
1021 }
1022
1023 #[test]
1024 fn test_btree_insert_get() {
1025 let (_tmp, mut btree) = create_test_btree();
1026
1027 btree.insert(b"key1", b"value1").unwrap();
1029 btree.insert(b"key2", b"value2").unwrap();
1030 btree.insert(b"key3", b"value3").unwrap();
1031
1032 assert_eq!(btree.get(b"key1").unwrap(), Some(b"value1".to_vec()));
1034 assert_eq!(btree.get(b"key2").unwrap(), Some(b"value2".to_vec()));
1035 assert_eq!(btree.get(b"key3").unwrap(), Some(b"value3".to_vec()));
1036 assert_eq!(btree.get(b"nonexistent").unwrap(), None);
1037 }
1038
1039 #[test]
1040 fn test_btree_update() {
1041 let (_tmp, mut btree) = create_test_btree();
1042
1043 btree.insert(b"key", b"value1").unwrap();
1044 let old = btree.insert(b"key", b"value2").unwrap();
1045
1046 assert_eq!(old, Some(b"value1".to_vec()));
1047 assert_eq!(btree.get(b"key").unwrap(), Some(b"value2".to_vec()));
1048 }
1049
1050 #[test]
1051 fn test_btree_delete() {
1052 let (_tmp, mut btree) = create_test_btree();
1053
1054 btree.insert(b"key", b"value").unwrap();
1055 let old = btree.delete(b"key").unwrap();
1056
1057 assert_eq!(old, Some(b"value".to_vec()));
1058 assert_eq!(btree.get(b"key").unwrap(), None);
1059 }
1060
1061 #[test]
1062 fn test_btree_iterator() {
1063 let (_tmp, mut btree) = create_test_btree();
1064
1065 btree.insert(b"a", b"1").unwrap();
1066 btree.insert(b"b", b"2").unwrap();
1067 btree.insert(b"c", b"3").unwrap();
1068
1069 let entries: Vec<_> = btree.iter().unwrap().collect();
1070 assert_eq!(entries.len(), 3);
1071 }
1072
1073 #[test]
1074 fn test_btree_many_inserts() {
1075 let (_tmp, mut btree) = create_test_btree();
1076
1077 for i in 0..100 {
1079 let key = format!("key{:04}", i);
1080 let value = format!("value{}", i);
1081 btree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1082 }
1083
1084 for i in 0..100 {
1086 let key = format!("key{:04}", i);
1087 let value = format!("value{}", i);
1088 assert_eq!(btree.get(key.as_bytes()).unwrap(), Some(value.into_bytes()));
1089 }
1090 }
1091}