1use crate::buffer::{PageId, INVALID_PAGE_ID};
2use crate::catalog::{Schema, SchemaRef};
3use crate::storage::page::RecordId;
4use crate::storage::tuple::Tuple;
5use std::sync::Arc;
6
7pub const BPLUS_INTERNAL_PAGE_MAX_SIZE: usize = 10;
8pub const BPLUS_LEAF_PAGE_MAX_SIZE: usize = 10;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub struct BPlusTreeHeaderPage {
12 pub root_page_id: PageId,
13}
14
15#[derive(Debug, Clone, Eq, PartialEq)]
16pub enum BPlusTreePage {
17 Internal(BPlusTreeInternalPage),
18 Leaf(BPlusTreeLeafPage),
19}
20
21impl BPlusTreePage {
22 pub fn is_full(&self) -> bool {
23 match self {
24 Self::Internal(page) => page.is_full(),
25 Self::Leaf(page) => page.is_full(),
26 }
27 }
28 pub fn is_underflow(&self, is_root: bool) -> bool {
29 if is_root {
30 return false;
31 }
32 match self {
33 Self::Internal(page) => page.header.current_size < page.min_size(),
34 Self::Leaf(page) => page.header.current_size < page.min_size(),
35 }
36 }
37 pub fn min_size(&self) -> u32 {
38 match self {
39 Self::Internal(page) => page.min_size(),
40 Self::Leaf(page) => page.min_size(),
41 }
42 }
43 pub fn current_size(&self) -> u32 {
44 match self {
45 Self::Internal(page) => page.header.current_size,
46 Self::Leaf(page) => page.header.current_size,
47 }
48 }
49 pub fn max_size(&self) -> u32 {
50 match self {
51 Self::Internal(page) => page.header.max_size,
52 Self::Leaf(page) => page.header.max_size,
53 }
54 }
55 pub fn insert_internalkv(&mut self, internalkv: InternalKV) {
56 match self {
57 Self::Internal(page) => page.insert(internalkv.0, internalkv.1),
58 Self::Leaf(_) => panic!("Leaf page cannot insert InternalKV"),
59 }
60 }
61 pub fn can_borrow(&self) -> bool {
62 match self {
63 Self::Internal(page) => page.header.current_size > page.min_size(),
64 Self::Leaf(page) => page.header.current_size > page.min_size(),
65 }
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BPlusTreePageType {
71 LeafPage,
72 InternalPage,
73}
74
75pub type InternalKV = (Tuple, PageId);
76pub type LeafKV = (Tuple, RecordId);
77
78#[derive(Debug, Clone, Eq, PartialEq)]
90pub struct BPlusTreeInternalPage {
91 pub schema: SchemaRef,
92 pub header: BPlusTreeInternalPageHeader,
93 pub array: Vec<InternalKV>,
95 pub high_key: Option<Tuple>,
97}
98
99#[derive(Debug, Clone, Eq, PartialEq)]
100pub struct BPlusTreeInternalPageHeader {
101 pub page_type: BPlusTreePageType,
102 pub current_size: u32,
103 pub max_size: u32,
105 pub version: u32,
106 pub next_page_id: PageId,
108}
109
110impl BPlusTreeInternalPage {
111 pub fn new(schema: SchemaRef, max_size: u32) -> Self {
112 Self {
113 schema,
114 header: BPlusTreeInternalPageHeader {
115 page_type: BPlusTreePageType::InternalPage,
116 current_size: 0,
117 max_size,
118 version: 0,
119 next_page_id: INVALID_PAGE_ID,
120 },
121 array: Vec::with_capacity(max_size as usize),
122 high_key: None,
123 }
124 }
125 pub fn min_size(&self) -> u32 {
126 (self.header.max_size + 1) / 2
127 }
128 pub fn key_at(&self, index: usize) -> &Tuple {
129 &self.array[index].0
130 }
131 pub fn value_at(&self, index: usize) -> PageId {
132 self.array[index].1
133 }
134 pub fn values(&self) -> Vec<PageId> {
135 self.array.iter().map(|kv| kv.1).collect()
136 }
137
138 pub fn sibling_page_ids(&self, page_id: PageId) -> (Option<PageId>, Option<PageId>) {
139 let index = self.array.iter().position(|x| x.1 == page_id);
140 if let Some(index) = index {
141 return (
142 if index == 0 {
143 None
144 } else {
145 Some(self.array[index - 1].1)
146 },
147 if index == self.header.current_size as usize - 1 {
148 None
149 } else {
150 Some(self.array[index + 1].1)
151 },
152 );
153 }
154 (None, None)
155 }
156
157 pub fn insert(&mut self, key: Tuple, page_id: PageId) {
160 let size = self.header.current_size as usize;
161 let pos = self.array[..size]
162 .binary_search_by(|(k, _)| k.cmp(&key))
163 .unwrap_or_else(|pos| pos);
164 self.array.insert(pos, (key, page_id));
165 self.header.current_size += 1;
166 }
167 pub fn batch_insert(&mut self, kvs: Vec<InternalKV>) {
168 let kvs_len = kvs.len();
169 self.array.extend(kvs);
173 self.header.current_size += kvs_len as u32;
174 }
175
176 pub fn insert_after(&mut self, old_page_id: PageId, key: Tuple, new_page_id: PageId) {
179 let pos = self
180 .array
181 .iter()
182 .position(|&(_, pid)| pid == old_page_id)
183 .expect("old child not found in parent when inserting after");
184 self.array.insert(pos + 1, (key, new_page_id));
185 self.header.current_size += 1;
186 }
187
188 pub fn remove(&mut self, page_id: PageId) -> Option<(Tuple, PageId)> {
189 if let Some(pos) = self.array.iter().position(|&(_, pid)| pid == page_id) {
190 let removed = self.array.remove(pos);
191 self.header.current_size -= 1;
192 Some(removed)
193 } else {
194 None
195 }
196 }
197
198 pub fn delete(&mut self, key: &Tuple) {
199 if self.header.current_size == 0 {
200 return;
201 }
202 let mut start: i32 = 1;
204 let mut end: i32 = self.header.current_size as i32 - 1;
205 while start < end {
206 let mid = (start + end) / 2;
207 let compare_res = key.partial_cmp(&self.array[mid as usize].0).unwrap();
208 if compare_res == std::cmp::Ordering::Equal {
209 self.array.remove(mid as usize);
210 self.header.current_size -= 1;
211 if self.header.current_size == 1 {
213 self.array.remove(0);
214 self.header.current_size -= 1;
215 }
216 return;
217 } else if compare_res == std::cmp::Ordering::Less {
218 end = mid - 1;
219 } else {
220 start = mid + 1;
221 }
222 }
223 if key.partial_cmp(&self.array[start as usize].0).unwrap() == std::cmp::Ordering::Equal {
224 self.array.remove(start as usize);
225 self.header.current_size -= 1;
226 if self.header.current_size == 1 {
228 self.array.remove(0);
229 self.header.current_size -= 1;
230 }
231 }
232 }
233
234 pub fn delete_page_id(&mut self, page_id: PageId) {
235 for i in 0..self.header.current_size {
236 if self.array[i as usize].1 == page_id {
237 self.array.remove(i as usize);
238 self.header.current_size -= 1;
239 return;
240 }
241 }
242 }
243
244 pub fn is_full(&self) -> bool {
245 self.header.current_size > self.header.max_size
247 }
248
249 pub fn split_off(&mut self, at: usize) -> Vec<InternalKV> {
250 let new_array = self.array.split_off(at);
251 self.header.current_size -= new_array.len() as u32;
252 new_array
253 }
254
255 pub fn reverse_split_off(&mut self, at: usize) -> Vec<InternalKV> {
256 let mut new_array = Vec::new();
257 for _ in 0..=at {
258 new_array.push(self.array.remove(0));
259 }
260 self.header.current_size -= new_array.len() as u32;
261 new_array
262 }
263
264 pub fn replace_key(&mut self, old_key: &Tuple, new_key: Tuple) {
265 let key_index = self.key_index(old_key);
266 if let Some(index) = key_index {
267 self.array[index].0 = new_key;
268 }
269 }
270
271 pub fn key_index(&self, key: &Tuple) -> Option<usize> {
272 if self.header.current_size == 0 {
273 return None;
274 }
275 let size = self.header.current_size as usize;
276 self.array[..size]
277 .binary_search_by(|(k, _)| k.cmp(key))
278 .ok()
279 }
280
281 pub fn value_index(&self, page_id: PageId) -> Option<usize> {
283 if self.header.current_size == 0 {
284 return None;
285 }
286 let size = self.header.current_size as usize;
287 self.array[..size]
288 .iter()
289 .position(|(_, pid)| *pid == page_id)
290 }
291
292 pub fn look_up(&self, key: &Tuple) -> PageId {
294 let size = self.header.current_size as usize;
295 let partition_idx = self.array[1..size].partition_point(|(probe_key, _)| probe_key <= key);
296 self.value_at(partition_idx)
297 }
298
299 pub fn look_up_mut(&mut self, key: &Tuple) -> Option<&mut PageId> {
301 let size = self.header.current_size as usize;
302 if size == 0 {
303 return None;
304 }
305
306 let partition_idx = self.array[1..size].partition_point(|(probe_key, _)| probe_key <= key);
308
309 Some(&mut self.array[partition_idx].1)
310 }
311
312 pub fn remove_first_kv(&mut self) -> (Tuple, PageId) {
315 if self.array.len() != self.header.current_size as usize {
317 panic!(
318 "Page state inconsistent: array len {} != current_size {}",
319 self.array.len(),
320 self.header.current_size
321 );
322 }
323
324 if self.array.len() <= 1 {
326 panic!(
327 "remove_first_kv called on internal page with array len {} and current_size {}",
328 self.array.len(),
329 self.header.current_size
330 );
331 }
332 let result = self.array.remove(1);
333 self.header.current_size -= 1;
334 result
335 }
336 pub fn remove_last_kv(&mut self) -> (Tuple, PageId) {
337 if self.array.len() != self.header.current_size as usize {
339 panic!(
340 "Page state inconsistent: array len {} != current_size {}",
341 self.array.len(),
342 self.header.current_size
343 );
344 }
345
346 if self.array.is_empty() {
347 panic!(
348 "remove_last_kv called on empty internal page with current_size {}",
349 self.header.current_size
350 );
351 }
352
353 let result = self.array.pop().unwrap();
354 self.header.current_size -= 1;
355 assert_eq!(self.header.current_size as usize, self.array.len());
356 result
357 }
358 pub fn merge(&mut self, middle_key: Tuple, other: &mut BPlusTreeInternalPage) {
359 self.array
362 .push((middle_key, other.array.first().unwrap().1));
363
364 self.array.extend(other.array.drain(1..));
367
368 self.header.current_size = self.array.len() as u32;
369 }
370}
371
372#[derive(Debug, Clone, Eq, PartialEq)]
384pub struct BPlusTreeLeafPage {
385 pub schema: SchemaRef,
386 pub header: BPlusTreeLeafPageHeader,
387 pub array: Vec<LeafKV>,
388}
389
390#[derive(Debug, Clone, Eq, PartialEq)]
391pub struct BPlusTreeLeafPageHeader {
392 pub page_type: BPlusTreePageType,
393 pub current_size: u32,
394 pub max_size: u32,
396 pub next_page_id: PageId,
397 pub version: u32,
398}
399
400impl BPlusTreeLeafPage {
401 pub fn new(schema: SchemaRef, max_size: u32) -> Self {
402 Self {
403 schema,
404 header: BPlusTreeLeafPageHeader {
405 page_type: BPlusTreePageType::LeafPage,
406 current_size: 0,
407 max_size,
408 next_page_id: INVALID_PAGE_ID,
409 version: 0,
410 },
411 array: Vec::with_capacity(max_size as usize),
412 }
413 }
414
415 pub fn empty() -> Self {
416 Self {
417 schema: Arc::new(Schema::empty()),
418 header: BPlusTreeLeafPageHeader {
419 page_type: BPlusTreePageType::LeafPage,
420 current_size: 0,
421 max_size: 0,
422 next_page_id: INVALID_PAGE_ID,
423 version: 0,
424 },
425 array: Vec::new(),
426 }
427 }
428
429 pub fn min_size(&self) -> u32 {
430 (self.header.max_size + 1) / 2
431 }
432
433 pub fn key_at(&self, index: usize) -> &Tuple {
434 &self.array[index].0
435 }
436
437 pub fn kv_at(&self, index: usize) -> &LeafKV {
438 &self.array[index]
439 }
440
441 pub fn is_full(&self) -> bool {
442 self.header.current_size > self.header.max_size
444 }
445
446 pub fn insert(&mut self, key: Tuple, rid: RecordId) {
447 let size = self.header.current_size as usize;
448 let pos = self.array[..size]
449 .binary_search_by(|(k, _)| k.cmp(&key))
450 .unwrap_or_else(|pos| pos);
451 self.array.insert(pos, (key, rid));
452 self.header.current_size += 1;
453 }
454
455 pub fn batch_insert(&mut self, kvs: Vec<LeafKV>) {
456 let kvs_len = kvs.len();
457 self.array.extend(kvs);
458 self.header.current_size += kvs_len as u32;
459 self.array.sort_by(|a, b| a.0.cmp(&b.0));
460 }
461
462 pub fn split_off(&mut self, at: usize) -> Vec<LeafKV> {
463 let new_array = self.array.split_off(at);
464 self.header.current_size -= new_array.len() as u32;
465 new_array
466 }
467
468 pub fn reverse_split_off(&mut self, at: usize) -> Vec<LeafKV> {
469 let mut new_array = Vec::new();
470 for _ in 0..=at {
471 new_array.push(self.array.remove(0));
472 }
473 self.header.current_size -= new_array.len() as u32;
474 new_array
475 }
476
477 pub fn delete(&mut self, key: &Tuple) {
478 let key_index = self.key_index(key);
479 if let Some(index) = key_index {
480 self.array.remove(index);
481 self.header.current_size -= 1;
482 }
483 }
484
485 pub fn look_up(&self, key: &Tuple) -> Option<RecordId> {
487 let key_index = self.key_index(key);
488 key_index.map(|index| self.array[index].1)
489 }
490
491 pub fn look_up_mut(&mut self, key: &Tuple) -> Option<&mut RecordId> {
493 let key_index = self.key_index(key);
494 key_index.map(|index| &mut self.array[index].1)
495 }
496
497 fn key_index(&self, key: &Tuple) -> Option<usize> {
498 if self.header.current_size == 0 {
499 return None;
500 }
501 let size = self.header.current_size as usize;
502 self.array[..size]
503 .binary_search_by(|(k, _)| k.cmp(key))
504 .ok()
505 }
506
507 pub fn remove_first_kv(&mut self) -> (Tuple, RecordId) {
509 if self.array.len() != self.header.current_size as usize {
511 panic!(
512 "Page state inconsistent: array len {} != current_size {}",
513 self.array.len(),
514 self.header.current_size
515 );
516 }
517
518 if self.array.is_empty() {
519 panic!(
520 "remove_first_kv called on empty leaf page with current_size {}",
521 self.header.current_size
522 );
523 }
524
525 let result = self.array.remove(0);
526 self.header.current_size -= 1;
527 result
528 }
529 pub fn remove_last_kv(&mut self) -> (Tuple, RecordId) {
530 if self.array.len() != self.header.current_size as usize {
532 panic!(
533 "Page state inconsistent: array len {} != current_size {}",
534 self.array.len(),
535 self.header.current_size
536 );
537 }
538
539 if self.array.is_empty() {
540 panic!(
541 "remove_last_kv called on empty leaf page with current_size {}",
542 self.header.current_size
543 );
544 }
545
546 let result = self.array.pop().unwrap();
547 self.header.current_size -= 1;
548 result
549 }
550 pub fn merge(&mut self, other: &mut BPlusTreeLeafPage) {
551 self.array.extend(other.array.drain(..));
552 self.header.current_size = self.array.len() as u32;
553 self.header.next_page_id = other.header.next_page_id;
554 }
555
556 pub fn next_closest(&self, tuple: &Tuple, included: bool) -> Option<usize> {
557 for (idx, (key, _)) in self.array.iter().enumerate() {
558 if tuple == key && included {
559 return Some(idx);
560 }
561 if key > tuple {
562 return Some(idx);
563 }
564 }
565 None
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use crate::storage::page::{BPlusTreeInternalPage, BPlusTreeLeafPage};
572 use crate::storage::tuple::Tuple;
573 use crate::utils::scalar::ScalarValue;
574 use crate::{
575 catalog::{Column, DataType, Schema},
576 storage::page::RecordId,
577 };
578 use std::sync::Arc;
579
580 #[test]
581 pub fn test_internal_page_insert() {
582 let key_schema = Arc::new(Schema::new(vec![
583 Column::new("a", DataType::Int8, false),
584 Column::new("b", DataType::Int16, false),
585 ]));
586 let mut internal_page = BPlusTreeInternalPage::new(key_schema.clone(), 3);
587 internal_page.insert(Tuple::empty(key_schema.clone()), 0);
588 internal_page.insert(
589 Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
590 2,
591 );
592 internal_page.insert(
593 Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
594 1,
595 );
596 assert_eq!(internal_page.header.current_size, 3);
597 assert_eq!(
598 internal_page.array[0].0.data,
599 Tuple::empty(key_schema.clone()).data
600 );
601 assert_eq!(internal_page.array[0].1, 0);
602 assert_eq!(internal_page.array[1].0.data, vec![1i8.into(), 1i16.into()]);
603 assert_eq!(internal_page.array[1].1, 1);
604 assert_eq!(internal_page.array[2].0.data, vec![2i8.into(), 2i16.into()]);
605 assert_eq!(internal_page.array[2].1, 2);
606 }
607
608 #[test]
609 pub fn test_leaf_page_insert() {
610 let key_schema = Arc::new(Schema::new(vec![
611 Column::new("a", DataType::Int8, false),
612 Column::new("b", DataType::Int16, false),
613 ]));
614 let mut leaf_page = BPlusTreeLeafPage::new(key_schema.clone(), 3);
615 leaf_page.insert(
616 Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
617 RecordId::new(2, 2),
618 );
619 leaf_page.insert(
620 Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
621 RecordId::new(1, 1),
622 );
623 leaf_page.insert(
624 Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
625 RecordId::new(3, 3),
626 );
627 assert_eq!(leaf_page.header.current_size, 3);
628 assert_eq!(leaf_page.array[0].0.data, vec![1i8.into(), 1i16.into()]);
629 assert_eq!(leaf_page.array[0].1, RecordId::new(1, 1));
630 assert_eq!(leaf_page.array[1].0.data, vec![2i8.into(), 2i16.into()]);
631 assert_eq!(leaf_page.array[1].1, RecordId::new(2, 2));
632 assert_eq!(leaf_page.array[2].0.data, vec![3i8.into(), 3i16.into()]);
633 assert_eq!(leaf_page.array[2].1, RecordId::new(3, 3));
634 }
635
636 #[test]
637 pub fn test_internal_page_look_up() {
638 let key_schema = Arc::new(Schema::new(vec![
639 Column::new("a", DataType::Int8, false),
640 Column::new("b", DataType::Int16, false),
641 ]));
642 let mut internal_page = BPlusTreeInternalPage::new(key_schema.clone(), 5);
643 internal_page.insert(Tuple::empty(key_schema.clone()), 0);
644 internal_page.insert(
645 Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
646 2,
647 );
648 internal_page.insert(
649 Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
650 1,
651 );
652 internal_page.insert(
653 Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
654 3,
655 );
656 internal_page.insert(
657 Tuple::new(key_schema.clone(), vec![4i8.into(), 4i16.into()]),
658 4,
659 );
660
661 assert_eq!(
662 internal_page.look_up(&Tuple::new(
663 key_schema.clone(),
664 vec![0i8.into(), 0i16.into()]
665 ),),
666 0
667 );
668 assert_eq!(
669 internal_page.look_up(&Tuple::new(
670 key_schema.clone(),
671 vec![3i8.into(), 3i16.into()]
672 ),),
673 3
674 );
675 assert_eq!(
676 internal_page.look_up(&Tuple::new(
677 key_schema.clone(),
678 vec![5i8.into(), 5i16.into()]
679 ),),
680 4
681 );
682
683 let mut internal_page = BPlusTreeInternalPage::new(key_schema.clone(), 2);
684 internal_page.insert(Tuple::empty(key_schema.clone()), 0);
685 internal_page.insert(
686 Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
687 1,
688 );
689
690 assert_eq!(
691 internal_page.look_up(&Tuple::new(
692 key_schema.clone(),
693 vec![0i8.into(), 0i16.into()]
694 ),),
695 0
696 );
697 assert_eq!(
698 internal_page.look_up(&Tuple::new(
699 key_schema.clone(),
700 vec![1i8.into(), 1i16.into()]
701 ),),
702 1
703 );
704 assert_eq!(
705 internal_page.look_up(&Tuple::new(
706 key_schema.clone(),
707 vec![2i8.into(), 2i16.into()]
708 ),),
709 1
710 );
711 }
712
713 #[test]
714 pub fn test_leaf_page_look_up() {
715 let key_schema = Arc::new(Schema::new(vec![
716 Column::new("a", DataType::Int8, false),
717 Column::new("b", DataType::Int16, false),
718 ]));
719 let mut leaf_page = BPlusTreeLeafPage::new(key_schema.clone(), 5);
720 leaf_page.insert(
721 Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
722 RecordId::new(2, 2),
723 );
724 leaf_page.insert(
725 Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
726 RecordId::new(1, 1),
727 );
728 leaf_page.insert(
729 Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
730 RecordId::new(3, 3),
731 );
732 leaf_page.insert(
733 Tuple::new(key_schema.clone(), vec![5i8.into(), 5i16.into()]),
734 RecordId::new(5, 5),
735 );
736 leaf_page.insert(
737 Tuple::new(key_schema.clone(), vec![4i8.into(), 4i16.into()]),
738 RecordId::new(4, 4),
739 );
740 assert_eq!(
741 leaf_page.look_up(&Tuple::new(
742 key_schema.clone(),
743 vec![0i8.into(), 0i16.into()]
744 ),),
745 None
746 );
747 assert_eq!(
748 leaf_page.look_up(&Tuple::new(
749 key_schema.clone(),
750 vec![2i8.into(), 2i16.into()]
751 ),),
752 Some(RecordId::new(2, 2))
753 );
754 assert_eq!(
755 leaf_page.look_up(&Tuple::new(
756 key_schema.clone(),
757 vec![3i8.into(), 3i16.into()]
758 ),),
759 Some(RecordId::new(3, 3))
760 );
761 assert_eq!(
762 leaf_page.look_up(&Tuple::new(
763 key_schema.clone(),
764 vec![6i8.into(), 6i16.into()]
765 ),),
766 None
767 );
768
769 let mut leaf_page = BPlusTreeLeafPage::new(key_schema.clone(), 2);
770 leaf_page.insert(
771 Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
772 RecordId::new(2, 2),
773 );
774 leaf_page.insert(
775 Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
776 RecordId::new(1, 1),
777 );
778 assert_eq!(
779 leaf_page.look_up(&Tuple::new(
780 key_schema.clone(),
781 vec![ScalarValue::Int8(None), ScalarValue::Int16(None)]
782 ),),
783 None
784 );
785 assert_eq!(
786 leaf_page.look_up(&Tuple::new(
787 key_schema.clone(),
788 vec![1i8.into(), 1i16.into()]
789 ),),
790 Some(RecordId::new(1, 1))
791 );
792 assert_eq!(
793 leaf_page.look_up(&Tuple::new(
794 key_schema.clone(),
795 vec![2i8.into(), 2i16.into()]
796 ),),
797 Some(RecordId::new(2, 2))
798 );
799 assert_eq!(
800 leaf_page.look_up(&Tuple::new(
801 key_schema.clone(),
802 vec![3i8.into(), 3i16.into()]
803 ),),
804 None
805 );
806 }
807
808 #[test]
809 pub fn test_internal_page_delete() {
810 let key_schema = Arc::new(Schema::new(vec![
811 Column::new("a", DataType::Int8, false),
812 Column::new("b", DataType::Int16, false),
813 ]));
814 let mut internal_page = BPlusTreeInternalPage::new(key_schema.clone(), 5);
815 internal_page.insert(Tuple::empty(key_schema.clone()), 0);
816 internal_page.insert(
817 Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
818 2,
819 );
820 internal_page.insert(
821 Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
822 1,
823 );
824 internal_page.insert(
825 Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
826 3,
827 );
828 internal_page.insert(
829 Tuple::new(key_schema.clone(), vec![4i8.into(), 4i16.into()]),
830 4,
831 );
832
833 internal_page.delete(&Tuple::new(
834 key_schema.clone(),
835 vec![2i8.into(), 2i16.into()],
836 ));
837 assert_eq!(internal_page.header.current_size, 4);
838 assert_eq!(
839 internal_page.array[0].0.data,
840 vec![ScalarValue::Int8(None), ScalarValue::Int16(None)]
841 );
842 assert_eq!(internal_page.array[0].1, 0);
843 assert_eq!(internal_page.array[1].0.data, vec![1i8.into(), 1i16.into()]);
844 assert_eq!(internal_page.array[1].1, 1);
845 assert_eq!(internal_page.array[2].0.data, vec![3i8.into(), 3i16.into()]);
846 assert_eq!(internal_page.array[2].1, 3);
847 assert_eq!(internal_page.array[3].0.data, vec![4i8.into(), 4i16.into()]);
848 assert_eq!(internal_page.array[3].1, 4);
849 internal_page.delete(&Tuple::new(
850 key_schema.clone(),
851 vec![4i8.into(), 4i16.into()],
852 ));
853 assert_eq!(internal_page.header.current_size, 3);
854 internal_page.delete(&Tuple::new(
855 key_schema.clone(),
856 vec![3i8.into(), 3i16.into()],
857 ));
858 assert_eq!(internal_page.header.current_size, 2);
859 internal_page.delete(&Tuple::new(
860 key_schema.clone(),
861 vec![1i8.into(), 1i16.into()],
862 ));
863 assert_eq!(internal_page.header.current_size, 0);
864 internal_page.delete(&Tuple::new(
865 key_schema.clone(),
866 vec![1i8.into(), 1i16.into()],
867 ));
868 assert_eq!(internal_page.header.current_size, 0);
869 }
870
871 #[test]
872 pub fn test_leaf_page_delete() {
873 let key_schema = Arc::new(Schema::new(vec![
874 Column::new("a", DataType::Int8, false),
875 Column::new("b", DataType::Int16, false),
876 ]));
877 let mut leaf_page = BPlusTreeLeafPage::new(key_schema.clone(), 5);
878 leaf_page.insert(
879 Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
880 RecordId::new(2, 2),
881 );
882 leaf_page.insert(
883 Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
884 RecordId::new(1, 1),
885 );
886 leaf_page.insert(
887 Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
888 RecordId::new(3, 3),
889 );
890 leaf_page.insert(
891 Tuple::new(key_schema.clone(), vec![5i8.into(), 5i16.into()]),
892 RecordId::new(5, 5),
893 );
894 leaf_page.insert(
895 Tuple::new(key_schema.clone(), vec![4i8.into(), 4i16.into()]),
896 RecordId::new(4, 4),
897 );
898
899 leaf_page.delete(&Tuple::new(
900 key_schema.clone(),
901 vec![2i8.into(), 2i16.into()],
902 ));
903 assert_eq!(leaf_page.header.current_size, 4);
904 assert_eq!(leaf_page.array[0].0.data, vec![1i8.into(), 1i16.into()]);
905 assert_eq!(leaf_page.array[0].1, RecordId::new(1, 1));
906 assert_eq!(leaf_page.array[1].0.data, vec![3i8.into(), 3i16.into()]);
907 assert_eq!(leaf_page.array[1].1, RecordId::new(3, 3));
908 assert_eq!(leaf_page.array[2].0.data, vec![4i8.into(), 4i16.into()]);
909 assert_eq!(leaf_page.array[2].1, RecordId::new(4, 4));
910 assert_eq!(leaf_page.array[3].0.data, vec![5i8.into(), 5i16.into()]);
911 assert_eq!(leaf_page.array[3].1, RecordId::new(5, 5));
912 leaf_page.delete(&Tuple::new(
913 key_schema.clone(),
914 vec![3i8.into(), 3i16.into()],
915 ));
916 assert_eq!(leaf_page.header.current_size, 3);
917 leaf_page.delete(&Tuple::new(
918 key_schema.clone(),
919 vec![5i8.into(), 5i16.into()],
920 ));
921 assert_eq!(leaf_page.header.current_size, 2);
922 leaf_page.delete(&Tuple::new(
923 key_schema.clone(),
924 vec![1i8.into(), 1i16.into()],
925 ));
926 assert_eq!(leaf_page.header.current_size, 1);
927 assert_eq!(leaf_page.array[0].0.data, vec![4i8.into(), 4i16.into()]);
928 assert_eq!(leaf_page.array[0].1, RecordId::new(4, 4));
929 leaf_page.delete(&Tuple::new(
930 key_schema.clone(),
931 vec![4i8.into(), 4i16.into()],
932 ));
933 assert_eq!(leaf_page.header.current_size, 0);
934 leaf_page.delete(&Tuple::new(
935 key_schema.clone(),
936 vec![4i8.into(), 4i16.into()],
937 ));
938 assert_eq!(leaf_page.header.current_size, 0);
939 }
940}