small_db/btree/page/
leaf_page.rs

1use std::sync::{Arc, RwLock};
2
3use backtrace::Backtrace;
4use bit_vec::BitVec;
5
6use super::{
7    BTreeBasePage, BTreePage, BTreePageID, PageCategory,
8    EMPTY_PAGE_ID,
9};
10use crate::{
11    btree::{
12        consts::INDEX_SIZE,
13        page_cache::PageCache,
14        tuple::{Schema, WrappedTuple},
15    },
16    field::IntField,
17    io::{SmallReader, SmallWriter, Vaporizable},
18    utils::{ceil_div, HandyRwLock},
19    Tuple,
20};
21
22/// A leaf page in the B+ tree.
23///
24/// # Binary Layout
25///
26/// - 4 bytes: page category
27/// - 4 bytes: parent page index
28/// - 4 bytes: left sibling page index
29/// - 4 bytes: right sibling page index
30/// - n bytes: header bytes, indicate whether every slot of the page
31///   is used or not.
32/// - n bytes: tuple bytes
33pub struct BTreeLeafPage {
34    base: BTreeBasePage,
35
36    pub slot_count: usize,
37
38    // indicate slots' status: true means occupied, false means empty
39    header: BitVec<u32>,
40
41    // all tuples (include empty tuples)
42    tuples: Vec<Tuple>,
43
44    pub tuple_scheme: Schema,
45
46    // use u32 instead of Option<BTreePageID> to reduce memory
47    // footprint
48    right_sibling_id: u32,
49    left_sibling_id: u32,
50
51    key_field: usize,
52
53    old_data: Vec<u8>,
54}
55
56impl BTreeLeafPage {
57    fn new(
58        pid: &BTreePageID,
59        bytes: &[u8],
60        tuple_scheme: &Schema,
61        key_field: usize,
62    ) -> Self {
63        let mut instance: Self;
64
65        if BTreeBasePage::is_empty_page(&bytes) {
66            instance = Self::new_empty_page(
67                pid,
68                bytes,
69                tuple_scheme,
70                key_field,
71            );
72        } else {
73            let slot_count =
74                Self::calculate_slots_count(&tuple_scheme);
75
76            let mut reader = SmallReader::new(&bytes);
77
78            // read page category
79            let category = reader.read::<PageCategory>();
80            if category != PageCategory::Leaf {
81                panic!(
82                "BTreeLeafPage::new: page category is not leaf, category: {:?}",
83                category,
84            );
85            }
86
87            // read parent page index
88            let parent_pid = BTreePageID::new(
89                PageCategory::Internal,
90                pid.get_table_id(),
91                u32::read_from(&mut reader),
92            );
93
94            // read left sibling page index
95            let left_sibling_id = u32::read_from(&mut reader);
96
97            // read right sibling page index
98            let right_sibling_id = u32::read_from(&mut reader);
99
100            // read header
101            let header = BitVec::read_from(&mut reader);
102
103            // read tuples
104            let mut tuples = Vec::new();
105            for _ in 0..slot_count {
106                let t = Tuple::read_from(&mut reader, tuple_scheme);
107                tuples.push(t);
108            }
109
110            let mut base = BTreeBasePage::new(pid);
111            base.set_parent_pid(&parent_pid);
112
113            instance = Self {
114                base,
115                slot_count,
116                header,
117                tuples,
118                tuple_scheme: tuple_scheme.clone(),
119                right_sibling_id,
120                left_sibling_id,
121                key_field,
122                old_data: Vec::new(),
123            };
124        }
125
126        instance.set_before_image();
127        return instance;
128    }
129
130    fn new_empty_page(
131        pid: &BTreePageID,
132        bytes: &[u8],
133        tuple_scheme: &Schema,
134        key_field: usize,
135    ) -> Self {
136        let slot_count = Self::calculate_slots_count(&tuple_scheme);
137
138        let mut reader = SmallReader::new(&bytes);
139
140        let parent_pid = BTreePageID::new(
141            PageCategory::Internal,
142            pid.get_table_id(),
143            EMPTY_PAGE_ID,
144        );
145
146        let mut header = BitVec::new();
147        header.grow(slot_count, false);
148
149        // read tuples
150        let mut tuples = Vec::new();
151        for _ in 0..slot_count {
152            let t = Tuple::read_from(&mut reader, tuple_scheme);
153            tuples.push(t);
154        }
155
156        let mut base = BTreeBasePage::new(pid);
157        base.set_parent_pid(&parent_pid);
158
159        Self {
160            base,
161            slot_count,
162            header,
163            tuples,
164            tuple_scheme: tuple_scheme.clone(),
165            right_sibling_id: EMPTY_PAGE_ID,
166            left_sibling_id: EMPTY_PAGE_ID,
167            key_field,
168            old_data: Vec::new(),
169        }
170    }
171
172    pub fn set_right_pid(&mut self, pid: Option<BTreePageID>) {
173        match pid {
174            Some(pid) => {
175                self.right_sibling_id = pid.page_index;
176            }
177            None => {
178                self.right_sibling_id = EMPTY_PAGE_ID;
179            }
180        }
181    }
182
183    pub fn get_right_pid(&self) -> Option<BTreePageID> {
184        if self.right_sibling_id == EMPTY_PAGE_ID {
185            return None;
186        } else {
187            return Some(BTreePageID::new(
188                PageCategory::Leaf,
189                self.get_pid().table_id,
190                self.right_sibling_id,
191            ));
192        }
193    }
194
195    pub fn set_left_pid(&mut self, pid: Option<BTreePageID>) {
196        match pid {
197            Some(pid) => {
198                self.left_sibling_id = pid.page_index;
199            }
200            None => {
201                self.left_sibling_id = EMPTY_PAGE_ID;
202            }
203        }
204    }
205
206    pub fn get_left_pid(&self) -> Option<BTreePageID> {
207        if self.left_sibling_id == EMPTY_PAGE_ID {
208            return None;
209        } else {
210            return Some(BTreePageID::new(
211                PageCategory::Leaf,
212                self.get_pid().table_id,
213                self.left_sibling_id,
214            ));
215        }
216    }
217
218    pub fn get_slots_count(&self) -> usize {
219        self.slot_count
220    }
221
222    /// stable means at least half of the page is occupied
223    pub fn stable(&self) -> bool {
224        if self.get_parent_pid().category == PageCategory::RootPointer
225        {
226            return true;
227        }
228
229        let stable_threshold = ceil_div(self.slot_count, 2);
230        return self.tuples_count() >= stable_threshold;
231    }
232
233    pub fn empty_slots_count(&self) -> usize {
234        let mut count = 0;
235        for i in 0..self.slot_count {
236            if !self.is_slot_used(i) {
237                count += 1;
238            }
239        }
240        count
241    }
242
243    /// Returns the number of tuples currently stored on this page
244    pub fn tuples_count(&self) -> usize {
245        self.slot_count - self.empty_slots_count()
246    }
247
248    // Computes the number of bytes in the header of
249    // a page in a BTreeFile with each tuple occupying
250    // tupleSize bytes
251    pub fn calculate_header_size(slot_count: usize) -> usize {
252        slot_count / 8 + 1
253    }
254
255    /// Adds the specified tuple to the page such that all records
256    /// remain in sorted order; the tuple should be updated to
257    /// reflect that it is now stored on this page.
258    /// tuple: The tuple to add.
259    pub fn insert_tuple(&mut self, tuple: &Tuple) {
260        // find the first empty slot
261        let mut first_empty_slot: i32 = 0;
262        for i in 0..self.slot_count {
263            if !self.is_slot_used(i) {
264                first_empty_slot = i as i32;
265                break;
266            }
267        }
268
269        // Find the last key less than or equal to the key being
270        // inserted.
271        //
272        // -1 indicate there is no such key less than tuple.key, so
273        // the tuple should be inserted in slot 0 (-1 + 1).
274        let mut last_less_slot: i32 = -1;
275        for i in 0..self.slot_count {
276            if self.is_slot_used(i) {
277                if self.tuples[i].get_field(self.key_field)
278                    < tuple.get_field(self.key_field)
279                {
280                    last_less_slot = i as i32;
281                } else {
282                    break;
283                }
284            }
285        }
286
287        // shift records back or forward to fill empty slot and make
288        // room for new record while keeping records in sorted
289        // order
290        let good_slot: usize;
291        if first_empty_slot < last_less_slot {
292            for i in first_empty_slot..last_less_slot {
293                self.move_tuple((i + 1) as usize, i as usize);
294            }
295            good_slot = last_less_slot as usize;
296        } else {
297            for i in (last_less_slot + 1..first_empty_slot).rev() {
298                self.move_tuple(i as usize, (i + 1) as usize);
299            }
300            good_slot = (last_less_slot + 1) as usize;
301        }
302
303        // insert new record into the correct spot in sorted order
304        self.tuples[good_slot] = tuple.clone();
305        self.mark_slot_status(good_slot, true);
306    }
307
308    // Move a tuple from one slot to another slot, destination must be
309    // empty
310    fn move_tuple(&mut self, from: usize, to: usize) {
311        // return if the source slot is empty
312        if !self.is_slot_used(from) {
313            return;
314        }
315
316        self.tuples[to] = self.tuples[from].clone();
317        self.mark_slot_status(to, true);
318        self.mark_slot_status(from, false);
319    }
320
321    pub fn get_tuple(&self, slot_index: usize) -> Option<Tuple> {
322        if self.is_slot_used(slot_index) {
323            return Some(self.tuples[slot_index].clone());
324        }
325        None
326    }
327
328    pub fn delete_tuple(&mut self, slot_index: usize) {
329        self.mark_slot_status(slot_index, false);
330    }
331
332    /// Returns true if associated slot on this page is filled.
333    pub fn is_slot_used(&self, slot_index: usize) -> bool {
334        self.header[slot_index]
335    }
336
337    // mark the slot as empty/filled.
338    pub fn mark_slot_status(
339        &mut self,
340        slot_index: usize,
341        used: bool,
342    ) {
343        self.header.set(slot_index, used);
344    }
345
346    pub fn check_integrity(
347        &self,
348        parent_pid: &BTreePageID,
349        lower_bound: Option<IntField>,
350        upper_bound: Option<IntField>,
351        check_occupancy: bool,
352        depth: usize,
353    ) {
354        let bt = Backtrace::new();
355
356        assert_eq!(self.get_pid().category, PageCategory::Leaf);
357        assert_eq!(
358            &self.get_parent_pid(),
359            parent_pid,
360            "parent pid incorrect, current page: {:?}, actual parent pid: {:?}, expect parent pid: {:?}, backtrace: {:?}",
361            self.get_pid(),
362            self.get_parent_pid(),
363            parent_pid,
364            bt,
365        );
366
367        let mut previous = lower_bound;
368        let it = BTreeLeafPageIterator::new(self);
369        for tuple in it {
370            if let Some(previous) = previous {
371                assert!(
372                    previous <= tuple.get_field(self.key_field),
373                    "previous: {:?}, current: {:?}, page_id: {:?}",
374                    previous,
375                    tuple.get_field(self.key_field),
376                    self.get_pid(),
377                );
378            }
379            previous = Some(tuple.get_field(self.key_field));
380        }
381
382        if let Some(upper_bound) = upper_bound {
383            if let Some(previous) = previous {
384                assert!(
385                    previous <= upper_bound,
386                    "the last tuple exceeds upper_bound, last tuple: {}, upper bound: {}",
387                    previous,
388                    upper_bound,
389                );
390            }
391        }
392
393        if check_occupancy && depth > 0 {
394            assert!(
395                self.tuples_count() >= self.get_slots_count() / 2
396            );
397        }
398    }
399
400    pub fn iter(&self) -> BTreeLeafPageIterator {
401        BTreeLeafPageIterator::new(self)
402    }
403}
404
405// Methods for accessing const attributes.
406impl BTreeLeafPage {
407    /// Retrieve the maximum number of tuples this page can hold.
408    pub fn calculate_slots_count(scheme: &Schema) -> usize {
409        let bits_per_tuple_including_header =
410            scheme.get_size() * 8 + 1;
411
412        // extraBits:
413        // - page category
414        // - parent pointer
415        // - left sibling pointer
416        // - right sibling pointer
417        // - header size
418        let extra_bits = (4 * INDEX_SIZE + 2) * 8;
419
420        (PageCache::get_page_size() * 8 - extra_bits)
421            / bits_per_tuple_including_header
422    }
423}
424
425impl BTreePage for BTreeLeafPage {
426    fn new(
427        pid: &BTreePageID,
428        bytes: &[u8],
429        tuple_scheme: &Schema,
430        key_field: usize,
431    ) -> Self {
432        Self::new(pid, &bytes, tuple_scheme, key_field)
433    }
434
435    fn get_pid(&self) -> BTreePageID {
436        self.base.get_pid()
437    }
438
439    fn get_parent_pid(&self) -> BTreePageID {
440        self.base.get_parent_pid()
441    }
442
443    fn set_parent_pid(&mut self, pid: &BTreePageID) {
444        self.base.set_parent_pid(pid)
445    }
446
447    /// Generates a byte array representing the contents of this page.
448    /// Used to serialize this page to disk.
449    ///
450    /// The invariant here is that it should be possible to pass the
451    /// byte array generated by get_page_data to the BTreeLeafPage
452    /// constructor and have it produce an identical BTreeLeafPage
453    /// object.
454    fn get_page_data(&self) -> Vec<u8> {
455        let mut writer = SmallWriter::new();
456
457        // write page category
458        writer.write(&self.get_pid().category);
459
460        // write parent page index
461        writer.write(&self.get_parent_pid().page_index);
462
463        // write left sibling page index
464        writer.write(&self.left_sibling_id);
465
466        // write right sibling page index
467        writer.write(&self.right_sibling_id);
468
469        // write header
470        writer.write(&self.header);
471
472        // write tuples
473        for tuple in &self.tuples {
474            writer.write(tuple);
475        }
476
477        return writer.to_padded_bytes(PageCache::get_page_size());
478    }
479
480    fn set_before_image(&mut self) {
481        self.old_data = self.get_page_data();
482    }
483
484    fn get_before_image(&self) -> Vec<u8> {
485        if self.old_data.is_empty() {
486            panic!("before image is not set");
487        }
488        return self.old_data.clone();
489    }
490
491    fn peek(&self) {
492        println!("page id: {:?}", self.get_pid());
493        println!("parent id: {:?}", self.get_parent_pid());
494        println!("left sibling id: {:?}", self.left_sibling_id);
495        println!("right sibling id: {:?}", self.right_sibling_id);
496        println!("header: {:?}", self.header);
497        println!("tuples: {:?}", self.tuples);
498    }
499}
500
501pub struct BTreeLeafPageIteratorRc {
502    page: Arc<RwLock<BTreeLeafPage>>,
503    cursor: i32,
504    reverse_cursor: i32,
505}
506
507impl BTreeLeafPageIteratorRc {
508    pub fn new(page: Arc<RwLock<BTreeLeafPage>>) -> Self {
509        let slot_count = page.rl().get_slots_count();
510        Self {
511            page,
512            cursor: -1,
513            reverse_cursor: slot_count as i32,
514        }
515    }
516}
517
518impl Iterator for BTreeLeafPageIteratorRc {
519    type Item = WrappedTuple;
520
521    fn next(&mut self) -> Option<Self::Item> {
522        let page = self.page.rl();
523        loop {
524            self.cursor += 1;
525            let cursor = self.cursor as usize;
526            if cursor >= page.slot_count {
527                return None;
528            }
529
530            if page.is_slot_used(cursor) {
531                return Some(WrappedTuple::new(
532                    page.tuples[cursor].clone(),
533                    cursor,
534                    page.get_pid(),
535                ));
536            }
537        }
538    }
539}
540
541impl DoubleEndedIterator for BTreeLeafPageIteratorRc {
542    fn next_back(&mut self) -> Option<Self::Item> {
543        let page = self.page.rl();
544        loop {
545            self.reverse_cursor -= 1;
546            if self.reverse_cursor < 0 {
547                return None;
548            }
549
550            let cursor = self.reverse_cursor as usize;
551            if page.is_slot_used(cursor) {
552                return Some(WrappedTuple::new(
553                    page.tuples[cursor].clone(),
554                    cursor,
555                    page.get_pid(),
556                ));
557            }
558        }
559    }
560}
561
562pub struct BTreeLeafPageIterator<'page> {
563    page: &'page BTreeLeafPage,
564    cursor: i32,
565    reverse_cursor: i32,
566}
567
568impl<'page> BTreeLeafPageIterator<'page> {
569    pub fn new(page: &'page BTreeLeafPage) -> Self {
570        Self {
571            page,
572            cursor: -1,
573            reverse_cursor: page.slot_count as i32,
574        }
575    }
576}
577
578impl<'page> Iterator for BTreeLeafPageIterator<'_> {
579    type Item = WrappedTuple;
580
581    fn next(&mut self) -> Option<Self::Item> {
582        let page = self.page;
583        loop {
584            self.cursor += 1;
585            let cursor = self.cursor as usize;
586            if cursor >= page.slot_count {
587                return None;
588            }
589
590            if page.is_slot_used(cursor) {
591                return Some(WrappedTuple::new(
592                    page.tuples[cursor].clone(),
593                    cursor,
594                    page.get_pid(),
595                ));
596            }
597        }
598    }
599}
600
601impl<'page> DoubleEndedIterator for BTreeLeafPageIterator<'_> {
602    fn next_back(&mut self) -> Option<Self::Item> {
603        let page = self.page;
604        loop {
605            self.reverse_cursor -= 1;
606            if self.reverse_cursor < 0 {
607                return None;
608            }
609
610            let cursor = self.reverse_cursor as usize;
611            if page.is_slot_used(cursor) {
612                return Some(WrappedTuple::new(
613                    page.tuples[cursor].clone(),
614                    cursor,
615                    page.get_pid(),
616                ));
617            }
618        }
619    }
620}