1use parking_lot::{RwLock, RwLockWriteGuard};
2use std::collections::VecDeque;
3use std::fmt::Write;
4use std::sync::Arc;
5
6use crate::buffer::{
7 BufferManager, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID, PAGE_SIZE,
8};
9use crate::catalog::SchemaRef;
10use crate::error::{QuillSQLError, QuillSQLResult};
11use crate::storage::codec::{
12 BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
13 BPlusTreePageCodec,
14};
15use crate::storage::page::{BPlusTreeHeaderPage, BPlusTreeInternalPage};
16use crate::storage::page::{BPlusTreeLeafPage, BPlusTreePage, RecordId};
17
18use crate::config::BTreeConfig;
19use crate::storage::codec::BPlusTreePageTypeCodec;
20pub use crate::storage::index::btree_iterator::TreeIndexIterator;
21use crate::storage::page::BPlusTreePageType;
22use crate::storage::tuple::Tuple;
23use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
24
25const MAX_OLC_RESTARTS: usize = 64;
27const OLC_BACKOFF_BASE_US: u64 = 50;
28
29#[derive(Debug)]
30pub struct Context<'a> {
31 pub write_set: VecDeque<WritePageGuard>,
35
36 pub read_set: VecDeque<ReadPageGuard>,
38
39 pub header_lock_guard: Option<RwLockWriteGuard<'a, ()>>,
41}
42
43impl<'a> Context<'a> {
44 pub fn new() -> Self {
45 Self {
46 write_set: VecDeque::new(),
47 read_set: VecDeque::new(),
48 header_lock_guard: None,
49 }
50 }
51
52 pub fn push_write_guard(&mut self, guard: WritePageGuard) {
54 self.write_set.push_back(guard);
55 }
56
57 pub fn push_read_guard(&mut self, guard: ReadPageGuard) {
58 self.read_set.push_back(guard);
59 }
60
61 pub fn release_all_write_locks(&mut self) {
63 self.write_set.clear();
64 self.header_lock_guard = None;
65 }
66}
67
68#[derive(Debug)]
72pub struct BPlusTreeIndex {
73 pub key_schema: SchemaRef,
74 pub buffer_pool: Arc<BufferManager>,
75 pub internal_max_size: u32,
76 pub leaf_max_size: u32,
77 pub header_page_id: PageId,
78 pub header_page_lock: Arc<RwLock<()>>,
79 pub config: BTreeConfig,
80 pub pending_garbage: AtomicUsize,
82}
83
84impl BPlusTreeIndex {
85 fn wal_overwrite_page(
88 &self,
89 guard: &mut WritePageGuard,
90 new_image: Vec<u8>,
91 ) -> QuillSQLResult<()> {
92 debug_assert_eq!(new_image.len(), PAGE_SIZE);
93 guard.apply_page_image(&new_image)?;
94 Ok(())
95 }
96 pub fn new(
97 key_schema: SchemaRef,
98 buffer_pool: Arc<BufferManager>,
99 internal_max_size: u32,
100 leaf_max_size: u32,
101 ) -> Self {
102 let mut header_page_guard = buffer_pool
104 .new_page()
105 .expect("Failed to create header page for B+ tree");
106 let header_page_id = header_page_guard.page_id();
107 let header_page = BPlusTreeHeaderPage {
108 root_page_id: INVALID_PAGE_ID,
109 };
110 let encoded = BPlusTreeHeaderPageCodec::encode(&header_page);
111 header_page_guard.overwrite(&encoded, None);
112 drop(header_page_guard);
113
114 Self {
115 key_schema,
116 buffer_pool,
117 internal_max_size: internal_max_size + 1,
120 leaf_max_size,
121 header_page_id,
122 header_page_lock: Arc::new(RwLock::new(())),
123 config: BTreeConfig::default(),
124 pending_garbage: AtomicUsize::new(0),
125 }
126 }
127
128 pub fn new_with_config(
129 key_schema: SchemaRef,
130 buffer_pool: Arc<BufferManager>,
131 internal_max_size: u32,
132 leaf_max_size: u32,
133 config: BTreeConfig,
134 ) -> Self {
135 let mut me = Self::new(key_schema, buffer_pool, internal_max_size, leaf_max_size);
136 me.config = config;
137 me
138 }
139
140 pub fn open(
141 key_schema: SchemaRef,
142 buffer_pool: Arc<BufferManager>,
143 internal_max_size: u32,
144 leaf_max_size: u32,
145 header_page_id: PageId,
146 ) -> Self {
147 Self {
148 key_schema,
149 buffer_pool,
150 internal_max_size: internal_max_size + 1,
152 leaf_max_size,
153 header_page_id,
154 header_page_lock: Arc::new(RwLock::new(())),
155 config: BTreeConfig::default(),
156 pending_garbage: AtomicUsize::new(0),
157 }
158 }
159
160 pub fn open_with_config(
161 key_schema: SchemaRef,
162 buffer_pool: Arc<BufferManager>,
163 internal_max_size: u32,
164 leaf_max_size: u32,
165 header_page_id: PageId,
166 config: BTreeConfig,
167 ) -> Self {
168 let mut me = Self::open(
169 key_schema,
170 buffer_pool,
171 internal_max_size,
172 leaf_max_size,
173 header_page_id,
174 );
175 me.config = config;
176 me
177 }
178
179 pub fn get_root_page_id(&self) -> QuillSQLResult<PageId> {
180 let header_guard = self.buffer_pool.fetch_page_read(self.header_page_id)?;
181 let (header_page, _) = BPlusTreeHeaderPageCodec::decode(header_guard.data())?;
182 Ok(header_page.root_page_id)
183 }
184
185 fn set_root_page_id(&self, page_id: PageId) -> QuillSQLResult<()> {
186 let mut header_guard = self.buffer_pool.fetch_page_write(self.header_page_id)?;
187 let header_page = BPlusTreeHeaderPage {
189 root_page_id: page_id,
190 };
191 let encoded = BPlusTreeHeaderPageCodec::encode(&header_page);
192 self.wal_overwrite_page(&mut header_guard, encoded)?;
193 Ok(())
194 }
195
196 pub fn is_empty(&self) -> QuillSQLResult<bool> {
197 Ok(self.get_root_page_id()? == INVALID_PAGE_ID)
198 }
199
200 pub fn get(&self, key: &Tuple) -> QuillSQLResult<Option<RecordId>> {
201 if self.is_empty()? {
202 return Ok(None);
203 }
204 let mut guard = self.find_leaf_page_optimistic(key)?;
205 loop {
207 let decoded = BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone());
208 if let Ok((leaf_page, _)) = decoded {
209 if let Some(rid) = leaf_page.look_up(key) {
210 return Ok(Some(rid));
211 }
212 if leaf_page.header.current_size > 0
213 && leaf_page.header.next_page_id != INVALID_PAGE_ID
214 {
215 let last_key = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
216 if *key > *last_key {
217 guard = self
218 .buffer_pool
219 .fetch_page_read(leaf_page.header.next_page_id)?;
220 continue;
221 }
222 }
223 return Ok(None);
224 } else {
225 guard = self.find_leaf_page_optimistic(key)?;
227 let (leaf_page, _) =
228 BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone())?;
229 return Ok(leaf_page.look_up(key));
230 }
231 }
232 }
233
234 fn find_leaf_page_pessimistic<'a>(
236 &'a self,
237 key: &Tuple,
238 is_insert: bool,
239 mut context: Context<'a>,
240 ) -> QuillSQLResult<(WritePageGuard, Context<'a>)> {
241 if self.config.debug_find_level >= 1 {
242 eprintln!(
243 "[FIND] thread={:?} begin is_insert={} key={}",
244 std::thread::current().id(),
245 is_insert,
246 key
247 );
248 }
249 let root_page_id = self.get_root_page_id()?;
251 if root_page_id == INVALID_PAGE_ID {
252 return Err(QuillSQLError::Internal(
254 "find_leaf_page_pessimistic called on an empty tree".to_string(),
255 ));
256 }
257 let mut current_guard = self.buffer_pool.fetch_page_write(root_page_id)?;
258
259 loop {
260 let (page, _) =
261 BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone())?;
262
263 match page {
264 BPlusTreePage::Internal(_internal) => {
265 let child_page_id = BPlusTreeInternalPageCodec::lookup_child_from_bytes(
266 current_guard.data(),
267 self.key_schema.clone(),
268 key,
269 )?;
270 if self.config.debug_find_level >= 1 {
271 eprintln!(
272 "[FIND] thread={:?} at_internal parent={} -> child={}",
273 std::thread::current().id(),
274 current_guard.page_id(),
275 child_page_id
276 );
277 }
278 let child_guard = self.buffer_pool.fetch_page_write(child_page_id)?;
279 let will_overflow = match BPlusTreePageTypeCodec::decode(child_guard.data())?.0
281 {
282 BPlusTreePageType::LeafPage => {
283 let (hdr, _) =
284 BPlusTreeLeafPageCodec::decode_header_only(child_guard.data())?;
285 hdr.current_size == hdr.max_size
286 }
287 BPlusTreePageType::InternalPage => {
288 let (hdr, _) =
289 BPlusTreeInternalPageCodec::decode_header_only(child_guard.data())?;
290 hdr.current_size == hdr.max_size
291 }
292 };
293
294 if is_insert {
295 if !will_overflow {
296 if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
297 eprintln!(
298 "[FIND] thread={:?} safe_descend release_ancestors parent={} child={}",
299 std::thread::current().id(),
300 current_guard.page_id(),
301 child_page_id
302 );
303 }
304 context.release_all_write_locks();
305 drop(current_guard);
306 current_guard = child_guard;
307 continue;
308 } else if self.config.debug_find_level >= 1 {
309 eprintln!(
310 "[FIND] thread={:?} hold_parent due_to_full child={} write_set_len={}",
311 std::thread::current().id(),
312 child_page_id,
313 context.write_set.len()
314 );
315 }
316 }
317
318 if self.config.debug_find_level >= 2 {
319 eprintln!(
320 "[FIND DEBUG] hold-parent: parent={}, child={}, write_set_len={}",
321 current_guard.page_id(),
322 child_page_id,
323 context.write_set.len()
324 );
325 }
326 context.push_write_guard(current_guard);
327 current_guard = child_guard;
328 }
329 BPlusTreePage::Leaf(_) => {
330 if self.config.debug_find_level >= 1 {
331 eprintln!(
332 "[FIND] thread={:?} reached_leaf leaf_page_id={} write_set_len={}",
333 std::thread::current().id(),
334 current_guard.page_id(),
335 context.write_set.len()
336 );
337 }
338 return Ok((current_guard, context));
339 }
340 }
341 }
342 }
343
344 pub fn insert(&self, key: &Tuple, rid: RecordId) -> QuillSQLResult<()> {
346 let mut context = Context::new();
347 if self.config.debug_insert_level >= 1 {
348 eprintln!(
349 "[INSERT] thread={:?} start key={}",
350 std::thread::current().id(),
351 key
352 );
353 }
354
355 if self.is_empty()? {
357 let _lock = self.header_page_lock.write();
358 let root_now = self.get_root_page_id()?;
359 if root_now == INVALID_PAGE_ID {
360 if self.config.debug_insert_level >= 1 {
361 eprintln!(
362 "[INSERT] thread={:?} start_new_tree key={}",
363 std::thread::current().id(),
364 key
365 );
366 }
367 self.start_new_tree(key, rid)?;
369 return Ok(());
370 }
371 }
372
373 loop {
375 if self.config.debug_insert_level >= 1 {
376 eprintln!(
377 "[INSERT] thread={:?} find_leaf_pessimistic key={}",
378 std::thread::current().id(),
379 key
380 );
381 }
382 let (mut leaf_guard, mut local_ctx) =
383 self.find_leaf_page_pessimistic(key, true, context)?;
384 let (mut leaf_page, _) =
385 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
386
387 if let Some(parent_guard_ref) = local_ctx.write_set.back() {
390 let (parent_page_chk, _) = BPlusTreeInternalPageCodec::decode(
391 parent_guard_ref.data(),
392 self.key_schema.clone(),
393 )?;
394 let expected_pid = parent_page_chk.look_up(key);
395 if expected_pid != leaf_guard.page_id() {
396 if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
397 eprintln!(
398 "[INSERT] parent_guided_redirect: from_leaf={} -> expected_child={}",
399 leaf_guard.page_id(),
400 expected_pid
401 );
402 }
403 drop(leaf_guard);
404 leaf_guard = self.buffer_pool.fetch_page_write(expected_pid)?;
405 let (new_leaf, _) =
406 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
407 leaf_page = new_leaf;
408 }
409 }
410
411 while leaf_page.header.current_size > 0
413 && leaf_page.header.next_page_id != INVALID_PAGE_ID
414 {
415 let last_key_ref = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
416 if *key <= *last_key_ref {
417 break;
418 }
419 let next_pid = leaf_page.header.next_page_id;
420 let next_guard_peek = self.buffer_pool.fetch_page_read(next_pid)?;
422 let (next_leaf_peek, _) = BPlusTreeLeafPageCodec::decode(
423 next_guard_peek.data(),
424 self.key_schema.clone(),
425 )?;
426 let next_first_key = if next_leaf_peek.header.current_size > 0 {
427 next_leaf_peek.key_at(0).clone()
428 } else {
429 break;
430 };
431 drop(next_guard_peek);
432 if *key < next_first_key {
433 break;
434 }
435 if self.config.debug_insert_level >= 1 {
436 eprintln!(
437 "[INSERT] thread={:?} redirect_to_sibling: from_leaf={} -> next_leaf={} key={} last_key={} next_first_key={}",
438 std::thread::current().id(),
439 leaf_guard.page_id(),
440 next_pid,
441 key,
442 last_key_ref,
443 next_first_key
444 );
445 }
446 local_ctx.release_all_write_locks();
448 drop(leaf_guard);
449 let next_guard = self.buffer_pool.fetch_page_write(next_pid)?;
450 let (next_leaf, _) =
451 BPlusTreeLeafPageCodec::decode(next_guard.data(), self.key_schema.clone())?;
452 leaf_guard = next_guard;
453 leaf_page = next_leaf;
454 }
455
456 if let Some(existing_rid) = leaf_page.look_up_mut(key) {
458 if self.config.debug_insert_level >= 1 {
459 eprintln!(
460 "[INSERT] thread={:?} update leaf_page_id={} key={} old_rid={:?} new_rid={:?}",
461 std::thread::current().id(),
462 leaf_guard.page_id(),
463 key,
464 *existing_rid,
465 rid
466 );
467 }
468 *existing_rid = rid;
469 leaf_page.header.version += 1;
470 let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
471 self.wal_overwrite_page(&mut leaf_guard, encoded)?;
472 local_ctx.release_all_write_locks();
473 return Ok(());
474 }
475
476 if leaf_page.header.current_size == leaf_page.header.max_size {
478 if std::env::var("QUILL_DEBUG_INSERT").ok().as_deref() == Some("1") {
479 eprintln!(
480 "[INSERT] thread={:?} leaf_full split leaf_page_id={} key={}",
481 std::thread::current().id(),
482 leaf_guard.page_id(),
483 key
484 );
485 }
486 local_ctx.header_lock_guard = None;
488 match self.split(leaf_guard, &mut local_ctx) {
489 Ok(()) => {
490 local_ctx.release_all_write_locks();
492 context = Context::new();
493 continue;
494 }
495 Err(QuillSQLError::Internal(_e)) => {
496 local_ctx.release_all_write_locks();
498 context = Context::new();
499 continue;
500 }
501 Err(e) => return Err(e),
502 }
503 }
504
505 if std::env::var("QUILL_DEBUG_INSERT").ok().as_deref() == Some("1") {
507 eprintln!(
508 "[INSERT] thread={:?} insert leaf_page_id={} key={} rid={:?}",
509 std::thread::current().id(),
510 leaf_guard.page_id(),
511 key,
512 rid
513 );
514 }
515 leaf_page.insert(key.clone(), rid);
516 leaf_page.header.version += 1;
517 let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
518 self.wal_overwrite_page(&mut leaf_guard, encoded)?;
519 local_ctx.release_all_write_locks();
520 return Ok(());
521 }
522 }
523
524 pub fn delete(&self, key: &Tuple) -> QuillSQLResult<()> {
526 if self.is_empty()? {
527 return Ok(());
528 }
529
530 let mut context = Context::new();
531 'restart: loop {
532 let (mut leaf_guard, mut local_ctx) =
533 self.find_leaf_page_pessimistic(key, false, context)?;
534 let (mut leaf_page, _) =
535 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
536
537 if let Some(parent_guard_ref) = local_ctx.write_set.back() {
540 let (parent_page_chk, _) = BPlusTreeInternalPageCodec::decode(
541 parent_guard_ref.data(),
542 self.key_schema.clone(),
543 )?;
544 let expected_pid = parent_page_chk.look_up(key);
545 if expected_pid != leaf_guard.page_id() {
546 if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
547 eprintln!(
548 "[DELETE] parent_guided_redirect: from_leaf={} -> expected_child={}",
549 leaf_guard.page_id(),
550 expected_pid
551 );
552 }
553 drop(leaf_guard);
554 leaf_guard = self.buffer_pool.fetch_page_write(expected_pid)?;
555 let (new_leaf, _) =
556 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
557 leaf_page = new_leaf;
558 }
559 }
560
561 let mut hops: u32 = 0;
563 while leaf_page.header.current_size > 0
564 && leaf_page.header.next_page_id != INVALID_PAGE_ID
565 {
566 let last_key_ref = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
567 if *key <= *last_key_ref {
568 break;
569 }
570 let next_pid = leaf_page.header.next_page_id;
571 if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
572 eprintln!(
573 "[DELETE] thread={:?} redirect_to_sibling: from_leaf={} -> next_leaf={} key={} last_key={}",
574 std::thread::current().id(),
575 leaf_guard.page_id(),
576 next_pid,
577 key,
578 last_key_ref
579 );
580 }
581 hops += 1;
582 if hops > 8 {
583 if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
584 eprintln!(
585 "[DELETE] redirect_hops_exceeded: restart from root at leaf={}",
586 leaf_guard.page_id()
587 );
588 }
589 local_ctx.release_all_write_locks();
590 drop(leaf_guard);
591 context = Context::new();
592 continue 'restart;
593 }
594 drop(leaf_guard);
595 leaf_guard = self.buffer_pool.fetch_page_write(next_pid)?;
596 let (new_leaf, _) =
597 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
598 leaf_page = new_leaf;
599
600 }
602
603 let was_first = if leaf_page.header.current_size > 0 {
605 let first_key = leaf_page.key_at(0).clone();
606 &first_key == key
607 } else {
608 false
609 };
610 if leaf_page.look_up(key).is_none() {
611 local_ctx.release_all_write_locks();
612 return Ok(());
613 }
614
615 leaf_page.delete(key);
616 leaf_page.header.version += 1;
617 let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
618 self.wal_overwrite_page(&mut leaf_guard, encoded)?;
619
620 if leaf_page.header.current_size < leaf_page.min_size() {
622 if local_ctx.write_set.is_empty() {
624 let root_id = self.get_root_page_id()?;
625 if leaf_guard.page_id() != root_id {
626 local_ctx.release_all_write_locks();
627 drop(leaf_guard);
628 context = Context::new();
629 continue 'restart;
630 }
631 }
632 match self.handle_underflow(leaf_guard, &mut local_ctx) {
633 Ok(()) => {
634 local_ctx.release_all_write_locks();
635 return Ok(());
636 }
637 Err(QuillSQLError::Internal(_)) => {
638 local_ctx.release_all_write_locks();
640 context = Context::new();
641 continue;
642 }
643 Err(e) => return Err(e),
644 }
645 } else {
646 if was_first && leaf_page.header.current_size > 0 {
649 if let Some(mut parent_guard) = local_ctx.write_set.pop_back() {
650 let (mut parent_page, _) = BPlusTreeInternalPageCodec::decode(
651 parent_guard.data(),
652 self.key_schema.clone(),
653 )?;
654 if let Some(node_idx) = parent_page.value_index(leaf_guard.page_id()) {
655 if node_idx > 0 {
656 parent_page.array[node_idx].0 = leaf_page.key_at(0).clone();
657 parent_page.header.version += 1;
658 let encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
659 self.wal_overwrite_page(&mut parent_guard, encoded)?;
660 }
661 }
662 local_ctx.write_set.push_back(parent_guard);
664 }
665 }
666 local_ctx.release_all_write_locks();
667 return Ok(());
668 }
669 }
670 }
671
672 pub fn to_dot(&self) -> QuillSQLResult<String> {
673 let mut dot = String::new();
674 writeln!(&mut dot, "digraph BPlusTree {{").unwrap();
675 writeln!(&mut dot, " rankdir=TB;").unwrap();
676 writeln!(&mut dot, " node [shape=record, height=.1];").unwrap();
677
678 let root_page_id = self.get_root_page_id()?;
679 if root_page_id == INVALID_PAGE_ID {
680 writeln!(&mut dot, " empty [label=\"<f0> Empty Tree\"];").unwrap();
681 writeln!(&mut dot, "}}").unwrap();
682 return Ok(dot);
683 }
684
685 let mut queue = VecDeque::new();
686 queue.push_back(root_page_id);
687
688 while let Some(page_id) = queue.pop_front() {
689 let guard = self.buffer_pool.fetch_page_read(page_id)?;
690 let (page, _) = BPlusTreePageCodec::decode(guard.data(), self.key_schema.clone())?;
691
692 match page {
693 BPlusTreePage::Internal(internal) => {
694 let mut label = String::new();
695 for i in 0..internal.header.current_size {
696 if i > 0 {
697 write!(&mut label, "|").unwrap();
698 }
699 write!(&mut label, "<p{}>", i).unwrap();
700 if i > 0 {
701 write!(&mut label, " {}", internal.key_at(i as usize)).unwrap();
702 }
703 }
704 writeln!(&mut dot, " page{} [label=\"{}\"];", page_id, label).unwrap();
705
706 for i in 0..internal.header.current_size {
707 let child_id = internal.value_at(i as usize);
708 writeln!(
709 &mut dot,
710 " \"page{}\":p{} -> \"page{}\";",
711 page_id, i, child_id
712 )
713 .unwrap();
714 queue.push_back(child_id);
715 }
716 }
717 BPlusTreePage::Leaf(leaf) => {
718 let mut label = String::new();
719 for i in 0..leaf.header.current_size {
720 if i > 0 {
721 write!(&mut label, "|").unwrap();
722 }
723 write!(
724 &mut label,
725 "<f{}> {} -> {}",
726 i,
727 leaf.key_at(i as usize),
728 leaf.array[i as usize].1
729 )
730 .unwrap();
731 }
732 writeln!(&mut dot, " page{} [label=\"{}\"];", page_id, label).unwrap();
733
734 if leaf.header.next_page_id != INVALID_PAGE_ID {
735 writeln!(
736 &mut dot,
737 " page{} -> page{} [style=dashed, constraint=false];",
738 page_id, leaf.header.next_page_id
739 )
740 .unwrap();
741 }
742 }
743 }
744 }
745
746 writeln!(&mut dot, "}}").unwrap();
747 Ok(dot)
748 }
749
750 fn handle_underflow(
751 &self,
752 mut node_guard: WritePageGuard,
753 context: &mut Context,
754 ) -> QuillSQLResult<()> {
755 if context.write_set.is_empty() {
756 self.adjust_root(node_guard)?;
758 return Ok(());
759 }
760
761 let parent_guard = match context.write_set.pop_back() {
762 Some(g) => g,
763 None => {
764 return Err(QuillSQLError::Internal(
765 "underflow: missing parent".to_string(),
766 ))
767 }
768 };
769 let (parent_page, _) =
770 BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
771
772 let Some(node_idx) = parent_page.value_index(node_guard.page_id()) else {
773 return Err(QuillSQLError::Internal(
774 "underflow: node not found in parent".to_string(),
775 ));
776 };
777
778 if node_idx > 0 {
780 let left_sibling_pid = parent_page.value_at(node_idx - 1);
781 let node_pid = node_guard.page_id();
782 if left_sibling_pid < node_pid {
783 drop(node_guard);
785 let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
786 let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
787 node_guard = node_guard_new;
788 let (left_sibling_page, _) =
789 BPlusTreePageCodec::decode(left_sibling_guard.data(), self.key_schema.clone())?;
790 if left_sibling_page.current_size() > left_sibling_page.min_size() {
791 self.redistribute(
792 left_sibling_guard,
793 node_guard,
794 parent_guard,
795 node_idx,
796 true,
797 )?;
798 return Ok(());
799 }
800 } else {
801 let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
803 let (left_sibling_page, _) =
804 BPlusTreePageCodec::decode(left_sibling_guard.data(), self.key_schema.clone())?;
805 if left_sibling_page.current_size() > left_sibling_page.min_size() {
806 self.redistribute(
807 left_sibling_guard,
808 node_guard,
809 parent_guard,
810 node_idx,
811 true,
812 )?;
813 return Ok(());
814 }
815 }
816 }
817
818 if node_idx < parent_page.header.current_size as usize - 1 {
820 let right_sibling_pid = parent_page.value_at(node_idx + 1);
821 let node_pid = node_guard.page_id();
822 if right_sibling_pid < node_pid {
823 drop(node_guard);
825 let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
826 let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
827 node_guard = node_guard_new;
828 let (right_sibling_page, _) = BPlusTreePageCodec::decode(
829 right_sibling_guard.data(),
830 self.key_schema.clone(),
831 )?;
832 if right_sibling_page.current_size() > right_sibling_page.min_size() {
833 self.redistribute(
834 right_sibling_guard,
835 node_guard,
836 parent_guard,
837 node_idx,
838 false,
839 )?;
840 return Ok(());
841 }
842 } else {
843 let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
844 let (right_sibling_page, _) = BPlusTreePageCodec::decode(
845 right_sibling_guard.data(),
846 self.key_schema.clone(),
847 )?;
848 if right_sibling_page.current_size() > right_sibling_page.min_size() {
849 self.redistribute(
850 right_sibling_guard,
851 node_guard,
852 parent_guard,
853 node_idx,
854 false,
855 )?;
856 return Ok(());
857 }
858 }
859 }
860
861 if node_idx > 0 {
863 let left_sibling_pid = parent_page.value_at(node_idx - 1);
865 let node_pid = node_guard.page_id();
866 if left_sibling_pid < node_pid {
867 drop(node_guard);
869 let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
870 let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
871 node_guard = node_guard_new;
872 self.coalesce(left_sibling_guard, node_guard, parent_guard, context)?;
873 } else {
874 let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
875 self.coalesce(left_sibling_guard, node_guard, parent_guard, context)?;
876 }
877 } else {
878 let right_sibling_pid = parent_page.value_at(node_idx + 1);
880 let node_pid = node_guard.page_id();
881 if right_sibling_pid < node_pid {
882 drop(node_guard);
884 let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
885 let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
886 node_guard = node_guard_new;
887 self.coalesce(node_guard, right_sibling_guard, parent_guard, context)?;
889 } else {
890 let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
891 self.coalesce(node_guard, right_sibling_guard, parent_guard, context)?;
892 }
893 }
894
895 Ok(())
896 }
897
898 fn coalesce(
899 &self,
900 mut left_guard: WritePageGuard,
901 right_guard: WritePageGuard,
902 mut parent_guard: WritePageGuard,
903 context: &mut Context,
904 ) -> QuillSQLResult<()> {
905 let (mut left_page, _) =
906 BPlusTreePageCodec::decode(left_guard.data(), self.key_schema.clone())?;
907 let (mut right_page, _) =
908 BPlusTreePageCodec::decode(right_guard.data(), self.key_schema.clone())?;
909 let (mut parent_page, _) =
910 BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
911
912 let right_page_id = right_guard.page_id();
913 let middle_key = match parent_page.remove(right_page_id) {
914 Some((k, _)) => k,
915 None => {
916 return Err(QuillSQLError::Internal(
917 "coalesce: parent missing right child".to_string(),
918 ));
919 }
920 };
921
922 match (&mut left_page, &mut right_page) {
923 (BPlusTreePage::Leaf(left), BPlusTreePage::Leaf(right)) => {
924 left.merge(right);
925 }
926 (BPlusTreePage::Internal(left), BPlusTreePage::Internal(right)) => {
927 left.merge(middle_key, right);
928 left.header.next_page_id = right.header.next_page_id;
930 left.high_key = right.high_key.clone();
931 }
932 _ => unreachable!("Mismatched page types in coalesce"),
933 }
934
935 if let BPlusTreePage::Leaf(p) = &mut left_page {
937 p.header.version += 1;
938 } else if let BPlusTreePage::Internal(p) = &mut left_page {
939 p.header.version += 1;
940 }
941 parent_page.header.version += 1;
942
943 let left_encoded = BPlusTreePageCodec::encode(&left_page);
944 self.wal_overwrite_page(&mut left_guard, left_encoded)?;
945 let parent_encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
946 self.wal_overwrite_page(&mut parent_guard, parent_encoded)?;
947 drop(left_guard);
948 drop(right_guard);
949 self.buffer_pool.delete_page(right_page_id)?;
950
951 if context.write_set.is_empty() && parent_page.header.current_size == 1 {
954 self.adjust_root(parent_guard)?;
955 } else if parent_page.header.current_size < parent_page.min_size() {
956 self.handle_underflow(parent_guard, context)?;
958 }
959 Ok(())
960 }
961
962 fn redistribute(
963 &self,
964 mut from_guard: WritePageGuard,
965 mut to_guard: WritePageGuard,
966 mut parent_guard: WritePageGuard,
967 parent_idx_of_to_node: usize,
968 from_is_left_sibling: bool,
969 ) -> QuillSQLResult<()> {
970 let (mut from_page, _) =
971 BPlusTreePageCodec::decode(from_guard.data(), self.key_schema.clone())?;
972 let (mut to_page, _) =
973 BPlusTreePageCodec::decode(to_guard.data(), self.key_schema.clone())?;
974 let (mut parent_page, _) =
975 BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
976
977 if from_is_left_sibling {
978 let separator_idx = parent_idx_of_to_node;
980 match (&mut to_page, &mut from_page) {
981 (BPlusTreePage::Leaf(to_leaf), BPlusTreePage::Leaf(from_leaf)) => {
982 let item_to_move = from_leaf.remove_last_kv();
983 to_leaf.array.insert(0, item_to_move);
984 to_leaf.header.current_size += 1;
985 parent_page.array[separator_idx].0 = to_leaf.key_at(0).clone();
987 to_leaf.header.version += 1;
988 from_leaf.header.version += 1;
989 }
990 (BPlusTreePage::Internal(to_internal), BPlusTreePage::Internal(from_internal)) => {
991 let item_to_move = from_internal.remove_last_kv(); let separator_key_in_parent = parent_page.key_at(separator_idx).clone();
995
996 to_internal
999 .array
1000 .insert(1, (separator_key_in_parent, to_internal.value_at(0)));
1001
1002 to_internal.array[0].1 = item_to_move.1; parent_page.array[separator_idx].0 = item_to_move.0; to_internal.header.current_size += 1;
1009 to_internal.header.version += 1;
1010 from_internal.header.version += 1;
1011
1012 self.refresh_internal_child_fence(
1013 &parent_page,
1014 to_guard.page_id(),
1015 to_internal,
1016 )?;
1017 self.refresh_internal_child_fence(
1018 &parent_page,
1019 from_guard.page_id(),
1020 from_internal,
1021 )?;
1022 }
1023 _ => return Err(QuillSQLError::Internal("Mismatched page types".to_string())),
1024 }
1025 } else {
1026 let separator_idx = parent_idx_of_to_node + 1;
1029 match (&mut to_page, &mut from_page) {
1030 (BPlusTreePage::Leaf(to_leaf), BPlusTreePage::Leaf(from_leaf)) => {
1031 let item_to_move = from_leaf.remove_first_kv();
1032 to_leaf.array.push(item_to_move);
1033 to_leaf.header.current_size += 1;
1034 parent_page.array[separator_idx].0 = from_leaf.key_at(0).clone();
1036 to_leaf.header.version += 1;
1037 from_leaf.header.version += 1;
1038 }
1039 (BPlusTreePage::Internal(to_internal), BPlusTreePage::Internal(from_internal)) => {
1040 let separator_key_in_parent = parent_page.key_at(separator_idx).clone();
1043 let item_to_move = from_internal.remove_first_kv(); to_internal
1048 .array
1049 .push((separator_key_in_parent, from_internal.value_at(0)));
1050
1051 parent_page.array[separator_idx].0 = item_to_move.0; from_internal.array[0].1 = item_to_move.1; to_internal.header.current_size += 1;
1058 to_internal.header.version += 1;
1059 from_internal.header.version += 1;
1060
1061 self.refresh_internal_child_fence(
1062 &parent_page,
1063 to_guard.page_id(),
1064 to_internal,
1065 )?;
1066 self.refresh_internal_child_fence(
1067 &parent_page,
1068 from_guard.page_id(),
1069 from_internal,
1070 )?;
1071 }
1072 _ => return Err(QuillSQLError::Internal("Mismatched page types".to_string())),
1073 }
1074 }
1075
1076 parent_page.header.version += 1;
1077
1078 let from_encoded = BPlusTreePageCodec::encode(&from_page);
1079 self.wal_overwrite_page(&mut from_guard, from_encoded)?;
1080 let to_encoded = BPlusTreePageCodec::encode(&to_page);
1081 self.wal_overwrite_page(&mut to_guard, to_encoded)?;
1082 let parent_encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
1083 self.wal_overwrite_page(&mut parent_guard, parent_encoded)?;
1084
1085 Ok(())
1086 }
1087
1088 fn refresh_internal_child_fence(
1089 &self,
1090 parent_page: &BPlusTreeInternalPage,
1091 child_page_id: PageId,
1092 child_page: &mut BPlusTreeInternalPage,
1093 ) -> QuillSQLResult<()> {
1094 let Some(child_idx) = parent_page.value_index(child_page_id) else {
1095 return Err(QuillSQLError::Internal(
1096 "redistribute: child missing from parent".to_string(),
1097 ));
1098 };
1099 let size = parent_page.header.current_size as usize;
1100 if child_idx + 1 < size {
1101 child_page.high_key = Some(parent_page.key_at(child_idx + 1).clone());
1102 child_page.header.next_page_id = parent_page.value_at(child_idx + 1);
1103 } else {
1104 child_page.high_key = parent_page.high_key.clone();
1105 child_page.header.next_page_id = parent_page.header.next_page_id;
1106 }
1107 Ok(())
1108 }
1109
1110 fn adjust_root(&self, root_guard: WritePageGuard) -> QuillSQLResult<()> {
1111 let (root_page, _) =
1112 BPlusTreePageCodec::decode(root_guard.data(), self.key_schema.clone())?;
1113
1114 if let BPlusTreePage::Internal(root_internal) = root_page {
1115 if root_internal.header.current_size == 1 {
1116 let new_root_id = root_internal.value_at(0);
1119 drop(root_guard);
1121 let _lock = self.header_page_lock.write();
1122 self.set_root_page_id(new_root_id)?;
1123 }
1125 } else if let BPlusTreePage::Leaf(root_leaf) = root_page {
1126 if root_leaf.header.current_size == 0 {
1127 drop(root_guard);
1129 let _lock = self.header_page_lock.write();
1130 self.set_root_page_id(INVALID_PAGE_ID)?;
1131 }
1133 }
1134 Ok(())
1135 }
1136
1137 fn start_new_tree(&self, key: &Tuple, rid: RecordId) -> QuillSQLResult<()> {
1139 let mut root_guard = self.buffer_pool.new_page()?;
1140 let root_page_id = root_guard.page_id();
1141 let mut leaf_page = BPlusTreeLeafPage::new(self.key_schema.clone(), self.leaf_max_size);
1142 leaf_page.insert(key.clone(), rid);
1143 let encoded_data = BPlusTreeLeafPageCodec::encode(&leaf_page);
1144 self.wal_overwrite_page(&mut root_guard, encoded_data)?;
1145 drop(root_guard);
1148 self.set_root_page_id(root_page_id)?;
1149 Ok(())
1150 }
1151
1152 pub fn lazy_cleanup_with<F>(
1161 &self,
1162 mut is_globally_dead: F,
1163 limit: Option<usize>,
1164 ) -> QuillSQLResult<usize>
1165 where
1166 F: FnMut(&RecordId) -> bool,
1167 {
1168 let mut cleaned = 0usize;
1169 let mut guard = self.find_first_leaf_page()?;
1170 loop {
1171 let (leaf, _) = BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone())?;
1172 let next_pid = leaf.header.next_page_id;
1174 let mut to_delete: Vec<Tuple> = Vec::new();
1176 for i in 0..(leaf.header.current_size as usize) {
1177 let kv = leaf.kv_at(i).clone();
1178 if is_globally_dead(&kv.1) {
1179 to_delete.push(kv.0);
1180 }
1181 }
1182 drop(guard);
1183
1184 for k in to_delete.into_iter() {
1185 self.delete(&k)?;
1186 cleaned += 1;
1187 if let Some(maxn) = limit {
1188 if cleaned >= maxn {
1189 return Ok(cleaned);
1190 }
1191 }
1192 }
1193
1194 if next_pid == INVALID_PAGE_ID {
1195 break;
1196 }
1197 guard = self.buffer_pool.fetch_page_read(next_pid)?;
1198 }
1199 Ok(cleaned)
1200 }
1201
1202 fn find_leaf_page_optimistic(&self, key: &Tuple) -> QuillSQLResult<ReadPageGuard> {
1203 let mut restarts = 0usize;
1205 'restart: loop {
1206 let mut current_guard = self.buffer_pool.fetch_page_read(self.get_root_page_id()?)?;
1207 loop {
1208 let decoded =
1209 BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone());
1210 if decoded.is_err() {
1211 drop(current_guard);
1212 restarts += 1;
1213 if restarts > MAX_OLC_RESTARTS {
1214 std::thread::sleep(std::time::Duration::from_micros(
1215 OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1216 ));
1217 restarts = 0;
1218 }
1219 continue 'restart;
1220 }
1221 let (page, _) = decoded.unwrap();
1222 match page {
1223 BPlusTreePage::Internal(internal) => {
1224 let (hdr1, _) =
1226 BPlusTreeInternalPageCodec::decode_header_only(current_guard.data())?;
1227 let v1 = hdr1.version;
1228 if let Some(ref hk) = internal.high_key {
1229 if key >= hk && internal.header.next_page_id != INVALID_PAGE_ID {
1230 let sib = self
1231 .buffer_pool
1232 .fetch_page_read(internal.header.next_page_id)?;
1233 drop(current_guard);
1234 current_guard = sib;
1235 continue;
1236 }
1237 }
1238 let next_page_id = BPlusTreeInternalPageCodec::lookup_child_from_bytes(
1240 current_guard.data(),
1241 self.key_schema.clone(),
1242 key,
1243 )?;
1244 let (hdr2, _) =
1245 BPlusTreeInternalPageCodec::decode_header_only(current_guard.data())?;
1246 let v2 = hdr2.version;
1247 if v1 != v2 {
1248 drop(current_guard);
1249 restarts += 1;
1250 if restarts > MAX_OLC_RESTARTS {
1251 std::thread::sleep(std::time::Duration::from_micros(
1252 OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1253 ));
1254 restarts = 0;
1255 }
1256 continue 'restart;
1257 }
1258 let child_guard = self.buffer_pool.fetch_page_read(next_page_id)?;
1259 drop(current_guard);
1260 current_guard = child_guard;
1261 }
1262 BPlusTreePage::Leaf(_leaf) => {
1263 let (h1, _) =
1264 BPlusTreeLeafPageCodec::decode_header_only(current_guard.data())?;
1265 let v1 = h1.version;
1266 let (h2, _) =
1267 BPlusTreeLeafPageCodec::decode_header_only(current_guard.data())?;
1268 let v2 = h2.version;
1269 if v1 != v2 {
1270 drop(current_guard);
1271 restarts += 1;
1272 if restarts > MAX_OLC_RESTARTS {
1273 std::thread::sleep(std::time::Duration::from_micros(
1274 OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1275 ));
1276 restarts = 0;
1277 }
1278 continue 'restart;
1279 }
1280 return Ok(current_guard);
1281 }
1282 }
1283 }
1284 }
1285 }
1286
1287 pub fn find_first_leaf_page(&self) -> QuillSQLResult<ReadPageGuard> {
1288 let mut current_page_id = self.get_root_page_id()?;
1289 if current_page_id == INVALID_PAGE_ID {
1290 return Err(QuillSQLError::Internal("Tree is empty".to_string()));
1291 }
1292
1293 loop {
1294 let guard = self.buffer_pool.fetch_page_read(current_page_id)?;
1295 let (page, _) = BPlusTreePageCodec::decode(guard.data(), self.key_schema.clone())?;
1296
1297 match page {
1298 BPlusTreePage::Internal(internal) => {
1299 current_page_id = internal.value_at(0);
1300 }
1301 BPlusTreePage::Leaf(_) => {
1302 return Ok(guard);
1303 }
1304 }
1305 }
1306 }
1307
1308 pub fn find_leaf_page_for_iterator(
1313 &self,
1314 key: &Tuple,
1315 start_page_id: PageId,
1316 ) -> QuillSQLResult<ReadPageGuard> {
1317 let mut current_page_id = start_page_id;
1318 if current_page_id == INVALID_PAGE_ID {
1319 return Err(QuillSQLError::Storage("btree: empty tree".to_string()));
1320 }
1321
1322 loop {
1323 let current_guard = self.buffer_pool.fetch_page_read(current_page_id)?;
1326
1327 let (page_content, _) =
1329 BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone())?;
1330
1331 match page_content {
1332 BPlusTreePage::Internal(_internal_page) => {
1334 current_page_id =
1336 crate::storage::codec::BPlusTreeInternalPageCodec::lookup_child_from_bytes(
1337 current_guard.data(),
1338 self.key_schema.clone(),
1339 key,
1340 )?;
1341 }
1342 BPlusTreePage::Leaf(_) => {
1344 return Ok(current_guard);
1347 }
1348 }
1349 }
1350 }
1351
1352 fn split<'a>(
1354 &'a self,
1355 mut page_guard: WritePageGuard,
1356 context: &mut Context<'a>,
1357 ) -> QuillSQLResult<()> {
1358 if self.config.debug_split_level >= 2 {
1359 eprintln!(
1360 "[SPLIT DEBUG] splitting page={}, write_set_len={}",
1361 page_guard.page_id(),
1362 context.write_set.len()
1363 );
1364 }
1365 loop {
1366 let page_id = page_guard.page_id();
1367 if self.config.debug_split_level >= 2 {
1368 eprintln!(
1369 "[SPLIT DEBUG] splitting page={}, write_set_len={}",
1370 page_id,
1371 context.write_set.len()
1372 );
1373 }
1374 let (mut page, _) =
1375 BPlusTreePageCodec::decode(page_guard.data(), self.key_schema.clone())?;
1376
1377 let mut new_page_guard = self.buffer_pool.new_page()?;
1378 let new_page_id = new_page_guard.page_id();
1379
1380 let middle_key = match &mut page {
1381 BPlusTreePage::Leaf(leaf_page) => {
1382 let mut new_leaf =
1383 BPlusTreeLeafPage::new(self.key_schema.clone(), self.leaf_max_size);
1384 new_leaf.batch_insert(
1385 leaf_page.split_off(leaf_page.header.current_size as usize / 2),
1386 );
1387 new_leaf.header.next_page_id = leaf_page.header.next_page_id;
1388 leaf_page.header.next_page_id = new_page_id;
1389 let new_data = BPlusTreeLeafPageCodec::encode(&new_leaf);
1390 self.wal_overwrite_page(&mut new_page_guard, new_data)?;
1391 leaf_page.header.version += 1;
1392 if self.config.debug_split_level >= 2 && new_leaf.header.current_size > 0 {
1393 eprintln!(
1394 "[SPLIT DEBUG] leaf_split left={} right={} sep_key={}",
1395 page_id,
1396 new_page_id,
1397 new_leaf.key_at(0)
1398 );
1399 }
1400 new_leaf.key_at(0).clone()
1401 }
1402 BPlusTreePage::Internal(internal_page) => {
1403 let mut new_internal =
1404 BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size);
1405
1406 let num_pointers = internal_page.header.current_size as usize;
1410 let promote_idx = 1 + (num_pointers.saturating_sub(1) / 2);
1411
1412 let mut moved = internal_page.split_off(promote_idx);
1414
1415 let (middle_key, right_sentinel_ptr) = {
1418 let pair = moved.get(0).ok_or(QuillSQLError::Internal(
1419 "Internal split moved entries empty".to_string(),
1420 ))?;
1421 (pair.0.clone(), pair.1)
1422 };
1423
1424 new_internal.insert(Tuple::empty(self.key_schema.clone()), right_sentinel_ptr);
1426
1427 if moved.len() > 1 {
1429 new_internal.batch_insert(moved.split_off(1));
1431 }
1432
1433 let old_high_key = internal_page.high_key.clone();
1436 let old_next = internal_page.header.next_page_id;
1437 internal_page.high_key = Some(middle_key.clone());
1439 new_internal.high_key = old_high_key;
1441 new_internal.header.next_page_id = old_next;
1443 let new_data = BPlusTreeInternalPageCodec::encode(&new_internal);
1444 self.wal_overwrite_page(&mut new_page_guard, new_data)?;
1445 internal_page.header.next_page_id = new_page_id;
1447 internal_page.header.version += 1;
1448 if self.config.debug_split_level >= 2 {
1449 eprintln!(
1450 "[SPLIT DEBUG] internal_split left={} right={} promote_key={}",
1451 page_id, new_page_id, middle_key
1452 );
1453 }
1454 middle_key
1455 }
1456 };
1457
1458 let old_page_data = BPlusTreePageCodec::encode(&page);
1460 self.wal_overwrite_page(&mut page_guard, old_page_data)?;
1461
1462 if page_guard.page_id() == self.get_root_page_id()? {
1464 if self.config.debug_split_level >= 2 {
1465 eprintln!(
1466 "[SPLIT DEBUG] root-split: old_root={}, new_right={}",
1467 page_id, new_page_id
1468 );
1469 }
1470 let mut new_root_guard = self.buffer_pool.new_page()?;
1471 let new_root_id = new_root_guard.page_id();
1472 let mut new_root_page =
1473 BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size);
1474
1475 new_root_page.insert(Tuple::empty(self.key_schema.clone()), page_id);
1476 new_root_page.insert(middle_key, new_page_id);
1477
1478 let encoded = BPlusTreeInternalPageCodec::encode(&new_root_page);
1479 self.wal_overwrite_page(&mut new_root_guard, encoded)?;
1480
1481 drop(new_page_guard);
1483 drop(page_guard);
1484
1485 let _lock = self.header_page_lock.write();
1486 self.set_root_page_id(new_root_id)?;
1487 return Ok(());
1488 }
1489
1490 let mut parent_guard = match context.write_set.pop_back() {
1492 Some(g) => g,
1493 None => {
1494 return Err(QuillSQLError::Internal(
1496 "split: missing parent in context".to_string(),
1497 ));
1498 }
1499 };
1500 if std::env::var("QUILL_DEBUG_SPLIT").ok().as_deref() == Some("2") {
1501 eprintln!(
1502 "[SPLIT DEBUG] promote to parent={}, left={}, right={}",
1503 parent_guard.page_id(),
1504 page_id,
1505 new_page_id
1506 );
1507 }
1508 let (mut parent_page, _) =
1509 BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
1510 if parent_page.value_index(page_id).is_none() {
1512 return Err(QuillSQLError::Internal(
1514 "split: parent no longer contains left child".to_string(),
1515 ));
1516 }
1517 parent_page.insert_after(page_id, middle_key, new_page_id);
1518 parent_page.header.version += 1;
1519
1520 let encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
1521 self.wal_overwrite_page(&mut parent_guard, encoded)?;
1522
1523 if parent_page.is_full() {
1524 drop(new_page_guard);
1526 drop(page_guard);
1527 page_guard = parent_guard;
1528 } else {
1530 drop(new_page_guard);
1532 drop(page_guard);
1533 return Ok(());
1534 }
1535 }
1536 }
1537
1538 pub fn note_potential_garbage(&self, n: usize) {
1541 self.pending_garbage.fetch_add(n, AtomicOrdering::Relaxed);
1542 }
1543
1544 pub fn take_pending_garbage(&self) -> usize {
1546 self.pending_garbage.swap(0, AtomicOrdering::Relaxed)
1547 }
1548}
1549
1550#[cfg(test)]
1551mod tests {
1552 use parking_lot::deadlock;
1553 use std::sync::Arc;
1554 use std::sync::Once;
1555 use std::time::Duration;
1556 use tempfile::TempDir;
1557
1558 use crate::catalog::SchemaRef;
1559 use crate::config::WalConfig;
1560 use crate::recovery::{RecoveryManager, WalManager};
1561 use crate::storage::disk_manager::DiskManager;
1562 use crate::storage::disk_scheduler::DiskScheduler;
1563 use crate::storage::index::btree_index::TreeIndexIterator;
1564 use crate::storage::page::{BPlusTreePage, RecordId};
1565 use crate::storage::tuple::Tuple;
1566 use crate::{
1567 buffer::BufferManager,
1568 catalog::{Column, DataType, Schema},
1569 storage::codec::BPlusTreePageCodec,
1570 };
1571
1572 use super::BPlusTreeIndex;
1573
1574 fn ensure_deadlock_watchdog() {
1575 static START: Once = Once::new();
1576 START.call_once(|| {
1577 std::thread::spawn(|| loop {
1578 std::thread::sleep(Duration::from_millis(500));
1579 let deadlocks = deadlock::check_deadlock();
1580 if !deadlocks.is_empty() {
1581 eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
1582 for (i, threads) in deadlocks.iter().enumerate() {
1583 eprintln!("Cycle {}:", i);
1584 for t in threads {
1585 eprintln!(" ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
1586 }
1587 }
1588 panic!("deadlock detected");
1589 }
1590 });
1591 });
1592 }
1593
1594 fn create_test_index(
1596 buffer_pool_size: usize,
1597 internal_max_size: u32,
1598 leaf_max_size: u32,
1599 ) -> (TempDir, BPlusTreeIndex, SchemaRef) {
1600 let temp_dir = TempDir::new().unwrap();
1601 let temp_path = temp_dir.path().join("test.db");
1602
1603 let key_schema = Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1604 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1605 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1606 let buffer_pool = Arc::new(BufferManager::new(buffer_pool_size, disk_scheduler));
1607 let index = BPlusTreeIndex::new(
1608 key_schema.clone(),
1609 buffer_pool,
1610 internal_max_size,
1611 leaf_max_size,
1612 );
1613
1614 (temp_dir, index, key_schema)
1615 }
1616
1617 fn setup_with_wal(
1619 db_path: &std::path::Path,
1620 wal_dir: &std::path::Path,
1621 bpm_pages: usize,
1622 ) -> (Arc<BufferManager>, Arc<WalManager>, Arc<DiskScheduler>) {
1623 let dm = Arc::new(DiskManager::try_new(db_path).unwrap());
1624 let scheduler = Arc::new(DiskScheduler::new(dm));
1625 let mut cfg = WalConfig::default();
1626 cfg.directory = wal_dir.to_path_buf();
1627 let wal = Arc::new(WalManager::new(cfg, None, None).unwrap());
1628 let bpm = Arc::new(BufferManager::new(bpm_pages, scheduler.clone()));
1629 bpm.set_wal_manager(wal.clone());
1630 (bpm, wal, scheduler)
1631 }
1632
1633 fn rid_from_key(key: i64) -> RecordId {
1635 let value = key & 0xFFFFFFFF;
1636 RecordId::new((key >> 32) as u32, value as u32)
1637 }
1638
1639 #[test]
1640 fn test_index_recovery_replays_wal_split() {
1641 ensure_deadlock_watchdog();
1642 let tmp = TempDir::new().unwrap();
1643 let db_path = tmp.path().join("test.db");
1644 let wal_dir = tmp.path().join("wal");
1645
1646 let (bpm, wal, scheduler) = setup_with_wal(&db_path, &wal_dir, 64);
1648 let key_schema: SchemaRef =
1649 Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1650 let index = BPlusTreeIndex::new(key_schema.clone(), bpm.clone(), 2, 3);
1651 let header_pid = index.header_page_id; let keys: Vec<i64> = (1..=30).collect();
1654 for k in &keys {
1655 let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1656 index.insert(&t, rid_from_key(*k)).unwrap();
1657 }
1658
1659 let _ = wal.flush(None).unwrap();
1661 drop(index);
1662 drop(bpm);
1663 drop(wal);
1664 drop(scheduler);
1665
1666 let dm2 = Arc::new(DiskManager::try_new(&db_path).unwrap());
1668 let scheduler2 = Arc::new(DiskScheduler::new(dm2));
1669 let mut cfg2 = WalConfig::default();
1670 cfg2.directory = wal_dir.clone();
1671 let wal2 = Arc::new(WalManager::new(cfg2, None, None).unwrap());
1672 let rm = RecoveryManager::new(wal2.clone(), scheduler2.clone());
1673 let _summary = rm.replay().unwrap();
1674
1675 let bpm2 = Arc::new(BufferManager::new(128, scheduler2.clone()));
1677 let reopened = BPlusTreeIndex::open(key_schema.clone(), bpm2.clone(), 2, 3, header_pid);
1678
1679 for k in &keys {
1680 let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1681 let got = reopened
1682 .get(&t)
1683 .unwrap()
1684 .expect("missing key after recovery");
1685 assert_eq!(got.page_id, rid_from_key(*k).page_id);
1686 assert_eq!(got.slot_num, rid_from_key(*k).slot_num);
1687 }
1688
1689 let index_arc = Arc::new(reopened);
1691 let mut it = TreeIndexIterator::new(index_arc, ..);
1692 let mut i = 1i64;
1693 while let Some(rid) = it.next().unwrap() {
1694 assert_eq!(rid.slot_num, rid_from_key(i).slot_num);
1695 i += 1;
1696 }
1697 assert_eq!(i, 31);
1698 }
1699
1700 fn create_rid_from_key(key: i64) -> RecordId {
1702 let value = key & 0xFFFFFFFF;
1703 RecordId::new((key >> 32) as u32, value as u32)
1704 }
1705
1706 fn create_tuple_from_key(key: i64, schema: SchemaRef) -> Tuple {
1708 Tuple::new(schema, vec![key.into()])
1709 }
1710
1711 #[test]
1713 fn test_basic_insert() {
1714 let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1715
1716 let key = 42i64;
1717 let tuple = create_tuple_from_key(key, key_schema.clone());
1718 let rid = create_rid_from_key(key);
1719
1720 index.insert(&tuple, rid).unwrap();
1721
1722 let root_page_id = index.get_root_page_id().unwrap();
1723 assert_ne!(root_page_id, crate::buffer::INVALID_PAGE_ID);
1724
1725 let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
1726 let (root_page, _) =
1727 BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
1728
1729 assert!(matches!(root_page, BPlusTreePage::Leaf(_)));
1730
1731 if let BPlusTreePage::Leaf(root_as_leaf) = root_page {
1732 assert_eq!(root_as_leaf.header.current_size, 1);
1733 assert_eq!(root_as_leaf.array[0].0, tuple);
1734 assert_eq!(root_as_leaf.array[0].1, rid);
1735 }
1736 }
1737
1738 #[test]
1740 fn test_insert_no_iterator() {
1741 let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1742
1743 let keys = vec![1i64, 2, 3, 4, 5];
1744 for key in &keys {
1745 let tuple = create_tuple_from_key(*key, key_schema.clone());
1746 let rid = create_rid_from_key(*key);
1747 index.insert(&tuple, rid).unwrap();
1748 }
1749
1750 for key in &keys {
1751 let tuple = create_tuple_from_key(*key, key_schema.clone());
1752 let result = index.get(&tuple).unwrap();
1753 assert!(result.is_some(), "missing key {}", key);
1754
1755 let expected_rid = create_rid_from_key(*key);
1756 let actual_rid = result.unwrap();
1757 assert_eq!(actual_rid.page_id, expected_rid.page_id);
1758 assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1759 }
1760 }
1761
1762 #[test]
1764 fn test_insert_reverse_order() {
1765 let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1766
1767 let keys = vec![5i64, 4, 3, 2, 1];
1768 for key in &keys {
1769 let tuple = create_tuple_from_key(*key, key_schema.clone());
1770 let rid = create_rid_from_key(*key);
1771 index.insert(&tuple, rid).unwrap();
1772 }
1773
1774 for key in &keys {
1775 let tuple = create_tuple_from_key(*key, key_schema.clone());
1776 let result = index.get(&tuple).unwrap();
1777 assert!(result.is_some(), "missing key {}", key);
1778
1779 let expected_rid = create_rid_from_key(*key);
1780 let actual_rid = result.unwrap();
1781 assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1782 }
1783
1784 let index_arc = Arc::new(index);
1786 let mut iter = TreeIndexIterator::new(index_arc, ..);
1787 let mut current_key = 1i64;
1788 while let Some(rid) = iter.next().unwrap() {
1789 assert_eq!(rid.slot_num as i64, current_key);
1790 current_key += 1;
1791 }
1792 assert_eq!(current_key, keys.len() as i64 + 1);
1793 }
1794
1795 #[test]
1797 fn test_delete_no_iterator() {
1798 let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1799
1800 let keys = vec![1i64, 2, 3, 4, 5];
1801 for key in &keys {
1802 let tuple = create_tuple_from_key(*key, key_schema.clone());
1803 let rid = create_rid_from_key(*key);
1804 index.insert(&tuple, rid).unwrap();
1805 }
1806
1807 for key in &keys {
1809 let tuple = create_tuple_from_key(*key, key_schema.clone());
1810 let result = index.get(&tuple).unwrap();
1811 assert!(result.is_some());
1812
1813 let expected_rid = create_rid_from_key(*key);
1814 let actual_rid = result.unwrap();
1815 assert_eq!(actual_rid.page_id, expected_rid.page_id);
1816 assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1817 }
1818
1819 let remove_keys = vec![1i64, 5, 3, 4];
1821 for key in &remove_keys {
1822 println!("Before deleting key {}:\n{}", key, index.to_dot().unwrap());
1823 let tuple = create_tuple_from_key(*key, key_schema.clone());
1824 index.delete(&tuple).unwrap();
1825 println!("After deleting key {}:\n{}", key, index.to_dot().unwrap());
1826 }
1827
1828 let mut size = 0;
1829 for key in &keys {
1830 let tuple = create_tuple_from_key(*key, key_schema.clone());
1831 let is_present = index.get(&tuple).unwrap().is_some();
1832
1833 if !is_present {
1834 assert!(remove_keys.contains(key));
1835 } else {
1836 assert!(!remove_keys.contains(key));
1837 assert_eq!(
1838 index.get(&tuple).unwrap().unwrap().slot_num,
1839 (*key & 0xFFFFFFFF) as u32
1840 );
1841 size += 1;
1842 }
1843 }
1844 assert_eq!(size, 1);
1845
1846 let tuple = create_tuple_from_key(2, key_schema.clone());
1848 println!("Before deleting final key 2:\n{}", index.to_dot().unwrap());
1849 index.delete(&tuple).unwrap();
1850 println!("After deleting final key 2:\n{}", index.to_dot().unwrap());
1851 assert!(index.is_empty().unwrap());
1852 }
1853
1854 #[test]
1855 fn test_internal_borrow_from_right_keeps_searchable() {
1856 let (_temp_dir, index, key_schema) = create_test_index(64, 2, 3);
1857
1858 let inserts = [
1859 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
1860 ];
1861 for key in inserts {
1862 let tuple = create_tuple_from_key(key, key_schema.clone());
1863 let rid = create_rid_from_key(key);
1864 index.insert(&tuple, rid).unwrap();
1865 }
1866
1867 let deletes = [-5, -4, -3, -2, -1, 0, 1, 2];
1868 for key in deletes {
1869 let tuple = create_tuple_from_key(key, key_schema.clone());
1870 index.delete(&tuple).unwrap();
1871 }
1872
1873 let root_page_id = index.get_root_page_id().unwrap();
1874 let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
1875 let (root_page, _) =
1876 BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
1877 let (left_internal_id, right_internal_id) =
1878 if let BPlusTreePage::Internal(root_internal) = root_page {
1879 assert_eq!(root_internal.header.current_size, 2);
1880 (root_internal.value_at(0), root_internal.value_at(1))
1881 } else {
1882 panic!("expected internal root after deletions");
1883 };
1884 drop(root_guard);
1885
1886 let left_guard = index.buffer_pool.fetch_page_read(left_internal_id).unwrap();
1887 let (left_page, _) =
1888 BPlusTreePageCodec::decode(left_guard.data(), key_schema.clone()).unwrap();
1889 if let BPlusTreePage::Internal(left_internal) = left_page {
1890 assert_eq!(
1891 left_internal.high_key,
1892 Some(create_tuple_from_key(7, key_schema.clone()))
1893 );
1894 assert_eq!(left_internal.header.next_page_id, right_internal_id);
1895 } else {
1896 panic!("expected left child to remain internal");
1897 }
1898
1899 let survivor = create_tuple_from_key(3, key_schema.clone());
1900 assert!(index.get(&survivor).unwrap().is_some());
1901 }
1902
1903 #[test]
1905 fn test_sequential_edge_mix() {
1906 ensure_deadlock_watchdog();
1907 for leaf_max_size in 2..=5 {
1908 let (_temp_dir, index, key_schema) = create_test_index(50, 3, leaf_max_size);
1909
1910 let keys = vec![1i64, 5, 15, 20, 25, 2, -1, -2, 6, 14, 4];
1911 let mut inserted = vec![];
1912 let mut deleted = vec![];
1913
1914 for key in &keys {
1915 let tuple = create_tuple_from_key(*key, key_schema.clone());
1916 let rid = create_rid_from_key(*key);
1917 index.insert(&tuple, rid).unwrap();
1918 inserted.push(*key);
1919
1920 verify_tree_state(&index, &key_schema, &inserted, &deleted);
1922 }
1923
1924 let tuple = create_tuple_from_key(1, key_schema.clone());
1926 index.delete(&tuple).unwrap();
1927 deleted.push(1);
1928 inserted.retain(|&x| x != 1);
1929 verify_tree_state(&index, &key_schema, &inserted, &deleted);
1930
1931 let tuple = create_tuple_from_key(3, key_schema.clone());
1933 let rid = create_rid_from_key(3);
1934 index.insert(&tuple, rid).unwrap();
1935 inserted.push(3);
1936 verify_tree_state(&index, &key_schema, &inserted, &deleted);
1937
1938 let delete_keys = vec![4i64, 14, 6, 2, 15, -2, -1, 3, 5, 25, 20];
1940 for key in &delete_keys {
1941 let tuple = create_tuple_from_key(*key, key_schema.clone());
1942 index.delete(&tuple).unwrap();
1943 deleted.push(*key);
1944 inserted.retain(|&x| x != *key);
1945 verify_tree_state(&index, &key_schema, &inserted, &deleted);
1946 }
1947 }
1948 }
1949
1950 fn verify_tree_state(
1952 index: &BPlusTreeIndex,
1953 key_schema: &SchemaRef,
1954 inserted: &[i64],
1955 deleted: &[i64],
1956 ) {
1957 for key in inserted {
1958 let tuple = create_tuple_from_key(*key, key_schema.clone());
1959 let result = index.get(&tuple).unwrap();
1960 assert!(result.is_some(), "Key {} should be present", key);
1961 }
1962
1963 for key in deleted {
1964 let tuple = create_tuple_from_key(*key, key_schema.clone());
1965 let result = index.get(&tuple).unwrap();
1966 assert!(result.is_none(), "Key {} should be deleted", key);
1967 }
1968 }
1969
1970 #[test]
1972 fn test_concurrent_insert() {
1973 std::thread::spawn(|| loop {
1975 std::thread::sleep(Duration::from_millis(500));
1976 let deadlocks = deadlock::check_deadlock();
1977 if !deadlocks.is_empty() {
1978 eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
1979 for (i, threads) in deadlocks.iter().enumerate() {
1980 eprintln!("Cycle {}:", i);
1981 for t in threads {
1982 eprintln!(" ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
1983 }
1984 }
1985 panic!("deadlock detected");
1986 }
1987 });
1988 const NUM_ITERS: usize = 3;
1989 const SCALE_FACTOR: i64 = 50; const NUM_THREADS: usize = 5;
1991
1992 for _iter in 0..NUM_ITERS {
1993 let (_temp_dir, index, key_schema) = create_test_index(64, 3, 5);
1994 let index = Arc::new(index);
1995
1996 let keys: Vec<i64> = (1..SCALE_FACTOR).collect();
1997 let mut threads = vec![];
1998
1999 for i in 0..NUM_THREADS {
2000 let index_clone = index.clone();
2001 let key_schema_clone = key_schema.clone();
2002 let keys_clone = keys.clone();
2003
2004 threads.push(std::thread::spawn(move || {
2005 for key in &keys_clone {
2006 if (*key as usize) % NUM_THREADS == i {
2007 let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2008 let rid = create_rid_from_key(*key);
2009 index_clone.insert(&tuple, rid).unwrap();
2010 }
2011 }
2012 }));
2013 }
2014
2015 for thread in threads {
2016 thread.join().unwrap();
2017 }
2018
2019 for key in &keys {
2021 let tuple = create_tuple_from_key(*key, key_schema.clone());
2022 let result = index.get(&tuple).unwrap();
2023 assert!(result.is_some());
2024 assert_eq!(result.unwrap().slot_num, (*key & 0xFFFFFFFF) as u32);
2025 }
2026
2027 let mut iter = TreeIndexIterator::new(index.clone(), ..);
2029 let mut current_key = 1i64;
2030 while let Some(rid) = iter.next().unwrap() {
2031 assert_eq!(rid.slot_num, (current_key & 0xFFFFFFFF) as u32);
2032 current_key += 1;
2033 }
2034 assert_eq!(current_key, keys.len() as i64 + 1);
2035 }
2036 }
2037
2038 #[test]
2040 fn test_basic_scale() {
2041 let (_temp_dir, index, key_schema) = create_test_index(512, 2, 3);
2042
2043 let scale = 1000i64;
2044 let mut keys: Vec<i64> = (1..=scale).collect();
2045
2046 let mut seed: u64 = 0x9E3779B97F4A7C15;
2048 for i in (1..keys.len()).rev() {
2049 seed = seed
2050 .wrapping_mul(2862933555777941757)
2051 .wrapping_add(3037000493);
2052 let j = (seed as usize) % (i + 1);
2053 keys.swap(i, j);
2054 }
2055
2056 for key in &keys {
2058 let tuple = create_tuple_from_key(*key, key_schema.clone());
2059 let rid = create_rid_from_key(*key);
2060 index.insert(&tuple, rid).unwrap();
2061 let got = index.get(&tuple).unwrap();
2062 if got.is_none() {
2063 println!(
2064 "After inserting {}, tree=\n{}",
2065 key,
2066 index.to_dot().unwrap()
2067 );
2068 panic!("immediate missing key {}", key);
2069 }
2070 }
2071
2072 let probe = 630i64;
2081 let tuple = create_tuple_from_key(probe, key_schema.clone());
2082 if index.get(&tuple).unwrap().is_none() {
2083 let guard = index.find_leaf_page_optimistic(&tuple).unwrap();
2084 let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone()).unwrap();
2085 if let BPlusTreePage::Leaf(leaf) = page {
2086 println!(
2087 "Early probe leaf for 630 has keys: {:?}",
2088 leaf.array
2089 .iter()
2090 .map(|(t, _)| format!("{}", t))
2091 .collect::<Vec<_>>()
2092 );
2093 }
2094 panic!("missing key 630 before verification loop");
2095 }
2096
2097 for key in &keys {
2099 let tuple = create_tuple_from_key(*key, key_schema.clone());
2100 let result = index.get(&tuple).unwrap();
2101 assert!(result.is_some(), "missing key {}", key);
2102
2103 let expected_rid = create_rid_from_key(*key);
2104 let actual_rid = result.unwrap();
2105 assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
2106 }
2107
2108 let probe = 630i64;
2110 let tuple = create_tuple_from_key(probe, key_schema.clone());
2111 if index.get(&tuple).unwrap().is_none() {
2112 let guard = index.find_leaf_page_optimistic(&tuple).unwrap();
2113 let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone()).unwrap();
2114 if let BPlusTreePage::Leaf(leaf) = page {
2115 println!(
2116 "Probe leaf for 630 has keys: {:?}",
2117 leaf.array
2118 .iter()
2119 .map(|(t, _)| format!("{}", t))
2120 .collect::<Vec<_>>()
2121 );
2122 }
2123 }
2124 }
2125
2126 #[test]
2128 fn test_concurrent_delete() {
2129 const NUM_ITERS: usize = 10;
2130
2131 for _iter in 0..NUM_ITERS {
2132 let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2133
2134 let keys = vec![1i64, 2, 3, 4, 5];
2136 for key in &keys {
2137 let tuple = create_tuple_from_key(*key, key_schema.clone());
2138 let rid = create_rid_from_key(*key);
2139 index.insert(&tuple, rid).unwrap();
2140 }
2141
2142 let index = Arc::new(index);
2143 let remove_keys = vec![1i64, 5, 3, 4];
2144 let mut threads = vec![];
2145
2146 for i in 0..2 {
2147 let index_clone = index.clone();
2148 let key_schema_clone = key_schema.clone();
2149 let remove_keys_clone = remove_keys.clone();
2150
2151 threads.push(std::thread::spawn(move || {
2152 for key in &remove_keys_clone {
2153 if (*key as usize) % 2 == i {
2154 let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2155 index_clone.delete(&tuple).unwrap();
2156 }
2157 }
2158 }));
2159 }
2160
2161 for thread in threads {
2162 thread.join().unwrap();
2163 }
2164
2165 let mut size = 0;
2167 let index_arc = index.clone();
2168 let mut iter = TreeIndexIterator::new(index_arc, ..);
2169 while let Some(rid) = iter.next().unwrap() {
2170 assert_eq!(rid.slot_num, 2);
2171 size += 1;
2172 }
2173 assert_eq!(size, 1);
2174 }
2175 }
2176
2177 #[test]
2179 fn test_concurrent_mix() {
2180 std::thread::spawn(|| loop {
2182 std::thread::sleep(Duration::from_millis(500));
2183 let deadlocks = deadlock::check_deadlock();
2184 if !deadlocks.is_empty() {
2185 eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
2186 for (i, threads) in deadlocks.iter().enumerate() {
2187 eprintln!("Cycle {}:", i);
2188 for t in threads {
2189 eprintln!(" ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
2190 }
2191 }
2192 panic!("deadlock detected");
2193 }
2194 });
2195 const NUM_ITERS: usize = 5;
2196
2197 for _iter in 0..NUM_ITERS {
2198 let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2199 let index = Arc::new(index);
2200
2201 let mut for_insert = vec![];
2203 let mut for_delete = vec![];
2204 let total_keys = 20i64; for i in 1..=total_keys {
2207 if i % 2 == 0 {
2208 for_insert.push(i);
2209 } else {
2210 for_delete.push(i);
2211 }
2212 }
2213
2214 for key in &for_delete {
2216 let tuple = create_tuple_from_key(*key, key_schema.clone());
2217 let rid = create_rid_from_key(*key);
2218 index.insert(&tuple, rid).unwrap();
2219 }
2220
2221 let mut threads = vec![];
2222 let num_threads = 5;
2223
2224 for i in 0..num_threads {
2225 let index_clone = index.clone();
2226 let key_schema_clone = key_schema.clone();
2227 let for_insert_clone = for_insert.clone();
2228 let for_delete_clone = for_delete.clone();
2229
2230 threads.push(std::thread::spawn(move || {
2231 if i % 2 == 0 {
2232 for key in &for_insert_clone {
2234 let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2235 let rid = create_rid_from_key(*key);
2236 index_clone.insert(&tuple, rid).unwrap();
2237 }
2238 } else {
2239 for key in &for_delete_clone {
2241 let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2242 index_clone.delete(&tuple).unwrap();
2243 }
2244 }
2245 }));
2246 }
2247
2248 for thread in threads {
2249 thread.join().unwrap();
2250 }
2251
2252 let mut count = 0;
2254 for key in &for_insert {
2255 let tuple = create_tuple_from_key(*key, key_schema.clone());
2256 let result = index.get(&tuple).unwrap();
2257 if result.is_some() {
2258 count += 1;
2259 }
2260 }
2261 assert_eq!(count, for_insert.len());
2262
2263 for key in &for_delete {
2265 let tuple = create_tuple_from_key(*key, key_schema.clone());
2266 let result = index.get(&tuple).unwrap();
2267 assert!(result.is_none());
2268 }
2269 }
2270 }
2271
2272 #[test]
2274 fn test_iterator_functionality() {
2275 let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2276
2277 let keys = vec![1i64, 3, 5, 7, 9];
2279 for key in &keys {
2280 let tuple = create_tuple_from_key(*key, key_schema.clone());
2281 let rid = create_rid_from_key(*key);
2282 index.insert(&tuple, rid).unwrap();
2283 }
2284
2285 let index = Arc::new(index);
2286
2287 let start_tuple = create_tuple_from_key(3, key_schema.clone());
2289 let end_tuple = create_tuple_from_key(7, key_schema.clone());
2290 let mut iterator = TreeIndexIterator::new(index.clone(), start_tuple..=end_tuple);
2291
2292 let mut results = vec![];
2293 while let Some(rid) = iterator.next().unwrap() {
2294 results.push(rid.slot_num as i64);
2295 }
2296 assert_eq!(results, vec![3, 5, 7]);
2297
2298 let mut iterator = TreeIndexIterator::new(index.clone(), ..);
2300 let mut all_results = vec![];
2301 while let Some(rid) = iterator.next().unwrap() {
2302 all_results.push(rid.slot_num as i64);
2303 }
2304 assert_eq!(all_results, vec![1, 3, 5, 7, 9]);
2305 }
2306
2307 #[test]
2309 fn test_leaf_split_structure() {
2310 let (_temp_dir, index, key_schema) = create_test_index(50, 10, 3);
2311
2312 for k in [1i64, 2, 3, 4] {
2314 let tuple = create_tuple_from_key(k, key_schema.clone());
2315 let rid = create_rid_from_key(k);
2316 index.insert(&tuple, rid).unwrap();
2317 }
2318
2319 let root_page_id = index.get_root_page_id().unwrap();
2320 let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
2321 let (root_page, _) =
2322 BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
2323
2324 let BPlusTreePage::Internal(root_internal) = root_page else {
2326 panic!("root is not internal after leaf split");
2327 };
2328
2329 let left_pid = root_internal.value_at(0);
2331 let right_pid = root_internal.value_at(1);
2332
2333 let left_guard = index.buffer_pool.fetch_page_read(left_pid).unwrap();
2334 let right_guard = index.buffer_pool.fetch_page_read(right_pid).unwrap();
2335 let (left_page, _) =
2336 BPlusTreePageCodec::decode(left_guard.data(), key_schema.clone()).unwrap();
2337 let (right_page, _) =
2338 BPlusTreePageCodec::decode(right_guard.data(), key_schema.clone()).unwrap();
2339
2340 let BPlusTreePage::Leaf(left_leaf) = left_page else {
2341 panic!("left child not leaf");
2342 };
2343 let BPlusTreePage::Leaf(_right_leaf) = right_page else {
2344 panic!("right child not leaf");
2345 };
2346
2347 assert_eq!(left_leaf.header.next_page_id, right_pid);
2349 }
2350
2351 #[test]
2353 #[ignore]
2354 fn bench_get_hot_read() {
2355 fn getenv_usize(k: &str, default_v: usize) -> usize {
2356 std::env::var(k)
2357 .ok()
2358 .and_then(|v| v.parse().ok())
2359 .unwrap_or(default_v)
2360 }
2361
2362 let bpm = getenv_usize("QUILL_BENCH_BPM", 1024);
2363 let nkeys = getenv_usize("QUILL_BENCH_N", 20000) as i64;
2364 let ops = getenv_usize("QUILL_BENCH_OPS", 200000);
2365
2366 let (_temp_dir, index, key_schema) = create_test_index(bpm, 3, 64);
2367
2368 let mut keys: Vec<i64> = (1..=nkeys).collect();
2370 let mut seed: u64 = 0x9E3779B97F4A7C15;
2372 for i in (1..keys.len()).rev() {
2373 seed = seed
2374 .wrapping_mul(2862933555777941757)
2375 .wrapping_add(3037000493);
2376 let j = (seed as usize) % (i + 1);
2377 keys.swap(i, j);
2378 }
2379 for k in &keys {
2380 let t = create_tuple_from_key(*k, key_schema.clone());
2381 index.insert(&t, create_rid_from_key(*k)).unwrap();
2382 }
2383
2384 let hot_start = (nkeys as usize * 9) / 10;
2386 let hot = &keys[hot_start..];
2387
2388 let start = std::time::Instant::now();
2389 let mut found = 0usize;
2390 let mut x = 0x243F6A8885A308D3u64;
2392 for _ in 0..ops {
2393 x = x.wrapping_mul(6364136223846793005).wrapping_add(1);
2394 let idx = (x as usize) % hot.len();
2395 let key = hot[idx];
2396 let t = create_tuple_from_key(key, key_schema.clone());
2397 if index.get(&t).unwrap().is_some() {
2398 found += 1;
2399 }
2400 }
2401 let el = start.elapsed();
2402 let qps = (ops as f64) / el.as_secs_f64();
2403 println!(
2404 "bench_get_hot_read: ops={} time={:?} qps={:.1} found={}",
2405 ops, el, qps, found
2406 );
2407 }
2408
2409 #[test]
2410 #[ignore]
2411 fn bench_range_scan() {
2412 fn getenv_usize(k: &str, default_v: usize) -> usize {
2413 std::env::var(k)
2414 .ok()
2415 .and_then(|v| v.parse().ok())
2416 .unwrap_or(default_v)
2417 }
2418
2419 let bpm = getenv_usize("QUILL_BENCH_BPM", 1024);
2420 let nkeys = getenv_usize("QUILL_BENCH_N", 30000) as i64;
2421 let passes = getenv_usize("QUILL_BENCH_PASSES", 20);
2422
2423 let (_temp_dir, index, key_schema) = create_test_index(bpm, 3, 64);
2424
2425 let mut keys: Vec<i64> = (1..=nkeys).collect();
2427 let mut seed: u64 = 0x9E3779B97F4A7C15;
2428 for i in (1..keys.len()).rev() {
2429 seed = seed
2430 .wrapping_mul(2862933555777941757)
2431 .wrapping_add(3037000493);
2432 let j = (seed as usize) % (i + 1);
2433 keys.swap(i, j);
2434 }
2435 for k in &keys {
2436 let t = create_tuple_from_key(*k, key_schema.clone());
2437 index.insert(&t, create_rid_from_key(*k)).unwrap();
2438 }
2439
2440 let index = Arc::new(index);
2441 let total = (nkeys as usize) * passes;
2442 let start = std::time::Instant::now();
2443 let mut seen = 0usize;
2444 for _ in 0..passes {
2445 let mut it = TreeIndexIterator::new(index.clone(), ..);
2446 while let Some(_rid) = it.next().unwrap() {
2447 seen += 1;
2448 }
2449 }
2450 let el = start.elapsed();
2451 let tps = (total as f64) / el.as_secs_f64();
2452 println!(
2453 "bench_range_scan: items={} time={:?} ips={:.1} seen={}",
2454 total, el, tps, seen
2455 );
2456 }
2457}