1use std::sync::{Arc, RwLock};
2
3use backtrace::Backtrace;
4use bit_vec::BitVec;
5
6use super::{
7 BTreeBasePage, BTreePage, BTreePageID, PageCategory,
8 EMPTY_PAGE_ID,
9};
10use crate::{
11 btree::{
12 consts::INDEX_SIZE,
13 page_cache::PageCache,
14 tuple::{Schema, WrappedTuple},
15 },
16 field::IntField,
17 io::{SmallReader, SmallWriter, Vaporizable},
18 utils::{ceil_div, HandyRwLock},
19 Tuple,
20};
21
22pub struct BTreeLeafPage {
34 base: BTreeBasePage,
35
36 pub slot_count: usize,
37
38 header: BitVec<u32>,
40
41 tuples: Vec<Tuple>,
43
44 pub tuple_scheme: Schema,
45
46 right_sibling_id: u32,
49 left_sibling_id: u32,
50
51 key_field: usize,
52
53 old_data: Vec<u8>,
54}
55
56impl BTreeLeafPage {
57 fn new(
58 pid: &BTreePageID,
59 bytes: &[u8],
60 tuple_scheme: &Schema,
61 key_field: usize,
62 ) -> Self {
63 let mut instance: Self;
64
65 if BTreeBasePage::is_empty_page(&bytes) {
66 instance = Self::new_empty_page(
67 pid,
68 bytes,
69 tuple_scheme,
70 key_field,
71 );
72 } else {
73 let slot_count =
74 Self::calculate_slots_count(&tuple_scheme);
75
76 let mut reader = SmallReader::new(&bytes);
77
78 let category = reader.read::<PageCategory>();
80 if category != PageCategory::Leaf {
81 panic!(
82 "BTreeLeafPage::new: page category is not leaf, category: {:?}",
83 category,
84 );
85 }
86
87 let parent_pid = BTreePageID::new(
89 PageCategory::Internal,
90 pid.get_table_id(),
91 u32::read_from(&mut reader),
92 );
93
94 let left_sibling_id = u32::read_from(&mut reader);
96
97 let right_sibling_id = u32::read_from(&mut reader);
99
100 let header = BitVec::read_from(&mut reader);
102
103 let mut tuples = Vec::new();
105 for _ in 0..slot_count {
106 let t = Tuple::read_from(&mut reader, tuple_scheme);
107 tuples.push(t);
108 }
109
110 let mut base = BTreeBasePage::new(pid);
111 base.set_parent_pid(&parent_pid);
112
113 instance = Self {
114 base,
115 slot_count,
116 header,
117 tuples,
118 tuple_scheme: tuple_scheme.clone(),
119 right_sibling_id,
120 left_sibling_id,
121 key_field,
122 old_data: Vec::new(),
123 };
124 }
125
126 instance.set_before_image();
127 return instance;
128 }
129
130 fn new_empty_page(
131 pid: &BTreePageID,
132 bytes: &[u8],
133 tuple_scheme: &Schema,
134 key_field: usize,
135 ) -> Self {
136 let slot_count = Self::calculate_slots_count(&tuple_scheme);
137
138 let mut reader = SmallReader::new(&bytes);
139
140 let parent_pid = BTreePageID::new(
141 PageCategory::Internal,
142 pid.get_table_id(),
143 EMPTY_PAGE_ID,
144 );
145
146 let mut header = BitVec::new();
147 header.grow(slot_count, false);
148
149 let mut tuples = Vec::new();
151 for _ in 0..slot_count {
152 let t = Tuple::read_from(&mut reader, tuple_scheme);
153 tuples.push(t);
154 }
155
156 let mut base = BTreeBasePage::new(pid);
157 base.set_parent_pid(&parent_pid);
158
159 Self {
160 base,
161 slot_count,
162 header,
163 tuples,
164 tuple_scheme: tuple_scheme.clone(),
165 right_sibling_id: EMPTY_PAGE_ID,
166 left_sibling_id: EMPTY_PAGE_ID,
167 key_field,
168 old_data: Vec::new(),
169 }
170 }
171
172 pub fn set_right_pid(&mut self, pid: Option<BTreePageID>) {
173 match pid {
174 Some(pid) => {
175 self.right_sibling_id = pid.page_index;
176 }
177 None => {
178 self.right_sibling_id = EMPTY_PAGE_ID;
179 }
180 }
181 }
182
183 pub fn get_right_pid(&self) -> Option<BTreePageID> {
184 if self.right_sibling_id == EMPTY_PAGE_ID {
185 return None;
186 } else {
187 return Some(BTreePageID::new(
188 PageCategory::Leaf,
189 self.get_pid().table_id,
190 self.right_sibling_id,
191 ));
192 }
193 }
194
195 pub fn set_left_pid(&mut self, pid: Option<BTreePageID>) {
196 match pid {
197 Some(pid) => {
198 self.left_sibling_id = pid.page_index;
199 }
200 None => {
201 self.left_sibling_id = EMPTY_PAGE_ID;
202 }
203 }
204 }
205
206 pub fn get_left_pid(&self) -> Option<BTreePageID> {
207 if self.left_sibling_id == EMPTY_PAGE_ID {
208 return None;
209 } else {
210 return Some(BTreePageID::new(
211 PageCategory::Leaf,
212 self.get_pid().table_id,
213 self.left_sibling_id,
214 ));
215 }
216 }
217
218 pub fn get_slots_count(&self) -> usize {
219 self.slot_count
220 }
221
222 pub fn stable(&self) -> bool {
224 if self.get_parent_pid().category == PageCategory::RootPointer
225 {
226 return true;
227 }
228
229 let stable_threshold = ceil_div(self.slot_count, 2);
230 return self.tuples_count() >= stable_threshold;
231 }
232
233 pub fn empty_slots_count(&self) -> usize {
234 let mut count = 0;
235 for i in 0..self.slot_count {
236 if !self.is_slot_used(i) {
237 count += 1;
238 }
239 }
240 count
241 }
242
243 pub fn tuples_count(&self) -> usize {
245 self.slot_count - self.empty_slots_count()
246 }
247
248 pub fn calculate_header_size(slot_count: usize) -> usize {
252 slot_count / 8 + 1
253 }
254
255 pub fn insert_tuple(&mut self, tuple: &Tuple) {
260 let mut first_empty_slot: i32 = 0;
262 for i in 0..self.slot_count {
263 if !self.is_slot_used(i) {
264 first_empty_slot = i as i32;
265 break;
266 }
267 }
268
269 let mut last_less_slot: i32 = -1;
275 for i in 0..self.slot_count {
276 if self.is_slot_used(i) {
277 if self.tuples[i].get_field(self.key_field)
278 < tuple.get_field(self.key_field)
279 {
280 last_less_slot = i as i32;
281 } else {
282 break;
283 }
284 }
285 }
286
287 let good_slot: usize;
291 if first_empty_slot < last_less_slot {
292 for i in first_empty_slot..last_less_slot {
293 self.move_tuple((i + 1) as usize, i as usize);
294 }
295 good_slot = last_less_slot as usize;
296 } else {
297 for i in (last_less_slot + 1..first_empty_slot).rev() {
298 self.move_tuple(i as usize, (i + 1) as usize);
299 }
300 good_slot = (last_less_slot + 1) as usize;
301 }
302
303 self.tuples[good_slot] = tuple.clone();
305 self.mark_slot_status(good_slot, true);
306 }
307
308 fn move_tuple(&mut self, from: usize, to: usize) {
311 if !self.is_slot_used(from) {
313 return;
314 }
315
316 self.tuples[to] = self.tuples[from].clone();
317 self.mark_slot_status(to, true);
318 self.mark_slot_status(from, false);
319 }
320
321 pub fn get_tuple(&self, slot_index: usize) -> Option<Tuple> {
322 if self.is_slot_used(slot_index) {
323 return Some(self.tuples[slot_index].clone());
324 }
325 None
326 }
327
328 pub fn delete_tuple(&mut self, slot_index: usize) {
329 self.mark_slot_status(slot_index, false);
330 }
331
332 pub fn is_slot_used(&self, slot_index: usize) -> bool {
334 self.header[slot_index]
335 }
336
337 pub fn mark_slot_status(
339 &mut self,
340 slot_index: usize,
341 used: bool,
342 ) {
343 self.header.set(slot_index, used);
344 }
345
346 pub fn check_integrity(
347 &self,
348 parent_pid: &BTreePageID,
349 lower_bound: Option<IntField>,
350 upper_bound: Option<IntField>,
351 check_occupancy: bool,
352 depth: usize,
353 ) {
354 let bt = Backtrace::new();
355
356 assert_eq!(self.get_pid().category, PageCategory::Leaf);
357 assert_eq!(
358 &self.get_parent_pid(),
359 parent_pid,
360 "parent pid incorrect, current page: {:?}, actual parent pid: {:?}, expect parent pid: {:?}, backtrace: {:?}",
361 self.get_pid(),
362 self.get_parent_pid(),
363 parent_pid,
364 bt,
365 );
366
367 let mut previous = lower_bound;
368 let it = BTreeLeafPageIterator::new(self);
369 for tuple in it {
370 if let Some(previous) = previous {
371 assert!(
372 previous <= tuple.get_field(self.key_field),
373 "previous: {:?}, current: {:?}, page_id: {:?}",
374 previous,
375 tuple.get_field(self.key_field),
376 self.get_pid(),
377 );
378 }
379 previous = Some(tuple.get_field(self.key_field));
380 }
381
382 if let Some(upper_bound) = upper_bound {
383 if let Some(previous) = previous {
384 assert!(
385 previous <= upper_bound,
386 "the last tuple exceeds upper_bound, last tuple: {}, upper bound: {}",
387 previous,
388 upper_bound,
389 );
390 }
391 }
392
393 if check_occupancy && depth > 0 {
394 assert!(
395 self.tuples_count() >= self.get_slots_count() / 2
396 );
397 }
398 }
399
400 pub fn iter(&self) -> BTreeLeafPageIterator {
401 BTreeLeafPageIterator::new(self)
402 }
403}
404
405impl BTreeLeafPage {
407 pub fn calculate_slots_count(scheme: &Schema) -> usize {
409 let bits_per_tuple_including_header =
410 scheme.get_size() * 8 + 1;
411
412 let extra_bits = (4 * INDEX_SIZE + 2) * 8;
419
420 (PageCache::get_page_size() * 8 - extra_bits)
421 / bits_per_tuple_including_header
422 }
423}
424
425impl BTreePage for BTreeLeafPage {
426 fn new(
427 pid: &BTreePageID,
428 bytes: &[u8],
429 tuple_scheme: &Schema,
430 key_field: usize,
431 ) -> Self {
432 Self::new(pid, &bytes, tuple_scheme, key_field)
433 }
434
435 fn get_pid(&self) -> BTreePageID {
436 self.base.get_pid()
437 }
438
439 fn get_parent_pid(&self) -> BTreePageID {
440 self.base.get_parent_pid()
441 }
442
443 fn set_parent_pid(&mut self, pid: &BTreePageID) {
444 self.base.set_parent_pid(pid)
445 }
446
447 fn get_page_data(&self) -> Vec<u8> {
455 let mut writer = SmallWriter::new();
456
457 writer.write(&self.get_pid().category);
459
460 writer.write(&self.get_parent_pid().page_index);
462
463 writer.write(&self.left_sibling_id);
465
466 writer.write(&self.right_sibling_id);
468
469 writer.write(&self.header);
471
472 for tuple in &self.tuples {
474 writer.write(tuple);
475 }
476
477 return writer.to_padded_bytes(PageCache::get_page_size());
478 }
479
480 fn set_before_image(&mut self) {
481 self.old_data = self.get_page_data();
482 }
483
484 fn get_before_image(&self) -> Vec<u8> {
485 if self.old_data.is_empty() {
486 panic!("before image is not set");
487 }
488 return self.old_data.clone();
489 }
490
491 fn peek(&self) {
492 println!("page id: {:?}", self.get_pid());
493 println!("parent id: {:?}", self.get_parent_pid());
494 println!("left sibling id: {:?}", self.left_sibling_id);
495 println!("right sibling id: {:?}", self.right_sibling_id);
496 println!("header: {:?}", self.header);
497 println!("tuples: {:?}", self.tuples);
498 }
499}
500
501pub struct BTreeLeafPageIteratorRc {
502 page: Arc<RwLock<BTreeLeafPage>>,
503 cursor: i32,
504 reverse_cursor: i32,
505}
506
507impl BTreeLeafPageIteratorRc {
508 pub fn new(page: Arc<RwLock<BTreeLeafPage>>) -> Self {
509 let slot_count = page.rl().get_slots_count();
510 Self {
511 page,
512 cursor: -1,
513 reverse_cursor: slot_count as i32,
514 }
515 }
516}
517
518impl Iterator for BTreeLeafPageIteratorRc {
519 type Item = WrappedTuple;
520
521 fn next(&mut self) -> Option<Self::Item> {
522 let page = self.page.rl();
523 loop {
524 self.cursor += 1;
525 let cursor = self.cursor as usize;
526 if cursor >= page.slot_count {
527 return None;
528 }
529
530 if page.is_slot_used(cursor) {
531 return Some(WrappedTuple::new(
532 page.tuples[cursor].clone(),
533 cursor,
534 page.get_pid(),
535 ));
536 }
537 }
538 }
539}
540
541impl DoubleEndedIterator for BTreeLeafPageIteratorRc {
542 fn next_back(&mut self) -> Option<Self::Item> {
543 let page = self.page.rl();
544 loop {
545 self.reverse_cursor -= 1;
546 if self.reverse_cursor < 0 {
547 return None;
548 }
549
550 let cursor = self.reverse_cursor as usize;
551 if page.is_slot_used(cursor) {
552 return Some(WrappedTuple::new(
553 page.tuples[cursor].clone(),
554 cursor,
555 page.get_pid(),
556 ));
557 }
558 }
559 }
560}
561
562pub struct BTreeLeafPageIterator<'page> {
563 page: &'page BTreeLeafPage,
564 cursor: i32,
565 reverse_cursor: i32,
566}
567
568impl<'page> BTreeLeafPageIterator<'page> {
569 pub fn new(page: &'page BTreeLeafPage) -> Self {
570 Self {
571 page,
572 cursor: -1,
573 reverse_cursor: page.slot_count as i32,
574 }
575 }
576}
577
578impl<'page> Iterator for BTreeLeafPageIterator<'_> {
579 type Item = WrappedTuple;
580
581 fn next(&mut self) -> Option<Self::Item> {
582 let page = self.page;
583 loop {
584 self.cursor += 1;
585 let cursor = self.cursor as usize;
586 if cursor >= page.slot_count {
587 return None;
588 }
589
590 if page.is_slot_used(cursor) {
591 return Some(WrappedTuple::new(
592 page.tuples[cursor].clone(),
593 cursor,
594 page.get_pid(),
595 ));
596 }
597 }
598 }
599}
600
601impl<'page> DoubleEndedIterator for BTreeLeafPageIterator<'_> {
602 fn next_back(&mut self) -> Option<Self::Item> {
603 let page = self.page;
604 loop {
605 self.reverse_cursor -= 1;
606 if self.reverse_cursor < 0 {
607 return None;
608 }
609
610 let cursor = self.reverse_cursor as usize;
611 if page.is_slot_used(cursor) {
612 return Some(WrappedTuple::new(
613 page.tuples[cursor].clone(),
614 cursor,
615 page.get_pid(),
616 ));
617 }
618 }
619 }
620}