small_db/btree/table/
table.rs

1use core::fmt;
2use std::{
3    collections::hash_map::DefaultHasher,
4    fs::{File, OpenOptions},
5    hash::{Hash, Hasher},
6    io::{Seek, SeekFrom, Write},
7    ops::DerefMut,
8    str,
9    sync::{
10        atomic::{AtomicU32, Ordering},
11        Arc, Mutex, MutexGuard, RwLock,
12    },
13    time::SystemTime,
14    usize,
15};
16
17use log::debug;
18
19use crate::{
20    btree::{
21        page::{
22            BTreeBasePage, BTreeHeaderPage, BTreeInternalPage,
23            BTreeInternalPageIterator, BTreeLeafPage,
24            BTreeLeafPageIterator, BTreeLeafPageIteratorRc,
25            BTreePage, BTreePageID, BTreeRootPointerPage, Entry,
26            PageCategory,
27        },
28        page_cache::PageCache,
29        tuple::{Schema, Tuple, WrappedTuple},
30    },
31    concurrent_status::Permission,
32    field::IntField,
33    transaction::Transaction,
34    types::{ResultPod, SmallResult},
35    utils::{lock_state, HandyRwLock},
36    Op, Predicate, Unique,
37};
38
39enum SearchFor {
40    IntField(IntField),
41    LeftMost,
42    RightMost,
43}
44
45/// B+ Tree
46pub struct BTreeTable {
47    // the file that stores the on-disk backing store for this B+
48    // tree file.
49    file_path: String,
50
51    // the field which index is keyed on
52    pub key_field: usize,
53
54    // the tuple descriptor of tuples in the file
55    pub tuple_scheme: Schema,
56
57    file: Mutex<File>,
58
59    table_id: u32,
60
61    /// the page index of the last page in the file
62    ///
63    /// The page index start from 0 and increase monotonically by 1,
64    /// the page index of "root pointer" page is always 0.
65    page_index: AtomicU32,
66}
67
68#[derive(Copy, Clone)]
69pub enum WriteScene {
70    Random,
71    Sequential,
72}
73
74impl fmt::Display for BTreeTable {
75    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
76        write!(
77            f,
78            "<BTreeFile, file: {}, id: {}>",
79            self.file_path, self.table_id
80        )
81    }
82}
83
84// init functions
85impl BTreeTable {
86    pub fn new(
87        file_path: &str,
88        key_field: usize,
89        row_scheme: &Schema,
90    ) -> Self {
91        File::create(file_path).expect("io error");
92
93        let f = Mutex::new(
94            OpenOptions::new()
95                .write(true)
96                .read(true)
97                .open(file_path)
98                .unwrap(),
99        );
100
101        // let file_size = f.rl().metadata().unwrap().len() as usize;
102        // debug!("btree initialized, file size: {}", file_size);
103
104        let mut hasher = DefaultHasher::new();
105        file_path.hash(&mut hasher);
106        let unix_time = SystemTime::now();
107        unix_time.hash(&mut hasher);
108
109        let table_id = hasher.finish() as u32;
110
111        Self::file_init(f.lock().unwrap(), table_id);
112
113        Self {
114            file_path: file_path.to_string(),
115            key_field,
116            tuple_scheme: row_scheme.clone(),
117            file: f,
118            table_id,
119
120            // start from 1 (the root page)
121            //
122            // TODO: init it according to actual condition
123            page_index: AtomicU32::new(1),
124        }
125    }
126}
127
128// normal read-only functions
129impl BTreeTable {
130    pub fn get_id(&self) -> u32 {
131        self.table_id
132    }
133
134    pub fn get_tuple_scheme(&self) -> Schema {
135        self.tuple_scheme.clone()
136    }
137
138    /// Calculate the number of tuples in the table. Require S_LOCK on
139    /// all pages.
140    pub fn tuples_count(&self) -> usize {
141        let tx = Transaction::new();
142        let count = BTreeTableIterator::new(&tx, self).count();
143        tx.commit().unwrap();
144        count
145    }
146
147    pub fn get_random_tuple(&self, _tx: &Transaction) -> Tuple {
148        unimplemented!()
149    }
150}
151
152// insert-related functions
153impl BTreeTable {
154    /// Insert a tuple into this BTreeFile, keeping the tuples in
155    /// sorted order. May cause pages to split if the page where
156    /// tuple belongs is full.
157    pub fn insert_tuple(
158        &self,
159        tx: &Transaction,
160        // buffer_pool: &mut BufferPool,
161        tuple: &Tuple,
162    ) -> SmallResult {
163        // a read lock on the root pointer page and
164        // use it to locate the root page
165        let root_pid = self.get_root_pid(tx);
166
167        // find and lock the left-most leaf page corresponding to
168        // the key field, and split the leaf page if there are no
169        // more slots available
170        let field = tuple.get_field(self.key_field);
171        let mut leaf_rc = self.find_leaf_page(
172            tx,
173            Permission::ReadWrite,
174            root_pid,
175            SearchFor::IntField(field),
176        );
177
178        if leaf_rc.rl().empty_slots_count() == 0 {
179            leaf_rc = self.split_leaf_page(
180                tx,
181                leaf_rc,
182                tuple.get_field(self.key_field),
183            )?;
184        }
185        leaf_rc.wl().insert_tuple(&tuple);
186        return Ok(());
187    }
188
189    /// Split a leaf page to make room for new tuples and
190    /// recursively split the parent node as needed to
191    /// accommodate a new entry. The new entry should have
192    /// a key matching the key field of the first tuple in
193    /// the right-hand page (the key is "copied up"), and
194    /// child pointers pointing to the two leaf pages
195    /// resulting from the split.  Update sibling pointers
196    /// and parent pointers as needed.
197    ///
198    /// Return the leaf page into which a new tuple with
199    /// key field "field" should be inserted.
200    ///
201    /// # Arguments
202    /// `field`: the key field of the tuple to be inserted after the
203    /// split is complete. Necessary to know which of the two
204    /// pages to return.
205    pub fn split_leaf_page(
206        &self,
207        tx: &Transaction,
208        page_rc: Arc<RwLock<BTreeLeafPage>>,
209        field: IntField,
210    ) -> ResultPod<BTreeLeafPage> {
211        let new_sibling_rc = self.get_empty_leaf_page(tx);
212        let parent_pid: BTreePageID;
213        let key: IntField;
214
215        // borrow of new_sibling_rc start here
216        // borrow of page_rc start here
217        {
218            let mut new_sibling = new_sibling_rc.wl();
219            let mut page = page_rc.wl();
220            // 1. adding a new page on the right of the existing
221            // page and moving half of the tuples to the new page
222            let tuple_count = page.tuples_count();
223            let move_tuple_count = tuple_count / 2;
224
225            let mut it = BTreeLeafPageIterator::new(&page);
226            let mut delete_indexes: Vec<usize> = Vec::new();
227            for tuple in it.by_ref().rev().take(move_tuple_count) {
228                delete_indexes.push(tuple.get_slot_number());
229                new_sibling.insert_tuple(&tuple);
230            }
231
232            for i in delete_indexes {
233                page.delete_tuple(i);
234            }
235
236            let mut it = BTreeLeafPageIterator::new(&page);
237            key = it.next_back().unwrap().get_field(self.key_field);
238
239            // get parent pid for use later
240            parent_pid = page.get_parent_pid();
241        }
242        // borrow of new_sibling_rc end here
243        // borrow of page_rc end here
244
245        // 2. Copy the middle key up into the parent page, and
246        // recursively split the parent as needed to accommodate
247        // the new entry.
248        //
249        // We put this method outside all the borrow blocks since
250        // once the parent page is split, a lot of children will
251        // been borrowed. (may including the current leaf page)
252        let parent_rc =
253            self.get_parent_with_empty_slots(tx, parent_pid, field);
254
255        // borrow of parent_rc start here
256        // borrow of page_rc start here
257        // borrow of new_sibling_rc start here
258        {
259            let mut parent = parent_rc.wl();
260            let mut page = page_rc.wl();
261            let mut new_sibling = new_sibling_rc.wl();
262            let mut entry = Entry::new(
263                key,
264                &page.get_pid(),
265                &new_sibling.get_pid(),
266            );
267
268            debug!(
269                "split start, page: {}, lock status: {}, new_sibling: {}, lock status: {}, parent: {}, lock status: {}",
270                page.get_pid(),
271                lock_state(page_rc.clone()),
272                new_sibling.get_pid(),
273                lock_state(new_sibling_rc.clone()),
274                parent.get_pid(),
275                lock_state(parent_rc.clone()),
276            );
277
278            parent.insert_entry(&mut entry)?;
279
280            // set left pointer for the old right sibling
281            if let Some(old_right_pid) = page.get_right_pid() {
282                let old_right_rc = Unique::mut_page_cache()
283                    .get_leaf_page(
284                        tx,
285                        Permission::ReadWrite,
286                        &old_right_pid,
287                    )
288                    .unwrap();
289                old_right_rc
290                    .wl()
291                    .set_left_pid(Some(new_sibling.get_pid()));
292            }
293
294            // set sibling id
295            new_sibling.set_right_pid(page.get_right_pid());
296            new_sibling.set_left_pid(Some(page.get_pid()));
297            page.set_right_pid(Some(new_sibling.get_pid()));
298
299            // set parent id
300            page.set_parent_pid(&parent.get_pid());
301            new_sibling.set_parent_pid(&parent.get_pid());
302        }
303        // borrow of parent_rc end here
304        // borrow of page_rc end here
305        // borrow of new_sibling_rc end here
306
307        if field > key {
308            Ok(new_sibling_rc)
309        } else {
310            Ok(page_rc)
311        }
312    }
313
314    pub fn get_empty_page_index(&self, tx: &Transaction) -> u32 {
315        let root_ptr_rc = self.get_root_ptr_page(tx);
316        // borrow of root_ptr_rc start here
317        {
318            let root_ptr = root_ptr_rc.rl();
319            let header_pid = root_ptr.get_header_pid();
320            if let Some(header_pid) = header_pid {
321                let header_rc = Unique::mut_page_cache()
322                    .get_header_page(
323                        tx,
324                        Permission::ReadOnly,
325                        &header_pid,
326                    )
327                    .unwrap();
328                // borrow of header_rc start here
329                {
330                    let header = header_rc.rl();
331                    if let Some(i) = header.get_empty_slot() {
332                        return i;
333                    }
334                }
335            }
336        }
337        // borrow of root_ptr_rc end here
338
339        let index =
340            self.page_index.fetch_add(1, Ordering::Relaxed) + 1;
341        index
342    }
343
344    /// Method to encapsulate the process of getting a parent page
345    /// ready to accept new entries.
346    ///
347    /// This may mean creating a page to become the new root of
348    /// the tree, splitting the existing parent page if there are
349    /// no empty slots, or simply locking and returning the existing
350    /// parent page.
351    ///
352    /// # Arguments
353    /// `field`: the key field of the tuple to be inserted after the
354    /// split is complete. Necessary to know which of the two
355    /// pages to return. `parentId`: the id of the parent. May be
356    /// an internal page or the RootPtr page
357    fn get_parent_with_empty_slots(
358        &self,
359        tx: &Transaction,
360        parent_id: BTreePageID,
361        field: IntField,
362    ) -> Arc<RwLock<BTreeInternalPage>> {
363        // create a parent node if necessary
364        // this will be the new root of the tree
365        match parent_id.category {
366            PageCategory::RootPointer => {
367                let new_parent_rc = self.get_empty_interanl_page(tx);
368
369                // update the root pointer
370                self.set_root_pid(tx, &new_parent_rc.wl().get_pid());
371
372                new_parent_rc
373            }
374            PageCategory::Internal => {
375                let parent_rc = Unique::mut_page_cache()
376                    .get_internal_page(
377                        tx,
378                        Permission::ReadWrite,
379                        &parent_id,
380                    )
381                    .unwrap();
382                let empty_slots_count: usize;
383
384                // borrow of parent_rc start here
385                {
386                    empty_slots_count =
387                        parent_rc.rl().empty_slots_count();
388                }
389                // borrow of parent_rc end here
390
391                if empty_slots_count > 0 {
392                    return parent_rc;
393                } else {
394                    // split upper parent
395                    return self
396                        .split_internal_page(tx, parent_rc, field);
397                }
398            }
399            _ => {
400                todo!()
401            }
402        }
403    }
404
405    /// Split an internal page to make room for new entries and
406    /// recursively split its parent page as needed to accommodate
407    /// a new entry. The new entry for the parent should have a
408    /// key matching the middle key in the original internal page
409    /// being split (this key is "pushed up" to the parent).
410    ///
411    /// Make a right sibling page and move half of entries to it.
412    ///
413    /// The child pointers of the new parent entry should point to the
414    /// two internal pages resulting from the split. Update parent
415    /// pointers as needed.
416    ///
417    /// Return the internal page into which an entry with key field
418    /// "field" should be inserted
419    ///
420    /// # Arguments
421    /// `field`: the key field of the tuple to be inserted after the
422    /// split is complete. Necessary to know which of the two
423    /// pages to return.
424    fn split_internal_page(
425        &self,
426        tx: &Transaction,
427        page_rc: Arc<RwLock<BTreeInternalPage>>,
428        field: IntField,
429    ) -> Arc<RwLock<BTreeInternalPage>> {
430        let sibling_rc = self.get_empty_interanl_page(tx);
431        let key: IntField;
432        let mut parent_pid: BTreePageID;
433        let mut new_entry: Entry;
434
435        // borrow of sibling_rc start here
436        // borrow of page_rc start here
437        {
438            let mut sibling = sibling_rc.wl();
439            let mut page = page_rc.wl();
440
441            parent_pid = page.get_parent_pid();
442
443            if parent_pid.category == PageCategory::RootPointer {
444                // create new parent page if the parent page is root
445                // pointer page.
446                let parent_rc = self.get_empty_interanl_page(tx);
447                parent_pid = parent_rc.rl().get_pid();
448
449                // update the root pointer
450                self.set_root_pid(tx, &parent_pid);
451            }
452
453            let enties_count = page.entries_count();
454            let move_entries_count = enties_count / 2;
455
456            let mut delete_indexes: Vec<usize> = Vec::new();
457            let mut it = BTreeInternalPageIterator::new(&page);
458            for e in it.by_ref().rev().take(move_entries_count) {
459                delete_indexes.push(e.get_record_id());
460                sibling.insert_entry(&e).unwrap();
461
462                // set parent id for right child
463                let right_pid = e.get_right_child();
464                Self::set_parent(tx, &right_pid, &sibling.get_pid());
465            }
466
467            let middle_entry = it.next_back().unwrap();
468
469            // also delete the middle entry
470            delete_indexes.push(middle_entry.get_record_id());
471            for i in delete_indexes {
472                page.delete_key_and_right_child(i);
473            }
474
475            // set parent id for right child to the middle entry
476            Self::set_parent(
477                tx,
478                &middle_entry.get_right_child(),
479                &sibling.get_pid(),
480            );
481
482            key = middle_entry.get_key();
483            new_entry =
484                Entry::new(key, &page.get_pid(), &sibling.get_pid());
485        }
486        // borrow of sibling_rc end here
487        // borrow of page_rc end here
488
489        let parent_rc =
490            self.get_parent_with_empty_slots(tx, parent_pid, field);
491        parent_pid = parent_rc.rl().get_pid();
492        page_rc.wl().set_parent_pid(&parent_pid);
493        sibling_rc.wl().set_parent_pid(&parent_pid);
494
495        // borrow of parent_rc start here
496        {
497            let mut parent = parent_rc.wl();
498            parent.insert_entry(&mut new_entry).unwrap();
499        }
500        // borrow of parent_rc end here
501
502        if field > key {
503            sibling_rc
504        } else {
505            page_rc
506        }
507    }
508}
509
510impl BTreeTable {
511    /// Method to encapsulate the process of locking/fetching a page.
512    /// First the method checks the local cache ("dirtypages"),
513    /// and if it can't find the requested page there, it fetches
514    /// it from the buffer pool. It also adds pages to the
515    /// dirtypages cache if they are fetched with read-write
516    /// permission, since presumably they will soon be dirtied by
517    /// this transaction.
518    ///
519    /// This method is needed to ensure that page updates are not lost
520    /// if the same pages are accessed multiple times.
521    ///
522    /// reference:
523    /// - https://sourcegraph.com/github.com/XiaochenCui/small-db-hw@87607789b677d6afee00a223eacb4f441bd4ae87/-/blob/src/java/smalldb/BTreeFile.java?L551&subtree=true
524    pub fn get_page(&self) {}
525}
526
527impl BTreeTable {
528    pub fn set_root_pid(
529        &self,
530        tx: &Transaction,
531        root_pid: &BTreePageID,
532    ) {
533        let root_pointer_rc = self.get_root_ptr_page(tx);
534        root_pointer_rc.wl().set_root_pid(root_pid);
535    }
536
537    fn set_parent(
538        tx: &Transaction,
539        child_pid: &BTreePageID,
540        parent_pid: &BTreePageID,
541    ) {
542        match child_pid.category {
543            PageCategory::RootPointer => todo!(),
544            PageCategory::Internal => {
545                let left_rc = Unique::mut_page_cache()
546                    .get_internal_page(
547                        tx,
548                        Permission::ReadWrite,
549                        &child_pid,
550                    )
551                    .unwrap();
552
553                // borrow of left_rc start here
554                {
555                    let mut left = left_rc.wl();
556                    left.set_parent_pid(&parent_pid);
557                }
558                // borrow of left_rc end here
559            }
560            PageCategory::Leaf => {
561                let child_rc = Unique::mut_page_cache()
562                    .get_leaf_page(
563                        tx,
564                        Permission::ReadWrite,
565                        &child_pid,
566                    )
567                    .unwrap();
568
569                // borrow of left_rc start here
570                {
571                    let mut child = child_rc.wl();
572                    child.set_parent_pid(&parent_pid);
573                }
574                // borrow of left_rc end here
575            }
576            PageCategory::Header => todo!(),
577        }
578    }
579
580    /// Recursive function which finds and locks the leaf page in
581    /// the B+ tree corresponding to the left-most page possibly
582    /// containing the key field f. It locks all internal nodes
583    /// along the path to the leaf node with READ_ONLY permission,
584    /// and locks the leaf node with permission perm.
585    ///
586    /// # Arguments
587    ///
588    /// tid  - the transaction id
589    /// pid  - the current page being searched
590    /// perm - the permissions with which to lock the leaf page
591    /// f    - the field to search for
592    ///
593    /// # Return
594    ///
595    /// the left-most leaf page possibly containing the key field f
596    fn find_leaf_page(
597        &self,
598        tx: &Transaction,
599        perm: Permission,
600        page_id: BTreePageID,
601        search: SearchFor,
602    ) -> Arc<RwLock<BTreeLeafPage>> {
603        match page_id.category {
604            PageCategory::Leaf => {
605                // get page and return directly
606                return Unique::mut_page_cache()
607                    .get_leaf_page(tx, perm, &page_id)
608                    .unwrap();
609            }
610            PageCategory::Internal => {
611                let page_rc = Unique::mut_page_cache()
612                    .get_internal_page(
613                        tx,
614                        Permission::ReadWrite,
615                        &page_id,
616                    )
617                    .unwrap();
618                let mut child_pid: Option<BTreePageID> = None;
619
620                // borrow of page_rc start here
621                {
622                    let page = page_rc.rl();
623                    let it = BTreeInternalPageIterator::new(&page);
624                    let mut entry: Option<Entry> = None;
625                    let mut found = false;
626                    for e in it {
627                        match search {
628                            SearchFor::IntField(field) => {
629                                if e.get_key() >= field {
630                                    child_pid =
631                                        Some(e.get_left_child());
632                                    found = true;
633                                    break;
634                                }
635                            }
636                            SearchFor::LeftMost => {
637                                child_pid = Some(e.get_left_child());
638                                found = true;
639                                break;
640                            }
641                            SearchFor::RightMost => {
642                                child_pid = Some(e.get_right_child());
643                                found = true;
644
645                                // dont't break here, we need to find
646                                // the
647                                // rightmost entry
648                            }
649                        }
650                        entry = Some(e);
651                    }
652
653                    if !found {
654                        // if not found, search in right of the last
655                        // entry
656                        match entry {
657                            Some(e) => {
658                                child_pid = Some(e.get_right_child());
659                            }
660                            None => todo!(),
661                        }
662                    }
663                }
664                // borrow of page_rc end here
665
666                // search child page recursively
667                match child_pid {
668                    Some(child_pid) => {
669                        return self.find_leaf_page(
670                            tx, perm, child_pid, search,
671                        );
672                    }
673                    None => todo!(),
674                }
675            }
676            _ => {
677                todo!()
678            }
679        }
680    }
681
682    pub fn get_file(&self) -> MutexGuard<'_, File> {
683        self.file.lock().unwrap()
684    }
685
686    /// init file in necessary
687    fn file_init(
688        mut file: impl DerefMut<Target = File>,
689        table_inex: u32,
690    ) {
691        // if db file is empty, create root pointer page at first
692        if file.metadata().unwrap().len() == 0 {
693            // write root pointer page
694            {
695                let pid = BTreePageID::new(
696                    PageCategory::RootPointer,
697                    table_inex,
698                    0,
699                );
700
701                let page = BTreeRootPointerPage::new_empty_page(&pid);
702                let data = page.get_page_data();
703                file.write(&data).unwrap();
704            }
705
706            // write the first leaf page
707            {
708                let data = BTreeBasePage::empty_page_data();
709                file.write(&data).unwrap();
710            }
711        }
712    }
713
714    fn get_empty_leaf_page(
715        &self,
716        tx: &Transaction,
717    ) -> Arc<RwLock<BTreeLeafPage>> {
718        // create the new page
719        let page_index = self.get_empty_page_index(tx);
720        let page_id = BTreePageID::new(
721            PageCategory::Leaf,
722            self.table_id,
723            page_index,
724        );
725        let page = BTreeLeafPage::new(
726            &page_id,
727            &BTreeBasePage::empty_page_data(),
728            &self.tuple_scheme,
729            self.key_field,
730        );
731
732        self.write_empty_page_to_disk(&page_id);
733
734        let page_rc = Arc::new(RwLock::new(page));
735        // insert to buffer pool because it's a dirty page at this
736        // time
737        Unique::mut_page_cache()
738            .leaf_buffer
739            .insert(page_id, page_rc.clone());
740        page_rc
741    }
742
743    fn get_empty_interanl_page(
744        &self,
745        tx: &Transaction,
746    ) -> Arc<RwLock<BTreeInternalPage>> {
747        // create the new page
748        let page_index = self.get_empty_page_index(tx);
749        let page_id = BTreePageID::new(
750            PageCategory::Internal,
751            self.table_id,
752            page_index,
753        );
754        let page = BTreeInternalPage::new(
755            &page_id,
756            &BTreeBasePage::empty_page_data(),
757            &self.tuple_scheme,
758            self.key_field,
759        );
760
761        self.write_empty_page_to_disk(&page_id);
762
763        let page_rc = Arc::new(RwLock::new(page));
764        // insert to buffer pool because it's a dirty page at this
765        // time
766        Unique::mut_page_cache()
767            .internal_buffer
768            .insert(page_id, page_rc.clone());
769        page_rc
770    }
771
772    pub(super) fn get_empty_header_page(
773        &self,
774        tx: &Transaction,
775    ) -> Arc<RwLock<BTreeHeaderPage>> {
776        // create the new page
777        let page_index = self.get_empty_page_index(tx);
778        let page_id = BTreePageID::new(
779            PageCategory::Header,
780            self.table_id,
781            page_index,
782        );
783        let page = BTreeHeaderPage::new(
784            &page_id,
785            &BTreeBasePage::empty_page_data(),
786        );
787
788        self.write_empty_page_to_disk(&page_id);
789
790        let page_rc = Arc::new(RwLock::new(page));
791        // insert to buffer pool because it's a dirty page at this
792        // time
793        Unique::mut_page_cache()
794            .header_buffer
795            .insert(page_id, page_rc.clone());
796        page_rc
797    }
798
799    pub fn write_empty_page_to_disk(&self, page_id: &BTreePageID) {
800        self.write_page_to_disk(
801            page_id,
802            &BTreeBasePage::empty_page_data(),
803        )
804    }
805
806    pub fn write_page_to_disk(
807        &self,
808        page_id: &BTreePageID,
809        data: &Vec<u8>,
810    ) {
811        let start_pos: usize =
812            page_id.page_index as usize * PageCache::get_page_size();
813        self.get_file()
814            .seek(SeekFrom::Start(start_pos as u64))
815            .expect("io error");
816        self.get_file().write(&data).expect("io error");
817        self.get_file().flush().expect("io error");
818    }
819
820    pub fn get_first_page(
821        &self,
822        tx: &Transaction,
823        perm: Permission,
824    ) -> Arc<RwLock<BTreeLeafPage>> {
825        let page_id = self.get_root_pid(tx);
826        return self.find_leaf_page(
827            tx,
828            perm,
829            page_id,
830            SearchFor::LeftMost,
831        );
832    }
833
834    pub fn get_last_page(
835        &self,
836        tx: &Transaction,
837        perm: Permission,
838    ) -> Arc<RwLock<BTreeLeafPage>> {
839        let page_id = self.get_root_pid(tx);
840        return self.find_leaf_page(
841            tx,
842            perm,
843            page_id,
844            SearchFor::RightMost,
845        );
846    }
847
848    /// Get the root page pid.
849    pub fn get_root_pid(&self, tx: &Transaction) -> BTreePageID {
850        let root_ptr_rc = self.get_root_ptr_page(tx);
851        let mut root_pid = root_ptr_rc.rl().get_root_pid();
852        root_pid.table_id = self.get_id();
853        root_pid
854    }
855
856    pub fn get_root_ptr_page(
857        &self,
858        tx: &Transaction,
859    ) -> Arc<RwLock<BTreeRootPointerPage>> {
860        let root_ptr_pid = BTreePageID {
861            category: PageCategory::RootPointer,
862            page_index: 0,
863            table_id: self.table_id,
864        };
865        Unique::mut_page_cache()
866            .get_root_ptr_page(
867                tx,
868                Permission::ReadWrite,
869                &root_ptr_pid,
870            )
871            .unwrap()
872    }
873
874    /// The count of pages in this BTreeFile
875    ///
876    /// (the ROOT_POINTER page is not included)
877    pub fn pages_count(&self) -> usize {
878        let file_size =
879            self.get_file().metadata().unwrap().len() as usize;
880        debug!(
881            "file size: {}, page size: {}",
882            file_size,
883            PageCache::get_page_size()
884        );
885        file_size / PageCache::get_page_size() - 1
886    }
887
888    // get the first tuple under the internal/leaf page
889    pub fn get_first_tuple(
890        &self,
891        _pid: &BTreePageID,
892    ) -> Option<Tuple> {
893        todo!()
894    }
895
896    pub fn set_page_index(&self, i: u32) {
897        self.page_index.store(i, Ordering::Relaxed);
898    }
899
900    // get the last tuple under the internal/leaf page
901    pub fn get_last_tuple(
902        &self,
903        tx: &Transaction,
904        pid: &BTreePageID,
905    ) -> Option<WrappedTuple> {
906        match pid.category {
907            PageCategory::RootPointer => todo!(),
908            PageCategory::Internal => {
909                let page_rc = Unique::mut_page_cache()
910                    .get_internal_page(tx, Permission::ReadOnly, pid)
911                    .unwrap();
912
913                // borrow of page_rc start here
914                let child_pid: BTreePageID;
915                {
916                    let page = page_rc.rl();
917                    let mut it =
918                        BTreeInternalPageIterator::new(&page);
919                    child_pid =
920                        it.next_back().unwrap().get_right_child();
921                }
922                // borrow of page_rc end here
923                self.get_last_tuple(tx, &child_pid)
924            }
925            PageCategory::Leaf => {
926                let page_rc = Unique::mut_page_cache()
927                    .get_leaf_page(tx, Permission::ReadWrite, pid)
928                    .unwrap();
929
930                let page = page_rc.rl();
931                let mut it = BTreeLeafPageIterator::new(&page);
932                it.next_back()
933            }
934            PageCategory::Header => todo!(),
935        }
936    }
937}
938
939/// debug methods
940impl BTreeTable {
941    /// Print the BTreeFile structure.
942    ///
943    /// # Arguments
944    ///
945    /// - `max_level` - the max level of the print
946    ///     - 0: print the root pointer page
947    ///     - 1: print the root pointer page and the root page
948    ///       (internal or leaf)
949    ///     - ...
950    ///     - -1: print all pages
951    pub fn draw_tree(&self, max_level: i32) {
952        Unique::concurrent_status().clear();
953
954        let tx = Transaction::new();
955        let mut depiction = "".to_string();
956
957        depiction.push_str(
958            "\n\n----- PRINT TREE STRUCTURE START -----\n\n",
959        );
960
961        // get root pointer page
962        let root_pointer_pid = BTreePageID {
963            category: PageCategory::RootPointer,
964            page_index: 0,
965            table_id: self.table_id,
966        };
967        depiction.push_str(&format!(
968            "root pointer: {}\n",
969            root_pointer_pid
970        ));
971
972        let root_pid = self.get_root_pid(&tx);
973        depiction.push_str(
974            &self.draw_subtree(&tx, &root_pid, 0, max_level),
975        );
976
977        depiction.push_str(&format!(
978            "\n\n----- PRINT TREE STRUCTURE END   -----\n\n"
979        ));
980
981        debug!("tree_structure, level {}: {}", max_level, depiction);
982        tx.commit().unwrap();
983    }
984
985    fn draw_subtree(
986        &self,
987        tx: &Transaction,
988        pid: &BTreePageID,
989        level: usize,
990        max_level: i32,
991    ) -> String {
992        match pid.category {
993            PageCategory::Internal => {
994                self.draw_internal_node(tx, pid, level, max_level)
995            }
996            PageCategory::Leaf => self.draw_leaf_node(tx, pid, level),
997            _ => {
998                panic!("invalid page category: {:?}", pid.category);
999            }
1000        }
1001    }
1002
1003    fn draw_leaf_node(
1004        &self,
1005        tx: &Transaction,
1006        pid: &BTreePageID,
1007        level: usize,
1008    ) -> String {
1009        let mut depiction = "".to_string();
1010
1011        let print_sibling = false;
1012
1013        let mut prefix = "│   ".repeat(level);
1014        let page_rc = Unique::mut_page_cache()
1015            .get_leaf_page(tx, Permission::ReadOnly, &pid)
1016            .unwrap();
1017        let lock_state = lock_state(page_rc.clone());
1018
1019        let mut it =
1020            BTreeLeafPageIteratorRc::new(Arc::clone(&page_rc));
1021        let first_tuple = it.next();
1022
1023        let page = page_rc.rl();
1024        let mut it = BTreeLeafPageIterator::new(&page);
1025        let last_tuple = it.next_back();
1026
1027        if print_sibling {
1028            depiction.push_str(&format!(
1029                "{}├── leaf: {} ({} tuples) (left: {:?}, right: {:?}) (lock state: {})\n",
1030                prefix,
1031                page.get_pid(),
1032                page.tuples_count(),
1033                page.get_left_pid(),
1034                page.get_right_pid(),
1035                lock_state,
1036            ));
1037        } else {
1038            depiction.push_str(&format!(
1039                "{}├── leaf: {} ({}/{} tuples) (lock state: {}\n",
1040                prefix,
1041                page.get_pid(),
1042                page.tuples_count(),
1043                page.slot_count,
1044                lock_state,
1045            ));
1046        }
1047
1048        prefix = "│   ".repeat(level + 1);
1049        depiction.push_str(&format!(
1050            "{}├── first tuple: {:?}\n",
1051            prefix, first_tuple
1052        ));
1053        depiction.push_str(&format!(
1054            "{}└── last tuple:  {:?}\n",
1055            prefix, last_tuple
1056        ));
1057
1058        return depiction;
1059    }
1060
1061    fn draw_internal_node(
1062        &self,
1063        tx: &Transaction,
1064        pid: &BTreePageID,
1065        level: usize,
1066        max_level: i32,
1067    ) -> String {
1068        let mut depiction = "".to_string();
1069
1070        let prefix = "│   ".repeat(level);
1071        let page_rc = Unique::mut_page_cache()
1072            .get_internal_page(tx, Permission::ReadWrite, &pid)
1073            .unwrap();
1074        let lock_state = lock_state(page_rc.clone());
1075
1076        // borrow of page_rc start here
1077        {
1078            let page = page_rc.rl();
1079            depiction.push_str(&format!(
1080                "{}├── internal: {} ({}/{} children) (lock state: {})\n",
1081                prefix,
1082                pid,
1083                page.children_count(),
1084                page.get_children_capacity(),
1085                lock_state,
1086            ));
1087            if max_level != -1 && level as i32 == max_level {
1088                return depiction;
1089            }
1090            let it = BTreeInternalPageIterator::new(&page);
1091            for (i, entry) in it.enumerate() {
1092                depiction.push_str(&self.draw_entry(
1093                    tx,
1094                    i,
1095                    &entry,
1096                    level + 1,
1097                    max_level,
1098                ));
1099            }
1100        }
1101        // borrow of page_rc end here
1102
1103        return depiction;
1104    }
1105
1106    fn draw_entry(
1107        &self,
1108        tx: &Transaction,
1109        id: usize,
1110        entry: &Entry,
1111        level: usize,
1112        max_level: i32,
1113    ) -> String {
1114        let mut depiction = "".to_string();
1115
1116        let prefix = "│   ".repeat(level);
1117        if id == 0 {
1118            depiction.push_str(&self.draw_subtree(
1119                tx,
1120                &entry.get_left_child(),
1121                level + 1,
1122                max_level,
1123            ));
1124        }
1125        depiction.push_str(&format!(
1126            "{}├── key: {}\n",
1127            prefix,
1128            entry.get_key()
1129        ));
1130        depiction.push_str(&self.draw_subtree(
1131            tx,
1132            &entry.get_right_child(),
1133            level + 1,
1134            max_level,
1135        ));
1136
1137        return depiction;
1138    }
1139
1140    /// checks the integrity of the tree:
1141    /// - parent pointers.
1142    /// - sibling pointers.
1143    /// - range invariants.
1144    /// - record to page pointers.
1145    /// - occupancy invariants. (if enabled)
1146    ///
1147    /// require s_lock on all pages.
1148    ///
1149    /// panic on any error found.
1150    pub fn check_integrity(&self, check_occupancy: bool) {
1151        Unique::concurrent_status().clear();
1152
1153        let tx = Transaction::new();
1154
1155        let root_ptr_page = self.get_root_ptr_page(&tx);
1156        let root_pid = root_ptr_page.rl().get_root_pid();
1157        let root_summary = self.check_sub_tree(
1158            &tx,
1159            &root_pid,
1160            &root_ptr_page.rl().get_pid(),
1161            None,
1162            None,
1163            check_occupancy,
1164            0,
1165        );
1166        assert!(
1167            root_summary.left_ptr.is_none(),
1168            "left pointer is not none: {:?}",
1169            root_summary.left_ptr
1170        );
1171        assert!(
1172            root_summary.right_ptr.is_none(),
1173            "right pointer is not none: {:?}",
1174            root_summary.right_ptr,
1175        );
1176
1177        tx.commit();
1178    }
1179
1180    /// panic on any error found.
1181    fn check_sub_tree(
1182        &self,
1183        tx: &Transaction,
1184        pid: &BTreePageID,
1185        parent_pid: &BTreePageID,
1186        mut lower_bound: Option<IntField>,
1187        upper_bound: Option<IntField>,
1188        check_occupancy: bool,
1189        depth: usize,
1190    ) -> SubtreeSummary {
1191        match pid.category {
1192            PageCategory::Leaf => {
1193                let page_rc = Unique::mut_page_cache()
1194                    .get_leaf_page(tx, Permission::ReadOnly, &pid)
1195                    .unwrap();
1196                let page = page_rc.rl();
1197                page.check_integrity(
1198                    parent_pid,
1199                    lower_bound,
1200                    upper_bound,
1201                    check_occupancy,
1202                    depth,
1203                );
1204
1205                return SubtreeSummary {
1206                    left_ptr: page.get_left_pid(),
1207                    right_ptr: page.get_right_pid(),
1208
1209                    left_most_pid: Some(page.get_pid()),
1210                    right_most_pid: Some(page.get_pid()),
1211
1212                    depth,
1213                };
1214            }
1215
1216            PageCategory::Internal => {
1217                let page_rc = Unique::mut_page_cache()
1218                    .get_internal_page(
1219                        tx,
1220                        Permission::ReadWrite,
1221                        &pid,
1222                    )
1223                    .unwrap();
1224                let page = page_rc.rl();
1225                page.check_integrity(
1226                    parent_pid,
1227                    lower_bound,
1228                    upper_bound,
1229                    check_occupancy,
1230                    depth,
1231                );
1232
1233                let mut it = BTreeInternalPageIterator::new(&page);
1234                let current = it.next().unwrap();
1235                let mut accumulation = self.check_sub_tree(
1236                    tx,
1237                    &current.get_left_child(),
1238                    pid,
1239                    lower_bound,
1240                    Some(current.get_key()),
1241                    check_occupancy,
1242                    depth + 1,
1243                );
1244
1245                let mut last_entry = current;
1246                for entry in it {
1247                    let current_summary = self.check_sub_tree(
1248                        tx,
1249                        &entry.get_left_child(),
1250                        pid,
1251                        lower_bound,
1252                        Some(entry.get_key()),
1253                        check_occupancy,
1254                        depth + 1,
1255                    );
1256                    accumulation = accumulation
1257                        .check_and_merge(&current_summary);
1258
1259                    lower_bound = Some(entry.get_key());
1260
1261                    last_entry = entry;
1262                }
1263
1264                let last_right_summary = self.check_sub_tree(
1265                    tx,
1266                    &last_entry.get_right_child(),
1267                    pid,
1268                    lower_bound,
1269                    upper_bound,
1270                    check_occupancy,
1271                    depth + 1,
1272                );
1273                accumulation =
1274                    accumulation.check_and_merge(&last_right_summary);
1275
1276                return accumulation;
1277            }
1278
1279            // no other page types allowed inside the tree.
1280            _ => panic!("invalid page category"),
1281        }
1282    }
1283}
1284
1285struct SubtreeSummary {
1286    /// The distance towards the root.
1287    depth: usize,
1288
1289    left_ptr: Option<BTreePageID>,
1290    left_most_pid: Option<BTreePageID>,
1291    right_ptr: Option<BTreePageID>,
1292    right_most_pid: Option<BTreePageID>,
1293}
1294
1295impl SubtreeSummary {
1296    fn check_and_merge(
1297        &mut self,
1298        right: &SubtreeSummary,
1299    ) -> SubtreeSummary {
1300        assert_eq!(self.depth, right.depth);
1301        assert_eq!(
1302            self.right_ptr, right.left_most_pid,
1303            "depth: {}, left_ptr: {:?}, right_ptr: {:?}",
1304            self.depth, self.right_ptr, right.left_most_pid
1305        );
1306        assert_eq!(self.right_most_pid, right.left_ptr);
1307
1308        let acc = SubtreeSummary {
1309            depth: self.depth,
1310            left_ptr: self.left_ptr,
1311            left_most_pid: self.left_most_pid,
1312            right_ptr: right.right_ptr,
1313            right_most_pid: right.right_most_pid,
1314        };
1315        return acc;
1316    }
1317}
1318
1319pub struct BTreeTableIterator<'t> {
1320    tx: &'t Transaction,
1321
1322    page_rc: Arc<RwLock<BTreeLeafPage>>,
1323    last_page_rc: Arc<RwLock<BTreeLeafPage>>,
1324    page_it: BTreeLeafPageIteratorRc,
1325    last_page_it: BTreeLeafPageIteratorRc,
1326}
1327
1328impl<'t> BTreeTableIterator<'t> {
1329    pub fn new(tx: &'t Transaction, table: &BTreeTable) -> Self {
1330        let page_rc = table.get_first_page(tx, Permission::ReadOnly);
1331        let last_page_rc =
1332            table.get_last_page(tx, Permission::ReadOnly);
1333
1334        Self {
1335            tx,
1336            page_rc: Arc::clone(&page_rc),
1337            last_page_rc: Arc::clone(&last_page_rc),
1338            page_it: BTreeLeafPageIteratorRc::new(Arc::clone(
1339                &page_rc,
1340            )),
1341            last_page_it: BTreeLeafPageIteratorRc::new(Arc::clone(
1342                &last_page_rc,
1343            )),
1344        }
1345    }
1346}
1347
1348impl Iterator for BTreeTableIterator<'_> {
1349    type Item = WrappedTuple;
1350
1351    fn next(&mut self) -> Option<Self::Item> {
1352        let v = self.page_it.next();
1353        if !v.is_none() {
1354            return v;
1355        }
1356
1357        let right = self.page_rc.rl().get_right_pid();
1358        match right {
1359            Some(right) => {
1360                let sibling_rc = Unique::mut_page_cache()
1361                    .get_leaf_page(
1362                        &self.tx,
1363                        Permission::ReadOnly,
1364                        &right,
1365                    )
1366                    .unwrap();
1367                let page_it = BTreeLeafPageIteratorRc::new(
1368                    Arc::clone(&sibling_rc),
1369                );
1370
1371                self.page_rc = Arc::clone(&sibling_rc);
1372                self.page_it = page_it;
1373                return self.page_it.next();
1374            }
1375            None => {
1376                return None;
1377            }
1378        }
1379    }
1380}
1381
1382impl DoubleEndedIterator for BTreeTableIterator<'_> {
1383    fn next_back(&mut self) -> Option<Self::Item> {
1384        let v = self.last_page_it.next_back();
1385        if !v.is_none() {
1386            return v;
1387        }
1388
1389        let left = self.last_page_rc.rl().get_left_pid();
1390        match left {
1391            Some(left) => {
1392                let sibling_rc = Unique::mut_page_cache()
1393                    .get_leaf_page(
1394                        self.tx,
1395                        Permission::ReadOnly,
1396                        &left,
1397                    )
1398                    .unwrap();
1399                let page_it = BTreeLeafPageIteratorRc::new(
1400                    Arc::clone(&sibling_rc),
1401                );
1402
1403                self.last_page_rc = Arc::clone(&sibling_rc);
1404                self.last_page_it = page_it;
1405                return self.last_page_it.next_back();
1406            }
1407            None => {
1408                return None;
1409            }
1410        }
1411    }
1412}
1413
1414pub struct BTreeTableSearchIterator<'t> {
1415    tx: &'t Transaction,
1416
1417    current_page_rc: Arc<RwLock<BTreeLeafPage>>,
1418    page_it: BTreeLeafPageIteratorRc,
1419    predicate: Predicate,
1420    key_field: usize,
1421}
1422
1423impl<'t> BTreeTableSearchIterator<'t> {
1424    pub fn new(
1425        tx: &'t Transaction,
1426        table: &BTreeTable,
1427        index_predicate: Predicate,
1428    ) -> Self {
1429        let start_rc: Arc<RwLock<BTreeLeafPage>>;
1430        let root_pid = table.get_root_pid(tx);
1431
1432        match index_predicate.op {
1433            Op::Equals | Op::GreaterThan | Op::GreaterThanOrEq => {
1434                start_rc = table.find_leaf_page(
1435                    &tx,
1436                    Permission::ReadOnly,
1437                    root_pid,
1438                    SearchFor::IntField(index_predicate.field),
1439                )
1440            }
1441            Op::LessThan | Op::LessThanOrEq => {
1442                start_rc = table.find_leaf_page(
1443                    &tx,
1444                    Permission::ReadOnly,
1445                    root_pid,
1446                    SearchFor::LeftMost,
1447                )
1448            }
1449            Op::Like => todo!(),
1450            Op::NotEquals => todo!(),
1451        }
1452
1453        Self {
1454            tx,
1455            current_page_rc: Arc::clone(&start_rc),
1456            page_it: BTreeLeafPageIteratorRc::new(Arc::clone(
1457                &start_rc,
1458            )),
1459            predicate: index_predicate,
1460            key_field: table.key_field,
1461        }
1462    }
1463}
1464
1465impl Iterator for BTreeTableSearchIterator<'_> {
1466    type Item = WrappedTuple;
1467
1468    // TODO: Short circuit on some conditions.
1469    fn next(&mut self) -> Option<Self::Item> {
1470        loop {
1471            let tuple = self.page_it.next();
1472            match tuple {
1473                Some(t) => match self.predicate.op {
1474                    Op::Equals => {
1475                        let field = t.get_field(self.key_field);
1476                        if field == self.predicate.field {
1477                            return Some(t);
1478                        } else if field > self.predicate.field {
1479                            return None;
1480                        }
1481                    }
1482                    Op::GreaterThan => {
1483                        let field = t.get_field(self.key_field);
1484                        if field > self.predicate.field {
1485                            return Some(t);
1486                        }
1487                    }
1488                    Op::GreaterThanOrEq => {
1489                        let field = t.get_field(self.key_field);
1490                        if field >= self.predicate.field {
1491                            return Some(t);
1492                        }
1493                    }
1494                    Op::LessThan => {
1495                        let field = t.get_field(self.key_field);
1496                        if field < self.predicate.field {
1497                            return Some(t);
1498                        } else if field >= self.predicate.field {
1499                            return None;
1500                        }
1501                    }
1502                    Op::LessThanOrEq => {
1503                        let field = t.get_field(self.key_field);
1504                        if field <= self.predicate.field {
1505                            return Some(t);
1506                        } else if field > self.predicate.field {
1507                            return None;
1508                        }
1509                    }
1510                    Op::Like => todo!(),
1511                    Op::NotEquals => todo!(),
1512                },
1513                None => {
1514                    // init iterator on next page and continue search
1515                    let right =
1516                        (*self.current_page_rc).rl().get_right_pid();
1517                    match right {
1518                        Some(pid) => {
1519                            let rc = Unique::mut_page_cache()
1520                                .get_leaf_page(
1521                                    self.tx,
1522                                    Permission::ReadOnly,
1523                                    &pid,
1524                                )
1525                                .unwrap();
1526                            self.current_page_rc = Arc::clone(&rc);
1527                            self.page_it =
1528                                BTreeLeafPageIteratorRc::new(
1529                                    Arc::clone(&rc),
1530                                );
1531                            continue;
1532                        }
1533                        None => {
1534                            return None;
1535                        }
1536                    }
1537                }
1538            }
1539        }
1540    }
1541}