quill_sql/storage/page/
btree_page.rs

1use crate::buffer::{PageId, INVALID_PAGE_ID};
2use crate::catalog::{Schema, SchemaRef};
3use crate::storage::page::RecordId;
4use crate::storage::tuple::Tuple;
5use std::sync::Arc;
6
7pub const BPLUS_INTERNAL_PAGE_MAX_SIZE: usize = 10;
8pub const BPLUS_LEAF_PAGE_MAX_SIZE: usize = 10;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub struct BPlusTreeHeaderPage {
12    pub root_page_id: PageId,
13}
14
15#[derive(Debug, Clone, Eq, PartialEq)]
16pub enum BPlusTreePage {
17    Internal(BPlusTreeInternalPage),
18    Leaf(BPlusTreeLeafPage),
19}
20
21impl BPlusTreePage {
22    pub fn is_full(&self) -> bool {
23        match self {
24            Self::Internal(page) => page.is_full(),
25            Self::Leaf(page) => page.is_full(),
26        }
27    }
28    pub fn is_underflow(&self, is_root: bool) -> bool {
29        if is_root {
30            return false;
31        }
32        match self {
33            Self::Internal(page) => page.header.current_size < page.min_size(),
34            Self::Leaf(page) => page.header.current_size < page.min_size(),
35        }
36    }
37    pub fn min_size(&self) -> u32 {
38        match self {
39            Self::Internal(page) => page.min_size(),
40            Self::Leaf(page) => page.min_size(),
41        }
42    }
43    pub fn current_size(&self) -> u32 {
44        match self {
45            Self::Internal(page) => page.header.current_size,
46            Self::Leaf(page) => page.header.current_size,
47        }
48    }
49    pub fn max_size(&self) -> u32 {
50        match self {
51            Self::Internal(page) => page.header.max_size,
52            Self::Leaf(page) => page.header.max_size,
53        }
54    }
55    pub fn insert_internalkv(&mut self, internalkv: InternalKV) {
56        match self {
57            Self::Internal(page) => page.insert(internalkv.0, internalkv.1),
58            Self::Leaf(_) => panic!("Leaf page cannot insert InternalKV"),
59        }
60    }
61    pub fn can_borrow(&self) -> bool {
62        match self {
63            Self::Internal(page) => page.header.current_size > page.min_size(),
64            Self::Leaf(page) => page.header.current_size > page.min_size(),
65        }
66    }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum BPlusTreePageType {
71    LeafPage,
72    InternalPage,
73}
74
75pub type InternalKV = (Tuple, PageId);
76pub type LeafKV = (Tuple, RecordId);
77
78/**
79 * Internal page format (keys are stored in increasing order):
80 *  --------------------------------------------------------------------------
81 * | HEADER | KEY(1)+PAGE_ID(1) | KEY(2)+PAGE_ID(2) | ... | KEY(n)+PAGE_ID(n) |
82 *  --------------------------------------------------------------------------
83 *
84 * Header format (size in byte, 12 bytes in total):
85 * ----------------------------------------------------------------------------
86 * | PageType (4) | CurrentSize (4) | MaxSize (4) |
87 * ----------------------------------------------------------------------------
88 */
89#[derive(Debug, Clone, Eq, PartialEq)]
90pub struct BPlusTreeInternalPage {
91    pub schema: SchemaRef,
92    pub header: BPlusTreeInternalPageHeader,
93    // 第一个key为空,n个key对应n+1个value
94    pub array: Vec<InternalKV>,
95    // B-link fence high key: keys in this node are strictly less than high_key if present
96    pub high_key: Option<Tuple>,
97}
98
99#[derive(Debug, Clone, Eq, PartialEq)]
100pub struct BPlusTreeInternalPageHeader {
101    pub page_type: BPlusTreePageType,
102    pub current_size: u32,
103    // max kv size can be stored
104    pub max_size: u32,
105    pub version: u32,
106    // B-link: right sibling pointer for chasing on concurrent splits
107    pub next_page_id: PageId,
108}
109
110impl BPlusTreeInternalPage {
111    pub fn new(schema: SchemaRef, max_size: u32) -> Self {
112        Self {
113            schema,
114            header: BPlusTreeInternalPageHeader {
115                page_type: BPlusTreePageType::InternalPage,
116                current_size: 0,
117                max_size,
118                version: 0,
119                next_page_id: INVALID_PAGE_ID,
120            },
121            array: Vec::with_capacity(max_size as usize),
122            high_key: None,
123        }
124    }
125    pub fn min_size(&self) -> u32 {
126        (self.header.max_size + 1) / 2
127    }
128    pub fn key_at(&self, index: usize) -> &Tuple {
129        &self.array[index].0
130    }
131    pub fn value_at(&self, index: usize) -> PageId {
132        self.array[index].1
133    }
134    pub fn values(&self) -> Vec<PageId> {
135        self.array.iter().map(|kv| kv.1).collect()
136    }
137
138    pub fn sibling_page_ids(&self, page_id: PageId) -> (Option<PageId>, Option<PageId>) {
139        let index = self.array.iter().position(|x| x.1 == page_id);
140        if let Some(index) = index {
141            return (
142                if index == 0 {
143                    None
144                } else {
145                    Some(self.array[index - 1].1)
146                },
147                if index == self.header.current_size as usize - 1 {
148                    None
149                } else {
150                    Some(self.array[index + 1].1)
151                },
152            );
153        }
154        (None, None)
155    }
156
157    /// Insert a key-value pair into the internal page using binary search
158    /// Works whether the page has a leading NULL sentinel or not.
159    pub fn insert(&mut self, key: Tuple, page_id: PageId) {
160        let size = self.header.current_size as usize;
161        let pos = self.array[..size]
162            .binary_search_by(|(k, _)| k.cmp(&key))
163            .unwrap_or_else(|pos| pos);
164        self.array.insert(pos, (key, page_id));
165        self.header.current_size += 1;
166    }
167    pub fn batch_insert(&mut self, kvs: Vec<InternalKV>) {
168        let kvs_len = kvs.len();
169        // For internal pages, the input `kvs` must already be sorted and strictly greater
170        // than existing keys (except the sentinel at index 0). We simply append to preserve
171        // the invariant that the sentinel stays at index 0.
172        self.array.extend(kvs);
173        self.header.current_size += kvs_len as u32;
174    }
175
176    /// Insert (key,new_page_id) right after the entry whose value equals `old_page_id`.
177    /// This mirrors BusTub's InsertNodeAfter semantics and preserves relative child order.
178    pub fn insert_after(&mut self, old_page_id: PageId, key: Tuple, new_page_id: PageId) {
179        let pos = self
180            .array
181            .iter()
182            .position(|&(_, pid)| pid == old_page_id)
183            .expect("old child not found in parent when inserting after");
184        self.array.insert(pos + 1, (key, new_page_id));
185        self.header.current_size += 1;
186    }
187
188    pub fn remove(&mut self, page_id: PageId) -> Option<(Tuple, PageId)> {
189        if let Some(pos) = self.array.iter().position(|&(_, pid)| pid == page_id) {
190            let removed = self.array.remove(pos);
191            self.header.current_size -= 1;
192            Some(removed)
193        } else {
194            None
195        }
196    }
197
198    pub fn delete(&mut self, key: &Tuple) {
199        if self.header.current_size == 0 {
200            return;
201        }
202        // 第一个key为空,所以从1开始
203        let mut start: i32 = 1;
204        let mut end: i32 = self.header.current_size as i32 - 1;
205        while start < end {
206            let mid = (start + end) / 2;
207            let compare_res = key.partial_cmp(&self.array[mid as usize].0).unwrap();
208            if compare_res == std::cmp::Ordering::Equal {
209                self.array.remove(mid as usize);
210                self.header.current_size -= 1;
211                // 删除后,如果只剩下一个空key,那么删除
212                if self.header.current_size == 1 {
213                    self.array.remove(0);
214                    self.header.current_size -= 1;
215                }
216                return;
217            } else if compare_res == std::cmp::Ordering::Less {
218                end = mid - 1;
219            } else {
220                start = mid + 1;
221            }
222        }
223        if key.partial_cmp(&self.array[start as usize].0).unwrap() == std::cmp::Ordering::Equal {
224            self.array.remove(start as usize);
225            self.header.current_size -= 1;
226            // 删除后,如果只剩下一个空key,那么删除
227            if self.header.current_size == 1 {
228                self.array.remove(0);
229                self.header.current_size -= 1;
230            }
231        }
232    }
233
234    pub fn delete_page_id(&mut self, page_id: PageId) {
235        for i in 0..self.header.current_size {
236            if self.array[i as usize].1 == page_id {
237                self.array.remove(i as usize);
238                self.header.current_size -= 1;
239                return;
240            }
241        }
242    }
243
244    pub fn is_full(&self) -> bool {
245        // Split happens only after overflow so full means strictly greater
246        self.header.current_size > self.header.max_size
247    }
248
249    pub fn split_off(&mut self, at: usize) -> Vec<InternalKV> {
250        let new_array = self.array.split_off(at);
251        self.header.current_size -= new_array.len() as u32;
252        new_array
253    }
254
255    pub fn reverse_split_off(&mut self, at: usize) -> Vec<InternalKV> {
256        let mut new_array = Vec::new();
257        for _ in 0..=at {
258            new_array.push(self.array.remove(0));
259        }
260        self.header.current_size -= new_array.len() as u32;
261        new_array
262    }
263
264    pub fn replace_key(&mut self, old_key: &Tuple, new_key: Tuple) {
265        let key_index = self.key_index(old_key);
266        if let Some(index) = key_index {
267            self.array[index].0 = new_key;
268        }
269    }
270
271    pub fn key_index(&self, key: &Tuple) -> Option<usize> {
272        if self.header.current_size == 0 {
273            return None;
274        }
275        let size = self.header.current_size as usize;
276        self.array[..size]
277            .binary_search_by(|(k, _)| k.cmp(key))
278            .ok()
279    }
280
281    // 查找page_id对应的索引位置
282    pub fn value_index(&self, page_id: PageId) -> Option<usize> {
283        if self.header.current_size == 0 {
284            return None;
285        }
286        let size = self.header.current_size as usize;
287        self.array[..size]
288            .iter()
289            .position(|(_, pid)| *pid == page_id)
290    }
291
292    // 查找key对应的page_id(返回最后一个 <= key 的指针)。
293    pub fn look_up(&self, key: &Tuple) -> PageId {
294        let size = self.header.current_size as usize;
295        let partition_idx = self.array[1..size].partition_point(|(probe_key, _)| probe_key <= key);
296        self.value_at(partition_idx)
297    }
298
299    // 查找key对应的page_id的可变引用(返回最后一个 <= key 的指针)。
300    pub fn look_up_mut(&mut self, key: &Tuple) -> Option<&mut PageId> {
301        let size = self.header.current_size as usize;
302        if size == 0 {
303            return None;
304        }
305
306        // Use `partition_point` to find the first key > `key`.
307        let partition_idx = self.array[1..size].partition_point(|(probe_key, _)| probe_key <= key);
308
309        Some(&mut self.array[partition_idx].1)
310    }
311
312    /// Removes the first key-value pair (not the sentinel) and returns it.
313    /// Also returns the key that will be promoted up to the parent.
314    pub fn remove_first_kv(&mut self) -> (Tuple, PageId) {
315        // 检查页面状态是否一致
316        if self.array.len() != self.header.current_size as usize {
317            panic!(
318                "Page state inconsistent: array len {} != current_size {}",
319                self.array.len(),
320                self.header.current_size
321            );
322        }
323
324        // The first "real" key is at index 1, after the sentinel.
325        if self.array.len() <= 1 {
326            panic!(
327                "remove_first_kv called on internal page with array len {} and current_size {}",
328                self.array.len(),
329                self.header.current_size
330            );
331        }
332        let result = self.array.remove(1);
333        self.header.current_size -= 1;
334        result
335    }
336    pub fn remove_last_kv(&mut self) -> (Tuple, PageId) {
337        // 检查页面状态是否一致
338        if self.array.len() != self.header.current_size as usize {
339            panic!(
340                "Page state inconsistent: array len {} != current_size {}",
341                self.array.len(),
342                self.header.current_size
343            );
344        }
345
346        if self.array.is_empty() {
347            panic!(
348                "remove_last_kv called on empty internal page with current_size {}",
349                self.header.current_size
350            );
351        }
352
353        let result = self.array.pop().unwrap();
354        self.header.current_size -= 1;
355        assert_eq!(self.header.current_size as usize, self.array.len());
356        result
357    }
358    pub fn merge(&mut self, middle_key: Tuple, other: &mut BPlusTreeInternalPage) {
359        // The middle key from the parent is pushed as a new separator,
360        // pointing to the first child of the 'other' (right) page.
361        self.array
362            .push((middle_key, other.array.first().unwrap().1));
363
364        // The remaining key-pointer pairs from the 'other' page are appended.
365        // We drain from index 1 to skip 'other' page's sentinel entry.
366        self.array.extend(other.array.drain(1..));
367
368        self.header.current_size = self.array.len() as u32;
369    }
370}
371
372/**
373 * Leaf page format (keys are stored in order):
374 *  ----------------------------------------------------------------------
375 * | HEADER | KEY(1) + RID(1) | KEY(2) + RID(2) | ... | KEY(n) + RID(n)
376 *  ----------------------------------------------------------------------
377 *
378 *  Header format (size in byte, 16 bytes in total):
379 *  ---------------------------------------------------------------------
380 * | PageType (4) | CurrentSize (4) | MaxSize (4) | NextPageId (4)
381 *  ---------------------------------------------------------------------
382 */
383#[derive(Debug, Clone, Eq, PartialEq)]
384pub struct BPlusTreeLeafPage {
385    pub schema: SchemaRef,
386    pub header: BPlusTreeLeafPageHeader,
387    pub array: Vec<LeafKV>,
388}
389
390#[derive(Debug, Clone, Eq, PartialEq)]
391pub struct BPlusTreeLeafPageHeader {
392    pub page_type: BPlusTreePageType,
393    pub current_size: u32,
394    // max kv size can be stored
395    pub max_size: u32,
396    pub next_page_id: PageId,
397    pub version: u32,
398}
399
400impl BPlusTreeLeafPage {
401    pub fn new(schema: SchemaRef, max_size: u32) -> Self {
402        Self {
403            schema,
404            header: BPlusTreeLeafPageHeader {
405                page_type: BPlusTreePageType::LeafPage,
406                current_size: 0,
407                max_size,
408                next_page_id: INVALID_PAGE_ID,
409                version: 0,
410            },
411            array: Vec::with_capacity(max_size as usize),
412        }
413    }
414
415    pub fn empty() -> Self {
416        Self {
417            schema: Arc::new(Schema::empty()),
418            header: BPlusTreeLeafPageHeader {
419                page_type: BPlusTreePageType::LeafPage,
420                current_size: 0,
421                max_size: 0,
422                next_page_id: INVALID_PAGE_ID,
423                version: 0,
424            },
425            array: Vec::new(),
426        }
427    }
428
429    pub fn min_size(&self) -> u32 {
430        (self.header.max_size + 1) / 2
431    }
432
433    pub fn key_at(&self, index: usize) -> &Tuple {
434        &self.array[index].0
435    }
436
437    pub fn kv_at(&self, index: usize) -> &LeafKV {
438        &self.array[index]
439    }
440
441    pub fn is_full(&self) -> bool {
442        // Split happens only after overflow so full means strictly greater
443        self.header.current_size > self.header.max_size
444    }
445
446    pub fn insert(&mut self, key: Tuple, rid: RecordId) {
447        let size = self.header.current_size as usize;
448        let pos = self.array[..size]
449            .binary_search_by(|(k, _)| k.cmp(&key))
450            .unwrap_or_else(|pos| pos);
451        self.array.insert(pos, (key, rid));
452        self.header.current_size += 1;
453    }
454
455    pub fn batch_insert(&mut self, kvs: Vec<LeafKV>) {
456        let kvs_len = kvs.len();
457        self.array.extend(kvs);
458        self.header.current_size += kvs_len as u32;
459        self.array.sort_by(|a, b| a.0.cmp(&b.0));
460    }
461
462    pub fn split_off(&mut self, at: usize) -> Vec<LeafKV> {
463        let new_array = self.array.split_off(at);
464        self.header.current_size -= new_array.len() as u32;
465        new_array
466    }
467
468    pub fn reverse_split_off(&mut self, at: usize) -> Vec<LeafKV> {
469        let mut new_array = Vec::new();
470        for _ in 0..=at {
471            new_array.push(self.array.remove(0));
472        }
473        self.header.current_size -= new_array.len() as u32;
474        new_array
475    }
476
477    pub fn delete(&mut self, key: &Tuple) {
478        let key_index = self.key_index(key);
479        if let Some(index) = key_index {
480            self.array.remove(index);
481            self.header.current_size -= 1;
482        }
483    }
484
485    // 查找key对应的rid
486    pub fn look_up(&self, key: &Tuple) -> Option<RecordId> {
487        let key_index = self.key_index(key);
488        key_index.map(|index| self.array[index].1)
489    }
490
491    // 查找key对应的rid的可变引用
492    pub fn look_up_mut(&mut self, key: &Tuple) -> Option<&mut RecordId> {
493        let key_index = self.key_index(key);
494        key_index.map(|index| &mut self.array[index].1)
495    }
496
497    fn key_index(&self, key: &Tuple) -> Option<usize> {
498        if self.header.current_size == 0 {
499            return None;
500        }
501        let size = self.header.current_size as usize;
502        self.array[..size]
503            .binary_search_by(|(k, _)| k.cmp(key))
504            .ok()
505    }
506
507    /// Removes and returns the first key-value pair.
508    pub fn remove_first_kv(&mut self) -> (Tuple, RecordId) {
509        // 检查页面状态是否一致
510        if self.array.len() != self.header.current_size as usize {
511            panic!(
512                "Page state inconsistent: array len {} != current_size {}",
513                self.array.len(),
514                self.header.current_size
515            );
516        }
517
518        if self.array.is_empty() {
519            panic!(
520                "remove_first_kv called on empty leaf page with current_size {}",
521                self.header.current_size
522            );
523        }
524
525        let result = self.array.remove(0);
526        self.header.current_size -= 1;
527        result
528    }
529    pub fn remove_last_kv(&mut self) -> (Tuple, RecordId) {
530        // 检查页面状态是否一致
531        if self.array.len() != self.header.current_size as usize {
532            panic!(
533                "Page state inconsistent: array len {} != current_size {}",
534                self.array.len(),
535                self.header.current_size
536            );
537        }
538
539        if self.array.is_empty() {
540            panic!(
541                "remove_last_kv called on empty leaf page with current_size {}",
542                self.header.current_size
543            );
544        }
545
546        let result = self.array.pop().unwrap();
547        self.header.current_size -= 1;
548        result
549    }
550    pub fn merge(&mut self, other: &mut BPlusTreeLeafPage) {
551        self.array.extend(other.array.drain(..));
552        self.header.current_size = self.array.len() as u32;
553        self.header.next_page_id = other.header.next_page_id;
554    }
555
556    pub fn next_closest(&self, tuple: &Tuple, included: bool) -> Option<usize> {
557        for (idx, (key, _)) in self.array.iter().enumerate() {
558            if tuple == key && included {
559                return Some(idx);
560            }
561            if key > tuple {
562                return Some(idx);
563            }
564        }
565        None
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use crate::storage::page::{BPlusTreeInternalPage, BPlusTreeLeafPage};
572    use crate::storage::tuple::Tuple;
573    use crate::utils::scalar::ScalarValue;
574    use crate::{
575        catalog::{Column, DataType, Schema},
576        storage::page::RecordId,
577    };
578    use std::sync::Arc;
579
580    #[test]
581    pub fn test_internal_page_insert() {
582        let key_schema = Arc::new(Schema::new(vec![
583            Column::new("a", DataType::Int8, false),
584            Column::new("b", DataType::Int16, false),
585        ]));
586        let mut internal_page = BPlusTreeInternalPage::new(key_schema.clone(), 3);
587        internal_page.insert(Tuple::empty(key_schema.clone()), 0);
588        internal_page.insert(
589            Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
590            2,
591        );
592        internal_page.insert(
593            Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
594            1,
595        );
596        assert_eq!(internal_page.header.current_size, 3);
597        assert_eq!(
598            internal_page.array[0].0.data,
599            Tuple::empty(key_schema.clone()).data
600        );
601        assert_eq!(internal_page.array[0].1, 0);
602        assert_eq!(internal_page.array[1].0.data, vec![1i8.into(), 1i16.into()]);
603        assert_eq!(internal_page.array[1].1, 1);
604        assert_eq!(internal_page.array[2].0.data, vec![2i8.into(), 2i16.into()]);
605        assert_eq!(internal_page.array[2].1, 2);
606    }
607
608    #[test]
609    pub fn test_leaf_page_insert() {
610        let key_schema = Arc::new(Schema::new(vec![
611            Column::new("a", DataType::Int8, false),
612            Column::new("b", DataType::Int16, false),
613        ]));
614        let mut leaf_page = BPlusTreeLeafPage::new(key_schema.clone(), 3);
615        leaf_page.insert(
616            Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
617            RecordId::new(2, 2),
618        );
619        leaf_page.insert(
620            Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
621            RecordId::new(1, 1),
622        );
623        leaf_page.insert(
624            Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
625            RecordId::new(3, 3),
626        );
627        assert_eq!(leaf_page.header.current_size, 3);
628        assert_eq!(leaf_page.array[0].0.data, vec![1i8.into(), 1i16.into()]);
629        assert_eq!(leaf_page.array[0].1, RecordId::new(1, 1));
630        assert_eq!(leaf_page.array[1].0.data, vec![2i8.into(), 2i16.into()]);
631        assert_eq!(leaf_page.array[1].1, RecordId::new(2, 2));
632        assert_eq!(leaf_page.array[2].0.data, vec![3i8.into(), 3i16.into()]);
633        assert_eq!(leaf_page.array[2].1, RecordId::new(3, 3));
634    }
635
636    #[test]
637    pub fn test_internal_page_look_up() {
638        let key_schema = Arc::new(Schema::new(vec![
639            Column::new("a", DataType::Int8, false),
640            Column::new("b", DataType::Int16, false),
641        ]));
642        let mut internal_page = BPlusTreeInternalPage::new(key_schema.clone(), 5);
643        internal_page.insert(Tuple::empty(key_schema.clone()), 0);
644        internal_page.insert(
645            Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
646            2,
647        );
648        internal_page.insert(
649            Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
650            1,
651        );
652        internal_page.insert(
653            Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
654            3,
655        );
656        internal_page.insert(
657            Tuple::new(key_schema.clone(), vec![4i8.into(), 4i16.into()]),
658            4,
659        );
660
661        assert_eq!(
662            internal_page.look_up(&Tuple::new(
663                key_schema.clone(),
664                vec![0i8.into(), 0i16.into()]
665            ),),
666            0
667        );
668        assert_eq!(
669            internal_page.look_up(&Tuple::new(
670                key_schema.clone(),
671                vec![3i8.into(), 3i16.into()]
672            ),),
673            3
674        );
675        assert_eq!(
676            internal_page.look_up(&Tuple::new(
677                key_schema.clone(),
678                vec![5i8.into(), 5i16.into()]
679            ),),
680            4
681        );
682
683        let mut internal_page = BPlusTreeInternalPage::new(key_schema.clone(), 2);
684        internal_page.insert(Tuple::empty(key_schema.clone()), 0);
685        internal_page.insert(
686            Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
687            1,
688        );
689
690        assert_eq!(
691            internal_page.look_up(&Tuple::new(
692                key_schema.clone(),
693                vec![0i8.into(), 0i16.into()]
694            ),),
695            0
696        );
697        assert_eq!(
698            internal_page.look_up(&Tuple::new(
699                key_schema.clone(),
700                vec![1i8.into(), 1i16.into()]
701            ),),
702            1
703        );
704        assert_eq!(
705            internal_page.look_up(&Tuple::new(
706                key_schema.clone(),
707                vec![2i8.into(), 2i16.into()]
708            ),),
709            1
710        );
711    }
712
713    #[test]
714    pub fn test_leaf_page_look_up() {
715        let key_schema = Arc::new(Schema::new(vec![
716            Column::new("a", DataType::Int8, false),
717            Column::new("b", DataType::Int16, false),
718        ]));
719        let mut leaf_page = BPlusTreeLeafPage::new(key_schema.clone(), 5);
720        leaf_page.insert(
721            Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
722            RecordId::new(2, 2),
723        );
724        leaf_page.insert(
725            Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
726            RecordId::new(1, 1),
727        );
728        leaf_page.insert(
729            Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
730            RecordId::new(3, 3),
731        );
732        leaf_page.insert(
733            Tuple::new(key_schema.clone(), vec![5i8.into(), 5i16.into()]),
734            RecordId::new(5, 5),
735        );
736        leaf_page.insert(
737            Tuple::new(key_schema.clone(), vec![4i8.into(), 4i16.into()]),
738            RecordId::new(4, 4),
739        );
740        assert_eq!(
741            leaf_page.look_up(&Tuple::new(
742                key_schema.clone(),
743                vec![0i8.into(), 0i16.into()]
744            ),),
745            None
746        );
747        assert_eq!(
748            leaf_page.look_up(&Tuple::new(
749                key_schema.clone(),
750                vec![2i8.into(), 2i16.into()]
751            ),),
752            Some(RecordId::new(2, 2))
753        );
754        assert_eq!(
755            leaf_page.look_up(&Tuple::new(
756                key_schema.clone(),
757                vec![3i8.into(), 3i16.into()]
758            ),),
759            Some(RecordId::new(3, 3))
760        );
761        assert_eq!(
762            leaf_page.look_up(&Tuple::new(
763                key_schema.clone(),
764                vec![6i8.into(), 6i16.into()]
765            ),),
766            None
767        );
768
769        let mut leaf_page = BPlusTreeLeafPage::new(key_schema.clone(), 2);
770        leaf_page.insert(
771            Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
772            RecordId::new(2, 2),
773        );
774        leaf_page.insert(
775            Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
776            RecordId::new(1, 1),
777        );
778        assert_eq!(
779            leaf_page.look_up(&Tuple::new(
780                key_schema.clone(),
781                vec![ScalarValue::Int8(None), ScalarValue::Int16(None)]
782            ),),
783            None
784        );
785        assert_eq!(
786            leaf_page.look_up(&Tuple::new(
787                key_schema.clone(),
788                vec![1i8.into(), 1i16.into()]
789            ),),
790            Some(RecordId::new(1, 1))
791        );
792        assert_eq!(
793            leaf_page.look_up(&Tuple::new(
794                key_schema.clone(),
795                vec![2i8.into(), 2i16.into()]
796            ),),
797            Some(RecordId::new(2, 2))
798        );
799        assert_eq!(
800            leaf_page.look_up(&Tuple::new(
801                key_schema.clone(),
802                vec![3i8.into(), 3i16.into()]
803            ),),
804            None
805        );
806    }
807
808    #[test]
809    pub fn test_internal_page_delete() {
810        let key_schema = Arc::new(Schema::new(vec![
811            Column::new("a", DataType::Int8, false),
812            Column::new("b", DataType::Int16, false),
813        ]));
814        let mut internal_page = BPlusTreeInternalPage::new(key_schema.clone(), 5);
815        internal_page.insert(Tuple::empty(key_schema.clone()), 0);
816        internal_page.insert(
817            Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
818            2,
819        );
820        internal_page.insert(
821            Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
822            1,
823        );
824        internal_page.insert(
825            Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
826            3,
827        );
828        internal_page.insert(
829            Tuple::new(key_schema.clone(), vec![4i8.into(), 4i16.into()]),
830            4,
831        );
832
833        internal_page.delete(&Tuple::new(
834            key_schema.clone(),
835            vec![2i8.into(), 2i16.into()],
836        ));
837        assert_eq!(internal_page.header.current_size, 4);
838        assert_eq!(
839            internal_page.array[0].0.data,
840            vec![ScalarValue::Int8(None), ScalarValue::Int16(None)]
841        );
842        assert_eq!(internal_page.array[0].1, 0);
843        assert_eq!(internal_page.array[1].0.data, vec![1i8.into(), 1i16.into()]);
844        assert_eq!(internal_page.array[1].1, 1);
845        assert_eq!(internal_page.array[2].0.data, vec![3i8.into(), 3i16.into()]);
846        assert_eq!(internal_page.array[2].1, 3);
847        assert_eq!(internal_page.array[3].0.data, vec![4i8.into(), 4i16.into()]);
848        assert_eq!(internal_page.array[3].1, 4);
849        internal_page.delete(&Tuple::new(
850            key_schema.clone(),
851            vec![4i8.into(), 4i16.into()],
852        ));
853        assert_eq!(internal_page.header.current_size, 3);
854        internal_page.delete(&Tuple::new(
855            key_schema.clone(),
856            vec![3i8.into(), 3i16.into()],
857        ));
858        assert_eq!(internal_page.header.current_size, 2);
859        internal_page.delete(&Tuple::new(
860            key_schema.clone(),
861            vec![1i8.into(), 1i16.into()],
862        ));
863        assert_eq!(internal_page.header.current_size, 0);
864        internal_page.delete(&Tuple::new(
865            key_schema.clone(),
866            vec![1i8.into(), 1i16.into()],
867        ));
868        assert_eq!(internal_page.header.current_size, 0);
869    }
870
871    #[test]
872    pub fn test_leaf_page_delete() {
873        let key_schema = Arc::new(Schema::new(vec![
874            Column::new("a", DataType::Int8, false),
875            Column::new("b", DataType::Int16, false),
876        ]));
877        let mut leaf_page = BPlusTreeLeafPage::new(key_schema.clone(), 5);
878        leaf_page.insert(
879            Tuple::new(key_schema.clone(), vec![2i8.into(), 2i16.into()]),
880            RecordId::new(2, 2),
881        );
882        leaf_page.insert(
883            Tuple::new(key_schema.clone(), vec![1i8.into(), 1i16.into()]),
884            RecordId::new(1, 1),
885        );
886        leaf_page.insert(
887            Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]),
888            RecordId::new(3, 3),
889        );
890        leaf_page.insert(
891            Tuple::new(key_schema.clone(), vec![5i8.into(), 5i16.into()]),
892            RecordId::new(5, 5),
893        );
894        leaf_page.insert(
895            Tuple::new(key_schema.clone(), vec![4i8.into(), 4i16.into()]),
896            RecordId::new(4, 4),
897        );
898
899        leaf_page.delete(&Tuple::new(
900            key_schema.clone(),
901            vec![2i8.into(), 2i16.into()],
902        ));
903        assert_eq!(leaf_page.header.current_size, 4);
904        assert_eq!(leaf_page.array[0].0.data, vec![1i8.into(), 1i16.into()]);
905        assert_eq!(leaf_page.array[0].1, RecordId::new(1, 1));
906        assert_eq!(leaf_page.array[1].0.data, vec![3i8.into(), 3i16.into()]);
907        assert_eq!(leaf_page.array[1].1, RecordId::new(3, 3));
908        assert_eq!(leaf_page.array[2].0.data, vec![4i8.into(), 4i16.into()]);
909        assert_eq!(leaf_page.array[2].1, RecordId::new(4, 4));
910        assert_eq!(leaf_page.array[3].0.data, vec![5i8.into(), 5i16.into()]);
911        assert_eq!(leaf_page.array[3].1, RecordId::new(5, 5));
912        leaf_page.delete(&Tuple::new(
913            key_schema.clone(),
914            vec![3i8.into(), 3i16.into()],
915        ));
916        assert_eq!(leaf_page.header.current_size, 3);
917        leaf_page.delete(&Tuple::new(
918            key_schema.clone(),
919            vec![5i8.into(), 5i16.into()],
920        ));
921        assert_eq!(leaf_page.header.current_size, 2);
922        leaf_page.delete(&Tuple::new(
923            key_schema.clone(),
924            vec![1i8.into(), 1i16.into()],
925        ));
926        assert_eq!(leaf_page.header.current_size, 1);
927        assert_eq!(leaf_page.array[0].0.data, vec![4i8.into(), 4i16.into()]);
928        assert_eq!(leaf_page.array[0].1, RecordId::new(4, 4));
929        leaf_page.delete(&Tuple::new(
930            key_schema.clone(),
931            vec![4i8.into(), 4i16.into()],
932        ));
933        assert_eq!(leaf_page.header.current_size, 0);
934        leaf_page.delete(&Tuple::new(
935            key_schema.clone(),
936            vec![4i8.into(), 4i16.into()],
937        ));
938        assert_eq!(leaf_page.header.current_size, 0);
939    }
940}