1use std::fmt;
2
3use bit_vec::BitVec;
4use log::{debug, error};
5
6use super::{
7 BTreeBasePage, BTreePage, BTreePageID, PageCategory,
8 EMPTY_PAGE_ID,
9};
10use crate::{
11 btree::{
12 consts::INDEX_SIZE, page_cache::PageCache, tuple::Schema,
13 },
14 concurrent_status::Permission,
15 error::SmallError,
16 field::{get_type_length, IntField},
17 io::{SmallReader, SmallWriter, Vaporizable},
18 transaction::Transaction,
19 types::SmallResult,
20 utils::{floor_div, HandyRwLock},
21 Unique,
22};
23
24pub struct BTreeInternalPage {
41 base: BTreeBasePage,
42
43 keys: Vec<IntField>,
44
45 children: Vec<BTreePageID>,
70
71 slot_count: usize,
75
76 header: BitVec<u32>,
83
84 children_category: PageCategory,
85
86 old_data: Vec<u8>,
87}
88
89impl BTreeInternalPage {
90 fn new(
91 pid: &BTreePageID,
92 bytes: &[u8],
93 tuple_scheme: &Schema,
94 key_field: usize,
95 ) -> Self {
96 let mut instance: Self;
97
98 if BTreeBasePage::is_empty_page(&bytes) {
99 instance = Self::new_empty_page(
100 pid,
101 bytes,
102 tuple_scheme,
103 key_field,
104 );
105 } else {
106 let key_size = get_type_length(
107 tuple_scheme.fields[key_field].field_type,
108 );
109 let slot_count = Self::get_children_cap(key_size) + 1;
110
111 let mut reader = SmallReader::new(&bytes);
112
113 let category = PageCategory::read_from(&mut reader);
115 if category != PageCategory::Internal {
116 panic!(
117 "The page category of the internal page is not
118 correct, expect: {:?}, actual: {:?}",
119 PageCategory::Internal,
120 category,
121 );
122 }
123
124 let parent_pid = BTreePageID::new(
126 PageCategory::Internal,
127 pid.get_table_id(),
128 u32::read_from(&mut reader),
129 );
130
131 let children_category =
133 PageCategory::read_from(&mut reader);
134
135 let header = BitVec::read_from(&mut reader);
137
138 let mut keys: Vec<IntField> = Vec::new();
140 keys.push(IntField::new(0));
141 for _ in 1..slot_count {
142 let key = IntField::read_from(&mut reader);
143 keys.push(key);
144 }
145
146 let mut children: Vec<BTreePageID> = Vec::new();
148 for _ in 0..slot_count {
149 let child = BTreePageID::new(
150 children_category,
151 pid.get_table_id(),
152 u32::read_from(&mut reader),
153 );
154 children.push(child);
155 }
156
157 let mut base = BTreeBasePage::new(pid);
158 base.set_parent_pid(&parent_pid);
159
160 instance = Self {
161 base,
162 keys,
163 children,
164 slot_count,
165 header,
166 children_category,
167 old_data: Vec::new(),
168 };
169 }
170
171 instance.set_before_image();
172 return instance;
173 }
174
175 fn new_empty_page(
176 pid: &BTreePageID,
177 bytes: &[u8],
178 tuple_scheme: &Schema,
179 key_field: usize,
180 ) -> Self {
181 let key_size = get_type_length(
182 tuple_scheme.fields[key_field].field_type,
183 );
184 let slot_count = Self::get_children_cap(key_size) + 1;
185
186 let mut reader = SmallReader::new(&bytes);
187
188 let parent_pid = BTreePageID::new(
189 PageCategory::Internal,
190 pid.get_table_id(),
191 EMPTY_PAGE_ID,
192 );
193
194 let children_category = PageCategory::Leaf;
195
196 let mut header = BitVec::new();
197 header.grow(slot_count, false);
198
199 let mut keys: Vec<IntField> = Vec::new();
201 keys.push(IntField::new(0));
202 for _ in 1..slot_count {
203 let key = IntField::read_from(&mut reader);
204 keys.push(key);
205 }
206
207 let mut children: Vec<BTreePageID> = Vec::new();
209 for _ in 0..slot_count {
210 let child = BTreePageID::new(
211 children_category,
212 pid.get_table_id(),
213 u32::read_from(&mut reader),
214 );
215 children.push(child);
216 }
217
218 let mut base = BTreeBasePage::new(pid);
219 base.set_parent_pid(&parent_pid);
220
221 Self {
222 base,
223 keys,
224 children,
225 slot_count,
226 header,
227 children_category,
228 old_data: Vec::new(),
229 }
230 }
231
232 pub fn get_coresponding_entry(
233 &self,
234 left_pid: Option<&BTreePageID>,
235 right_pid: Option<&BTreePageID>,
236 ) -> Option<Entry> {
237 let mut it = BTreeInternalPageIterator::new(self);
238 let mut entry = None;
239 for e in it.by_ref() {
240 if let Some(left) = left_pid {
241 if e.get_left_child() != *left {
242 continue;
243 }
244 }
245 if let Some(right) = right_pid {
246 if e.get_right_child() != *right {
247 continue;
248 }
249 }
250
251 entry = Some(e);
252 break;
253 }
254
255 entry
258 }
259
260 pub fn stable(&self) -> bool {
261 if self.get_parent_pid().category == PageCategory::RootPointer
262 {
263 return true;
264 }
265
266 let max_empty_slots =
267 floor_div(self.get_children_capacity(), 2);
268 return self.empty_slots_count() <= max_empty_slots;
269 }
270
271 pub fn get_entry(&self, index: usize) -> Option<Entry> {
272 if self.is_slot_used(index) {
273 Some(Entry::new(
274 self.keys[index],
275 &self.children[index - 1],
276 &self.children[index],
277 ))
278 } else {
279 None
280 }
281 }
282
283 pub fn delete_key_and_right_child(&mut self, record_id: usize) {
284 self.mark_slot_status(record_id, false);
285 }
286
287 pub fn delete_key_and_left_child(&mut self, record_id: usize) {
288 for i in (0..record_id).rev() {
289 if self.is_slot_used(i) {
290 self.children[i] = self.children[record_id];
292
293 self.mark_slot_status(record_id, false);
294 return;
295 }
296 }
297 }
298
299 pub fn update_entry(&mut self, entry: &Entry) {
300 let record_id = entry.get_record_id();
301
302 for i in (0..record_id).rev() {
304 if self.is_slot_used(i) {
305 self.children[i] = entry.get_left_child();
306 break;
307 }
308 }
309
310 self.children[record_id] = entry.get_right_child();
311 self.keys[record_id] = entry.get_key();
312 }
313
314 pub fn is_slot_used(&self, slot_index: usize) -> bool {
316 self.header[slot_index]
317 }
318
319 fn move_entry(&mut self, from: usize, to: usize) {
320 if self.is_slot_used(from) && !self.is_slot_used(to) {
321 self.keys[to] = self.keys[from];
322
323 self.children[to] = self.children[from];
328
329 self.mark_slot_status(from, false);
330 self.mark_slot_status(to, true);
331 } else {
332 }
334 }
335
336 fn mark_slot_status(&mut self, slot_index: usize, used: bool) {
337 self.header.set(slot_index, used);
338 }
339
340 pub fn get_child_pid(
348 &self,
349 _index: usize,
350 ) -> Option<BTreePageID> {
351 unimplemented!()
352 }
353
354 pub fn get_first_child_pid(&self) -> BTreePageID {
355 let mut it = BTreeInternalPageIterator::new(self);
356 return it.next().unwrap().get_left_child();
357 }
358
359 pub fn get_last_child_pid(&self) -> BTreePageID {
360 let mut it = BTreeInternalPageIterator::new(self);
361 return it.next_back().unwrap().get_right_child();
362 }
363
364 pub fn get_left_sibling_pid(
365 &self,
366 tx: &Transaction,
367 ) -> Option<BTreePageID> {
368 let parent_pid = self.get_parent_pid();
369 let parent_rc = Unique::mut_page_cache()
370 .get_internal_page(tx, Permission::ReadOnly, &parent_pid)
371 .unwrap();
372 let parent = parent_rc.rl();
373 let it = BTreeInternalPageIterator::new(&parent);
374 for e in it {
375 if e.get_right_child() == self.get_pid() {
376 return Some(e.get_left_child());
377 }
378 }
379 return None;
380 }
381
382 pub fn get_right_sibling_pid(
383 &self,
384 tx: &Transaction,
385 ) -> Option<BTreePageID> {
386 let parent_pid = self.get_parent_pid();
387 let parent_rc = Unique::mut_page_cache()
388 .get_internal_page(tx, Permission::ReadOnly, &parent_pid)
389 .unwrap();
390 let parent = parent_rc.rl();
391 let it = BTreeInternalPageIterator::new(&parent);
392 for e in it {
393 if e.get_left_child() == self.get_pid() {
394 return Some(e.get_right_child());
395 }
396 }
397 return None;
398 }
399
400 pub fn get_entry_by_children(
401 &self,
402 left_pid: &BTreePageID,
403 right_pid: &BTreePageID,
404 ) -> Option<Entry> {
405 let it = BTreeInternalPageIterator::new(self);
406 for entry in it {
407 if entry.get_left_child() == *left_pid
408 && entry.get_right_child() == *right_pid
409 {
410 return Some(entry);
411 }
412 }
413 None
414 }
415
416 pub fn check_integrity(
417 &self,
418 parent_pid: &BTreePageID,
419 lower_bound: Option<IntField>,
420 upper_bound: Option<IntField>,
421 check_occupancy: bool,
422 depth: usize,
423 ) {
424 assert_eq!(self.get_pid().category, PageCategory::Internal);
425 assert_eq!(&self.get_parent_pid(), parent_pid);
426
427 let mut previous = lower_bound;
428 let it = BTreeInternalPageIterator::new(self);
429 for e in it {
430 if let Some(previous) = previous {
431 assert!(
432 previous <= e.get_key(),
433 "entries are not in order, previous (lower_bound): {}, current entry: {}, current pid: {}, parent pid: {}",
434 previous,
435 e,
436 self.get_pid(),
437 self.get_parent_pid(),
438 );
439 }
440 previous = Some(e.get_key());
441 }
442
443 if let Some(upper_bound) = upper_bound {
444 if let Some(previous) = previous {
445 assert!(previous <= upper_bound);
446 }
447 }
448
449 if check_occupancy && depth > 0 {
450 assert!(
451 self.children_count()
452 >= Self::get_stable_threshold(4),
453 "children count: {}, max children: {}, pid: {:?}",
454 self.children_count(),
455 Self::get_children_cap(4),
456 self.get_pid(),
457 );
458 }
459 }
460}
461
462impl BTreeInternalPage {
464 pub fn insert_entry(&mut self, e: &Entry) -> SmallResult {
465 if self.empty_slots_count() == 0 {
466 return Err(SmallError::new(
467 "No empty slots on this page.",
468 ));
469 }
470
471 if self.entries_count() == 0 {
473 self.children_category = e.get_left_child().category;
475
476 self.children[0] = e.get_left_child();
478 self.children[1] = e.get_right_child();
479 self.keys[1] = e.get_key();
480 self.mark_slot_status(0, true);
481 self.mark_slot_status(1, true);
482
483 return Ok(());
484 }
485
486 let mut empty_slot = 0;
488 for i in 0..self.slot_count {
489 if !self.is_slot_used(i) {
490 empty_slot = i;
491 break;
492 }
493 }
494
495 let mut slot_just_ahead: usize = usize::MAX;
498 for i in 0..self.slot_count {
499 if !self.is_slot_used(i) {
500 continue;
501 }
502
503 if self.children[i] == e.get_left_child() {
506 slot_just_ahead = i;
507 break;
508 }
509
510 if self.children[i] == e.get_right_child() {
515 slot_just_ahead = i;
516 self.children[i] = e.get_left_child();
518 break;
519 }
520 }
521
522 if slot_just_ahead == usize::MAX {
523 let e = SmallError::new(&format!(
524 "No slot found for entry {}, pid: {}, entries count: {}",
525 e,
526 self.get_pid(),
527 self.entries_count()
528 ));
529 error!("{}", e);
530 return Err(e);
532 }
533
534 let good_slot: usize;
538 if empty_slot < slot_just_ahead {
539 for i in empty_slot..slot_just_ahead {
540 self.move_entry(i + 1, i);
541 }
542 good_slot = slot_just_ahead
543 } else {
544 for i in (slot_just_ahead + 1..empty_slot).rev() {
545 self.move_entry(i, i + 1);
546 }
547 good_slot = slot_just_ahead + 1
548 }
549
550 self.keys[good_slot] = e.get_key();
551 self.children[good_slot] = e.get_right_child();
552 self.mark_slot_status(good_slot, true);
553 Ok(())
554 }
555}
556
557impl BTreeInternalPage {
559 pub fn empty_slots_count(&self) -> usize {
561 let mut count = 0;
562 for i in 1..self.slot_count {
565 if !self.is_slot_used(i) {
566 count += 1
567 }
568 }
569 count
570 }
571
572 pub fn children_count(&self) -> usize {
573 self.slot_count - self.empty_slots_count()
574 }
575
576 pub fn entries_count(&self) -> usize {
577 self.slot_count - self.empty_slots_count() - 1
578 }
579}
580
581impl BTreeInternalPage {
583 pub fn get_children_capacity(&self) -> usize {
584 self.slot_count
585 }
586
587 pub fn get_stable_threshold(key_size: usize) -> usize {
590 floor_div(Self::get_children_cap(key_size), 2)
591 }
592
593 pub fn get_children_cap(key_size: usize) -> usize {
595 let bits_per_entry_including_header =
596 key_size * 8 + INDEX_SIZE * 8 + 1;
597
598 let extra_bits = (4 * INDEX_SIZE + 2) * 8 + 1;
607
608 let entries_per_page = (PageCache::get_page_size() * 8
609 - extra_bits)
610 / bits_per_entry_including_header; return entries_per_page;
612 }
613}
614
615impl BTreePage for BTreeInternalPage {
616 fn new(
617 pid: &BTreePageID,
618 bytes: &[u8],
619 tuple_scheme: &Schema,
620 key_field: usize,
621 ) -> Self {
622 Self::new(pid, bytes, tuple_scheme, key_field)
623 }
624
625 fn get_pid(&self) -> BTreePageID {
626 self.base.get_pid()
627 }
628
629 fn get_parent_pid(&self) -> BTreePageID {
630 self.base.get_parent_pid()
631 }
632
633 fn set_parent_pid(&mut self, pid: &BTreePageID) {
634 self.base.set_parent_pid(pid)
635 }
636
637 fn get_page_data(&self) -> Vec<u8> {
638 let mut writer = SmallWriter::new();
639
640 writer.write(&self.get_pid().category);
642
643 writer.write(&self.get_parent_pid().page_index);
645
646 writer.write(&self.children_category);
648
649 writer.write(&self.header);
651
652 for i in 1..self.slot_count {
654 writer.write(&self.keys[i]);
655 }
656
657 for i in 0..self.slot_count {
659 writer.write(&self.children[i].page_index);
660 }
661
662 return writer.to_padded_bytes(PageCache::get_page_size());
663 }
664
665 fn set_before_image(&mut self) {
666 self.old_data = self.get_page_data();
667 }
668
669 fn get_before_image(&self) -> Vec<u8> {
670 if self.old_data.is_empty() {
671 panic!("before image is not set");
672 }
673 return self.old_data.clone();
674 }
675
676 fn peek(&self) {
677 debug!("======start=======");
678 println!("Internal page: {}", self.get_pid());
679 println!("Parent: {}", self.get_parent_pid());
680 println!("slots count: {}", self.slot_count);
681 println!("entries count: {}", self.entries_count());
682 println!("children category: {:?}", self.children_category);
683 }
684}
685
686#[derive(Clone, Copy, Debug)]
690pub struct Entry {
691 key: IntField,
692 left: BTreePageID,
693 right: BTreePageID,
694
695 record_id: usize,
697}
698
699impl Entry {
700 pub fn new(
701 key: IntField,
702 left: &BTreePageID,
703 right: &BTreePageID,
704 ) -> Self {
705 Self {
706 key,
707 left: *left,
708 right: *right,
709
710 record_id: 0,
711 }
712 }
713
714 pub fn set_record_id(&mut self, record_id: usize) {
715 self.record_id = record_id;
716 }
717
718 pub fn get_record_id(&self) -> usize {
719 self.record_id
720 }
721
722 pub fn get_key(&self) -> IntField {
723 self.key
724 }
725
726 pub fn set_key(&mut self, key: IntField) {
727 self.key = key;
728 }
729
730 pub fn get_left_child(&self) -> BTreePageID {
731 self.left
732 }
733
734 pub fn get_right_child(&self) -> BTreePageID {
735 self.right
736 }
737}
738
739impl fmt::Display for Entry {
740 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
741 write!(f, "({}, {}, {})", self.key, self.left, self.right)
742 }
743}
744
745pub struct BTreeInternalPageIterator<'page> {
746 page: &'page BTreeInternalPage,
747
748 cursor: usize,
749 left_child_position: usize,
750
751 reverse_cursor: usize,
752 right_child_position: usize,
753}
754
755impl<'page> BTreeInternalPageIterator<'page> {
756 pub fn new(page: &'page BTreeInternalPage) -> Self {
757 let mut right_child_position = page.slot_count;
758 loop {
759 right_child_position -= 1;
760 if page.is_slot_used(right_child_position) {
761 break;
762 }
763 }
764
765 Self {
766 page,
767
768 cursor: 0,
769 left_child_position: 0,
770
771 reverse_cursor: right_child_position,
772 right_child_position,
773 }
774 }
775}
776
777impl Iterator for BTreeInternalPageIterator<'_> {
778 type Item = Entry;
779
780 fn next(&mut self) -> Option<Self::Item> {
781 loop {
782 self.cursor += 1;
783 let cursor = self.cursor;
784
785 if cursor >= self.page.slot_count {
786 return None;
787 }
788
789 if !self.page.is_slot_used(cursor) {
790 continue;
791 }
792 let mut e = Entry::new(
793 self.page.keys[cursor],
794 &self.page.children[self.left_child_position],
795 &self.page.children[cursor],
796 );
797 e.set_record_id(cursor);
798
799 self.left_child_position = cursor;
801
802 return Some(e);
803 }
804 }
805}
806
807impl<'page> DoubleEndedIterator for BTreeInternalPageIterator<'_> {
808 fn next_back(&mut self) -> Option<Self::Item> {
809 loop {
810 if let Some(left_index) =
811 self.reverse_cursor.checked_sub(1)
812 {
813 self.reverse_cursor = left_index;
814 if !self.page.is_slot_used(left_index) {
815 continue;
816 }
817
818 let mut e = Entry::new(
819 self.page.keys[self.right_child_position],
820 &self.page.children[left_index],
821 &self.page.children[self.right_child_position],
822 );
823 e.set_record_id(self.right_child_position);
824
825 self.right_child_position = left_index;
827
828 return Some(e);
829 } else {
830 return None;
831 }
832 }
833 }
834}