simple_db_rust/btree/
table.rs

1use super::{
2    buffer_pool::BufferPool,
3    page::{
4        empty_page_data, BTreeInternalPageIterator,
5        BTreeInternalPageReverseIterator, BTreeLeafPage, BTreeLeafPageIterator,
6        BTreeLeafPageReverseIterator, BTreePageID, BTreeRootPointerPage, Entry,
7    },
8};
9use crate::{
10    btree::page::{BTreeBasePage, PageCategory},
11    field::IntField,
12};
13
14use core::fmt;
15use log::info;
16use std::{cell::Cell, str};
17
18use std::{
19    cell::RefCell,
20    collections::hash_map::DefaultHasher,
21    fs::{File, OpenOptions},
22    hash::{Hash, Hasher},
23    io::{Seek, SeekFrom, Write},
24    rc::Rc,
25    usize,
26};
27
28use std::cell::RefMut;
29
30use super::{
31    page::BTreeInternalPage,
32    tuple::{Tuple, TupleScheme},
33};
34
35// B+ Tree
36pub struct BTreeTable {
37    // the file that stores the on-disk backing store for this B+ tree
38    // file.
39    file_path: String,
40
41    // the field which index is keyed on
42    pub key_field: usize,
43
44    // the tuple descriptor of tuples in the file
45    pub tuple_scheme: TupleScheme,
46
47    file: RefCell<File>,
48
49    table_id: i32,
50
51    page_index: Cell<usize>,
52}
53
54#[derive(Copy, Clone)]
55pub enum WriteScene {
56    Random,
57    Sequential,
58}
59
60impl fmt::Display for BTreeTable {
61    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
62        write!(
63            f,
64            "<BTreeFile, file: {}, id: {}>",
65            self.file_path, self.table_id
66        )
67    }
68}
69
70impl BTreeTable {
71    pub fn new(
72        file_path: &str,
73        key_field: usize,
74        row_scheme: &TupleScheme,
75    ) -> Self {
76        File::create(file_path).expect("io error");
77
78        let f = RefCell::new(
79            OpenOptions::new()
80                .write(true)
81                .read(true)
82                .open(file_path)
83                .unwrap(),
84        );
85
86        let mut hasher = DefaultHasher::new();
87        file_path.hash(&mut hasher);
88        let table_id = hasher.finish() as i32;
89
90        Self::file_init(f.borrow_mut());
91
92        Self {
93            file_path: file_path.to_string(),
94            key_field,
95            tuple_scheme: row_scheme.clone(),
96            file: f,
97            table_id,
98
99            /*
100            start from 1 (the root page)
101
102            TODO: init it according to actual condition
103            */
104            page_index: Cell::new(1),
105        }
106    }
107
108    pub fn get_id(&self) -> i32 {
109        self.table_id
110    }
111
112    /// Insert a tuple into this BTreeFile, keeping the tuples in sorted order.
113    /// May cause pages to split if the page where tuple belongs is full.
114    pub fn insert_tuple(&self, tuple: &Tuple) {
115        // a read lock on the root pointer page and
116        // use it to locate the root page
117        let root_pid = self.get_root_pid();
118
119        // find and lock the left-most leaf page corresponding to
120        // the key field, and split the leaf page if there are no
121        // more slots available
122        let field = tuple.get_field(self.key_field);
123        let mut leaf_rc = self.find_leaf_page(root_pid, Some(field));
124
125        if leaf_rc.borrow().empty_slots_count() == 0 {
126            leaf_rc =
127                self.split_leaf_page(leaf_rc, tuple.get_field(self.key_field));
128        }
129        leaf_rc.borrow_mut().insert_tuple(&tuple);
130    }
131
132    /**
133    Split a leaf page to make room for new tuples and
134    recursively split the parent node as needed to
135    accommodate a new entry. The new entry should have
136    a key matching the key field of the first tuple in
137    the right-hand page (the key is "copied up"), and
138    child pointers pointing to the two leaf pages
139    resulting from the split.  Update sibling pointers
140    and parent pointers as needed.
141
142    Return the leaf page into which a new tuple with
143    key field "field" should be inserted.
144
145    # Arguments
146    * `field`: the key field of the tuple to be inserted after the split is complete. Necessary to know which of the two pages to return.
147    */
148    pub fn split_leaf_page(
149        &self,
150        page_rc: Rc<RefCell<BTreeLeafPage>>,
151        field: IntField,
152    ) -> Rc<RefCell<BTreeLeafPage>> {
153        let new_sibling_rc = self.get_empty_leaf_page();
154        let parent_pid: BTreePageID;
155        let key: i32;
156
157        // borrow of new_sibling_rc start here
158        // borrow of page_rc start here
159        {
160            let mut new_sibling = new_sibling_rc.borrow_mut();
161            let mut page = page_rc.borrow_mut();
162            // 1. adding a new page on the right of the existing
163            // page and moving half of the tuples to the new page
164            let tuple_count = page.tuples_count();
165            let move_tuple_count = tuple_count / 2;
166
167            let mut it = BTreeLeafPageReverseIterator::new(&page);
168            let mut delete_indexes: Vec<usize> = Vec::new();
169            for (i, tuple) in it.by_ref().take(move_tuple_count).enumerate() {
170                delete_indexes.push(tuple_count - i - 1);
171                new_sibling.insert_tuple(&tuple);
172            }
173
174            for i in &delete_indexes {
175                page.delete_tuple(i);
176            }
177
178            let mut it = BTreeLeafPageReverseIterator::new(&page);
179            key = it.next().unwrap().get_field(self.key_field).value;
180
181            // set sibling id
182            new_sibling.set_right_sibling_pid(page.get_right_sibling_pid());
183            page.set_right_sibling_pid(Some(new_sibling.get_pid()));
184
185            // get parent pid for use later
186            parent_pid = page.get_parent_pid();
187        }
188        // borrow of new_sibling_rc end here
189        // borrow of page_rc end here
190
191        // 2. Copy the middle key up into the parent page, and
192        // recursively split the parent as needed to accommodate
193        // the new entry.
194        //
195        // We put this method outside all the borrow blocks since
196        // once the parent page is split, a lot of children will
197        // been borrowed. (may including the current leaf page)
198        let parent_rc = self.get_parent_with_empty_slots(parent_pid, field);
199
200        // borrow of parent_rc start here
201        // borrow of page_rc start here
202        // borrow of new_sibling_rc start here
203        {
204            let mut parent = parent_rc.borrow_mut();
205            let mut page = page_rc.borrow_mut();
206            let mut new_sibling = new_sibling_rc.borrow_mut();
207            let mut entry =
208                Entry::new(key, &page.get_pid(), &new_sibling.get_pid());
209            parent.insert_entry(&mut entry);
210
211            // set parent id
212            page.set_parent_pid(&parent.get_page_id());
213            new_sibling.set_parent_pid(&parent.get_page_id());
214        }
215        // borrow of parent_rc end here
216        // borrow of page_rc end here
217        // borrow of new_sibling_rc end here
218
219        if field.value > key {
220            new_sibling_rc
221        } else {
222            page_rc
223        }
224    }
225
226    pub fn get_empty_page_index(&self) -> usize {
227        let index = self.page_index.get() + 1;
228        self.page_index.set(index);
229        index
230    }
231
232    /**
233    Method to encapsulate the process of getting a parent page
234    ready to accept new entries.
235
236    This may mean creating a page to become the new root of
237    the tree, splitting the existing parent page if there are
238    no empty slots, or simply locking and returning the existing
239    parent page.
240
241    # Arguments
242    * `field`: the key field of the tuple to be inserted after the split is complete. Necessary to know which of the two pages to return.
243    * `parentId`: the id of the parent. May be an internal page or the RootPtr page
244    */
245    fn get_parent_with_empty_slots(
246        &self,
247        parent_id: BTreePageID,
248        field: IntField,
249    ) -> Rc<RefCell<BTreeInternalPage>> {
250        // create a parent node if necessary
251        // this will be the new root of the tree
252        match parent_id.category {
253            PageCategory::RootPointer => {
254                let new_parent_rc = self.get_empty_interanl_page();
255
256                // borrow of new_parent_rc start here
257                {
258                    let new_parent = new_parent_rc.borrow_mut();
259
260                    // update the root pointer
261                    let page_id = BTreePageID::new(
262                        PageCategory::RootPointer,
263                        self.table_id,
264                        0,
265                    );
266                    let root_pointer_page = BufferPool::global()
267                        .get_root_pointer_page(&page_id)
268                        .unwrap();
269
270                    root_pointer_page
271                        .borrow_mut()
272                        .set_root_pid(&new_parent.get_page_id());
273                }
274                // borrow of new_parent_rc end here
275
276                new_parent_rc
277            }
278            PageCategory::Internal => {
279                let parent_rc =
280                    BufferPool::global().get_internal_page(&parent_id).unwrap();
281                let empty_slots_count: usize;
282
283                // borrow of parent_rc start here
284                {
285                    empty_slots_count = parent_rc.borrow().empty_slots_count();
286                }
287                // borrow of parent_rc end here
288
289                info!("empty slots count: {}", empty_slots_count);
290
291                if empty_slots_count > 0 {
292                    return parent_rc;
293                } else {
294                    // split upper parent
295                    return self.split_internal_page(parent_rc, field);
296                }
297            }
298            _ => {
299                todo!()
300            }
301        }
302    }
303
304    /**
305    Split an internal page to make room for new entries and recursively split its parent page
306    as needed to accommodate a new entry. The new entry for the parent should have a key matching
307    the middle key in the original internal page being split (this key is "pushed up" to the parent).
308
309    Make a right sibling page and move half of entries to it.
310
311    The child pointers of the new parent entry should point to the two internal pages resulting
312    from the split. Update parent pointers as needed.
313
314    Return the internal page into which an entry with key field "field" should be inserted
315
316    # Arguments
317    * `field`: the key field of the tuple to be inserted after the split is complete. Necessary to know which of the two pages to return.
318    */
319    fn split_internal_page(
320        &self,
321        page_rc: Rc<RefCell<BTreeInternalPage>>,
322        field: IntField,
323    ) -> Rc<RefCell<BTreeInternalPage>> {
324        info!("split start");
325        self.draw_tree(-1);
326
327        let sibling_rc = self.get_empty_interanl_page();
328        let key: i32;
329        let mut parent_pid: BTreePageID;
330        let mut new_entry: Entry;
331
332        // borrow of sibling_rc start here
333        // borrow of page_rc start here
334        {
335            let mut sibling = sibling_rc.borrow_mut();
336            let mut page = page_rc.borrow_mut();
337
338            parent_pid = page.get_parent_pid();
339
340            if parent_pid.category == PageCategory::RootPointer {
341                // create new parent page if the parent page is root pointer
342                // page.
343                let parent_rc = self.get_empty_interanl_page();
344                parent_pid = parent_rc.borrow().get_page_id();
345
346                // update the root pointer
347                let root_pointer_pid = BTreePageID::new(
348                    PageCategory::RootPointer,
349                    self.table_id,
350                    0,
351                );
352                let root_pointer_page = BufferPool::global()
353                    .get_root_pointer_page(&root_pointer_pid)
354                    .unwrap();
355                root_pointer_page.borrow_mut().set_root_pid(&parent_pid);
356            }
357
358            let enties_count = page.entries_count();
359            let move_entries_count = enties_count / 2;
360
361            let mut delete_indexes: Vec<usize> = Vec::new();
362            let mut it = BTreeInternalPageReverseIterator::new(&page);
363            for e in it.by_ref().take(move_entries_count) {
364                delete_indexes.push(e.get_record_id());
365                sibling.insert_entry(&e);
366
367                // set parent id for right child
368                let right_pid = e.get_right_child();
369                Self::set_parent(&right_pid, &sibling.get_page_id());
370            }
371
372            let middle_entry = it.next().unwrap();
373
374            // also delete the middle entry
375            delete_indexes.push(middle_entry.get_record_id());
376            info!("delete_indexes: {:?}", delete_indexes);
377            for i in delete_indexes {
378                page.delete_entry(i);
379            }
380
381            // set parent id for right child to the middle entry
382            Self::set_parent(
383                &middle_entry.get_right_child(),
384                &sibling.get_page_id(),
385            );
386
387            key = middle_entry.get_key();
388            new_entry =
389                Entry::new(key, &page.get_page_id(), &sibling.get_page_id());
390        }
391        // borrow of sibling_rc end here
392        // borrow of page_rc end here
393
394        let parent_rc = self.get_parent_with_empty_slots(parent_pid, field);
395
396        // borrow of parent_rc start here
397        // borrow of page_rc start here
398        // borrow of sibling_rc start here
399        {
400            let mut parent = parent_rc.borrow_mut();
401            info!("entry: {}", new_entry);
402            parent.insert_entry(&mut new_entry);
403
404            let mut page = page_rc.borrow_mut();
405            let mut sibling = sibling_rc.borrow_mut();
406            page.set_parent_pid(&parent.get_page_id());
407            sibling.set_parent_pid(&parent.get_page_id());
408        }
409        // borrow of parent_rc end here
410        // borrow of page_rc end here
411        // borrow of sibling_rc end here
412
413        info!("split end");
414        self.draw_tree(-1);
415
416        if field.value > key {
417            sibling_rc
418        } else {
419            page_rc
420        }
421    }
422
423    pub fn set_root_pid(&self, root_pid: &BTreePageID) {
424        let root_pointer_pid =
425            BTreePageID::new(PageCategory::RootPointer, self.table_id, 0);
426        let root_pointer_rc = BufferPool::global()
427            .get_root_pointer_page(&root_pointer_pid)
428            .unwrap();
429        root_pointer_rc.borrow_mut().set_root_pid(root_pid);
430    }
431
432    fn set_parent(child_pid: &BTreePageID, parent_pid: &BTreePageID) {
433        match child_pid.category {
434            PageCategory::RootPointer => todo!(),
435            PageCategory::Internal => {
436                let left_rc =
437                    BufferPool::global().get_internal_page(&child_pid).unwrap();
438
439                // borrow of left_rc start here
440                {
441                    let mut left = left_rc.borrow_mut();
442                    left.set_parent_pid(&parent_pid);
443                }
444                // borrow of left_rc end here
445            }
446            PageCategory::Leaf => {
447                let child_rc =
448                    BufferPool::global().get_leaf_page(&child_pid).unwrap();
449
450                // borrow of left_rc start here
451                {
452                    let mut child = child_rc.borrow_mut();
453                    child.set_parent_pid(&parent_pid);
454                }
455                // borrow of left_rc end here
456            }
457            PageCategory::Header => todo!(),
458        }
459    }
460
461    /**
462    Recursive function which finds and locks the leaf page in
463    the B+ tree corresponding to the left-most page possibly
464    containing the key field f. It locks all internal nodes
465    along the path to the leaf node with READ_ONLY permission,
466    and locks the leaf node with permission perm.
467
468    If f is null, it finds the left-most leaf page -- used
469    for the iterator
470
471    # Arguments
472    * tid  - the transaction id
473    * pid  - the current page being searched
474    * perm - the permissions with which to lock the leaf page
475    * f    - the field to search for
476
477    # Return
478    * the left-most leaf page possibly containing the key field f
479    */
480    pub fn find_leaf_page(
481        &self,
482        page_id: BTreePageID,
483        field: Option<IntField>,
484    ) -> Rc<RefCell<BTreeLeafPage>> {
485        match page_id.category {
486            PageCategory::Leaf => {
487                // get page and return directly
488                return BufferPool::global().get_leaf_page(&page_id).unwrap();
489            }
490            PageCategory::Internal => {
491                let page_rc =
492                    BufferPool::global().get_internal_page(&page_id).unwrap();
493                let mut child_pid: Option<BTreePageID> = None;
494
495                // borrow of page_rc start here
496                {
497                    let page = page_rc.borrow();
498                    let it = BTreeInternalPageIterator::new(&page);
499                    let mut entry: Option<Entry> = None;
500                    let mut found = false;
501                    for e in it {
502                        match field {
503                            Some(f) => {
504                                if e.get_key() >= f.value {
505                                    child_pid = Some(e.get_left_child());
506                                    found = true;
507                                    break;
508                                }
509                            }
510                            None => {
511                                child_pid = Some(e.get_left_child());
512                                found = true;
513                                break;
514                            }
515                        }
516                        entry = Some(e);
517                    }
518
519                    if !found {
520                        // return right of last entry
521                        match entry {
522                            Some(e) => {
523                                child_pid = Some(e.get_right_child());
524                            }
525                            None => todo!(),
526                        }
527                    }
528                }
529                // borrow of page_rc end here
530
531                match child_pid {
532                    Some(child_pid) => {
533                        return self.find_leaf_page(child_pid, field);
534                    }
535                    None => todo!(),
536                }
537            }
538            _ => {
539                todo!()
540            }
541        }
542    }
543
544    pub fn get_file(&self) -> RefMut<File> {
545        self.file.borrow_mut()
546    }
547
548    /**
549    init file in necessary
550    */
551    fn file_init(mut file: RefMut<File>) {
552        // if db file is empty, create root pointer page at first
553        if file.metadata().unwrap().len() == 0 {
554            // write root pointer page
555            {
556                // set the root pid to 1
557                let mut data = empty_page_data();
558                let root_pid_bytes = 1_i32.to_le_bytes();
559                for i in 0..4 {
560                    data[i] = root_pid_bytes[i];
561                }
562                file.write(&data).unwrap();
563            }
564
565            // write the first leaf page
566            {
567                let data = BTreeBasePage::empty_page_data();
568                file.write(&data).unwrap();
569            }
570        }
571    }
572
573    fn get_empty_leaf_page(&self) -> Rc<RefCell<BTreeLeafPage>> {
574        // create the new page
575        let page_index = self.get_empty_page_index();
576        let page_id =
577            BTreePageID::new(PageCategory::Leaf, self.table_id, page_index);
578        let page = BTreeLeafPage::new(
579            &page_id,
580            BTreeBasePage::empty_page_data().to_vec(),
581            &self.tuple_scheme,
582            self.key_field,
583        );
584
585        self.write_page_to_disk(&page_id);
586
587        let page_rc = Rc::new(RefCell::new(page));
588
589        BufferPool::global()
590            .leaf_buffer
591            .insert(page_id, page_rc.clone());
592
593        page_rc
594    }
595
596    fn get_empty_interanl_page(&self) -> Rc<RefCell<BTreeInternalPage>> {
597        // create the new page
598        let page_index = self.get_empty_page_index();
599        let page_id =
600            BTreePageID::new(PageCategory::Internal, self.table_id, page_index);
601        let page = BTreeInternalPage::new(
602            &page_id,
603            BTreeBasePage::empty_page_data().to_vec(),
604            &self.tuple_scheme,
605            self.key_field,
606        );
607
608        self.write_page_to_disk(&page_id);
609
610        let page_rc = Rc::new(RefCell::new(page));
611
612        BufferPool::global()
613            .internal_buffer
614            .insert(page_id, page_rc.clone());
615
616        page_rc
617    }
618
619    pub fn write_page_to_disk(&self, page_id: &BTreePageID) {
620        info!("crate new page and write it to disk, pid: {}", page_id);
621        let start_pos: usize = page_id.page_index * BufferPool::get_page_size();
622        self.get_file()
623            .seek(SeekFrom::Start(start_pos as u64))
624            .expect("io error");
625        self.get_file()
626            .write(&BTreeBasePage::empty_page_data())
627            .expect("io error");
628        self.get_file().flush().expect("io error");
629    }
630
631    fn get_first_page(&self) -> Rc<RefCell<BTreeLeafPage>> {
632        let page_id = self.get_root_pid();
633        return self.find_leaf_page(page_id, None);
634    }
635
636    /**
637    Get the root page pid.
638    */
639    pub fn get_root_pid(&self) -> BTreePageID {
640        // get root pointer page
641        let root_pointer_pid = BTreePageID {
642            category: PageCategory::RootPointer,
643            page_index: 0,
644            table_id: self.table_id,
645        };
646        let page_ref = BufferPool::global()
647            .get_root_pointer_page(&root_pointer_pid)
648            .expect("io error");
649        let page = page_ref.borrow();
650        let mut root_pid = page.get_root_pid();
651        root_pid.table_id = self.get_id();
652        root_pid
653    }
654
655    /**
656    The count of pages in this BTreeFile
657
658    (BTreeRootPointerPage is not included)
659    */
660    pub fn pages_count(&self) -> usize {
661        let file_size = self.file.borrow().metadata().unwrap().len() as usize;
662        file_size / BufferPool::get_page_size() - 1
663    }
664
665    // get the first tuple under the internal/leaf page
666    pub fn get_first_tuple(&self, pid: &BTreePageID) -> Option<Tuple> {
667        todo!()
668    }
669
670    pub fn set_page_index(&self, i: usize) {
671        self.page_index.set(i);
672    }
673
674    // get the last tuple under the internal/leaf page
675    pub fn get_last_tuple(&self, pid: &BTreePageID) -> Option<Tuple> {
676        match pid.category {
677            PageCategory::RootPointer => todo!(),
678            PageCategory::Internal => {
679                let page_rc =
680                    BufferPool::global().get_internal_page(pid).unwrap();
681
682                // borrow of page_rc start here
683                let child_pid: BTreePageID;
684                {
685                    let page = page_rc.borrow();
686                    let mut it = BTreeInternalPageReverseIterator::new(&page);
687                    child_pid = it.next().unwrap().get_right_child();
688                }
689                // borrow of page_rc end here
690                self.get_last_tuple(&child_pid)
691            }
692            PageCategory::Leaf => {
693                let page_rc = BufferPool::global().get_leaf_page(pid).unwrap();
694
695                let page = page_rc.borrow();
696                let mut it = BTreeLeafPageReverseIterator::new(&page);
697                it.next()
698            }
699            PageCategory::Header => todo!(),
700        }
701    }
702
703    /**
704    used for debug
705
706    # Arguments
707    * `max_level` - the max level of the print, -1 for print all
708    */
709    pub fn draw_tree(&self, max_level: i32) {
710        println!("\n----- PRINT TREE STRUCTURE START -----\n");
711
712        // get root pointer page
713        let root_pointer_pid = BTreePageID {
714            category: PageCategory::RootPointer,
715            page_index: 0,
716            table_id: self.table_id,
717        };
718        println!("root pointer: {}", root_pointer_pid);
719
720        let root_pid = self.get_root_pid();
721        self.draw_subtree(&root_pid, 0, max_level);
722
723        println!("\n----- PRINT TREE STRUCTURE END   -----\n");
724    }
725
726    fn draw_subtree(&self, pid: &BTreePageID, level: usize, max_level: i32) {
727        match pid.category {
728            PageCategory::RootPointer => todo!(),
729            PageCategory::Internal => {
730                self.draw_internal_node(pid, level, max_level)
731            }
732            PageCategory::Leaf => self.draw_leaf_node(pid, level),
733            PageCategory::Header => todo!(),
734        }
735    }
736
737    fn draw_leaf_node(&self, pid: &BTreePageID, level: usize) {
738        let prefix = "│   ".repeat(level);
739        let page_rc = BufferPool::global().get_leaf_page(&pid).unwrap();
740
741        let mut it = BTreeLeafPageIterator::new(Rc::clone(&page_rc));
742        let first_tuple = it.next().unwrap();
743
744        let page = page_rc.borrow();
745        let mut rit = BTreeLeafPageReverseIterator::new(&page);
746        let last_tuple = rit.next().unwrap();
747
748        println!(
749            "{}├── leaf: {} ({} tuples)",
750            prefix,
751            page.get_pid(),
752            page.tuples_count()
753        );
754        println!("{}├── first: {}", prefix, first_tuple);
755        println!("{}└── last:  {}", prefix, last_tuple);
756    }
757
758    fn draw_internal_node(
759        &self,
760        pid: &BTreePageID,
761        level: usize,
762        max_level: i32,
763    ) {
764        let prefix = "│   ".repeat(level);
765        let page_rc = BufferPool::global().get_internal_page(&pid).unwrap();
766
767        // borrow of page_rc start here
768        {
769            let page = page_rc.borrow();
770            println!(
771                "{}├── internal: {} ({} entries)",
772                prefix,
773                pid,
774                page.entries_count()
775            );
776            if max_level != -1 && level as i32 == max_level {
777                return;
778            }
779            let it = BTreeInternalPageIterator::new(&page);
780            for (i, entry) in it.enumerate() {
781                self.draw_entry(i, &entry, level + 1, max_level);
782            }
783        }
784        // borrow of page_rc end here
785    }
786
787    fn draw_entry(
788        &self,
789        id: usize,
790        entry: &Entry,
791        level: usize,
792        max_level: i32,
793    ) {
794        let prefix = "│   ".repeat(level);
795        if id == 0 {
796            self.draw_subtree(&entry.get_left_child(), level + 1, max_level);
797        }
798        println!("{}├── key: {}", prefix, entry.get_key());
799        self.draw_subtree(&entry.get_right_child(), level + 1, max_level);
800    }
801}
802
803pub struct BTreeTableIterator {
804    page_rc: Rc<RefCell<BTreeLeafPage>>,
805    page_it: BTreeLeafPageIterator,
806}
807
808impl BTreeTableIterator {
809    pub fn new(table: &BTreeTable) -> Self {
810        let page_rc = table.get_first_page();
811
812        Self {
813            page_rc: Rc::clone(&page_rc),
814            page_it: BTreeLeafPageIterator::new(Rc::clone(&page_rc)),
815        }
816    }
817}
818
819impl Iterator for BTreeTableIterator {
820    type Item = Tuple;
821
822    fn next(&mut self) -> Option<Self::Item> {
823        let v = self.page_it.next();
824        if !v.is_none() {
825            return v;
826        }
827
828        let right = (*self.page_rc).borrow().get_right_sibling_pid();
829        match right {
830            Some(right) => {
831                let sibling_rc =
832                    BufferPool::global().get_leaf_page(&right).unwrap();
833                let page_it =
834                    BTreeLeafPageIterator::new(Rc::clone(&sibling_rc));
835
836                self.page_rc = Rc::clone(&sibling_rc);
837                self.page_it = page_it;
838                return self.page_it.next();
839            }
840            None => {
841                return None;
842            }
843        }
844    }
845}
846
847pub enum Op {
848    Equals,
849    GreaterThan,
850    GreaterThanOrEq,
851    LessThan,
852    LessThanOrEq,
853    Like,
854    NotEquals,
855}
856
857pub struct Predicate {
858    pub op: Op,
859    pub field: IntField,
860}
861
862impl Predicate {
863    pub fn new(op: Op, field: IntField) -> Self {
864        Self { op, field }
865    }
866}
867
868pub struct BTreeTableSearchIterator {
869    current_page_rc: Rc<RefCell<BTreeLeafPage>>,
870    page_it: BTreeLeafPageIterator,
871    predicate: Predicate,
872    key_field: usize,
873}
874
875impl BTreeTableSearchIterator {
876    pub fn new(table: &BTreeTable, index_predicate: Predicate) -> Self {
877        let rc: Rc<RefCell<BTreeLeafPage>>;
878
879        let root_pid = table.get_root_pid();
880
881        match index_predicate.op {
882            Op::Equals | Op::GreaterThan | Op::GreaterThanOrEq => {
883                rc = table.find_leaf_page(root_pid, Some(index_predicate.field))
884            }
885            Op::LessThan | Op::LessThanOrEq => {
886                rc = table.find_leaf_page(root_pid, None)
887            }
888            Op::Like => todo!(),
889            Op::NotEquals => todo!(),
890        }
891
892        Self {
893            current_page_rc: Rc::clone(&rc),
894            page_it: BTreeLeafPageIterator::new(Rc::clone(&rc)),
895            predicate: index_predicate,
896            key_field: table.key_field,
897        }
898    }
899}
900
901impl Iterator for BTreeTableSearchIterator {
902    type Item = Tuple;
903
904    // TODO: Short circuit on some conditions.
905    fn next(&mut self) -> Option<Self::Item> {
906        loop {
907            let tuple = self.page_it.next();
908            match tuple {
909                Some(t) => match self.predicate.op {
910                    Op::Equals => {
911                        let field = t.get_field(self.key_field);
912                        if field == self.predicate.field {
913                            return Some(t);
914                        } else if field > self.predicate.field {
915                            return None;
916                        }
917                    }
918                    Op::GreaterThan => {
919                        let field = t.get_field(self.key_field);
920                        if field > self.predicate.field {
921                            return Some(t);
922                        }
923                    }
924                    Op::GreaterThanOrEq => {
925                        let field = t.get_field(self.key_field);
926                        if field >= self.predicate.field {
927                            return Some(t);
928                        }
929                    }
930                    Op::LessThan => {
931                        let field = t.get_field(self.key_field);
932                        if field < self.predicate.field {
933                            return Some(t);
934                        } else if field >= self.predicate.field {
935                            return None;
936                        }
937                    }
938                    Op::LessThanOrEq => {
939                        let field = t.get_field(self.key_field);
940                        if field <= self.predicate.field {
941                            return Some(t);
942                        } else if field > self.predicate.field {
943                            return None;
944                        }
945                    }
946                    Op::Like => todo!(),
947                    Op::NotEquals => todo!(),
948                },
949                None => {
950                    // init iterator on next page and continue search
951                    let right = (*self.current_page_rc)
952                        .borrow()
953                        .get_right_sibling_pid();
954                    match right {
955                        Some(pid) => {
956                            let rc = BufferPool::global()
957                                .get_leaf_page(&pid)
958                                .unwrap();
959                            self.current_page_rc = Rc::clone(&rc);
960                            self.page_it =
961                                BTreeLeafPageIterator::new(Rc::clone(&rc));
962                            continue;
963                        }
964                        None => {
965                            return None;
966                        }
967                    }
968                }
969            }
970        }
971    }
972}