1use parking_lot::{RwLock, RwLockWriteGuard};
2use std::collections::VecDeque;
3use std::fmt::Write;
4use std::sync::Arc;
5
6use crate::buffer::{
7 BufferManager, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID, PAGE_SIZE,
8};
9use crate::catalog::SchemaRef;
10use crate::error::{QuillSQLError, QuillSQLResult};
11use crate::storage::codec::{
12 BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
13 BPlusTreePageCodec, TupleCodec,
14};
15use crate::storage::page::{BPlusTreeHeaderPage, BPlusTreeInternalPage};
16use crate::storage::page::{BPlusTreeLeafPage, BPlusTreePage, RecordId};
17
18use crate::config::BTreeConfig;
19use crate::recovery::{wal_record::WalRecordPayload, Lsn};
20use crate::storage::codec::BPlusTreePageTypeCodec;
21pub use crate::storage::index::btree_iterator::TreeIndexIterator;
22use crate::storage::index::wal_codec::{
23 IndexInternalEntryPayload, IndexInternalMergePayload, IndexInternalRedistributePayload,
24 IndexInternalSplitPayload, IndexLeafDeletePayload, IndexLeafInsertPayload,
25 IndexLeafMergePayload, IndexLeafRedistributePayload, IndexLeafSplitEntryPayload,
26 IndexLeafSplitPayload, IndexParentDeletePayload, IndexParentInsertPayload,
27 IndexParentUpdatePayload, IndexRecordPayload, IndexRelationIdent, IndexRootAdoptPayload,
28 IndexRootInstallInternalPayload, IndexRootInstallLeafPayload, IndexRootResetPayload,
29 SchemaRepr,
30};
31use crate::storage::page::BPlusTreePageType;
32use crate::storage::tuple::Tuple;
33use crate::transaction::TransactionId;
34const MAX_OLC_RESTARTS: usize = 64;
36const OLC_BACKOFF_BASE_US: u64 = 50;
37
38#[derive(Debug)]
39pub struct Context<'a> {
40 pub write_set: VecDeque<WritePageGuard>,
44 pub header_lock_guard: Option<RwLockWriteGuard<'a, ()>>,
46}
47
48impl<'a> Context<'a> {
49 pub fn new() -> Self {
50 Self {
51 write_set: VecDeque::new(),
52 header_lock_guard: None,
53 }
54 }
55
56 pub fn push_write_guard(&mut self, guard: WritePageGuard) {
58 self.write_set.push_back(guard);
59 }
60
61 pub fn release_all_write_locks(&mut self) {
63 self.write_set.clear();
64 self.header_lock_guard = None;
65 }
66}
67
68#[derive(Debug)]
72pub struct BPlusTreeIndex {
73 pub key_schema: SchemaRef,
74 pub buffer_pool: Arc<BufferManager>,
75 pub internal_max_size: u32,
76 pub leaf_max_size: u32,
77 pub header_page_id: PageId,
78 pub header_page_lock: Arc<RwLock<()>>,
79 pub config: BTreeConfig,
80}
81
82impl BPlusTreeIndex {
83 fn wal_overwrite_page(
86 &self,
87 guard: &mut WritePageGuard,
88 new_image: Vec<u8>,
89 new_lsn: Option<Lsn>,
90 ) -> QuillSQLResult<()> {
91 debug_assert_eq!(new_image.len(), PAGE_SIZE);
92 if new_lsn.is_some() {
93 guard.overwrite(&new_image, new_lsn);
94 } else {
95 guard.apply_page_image(&new_image)?;
96 }
97 Ok(())
98 }
99
100 fn relation_ident(&self) -> IndexRelationIdent {
101 IndexRelationIdent {
102 header_page_id: self.header_page_id,
103 schema: SchemaRepr::from(self.key_schema.clone()),
104 }
105 }
106
107 fn append_index_record(&self, payload: IndexRecordPayload) -> QuillSQLResult<Option<Lsn>> {
108 if let Some(wal) = self.buffer_pool.wal_manager() {
109 let res = wal.append_record_with(|_| WalRecordPayload::Index(payload.clone()))?;
110 Ok(Some(res.end_lsn))
111 } else {
112 Ok(None)
113 }
114 }
115
116 pub fn new(
117 key_schema: SchemaRef,
118 buffer_pool: Arc<BufferManager>,
119 internal_max_size: u32,
120 leaf_max_size: u32,
121 ) -> Self {
122 let mut header_page_guard = buffer_pool
124 .new_page()
125 .expect("Failed to create header page for B+ tree");
126 let header_page_id = header_page_guard.page_id();
127 let header_page = BPlusTreeHeaderPage {
128 root_page_id: INVALID_PAGE_ID,
129 };
130 let encoded = BPlusTreeHeaderPageCodec::encode(&header_page);
131 header_page_guard.overwrite(&encoded, None);
132 drop(header_page_guard);
133
134 Self {
135 key_schema,
136 buffer_pool,
137 internal_max_size: internal_max_size + 1,
140 leaf_max_size,
141 header_page_id,
142 header_page_lock: Arc::new(RwLock::new(())),
143 config: BTreeConfig::default(),
144 }
145 }
146
147 pub fn new_with_config(
148 key_schema: SchemaRef,
149 buffer_pool: Arc<BufferManager>,
150 internal_max_size: u32,
151 leaf_max_size: u32,
152 config: BTreeConfig,
153 ) -> Self {
154 let mut me = Self::new(key_schema, buffer_pool, internal_max_size, leaf_max_size);
155 me.config = config;
156 me
157 }
158
159 pub fn open(
160 key_schema: SchemaRef,
161 buffer_pool: Arc<BufferManager>,
162 internal_max_size: u32,
163 leaf_max_size: u32,
164 header_page_id: PageId,
165 ) -> Self {
166 Self {
167 key_schema,
168 buffer_pool,
169 internal_max_size: internal_max_size + 1,
171 leaf_max_size,
172 header_page_id,
173 header_page_lock: Arc::new(RwLock::new(())),
174 config: BTreeConfig::default(),
175 }
176 }
177
178 pub fn open_with_config(
179 key_schema: SchemaRef,
180 buffer_pool: Arc<BufferManager>,
181 internal_max_size: u32,
182 leaf_max_size: u32,
183 header_page_id: PageId,
184 config: BTreeConfig,
185 ) -> Self {
186 let mut me = Self::open(
187 key_schema,
188 buffer_pool,
189 internal_max_size,
190 leaf_max_size,
191 header_page_id,
192 );
193 me.config = config;
194 me
195 }
196
197 pub fn get_root_page_id(&self) -> QuillSQLResult<PageId> {
198 let header_guard = self.buffer_pool.fetch_page_read(self.header_page_id)?;
199 let (header_page, _) = BPlusTreeHeaderPageCodec::decode(header_guard.data())?;
200 Ok(header_page.root_page_id)
201 }
202
203 fn set_root_page_id(&self, page_id: PageId, wal_lsn: Option<Lsn>) -> QuillSQLResult<()> {
204 let mut header_guard = self.buffer_pool.fetch_page_write(self.header_page_id)?;
205 let header_page = BPlusTreeHeaderPage {
206 root_page_id: page_id,
207 };
208 let encoded = BPlusTreeHeaderPageCodec::encode(&header_page);
209 self.wal_overwrite_page(&mut header_guard, encoded, wal_lsn)?;
210 Ok(())
211 }
212
213 pub fn is_empty(&self) -> QuillSQLResult<bool> {
214 Ok(self.get_root_page_id()? == INVALID_PAGE_ID)
215 }
216
217 pub fn get(&self, key: &Tuple) -> QuillSQLResult<Option<RecordId>> {
218 if self.is_empty()? {
219 return Ok(None);
220 }
221 let mut guard = self.find_leaf_page_optimistic(key)?;
222 loop {
224 let decoded = BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone());
225 if let Ok((leaf_page, _)) = decoded {
226 if let Some(rid) = leaf_page.look_up(key) {
227 return Ok(Some(rid));
228 }
229 if leaf_page.header.current_size > 0
230 && leaf_page.header.next_page_id != INVALID_PAGE_ID
231 {
232 let last_key = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
233 if *key > *last_key {
234 guard = self
235 .buffer_pool
236 .fetch_page_read(leaf_page.header.next_page_id)?;
237 continue;
238 }
239 }
240 return Ok(None);
241 } else {
242 guard = self.find_leaf_page_optimistic(key)?;
244 let (leaf_page, _) =
245 BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone())?;
246 return Ok(leaf_page.look_up(key));
247 }
248 }
249 }
250
251 fn find_leaf_page_pessimistic<'a>(
253 &'a self,
254 key: &Tuple,
255 is_insert: bool,
256 mut context: Context<'a>,
257 ) -> QuillSQLResult<(WritePageGuard, Context<'a>)> {
258 let root_page_id = self.get_root_page_id()?;
260 if root_page_id == INVALID_PAGE_ID {
261 return Err(QuillSQLError::Internal(
263 "find_leaf_page_pessimistic called on an empty tree".to_string(),
264 ));
265 }
266 let mut current_guard = self.buffer_pool.fetch_page_write(root_page_id)?;
267
268 loop {
269 let (page, _) =
270 BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone())?;
271
272 match page {
273 BPlusTreePage::Internal(_internal) => {
274 let child_page_id = BPlusTreeInternalPageCodec::lookup_child_from_bytes(
275 current_guard.data(),
276 self.key_schema.clone(),
277 key,
278 )?;
279 let child_guard = self.buffer_pool.fetch_page_write(child_page_id)?;
280 let will_overflow = match BPlusTreePageTypeCodec::decode(child_guard.data())?.0
282 {
283 BPlusTreePageType::LeafPage => {
284 let (hdr, _) =
285 BPlusTreeLeafPageCodec::decode_header_only(child_guard.data())?;
286 hdr.current_size == hdr.max_size
287 }
288 BPlusTreePageType::InternalPage => {
289 let (hdr, _) =
290 BPlusTreeInternalPageCodec::decode_header_only(child_guard.data())?;
291 hdr.current_size == hdr.max_size
292 }
293 };
294
295 if is_insert {
296 if !will_overflow {
297 context.release_all_write_locks();
298 drop(current_guard);
299 current_guard = child_guard;
300 continue;
301 }
302 }
303 context.push_write_guard(current_guard);
304 current_guard = child_guard;
305 }
306 BPlusTreePage::Leaf(_) => {
307 return Ok((current_guard, context));
308 }
309 }
310 }
311 }
312
313 pub fn insert(&self, key: &Tuple, rid: RecordId) -> QuillSQLResult<()> {
314 self.insert_with_txn(key, rid, TransactionId::default())
315 }
316
317 pub fn insert_with_txn(
319 &self,
320 key: &Tuple,
321 rid: RecordId,
322 txn_id: TransactionId,
323 ) -> QuillSQLResult<()> {
324 let mut context = Context::new();
325
326 if self.is_empty()? {
328 let _lock = self.header_page_lock.write();
329 let root_now = self.get_root_page_id()?;
330 if root_now == INVALID_PAGE_ID {
331 self.start_new_tree()?;
333 }
334 }
335
336 loop {
338 let (mut leaf_guard, mut local_ctx) =
339 self.find_leaf_page_pessimistic(key, true, context)?;
340 let (mut leaf_page, _) =
341 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
342
343 if let Some(parent_guard_ref) = local_ctx.write_set.back() {
346 let (parent_page_chk, _) = BPlusTreeInternalPageCodec::decode(
347 parent_guard_ref.data(),
348 self.key_schema.clone(),
349 )?;
350 let expected_pid = parent_page_chk.look_up(key);
351 if expected_pid != leaf_guard.page_id() {
352 drop(leaf_guard);
353 leaf_guard = self.buffer_pool.fetch_page_write(expected_pid)?;
354 let (new_leaf, _) =
355 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
356 leaf_page = new_leaf;
357 }
358 }
359
360 while leaf_page.header.current_size > 0
362 && leaf_page.header.next_page_id != INVALID_PAGE_ID
363 {
364 let last_key_ref = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
365 if *key <= *last_key_ref {
366 break;
367 }
368 let next_pid = leaf_page.header.next_page_id;
369 let next_guard_peek = self.buffer_pool.fetch_page_read(next_pid)?;
371 let (next_leaf_peek, _) = BPlusTreeLeafPageCodec::decode(
372 next_guard_peek.data(),
373 self.key_schema.clone(),
374 )?;
375 let next_first_key = if next_leaf_peek.header.current_size > 0 {
376 next_leaf_peek.key_at(0).clone()
377 } else {
378 break;
379 };
380 drop(next_guard_peek);
381 if *key < next_first_key {
382 break;
383 }
384 local_ctx.release_all_write_locks();
386 drop(leaf_guard);
387 let next_guard = self.buffer_pool.fetch_page_write(next_pid)?;
388 let (next_leaf, _) =
389 BPlusTreeLeafPageCodec::decode(next_guard.data(), self.key_schema.clone())?;
390 leaf_guard = next_guard;
391 leaf_page = next_leaf;
392 }
393
394 if let Some(existing_rid) = leaf_page.look_up_mut(key) {
396 let old_rid = *existing_rid;
397 *existing_rid = rid;
398 leaf_page.header.version += 1;
399 let key_encoded = TupleCodec::encode(key);
400 let delete_payload = IndexRecordPayload::LeafDelete(IndexLeafDeletePayload {
401 relation: self.relation_ident(),
402 page_id: leaf_guard.page_id(),
403 op_txn_id: txn_id,
404 key_data: key_encoded.clone(),
405 old_rid,
406 });
407 let mut wal_lsn = self.append_index_record(delete_payload)?;
408 let insert_payload = IndexRecordPayload::LeafInsert(IndexLeafInsertPayload {
409 relation: self.relation_ident(),
410 page_id: leaf_guard.page_id(),
411 op_txn_id: txn_id,
412 key_data: key_encoded,
413 rid,
414 });
415 if let Some(lsn) = self.append_index_record(insert_payload)? {
416 wal_lsn = Some(lsn);
417 }
418 let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
419 self.wal_overwrite_page(&mut leaf_guard, encoded, wal_lsn)?;
420 local_ctx.release_all_write_locks();
421 return Ok(());
422 }
423
424 if leaf_page.header.current_size == leaf_page.header.max_size {
426 local_ctx.header_lock_guard = None;
428 match self.split(leaf_guard, &mut local_ctx) {
429 Ok(()) => {
430 local_ctx.release_all_write_locks();
432 context = Context::new();
433 continue;
434 }
435 Err(QuillSQLError::Internal(_e)) => {
436 local_ctx.release_all_write_locks();
438 context = Context::new();
439 continue;
440 }
441 Err(e) => return Err(e),
442 }
443 }
444
445 leaf_page.insert(key.clone(), rid);
447 leaf_page.header.version += 1;
448 let payload = IndexRecordPayload::LeafInsert(IndexLeafInsertPayload {
449 relation: self.relation_ident(),
450 page_id: leaf_guard.page_id(),
451 op_txn_id: txn_id,
452 key_data: TupleCodec::encode(key),
453 rid,
454 });
455 let wal_lsn = self.append_index_record(payload)?;
456 let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
457 self.wal_overwrite_page(&mut leaf_guard, encoded, wal_lsn)?;
458 local_ctx.release_all_write_locks();
459 return Ok(());
460 }
461 }
462
463 pub fn delete(&self, key: &Tuple) -> QuillSQLResult<()> {
465 self.delete_with_txn(key, TransactionId::default())
466 }
467
468 pub fn delete_with_txn(&self, key: &Tuple, txn_id: TransactionId) -> QuillSQLResult<()> {
469 if self.is_empty()? {
470 return Ok(());
471 }
472
473 let mut context = Context::new();
474 'restart: loop {
475 let (mut leaf_guard, mut local_ctx) =
476 self.find_leaf_page_pessimistic(key, false, context)?;
477 let (mut leaf_page, _) =
478 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
479
480 if let Some(parent_guard_ref) = local_ctx.write_set.back() {
483 let (parent_page_chk, _) = BPlusTreeInternalPageCodec::decode(
484 parent_guard_ref.data(),
485 self.key_schema.clone(),
486 )?;
487 let expected_pid = parent_page_chk.look_up(key);
488 if expected_pid != leaf_guard.page_id() {
489 drop(leaf_guard);
490 leaf_guard = self.buffer_pool.fetch_page_write(expected_pid)?;
491 let (new_leaf, _) =
492 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
493 leaf_page = new_leaf;
494 }
495 }
496
497 let mut hops: u32 = 0;
499 while leaf_page.header.current_size > 0
500 && leaf_page.header.next_page_id != INVALID_PAGE_ID
501 {
502 let last_key_ref = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
503 if *key <= *last_key_ref {
504 break;
505 }
506 let next_pid = leaf_page.header.next_page_id;
507 hops += 1;
508 if hops > 8 {
509 local_ctx.release_all_write_locks();
510 drop(leaf_guard);
511 context = Context::new();
512 continue 'restart;
513 }
514 drop(leaf_guard);
515 leaf_guard = self.buffer_pool.fetch_page_write(next_pid)?;
516 let (new_leaf, _) =
517 BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
518 leaf_page = new_leaf;
519
520 }
522
523 let was_first = if leaf_page.header.current_size > 0 {
525 let first_key = leaf_page.key_at(0).clone();
526 &first_key == key
527 } else {
528 false
529 };
530 let target_rid = match leaf_page.look_up(key) {
531 Some(rid) => rid,
532 None => {
533 local_ctx.release_all_write_locks();
534 return Ok(());
535 }
536 };
537 let key_bytes = TupleCodec::encode(key);
538
539 leaf_page.delete(key);
540 leaf_page.header.version += 1;
541 let payload = IndexRecordPayload::LeafDelete(IndexLeafDeletePayload {
542 relation: self.relation_ident(),
543 page_id: leaf_guard.page_id(),
544 op_txn_id: txn_id,
545 key_data: key_bytes,
546 old_rid: target_rid,
547 });
548 let wal_lsn = self.append_index_record(payload)?;
549 let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
550 self.wal_overwrite_page(&mut leaf_guard, encoded, wal_lsn)?;
551
552 if leaf_page.header.current_size < leaf_page.min_size() {
554 if local_ctx.write_set.is_empty() {
556 let root_id = self.get_root_page_id()?;
557 if leaf_guard.page_id() != root_id {
558 local_ctx.release_all_write_locks();
559 drop(leaf_guard);
560 context = Context::new();
561 continue 'restart;
562 }
563 }
564 match self.handle_underflow(leaf_guard, &mut local_ctx) {
565 Ok(()) => {
566 local_ctx.release_all_write_locks();
567 return Ok(());
568 }
569 Err(QuillSQLError::Internal(_)) => {
570 local_ctx.release_all_write_locks();
572 context = Context::new();
573 continue;
574 }
575 Err(e) => return Err(e),
576 }
577 } else {
578 if was_first && leaf_page.header.current_size > 0 {
581 if let Some(mut parent_guard) = local_ctx.write_set.pop_back() {
582 let (mut parent_page, _) = BPlusTreeInternalPageCodec::decode(
583 parent_guard.data(),
584 self.key_schema.clone(),
585 )?;
586 if let Some(node_idx) = parent_page.value_index(leaf_guard.page_id()) {
587 if node_idx > 0 {
588 parent_page.array[node_idx].0 = leaf_page.key_at(0).clone();
589 parent_page.header.version += 1;
590 let encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
591 self.wal_overwrite_page(&mut parent_guard, encoded, None)?;
592 }
593 }
594 local_ctx.write_set.push_back(parent_guard);
596 }
597 }
598 local_ctx.release_all_write_locks();
599 return Ok(());
600 }
601 }
602 }
603
604 pub fn to_dot(&self) -> QuillSQLResult<String> {
605 let mut dot = String::new();
606 writeln!(&mut dot, "digraph BPlusTree {{").unwrap();
607 writeln!(&mut dot, " rankdir=TB;").unwrap();
608 writeln!(&mut dot, " node [shape=record, height=.1];").unwrap();
609
610 let root_page_id = self.get_root_page_id()?;
611 if root_page_id == INVALID_PAGE_ID {
612 writeln!(&mut dot, " empty [label=\"<f0> Empty Tree\"];").unwrap();
613 writeln!(&mut dot, "}}").unwrap();
614 return Ok(dot);
615 }
616
617 let mut queue = VecDeque::new();
618 queue.push_back(root_page_id);
619
620 while let Some(page_id) = queue.pop_front() {
621 let guard = self.buffer_pool.fetch_page_read(page_id)?;
622 let (page, _) = BPlusTreePageCodec::decode(guard.data(), self.key_schema.clone())?;
623
624 match page {
625 BPlusTreePage::Internal(internal) => {
626 let mut label = String::new();
627 for i in 0..internal.header.current_size {
628 if i > 0 {
629 write!(&mut label, "|").unwrap();
630 }
631 write!(&mut label, "<p{}>", i).unwrap();
632 if i > 0 {
633 write!(&mut label, " {}", internal.key_at(i as usize)).unwrap();
634 }
635 }
636 writeln!(&mut dot, " page{} [label=\"{}\"];", page_id, label).unwrap();
637
638 for i in 0..internal.header.current_size {
639 let child_id = internal.value_at(i as usize);
640 writeln!(
641 &mut dot,
642 " \"page{}\":p{} -> \"page{}\";",
643 page_id, i, child_id
644 )
645 .unwrap();
646 queue.push_back(child_id);
647 }
648 }
649 BPlusTreePage::Leaf(leaf) => {
650 let mut label = String::new();
651 for i in 0..leaf.header.current_size {
652 if i > 0 {
653 write!(&mut label, "|").unwrap();
654 }
655 write!(
656 &mut label,
657 "<f{}> {} -> {}",
658 i,
659 leaf.key_at(i as usize),
660 leaf.array[i as usize].1
661 )
662 .unwrap();
663 }
664 writeln!(&mut dot, " page{} [label=\"{}\"];", page_id, label).unwrap();
665
666 if leaf.header.next_page_id != INVALID_PAGE_ID {
667 writeln!(
668 &mut dot,
669 " page{} -> page{} [style=dashed, constraint=false];",
670 page_id, leaf.header.next_page_id
671 )
672 .unwrap();
673 }
674 }
675 }
676 }
677
678 writeln!(&mut dot, "}}").unwrap();
679 Ok(dot)
680 }
681
682 fn handle_underflow(
683 &self,
684 mut node_guard: WritePageGuard,
685 context: &mut Context,
686 ) -> QuillSQLResult<()> {
687 if context.write_set.is_empty() {
688 self.adjust_root(node_guard)?;
690 return Ok(());
691 }
692
693 let parent_guard = match context.write_set.pop_back() {
694 Some(g) => g,
695 None => {
696 return Err(QuillSQLError::Internal(
697 "underflow: missing parent".to_string(),
698 ))
699 }
700 };
701 let (parent_page, _) =
702 BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
703
704 let Some(node_idx) = parent_page.value_index(node_guard.page_id()) else {
705 return Err(QuillSQLError::Internal(
706 "underflow: node not found in parent".to_string(),
707 ));
708 };
709
710 if node_idx > 0 {
712 let left_sibling_pid = parent_page.value_at(node_idx - 1);
713 let node_pid = node_guard.page_id();
714 if left_sibling_pid < node_pid {
715 drop(node_guard);
717 let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
718 let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
719 node_guard = node_guard_new;
720 let (left_sibling_page, _) =
721 BPlusTreePageCodec::decode(left_sibling_guard.data(), self.key_schema.clone())?;
722 if left_sibling_page.current_size() > left_sibling_page.min_size() {
723 self.redistribute(
724 left_sibling_guard,
725 node_guard,
726 parent_guard,
727 node_idx,
728 true,
729 )?;
730 return Ok(());
731 }
732 } else {
733 let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
735 let (left_sibling_page, _) =
736 BPlusTreePageCodec::decode(left_sibling_guard.data(), self.key_schema.clone())?;
737 if left_sibling_page.current_size() > left_sibling_page.min_size() {
738 self.redistribute(
739 left_sibling_guard,
740 node_guard,
741 parent_guard,
742 node_idx,
743 true,
744 )?;
745 return Ok(());
746 }
747 }
748 }
749
750 if node_idx < parent_page.header.current_size as usize - 1 {
752 let right_sibling_pid = parent_page.value_at(node_idx + 1);
753 let node_pid = node_guard.page_id();
754 if right_sibling_pid < node_pid {
755 drop(node_guard);
757 let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
758 let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
759 node_guard = node_guard_new;
760 let (right_sibling_page, _) = BPlusTreePageCodec::decode(
761 right_sibling_guard.data(),
762 self.key_schema.clone(),
763 )?;
764 if right_sibling_page.current_size() > right_sibling_page.min_size() {
765 self.redistribute(
766 right_sibling_guard,
767 node_guard,
768 parent_guard,
769 node_idx,
770 false,
771 )?;
772 return Ok(());
773 }
774 } else {
775 let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
776 let (right_sibling_page, _) = BPlusTreePageCodec::decode(
777 right_sibling_guard.data(),
778 self.key_schema.clone(),
779 )?;
780 if right_sibling_page.current_size() > right_sibling_page.min_size() {
781 self.redistribute(
782 right_sibling_guard,
783 node_guard,
784 parent_guard,
785 node_idx,
786 false,
787 )?;
788 return Ok(());
789 }
790 }
791 }
792
793 if node_idx > 0 {
795 let left_sibling_pid = parent_page.value_at(node_idx - 1);
797 let node_pid = node_guard.page_id();
798 if left_sibling_pid < node_pid {
799 drop(node_guard);
801 let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
802 let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
803 node_guard = node_guard_new;
804 self.coalesce(left_sibling_guard, node_guard, parent_guard, context)?;
805 } else {
806 let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
807 self.coalesce(left_sibling_guard, node_guard, parent_guard, context)?;
808 }
809 } else {
810 let right_sibling_pid = parent_page.value_at(node_idx + 1);
812 let node_pid = node_guard.page_id();
813 if right_sibling_pid < node_pid {
814 drop(node_guard);
816 let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
817 let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
818 node_guard = node_guard_new;
819 self.coalesce(node_guard, right_sibling_guard, parent_guard, context)?;
821 } else {
822 let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
823 self.coalesce(node_guard, right_sibling_guard, parent_guard, context)?;
824 }
825 }
826
827 Ok(())
828 }
829
830 fn coalesce(
831 &self,
832 mut left_guard: WritePageGuard,
833 right_guard: WritePageGuard,
834 mut parent_guard: WritePageGuard,
835 context: &mut Context,
836 ) -> QuillSQLResult<()> {
837 let relation = self.relation_ident();
838 let (mut left_page, _) =
839 BPlusTreePageCodec::decode(left_guard.data(), self.key_schema.clone())?;
840 let (mut right_page, _) =
841 BPlusTreePageCodec::decode(right_guard.data(), self.key_schema.clone())?;
842 let (mut parent_page, _) =
843 BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
844
845 let right_page_id = right_guard.page_id();
846 let middle_key = match parent_page.remove(right_page_id) {
847 Some((k, _)) => k,
848 None => {
849 return Err(QuillSQLError::Internal(
850 "coalesce: parent missing right child".to_string(),
851 ));
852 }
853 };
854
855 let child_payload = match (&mut left_page, &mut right_page) {
856 (BPlusTreePage::Leaf(left), BPlusTreePage::Leaf(right)) => {
857 left.merge(right);
858 left.header.version += 1;
859 IndexRecordPayload::LeafMerge(IndexLeafMergePayload {
860 relation: relation.clone(),
861 left_page_id: left_guard.page_id(),
862 right_page_id,
863 leaf_max_size: self.leaf_max_size,
864 left_next_page_id: left.header.next_page_id,
865 entries: left
866 .array
867 .iter()
868 .map(|(key, rid)| IndexLeafSplitEntryPayload {
869 key_data: TupleCodec::encode(key),
870 rid: *rid,
871 })
872 .collect(),
873 })
874 }
875 (BPlusTreePage::Internal(left), BPlusTreePage::Internal(right)) => {
876 left.merge(middle_key, right);
877 left.header.next_page_id = right.header.next_page_id;
878 left.high_key = right.high_key.clone();
879 left.header.version += 1;
880 IndexRecordPayload::InternalMerge(IndexInternalMergePayload {
881 relation: relation.clone(),
882 left_page_id: left_guard.page_id(),
883 right_page_id,
884 internal_max_size: self.internal_max_size,
885 left_entries: left
886 .array
887 .iter()
888 .map(|(tuple, child)| IndexInternalEntryPayload {
889 key_data: TupleCodec::encode(tuple),
890 child_page_id: *child,
891 })
892 .collect(),
893 high_key: left.high_key.as_ref().map(|t| TupleCodec::encode(t)),
894 next_page_id: left.header.next_page_id,
895 })
896 }
897 _ => unreachable!("Mismatched page types in coalesce"),
898 };
899
900 let child_lsn = self.append_index_record(child_payload)?;
901 let left_encoded = BPlusTreePageCodec::encode(&left_page);
902 self.wal_overwrite_page(&mut left_guard, left_encoded, child_lsn)?;
903
904 parent_page.header.version += 1;
905 let parent_payload = IndexRecordPayload::ParentDelete(IndexParentDeletePayload {
906 relation: relation.clone(),
907 parent_page_id: parent_guard.page_id(),
908 child_page_id: right_page_id,
909 });
910 let parent_lsn = self.append_index_record(parent_payload)?;
911 let parent_encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
912 self.wal_overwrite_page(&mut parent_guard, parent_encoded, parent_lsn)?;
913 drop(left_guard);
914 drop(right_guard);
915 self.buffer_pool.delete_page(right_page_id)?;
916
917 if context.write_set.is_empty() && parent_page.header.current_size == 1 {
920 self.adjust_root(parent_guard)?;
921 } else if parent_page.header.current_size < parent_page.min_size() {
922 self.handle_underflow(parent_guard, context)?;
924 }
925 Ok(())
926 }
927
928 fn redistribute(
929 &self,
930 mut from_guard: WritePageGuard,
931 mut to_guard: WritePageGuard,
932 mut parent_guard: WritePageGuard,
933 parent_idx_of_to_node: usize,
934 from_is_left_sibling: bool,
935 ) -> QuillSQLResult<()> {
936 let relation = self.relation_ident();
937 let (mut from_page, _) =
938 BPlusTreePageCodec::decode(from_guard.data(), self.key_schema.clone())?;
939 let (mut to_page, _) =
940 BPlusTreePageCodec::decode(to_guard.data(), self.key_schema.clone())?;
941 let (mut parent_page, _) =
942 BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
943
944 let (child_payload, parent_child_id, parent_key_tuple) = if from_is_left_sibling {
945 let separator_idx = parent_idx_of_to_node;
946 match (&mut to_page, &mut from_page) {
947 (BPlusTreePage::Leaf(to_leaf), BPlusTreePage::Leaf(from_leaf)) => {
948 let (moved_key, moved_rid) = from_leaf.remove_last_kv();
949 let moved_payload = IndexLeafSplitEntryPayload {
950 key_data: TupleCodec::encode(&moved_key),
951 rid: moved_rid,
952 };
953 to_leaf.array.insert(0, (moved_key, moved_rid));
954 to_leaf.header.current_size += 1;
955 parent_page.array[separator_idx].0 = to_leaf.key_at(0).clone();
956 to_leaf.header.version += 1;
957 from_leaf.header.version += 1;
958 (
959 IndexRecordPayload::LeafRedistribute(IndexLeafRedistributePayload {
960 relation: relation.clone(),
961 from_page_id: from_guard.page_id(),
962 to_page_id: to_guard.page_id(),
963 from_is_left: true,
964 moved_entry: moved_payload,
965 }),
966 to_guard.page_id(),
967 parent_page.array[separator_idx].0.clone(),
968 )
969 }
970 (BPlusTreePage::Internal(to_internal), BPlusTreePage::Internal(from_internal)) => {
971 let item_to_move = from_internal.remove_last_kv();
972 let separator_key_in_parent = parent_page.key_at(separator_idx).clone();
973 let from_old_sentinel = from_internal.value_at(0);
974 let to_old_sentinel = to_internal.value_at(0);
975 let moved_payload = IndexInternalEntryPayload {
976 key_data: TupleCodec::encode(&item_to_move.0),
977 child_page_id: item_to_move.1,
978 };
979 to_internal.array.insert(
980 1,
981 (separator_key_in_parent.clone(), to_internal.value_at(0)),
982 );
983 to_internal.array[0].1 = item_to_move.1;
984 parent_page.array[separator_idx].0 = item_to_move.0.clone();
985 to_internal.header.current_size += 1;
986 to_internal.header.version += 1;
987 from_internal.header.version += 1;
988 self.refresh_internal_child_fence(
989 &parent_page,
990 to_guard.page_id(),
991 to_internal,
992 )?;
993 self.refresh_internal_child_fence(
994 &parent_page,
995 from_guard.page_id(),
996 from_internal,
997 )?;
998 (
999 IndexRecordPayload::InternalRedistribute(
1000 IndexInternalRedistributePayload {
1001 relation: relation.clone(),
1002 from_page_id: from_guard.page_id(),
1003 to_page_id: to_guard.page_id(),
1004 parent_page_id: parent_guard.page_id(),
1005 from_is_left: true,
1006 from_old_sentinel,
1007 to_old_sentinel,
1008 moved_entry: moved_payload,
1009 separator_key_data: TupleCodec::encode(&separator_key_in_parent),
1010 to_new_high_key: to_internal
1011 .high_key
1012 .as_ref()
1013 .map(|t| TupleCodec::encode(t)),
1014 to_new_next_page_id: to_internal.header.next_page_id,
1015 from_new_high_key: from_internal
1016 .high_key
1017 .as_ref()
1018 .map(|t| TupleCodec::encode(t)),
1019 from_new_next_page_id: from_internal.header.next_page_id,
1020 },
1021 ),
1022 to_guard.page_id(),
1023 parent_page.array[separator_idx].0.clone(),
1024 )
1025 }
1026 _ => return Err(QuillSQLError::Internal("Mismatched page types".to_string())),
1027 }
1028 } else {
1029 let separator_idx = parent_idx_of_to_node + 1;
1030 match (&mut to_page, &mut from_page) {
1031 (BPlusTreePage::Leaf(to_leaf), BPlusTreePage::Leaf(from_leaf)) => {
1032 let (moved_key, moved_rid) = from_leaf.remove_first_kv();
1033 let moved_payload = IndexLeafSplitEntryPayload {
1034 key_data: TupleCodec::encode(&moved_key),
1035 rid: moved_rid,
1036 };
1037 to_leaf.array.push((moved_key, moved_rid));
1038 to_leaf.header.current_size += 1;
1039 parent_page.array[separator_idx].0 = from_leaf.key_at(0).clone();
1040 to_leaf.header.version += 1;
1041 from_leaf.header.version += 1;
1042 (
1043 IndexRecordPayload::LeafRedistribute(IndexLeafRedistributePayload {
1044 relation: relation.clone(),
1045 from_page_id: from_guard.page_id(),
1046 to_page_id: to_guard.page_id(),
1047 from_is_left: false,
1048 moved_entry: moved_payload,
1049 }),
1050 from_guard.page_id(),
1051 parent_page.array[separator_idx].0.clone(),
1052 )
1053 }
1054 (BPlusTreePage::Internal(to_internal), BPlusTreePage::Internal(from_internal)) => {
1055 let separator_key_in_parent = parent_page.key_at(separator_idx).clone();
1056 let item_to_move = from_internal.remove_first_kv();
1057 let from_old_sentinel = from_internal.value_at(0);
1058 let to_old_sentinel = to_internal.value_at(0);
1059 let moved_payload = IndexInternalEntryPayload {
1060 key_data: TupleCodec::encode(&item_to_move.0),
1061 child_page_id: item_to_move.1,
1062 };
1063 to_internal
1064 .array
1065 .push((separator_key_in_parent.clone(), from_internal.value_at(0)));
1066 parent_page.array[separator_idx].0 = item_to_move.0.clone();
1067 from_internal.array[0].1 = item_to_move.1;
1068 to_internal.header.current_size += 1;
1069 to_internal.header.version += 1;
1070 from_internal.header.version += 1;
1071 self.refresh_internal_child_fence(
1072 &parent_page,
1073 to_guard.page_id(),
1074 to_internal,
1075 )?;
1076 self.refresh_internal_child_fence(
1077 &parent_page,
1078 from_guard.page_id(),
1079 from_internal,
1080 )?;
1081 (
1082 IndexRecordPayload::InternalRedistribute(
1083 IndexInternalRedistributePayload {
1084 relation: relation.clone(),
1085 from_page_id: from_guard.page_id(),
1086 to_page_id: to_guard.page_id(),
1087 parent_page_id: parent_guard.page_id(),
1088 from_is_left: false,
1089 from_old_sentinel,
1090 to_old_sentinel,
1091 moved_entry: moved_payload,
1092 separator_key_data: TupleCodec::encode(&separator_key_in_parent),
1093 to_new_high_key: to_internal
1094 .high_key
1095 .as_ref()
1096 .map(|t| TupleCodec::encode(t)),
1097 to_new_next_page_id: to_internal.header.next_page_id,
1098 from_new_high_key: from_internal
1099 .high_key
1100 .as_ref()
1101 .map(|t| TupleCodec::encode(t)),
1102 from_new_next_page_id: from_internal.header.next_page_id,
1103 },
1104 ),
1105 from_guard.page_id(),
1106 parent_page.array[separator_idx].0.clone(),
1107 )
1108 }
1109 _ => return Err(QuillSQLError::Internal("Mismatched page types".to_string())),
1110 }
1111 };
1112
1113 let child_lsn = self.append_index_record(child_payload)?;
1114 let from_encoded = BPlusTreePageCodec::encode(&from_page);
1115 self.wal_overwrite_page(&mut from_guard, from_encoded, child_lsn)?;
1116 let to_encoded = BPlusTreePageCodec::encode(&to_page);
1117 self.wal_overwrite_page(&mut to_guard, to_encoded, child_lsn)?;
1118
1119 parent_page.header.version += 1;
1120 let parent_payload = IndexRecordPayload::ParentUpdate(IndexParentUpdatePayload {
1121 relation,
1122 parent_page_id: parent_guard.page_id(),
1123 child_page_id: parent_child_id,
1124 key_data: TupleCodec::encode(&parent_key_tuple),
1125 });
1126 let parent_lsn = self.append_index_record(parent_payload)?;
1127 let parent_encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
1128 self.wal_overwrite_page(&mut parent_guard, parent_encoded, parent_lsn)?;
1129
1130 Ok(())
1131 }
1132
1133 fn refresh_internal_child_fence(
1134 &self,
1135 parent_page: &BPlusTreeInternalPage,
1136 child_page_id: PageId,
1137 child_page: &mut BPlusTreeInternalPage,
1138 ) -> QuillSQLResult<()> {
1139 let Some(child_idx) = parent_page.value_index(child_page_id) else {
1140 return Err(QuillSQLError::Internal(
1141 "redistribute: child missing from parent".to_string(),
1142 ));
1143 };
1144 let size = parent_page.header.current_size as usize;
1145 if child_idx + 1 < size {
1146 child_page.high_key = Some(parent_page.key_at(child_idx + 1).clone());
1147 child_page.header.next_page_id = parent_page.value_at(child_idx + 1);
1148 } else {
1149 child_page.high_key = parent_page.high_key.clone();
1150 child_page.header.next_page_id = parent_page.header.next_page_id;
1151 }
1152 Ok(())
1153 }
1154
1155 fn adjust_root(&self, root_guard: WritePageGuard) -> QuillSQLResult<()> {
1156 let (root_page, _) =
1157 BPlusTreePageCodec::decode(root_guard.data(), self.key_schema.clone())?;
1158
1159 if let BPlusTreePage::Internal(root_internal) = root_page {
1160 if root_internal.header.current_size == 1 {
1161 let payload = IndexRecordPayload::RootAdopt(IndexRootAdoptPayload {
1162 relation: self.relation_ident(),
1163 new_root_page_id: root_internal.value_at(0),
1164 });
1165 let wal_lsn = self.append_index_record(payload)?;
1166 let new_root_id = root_internal.value_at(0);
1169 drop(root_guard);
1171 let _lock = self.header_page_lock.write();
1172 self.set_root_page_id(new_root_id, wal_lsn)?;
1173 }
1175 } else if let BPlusTreePage::Leaf(root_leaf) = root_page {
1176 if root_leaf.header.current_size == 0 {
1177 let payload = IndexRecordPayload::RootReset(IndexRootResetPayload {
1178 relation: self.relation_ident(),
1179 });
1180 let wal_lsn = self.append_index_record(payload)?;
1181 drop(root_guard);
1183 let _lock = self.header_page_lock.write();
1184 self.set_root_page_id(INVALID_PAGE_ID, wal_lsn)?;
1185 }
1187 }
1188 Ok(())
1189 }
1190
1191 fn start_new_tree(&self) -> QuillSQLResult<()> {
1193 let mut root_guard = self.buffer_pool.new_page()?;
1194 let root_page_id = root_guard.page_id();
1195 let mut leaf_page = BPlusTreeLeafPage::new(self.key_schema.clone(), self.leaf_max_size);
1196 leaf_page.header.version += 1;
1197 let payload = IndexRecordPayload::RootInstallLeaf(IndexRootInstallLeafPayload {
1198 relation: self.relation_ident(),
1199 page_id: root_page_id,
1200 leaf_max_size: self.leaf_max_size,
1201 next_page_id: leaf_page.header.next_page_id,
1202 entries: Vec::new(),
1203 });
1204 let wal_lsn = self.append_index_record(payload)?;
1205 let encoded_data = BPlusTreeLeafPageCodec::encode(&leaf_page);
1206 self.wal_overwrite_page(&mut root_guard, encoded_data, wal_lsn)?;
1207 drop(root_guard);
1210 self.set_root_page_id(root_page_id, wal_lsn)?;
1211 Ok(())
1212 }
1213
1214 fn find_leaf_page_optimistic(&self, key: &Tuple) -> QuillSQLResult<ReadPageGuard> {
1215 let mut restarts = 0usize;
1217 'restart: loop {
1218 let mut current_guard = self.buffer_pool.fetch_page_read(self.get_root_page_id()?)?;
1219 loop {
1220 let decoded =
1221 BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone());
1222 if decoded.is_err() {
1223 drop(current_guard);
1224 restarts += 1;
1225 if restarts > MAX_OLC_RESTARTS {
1226 std::thread::sleep(std::time::Duration::from_micros(
1227 OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1228 ));
1229 restarts = 0;
1230 }
1231 continue 'restart;
1232 }
1233 let (page, _) = decoded.unwrap();
1234 match page {
1235 BPlusTreePage::Internal(internal) => {
1236 let (hdr1, _) =
1238 BPlusTreeInternalPageCodec::decode_header_only(current_guard.data())?;
1239 let v1 = hdr1.version;
1240 if let Some(ref hk) = internal.high_key {
1241 if key >= hk && internal.header.next_page_id != INVALID_PAGE_ID {
1242 let sib = self
1243 .buffer_pool
1244 .fetch_page_read(internal.header.next_page_id)?;
1245 drop(current_guard);
1246 current_guard = sib;
1247 continue;
1248 }
1249 }
1250 let next_page_id = BPlusTreeInternalPageCodec::lookup_child_from_bytes(
1252 current_guard.data(),
1253 self.key_schema.clone(),
1254 key,
1255 )?;
1256 let (hdr2, _) =
1257 BPlusTreeInternalPageCodec::decode_header_only(current_guard.data())?;
1258 let v2 = hdr2.version;
1259 if v1 != v2 {
1260 drop(current_guard);
1261 restarts += 1;
1262 if restarts > MAX_OLC_RESTARTS {
1263 std::thread::sleep(std::time::Duration::from_micros(
1264 OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1265 ));
1266 restarts = 0;
1267 }
1268 continue 'restart;
1269 }
1270 let child_guard = self.buffer_pool.fetch_page_read(next_page_id)?;
1271 drop(current_guard);
1272 current_guard = child_guard;
1273 }
1274 BPlusTreePage::Leaf(_leaf) => {
1275 let (h1, _) =
1276 BPlusTreeLeafPageCodec::decode_header_only(current_guard.data())?;
1277 let v1 = h1.version;
1278 let (h2, _) =
1279 BPlusTreeLeafPageCodec::decode_header_only(current_guard.data())?;
1280 let v2 = h2.version;
1281 if v1 != v2 {
1282 drop(current_guard);
1283 restarts += 1;
1284 if restarts > MAX_OLC_RESTARTS {
1285 std::thread::sleep(std::time::Duration::from_micros(
1286 OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1287 ));
1288 restarts = 0;
1289 }
1290 continue 'restart;
1291 }
1292 return Ok(current_guard);
1293 }
1294 }
1295 }
1296 }
1297 }
1298
1299 pub fn find_first_leaf_page(&self) -> QuillSQLResult<ReadPageGuard> {
1300 let mut current_page_id = self.get_root_page_id()?;
1301 if current_page_id == INVALID_PAGE_ID {
1302 return Err(QuillSQLError::Internal("Tree is empty".to_string()));
1303 }
1304
1305 loop {
1306 let guard = self.buffer_pool.fetch_page_read(current_page_id)?;
1307 let (page, _) = BPlusTreePageCodec::decode(guard.data(), self.key_schema.clone())?;
1308
1309 match page {
1310 BPlusTreePage::Internal(internal) => {
1311 current_page_id = internal.value_at(0);
1312 }
1313 BPlusTreePage::Leaf(_) => {
1314 return Ok(guard);
1315 }
1316 }
1317 }
1318 }
1319
1320 pub fn find_leaf_page_for_iterator(
1325 &self,
1326 key: &Tuple,
1327 start_page_id: PageId,
1328 ) -> QuillSQLResult<ReadPageGuard> {
1329 let mut current_page_id = start_page_id;
1330 if current_page_id == INVALID_PAGE_ID {
1331 return Err(QuillSQLError::Storage("btree: empty tree".to_string()));
1332 }
1333
1334 loop {
1335 let current_guard = self.buffer_pool.fetch_page_read(current_page_id)?;
1338
1339 let (page_content, _) =
1341 BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone())?;
1342
1343 match page_content {
1344 BPlusTreePage::Internal(_internal_page) => {
1346 current_page_id =
1348 crate::storage::codec::BPlusTreeInternalPageCodec::lookup_child_from_bytes(
1349 current_guard.data(),
1350 self.key_schema.clone(),
1351 key,
1352 )?;
1353 }
1354 BPlusTreePage::Leaf(_) => {
1356 return Ok(current_guard);
1359 }
1360 }
1361 }
1362 }
1363
1364 fn split<'a>(
1366 &'a self,
1367 mut page_guard: WritePageGuard,
1368 context: &mut Context<'a>,
1369 ) -> QuillSQLResult<()> {
1370 loop {
1371 let page_id = page_guard.page_id();
1372 let (mut page, _) =
1373 BPlusTreePageCodec::decode(page_guard.data(), self.key_schema.clone())?;
1374
1375 let mut new_page_guard = self.buffer_pool.new_page()?;
1376 let new_page_id = new_page_guard.page_id();
1377
1378 let (middle_key, split_wal_lsn) = match &mut page {
1379 BPlusTreePage::Leaf(leaf_page) => {
1380 let split_at = leaf_page.header.current_size as usize / 2;
1381 let mut new_leaf =
1382 BPlusTreeLeafPage::new(self.key_schema.clone(), self.leaf_max_size);
1383 new_leaf.batch_insert(leaf_page.split_off(split_at));
1384 new_leaf.header.next_page_id = leaf_page.header.next_page_id;
1385 leaf_page.header.next_page_id = new_page_id;
1386 let entries_payload = new_leaf
1387 .array
1388 .iter()
1389 .map(|(key, rid)| IndexLeafSplitEntryPayload {
1390 key_data: TupleCodec::encode(key),
1391 rid: *rid,
1392 })
1393 .collect::<Vec<_>>();
1394 let payload = IndexRecordPayload::LeafSplit(IndexLeafSplitPayload {
1395 relation: self.relation_ident(),
1396 left_page_id: page_id,
1397 right_page_id: new_page_id,
1398 leaf_max_size: self.leaf_max_size,
1399 split_index: split_at as u16,
1400 right_next_page_id: new_leaf.header.next_page_id,
1401 entries: entries_payload,
1402 });
1403 let wal_lsn = self.append_index_record(payload)?;
1404 let new_data = BPlusTreeLeafPageCodec::encode(&new_leaf);
1405 self.wal_overwrite_page(&mut new_page_guard, new_data, wal_lsn)?;
1406 leaf_page.header.version += 1;
1407 (new_leaf.key_at(0).clone(), wal_lsn)
1408 }
1409 BPlusTreePage::Internal(internal_page) => {
1410 let mut new_internal =
1411 BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size);
1412
1413 let num_pointers = internal_page.header.current_size as usize;
1414 let promote_idx = 1 + (num_pointers.saturating_sub(1) / 2);
1415
1416 let mut moved = internal_page.split_off(promote_idx);
1417
1418 let (middle_key, right_sentinel_ptr) = {
1419 let pair = moved.get(0).ok_or(QuillSQLError::Internal(
1420 "Internal split moved entries empty".to_string(),
1421 ))?;
1422 (pair.0.clone(), pair.1)
1423 };
1424
1425 new_internal.insert(Tuple::empty(self.key_schema.clone()), right_sentinel_ptr);
1426 if moved.len() > 1 {
1427 new_internal.batch_insert(moved.split_off(1));
1428 }
1429
1430 let old_high_key = internal_page.high_key.clone();
1431 let old_next = internal_page.header.next_page_id;
1432 internal_page.high_key = Some(middle_key.clone());
1433 new_internal.high_key = old_high_key;
1434 new_internal.header.next_page_id = old_next;
1435 internal_page.header.next_page_id = new_page_id;
1436 internal_page.header.version += 1;
1437
1438 let right_entries = new_internal
1439 .array
1440 .iter()
1441 .map(|(tuple, child)| IndexInternalEntryPayload {
1442 key_data: TupleCodec::encode(tuple),
1443 child_page_id: *child,
1444 })
1445 .collect::<Vec<_>>();
1446 let payload = IndexRecordPayload::InternalSplit(IndexInternalSplitPayload {
1447 relation: self.relation_ident(),
1448 left_page_id: page_id,
1449 left_new_size: internal_page.header.current_size as u16,
1450 left_high_key: internal_page
1451 .high_key
1452 .as_ref()
1453 .map(|t| TupleCodec::encode(t)),
1454 left_next_page_id: internal_page.header.next_page_id,
1455 right_page_id: new_page_id,
1456 internal_max_size: self.internal_max_size,
1457 right_entries,
1458 right_high_key: new_internal
1459 .high_key
1460 .as_ref()
1461 .map(|t| TupleCodec::encode(t)),
1462 right_next_page_id: new_internal.header.next_page_id,
1463 });
1464 let wal_lsn = self.append_index_record(payload)?;
1465 let new_data = BPlusTreeInternalPageCodec::encode(&new_internal);
1466 self.wal_overwrite_page(&mut new_page_guard, new_data, wal_lsn)?;
1467 (middle_key, wal_lsn)
1468 }
1469 };
1470
1471 let old_page_data = BPlusTreePageCodec::encode(&page);
1473 self.wal_overwrite_page(&mut page_guard, old_page_data, split_wal_lsn)?;
1474
1475 if page_guard.page_id() == self.get_root_page_id()? {
1477 let mut new_root_guard = self.buffer_pool.new_page()?;
1478 let new_root_id = new_root_guard.page_id();
1479 let mut new_root_page =
1480 BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size);
1481
1482 new_root_page.insert(Tuple::empty(self.key_schema.clone()), page_id);
1483 new_root_page.insert(middle_key, new_page_id);
1484 new_root_page.header.version += 1;
1485
1486 let entries_payload = new_root_page
1487 .array
1488 .iter()
1489 .map(|(key, child)| IndexInternalEntryPayload {
1490 key_data: TupleCodec::encode(key),
1491 child_page_id: *child,
1492 })
1493 .collect::<Vec<_>>();
1494 let payload =
1495 IndexRecordPayload::RootInstallInternal(IndexRootInstallInternalPayload {
1496 relation: self.relation_ident(),
1497 page_id: new_root_id,
1498 internal_max_size: self.internal_max_size,
1499 entries: entries_payload,
1500 high_key: new_root_page
1501 .high_key
1502 .as_ref()
1503 .map(|t| TupleCodec::encode(t)),
1504 next_page_id: new_root_page.header.next_page_id,
1505 });
1506 let wal_lsn = self.append_index_record(payload)?;
1507 let encoded = BPlusTreeInternalPageCodec::encode(&new_root_page);
1508 self.wal_overwrite_page(&mut new_root_guard, encoded, wal_lsn)?;
1509
1510 drop(new_page_guard);
1512 drop(page_guard);
1513
1514 let _lock = self.header_page_lock.write();
1515 self.set_root_page_id(new_root_id, wal_lsn)?;
1516 return Ok(());
1517 }
1518
1519 let mut parent_guard = match context.write_set.pop_back() {
1521 Some(g) => g,
1522 None => {
1523 return Err(QuillSQLError::Internal(
1525 "split: missing parent in context".to_string(),
1526 ));
1527 }
1528 };
1529 let (mut parent_page, _) =
1530 BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
1531 if parent_page.value_index(page_id).is_none() {
1533 return Err(QuillSQLError::Internal(
1535 "split: parent no longer contains left child".to_string(),
1536 ));
1537 }
1538 parent_page.insert_after(page_id, middle_key.clone(), new_page_id);
1539 parent_page.header.version += 1;
1540
1541 let parent_payload = IndexRecordPayload::ParentInsert(IndexParentInsertPayload {
1542 relation: self.relation_ident(),
1543 parent_page_id: parent_guard.page_id(),
1544 left_child_page_id: page_id,
1545 right_child_page_id: new_page_id,
1546 key_data: TupleCodec::encode(&middle_key),
1547 });
1548 let parent_lsn = self.append_index_record(parent_payload)?;
1549
1550 let encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
1551 self.wal_overwrite_page(&mut parent_guard, encoded, parent_lsn)?;
1552
1553 if parent_page.is_full() {
1554 drop(new_page_guard);
1556 drop(page_guard);
1557 page_guard = parent_guard;
1558 } else {
1560 drop(new_page_guard);
1562 drop(page_guard);
1563 return Ok(());
1564 }
1565 }
1566 }
1567}
1568
1569#[cfg(test)]
1570mod tests {
1571 use parking_lot::deadlock;
1572 use std::collections::HashSet;
1573 use std::sync::Arc;
1574 use std::sync::Once;
1575 use std::time::Duration;
1576 use tempfile::TempDir;
1577
1578 use crate::catalog::SchemaRef;
1579 use crate::config::WalConfig;
1580 use crate::recovery::{RecoveryManager, WalManager};
1581 use crate::storage::disk_manager::DiskManager;
1582 use crate::storage::disk_scheduler::DiskScheduler;
1583 use crate::storage::index::btree_index::TreeIndexIterator;
1584 use crate::storage::page::{BPlusTreePage, RecordId};
1585 use crate::storage::tuple::Tuple;
1586 use crate::{
1587 buffer::BufferManager,
1588 catalog::{Column, DataType, Schema},
1589 storage::codec::BPlusTreePageCodec,
1590 };
1591
1592 use super::BPlusTreeIndex;
1593
1594 fn ensure_deadlock_watchdog() {
1595 static START: Once = Once::new();
1596 START.call_once(|| {
1597 std::thread::spawn(|| loop {
1598 std::thread::sleep(Duration::from_millis(500));
1599 let deadlocks = deadlock::check_deadlock();
1600 if !deadlocks.is_empty() {
1601 eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
1602 for (i, threads) in deadlocks.iter().enumerate() {
1603 eprintln!("Cycle {}:", i);
1604 for t in threads {
1605 eprintln!(" ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
1606 }
1607 }
1608 panic!("deadlock detected");
1609 }
1610 });
1611 });
1612 }
1613
1614 fn create_test_index(
1616 buffer_pool_size: usize,
1617 internal_max_size: u32,
1618 leaf_max_size: u32,
1619 ) -> (TempDir, BPlusTreeIndex, SchemaRef) {
1620 let temp_dir = TempDir::new().unwrap();
1621 let temp_path = temp_dir.path().join("test.db");
1622
1623 let key_schema = Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1624 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1625 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1626 let buffer_pool = Arc::new(BufferManager::new(buffer_pool_size, disk_scheduler));
1627 let index = BPlusTreeIndex::new(
1628 key_schema.clone(),
1629 buffer_pool,
1630 internal_max_size,
1631 leaf_max_size,
1632 );
1633
1634 (temp_dir, index, key_schema)
1635 }
1636
1637 fn setup_with_wal(
1639 db_path: &std::path::Path,
1640 wal_dir: &std::path::Path,
1641 bpm_pages: usize,
1642 ) -> (Arc<BufferManager>, Arc<WalManager>, Arc<DiskScheduler>) {
1643 let dm = Arc::new(DiskManager::try_new(db_path).unwrap());
1644 let scheduler = Arc::new(DiskScheduler::new(dm));
1645 let mut cfg = WalConfig::default();
1646 cfg.directory = wal_dir.to_path_buf();
1647 let wal = Arc::new(WalManager::new(cfg, None, None).unwrap());
1648 let bpm = Arc::new(BufferManager::new(bpm_pages, scheduler.clone()));
1649 bpm.set_wal_manager(wal.clone());
1650 (bpm, wal, scheduler)
1651 }
1652
1653 fn rid_from_key(key: i64) -> RecordId {
1655 let value = key & 0xFFFFFFFF;
1656 RecordId::new((key >> 32) as u32, value as u32)
1657 }
1658
1659 #[test]
1660 fn test_index_recovery_replays_wal_split() {
1661 ensure_deadlock_watchdog();
1662 let tmp = TempDir::new().unwrap();
1663 let db_path = tmp.path().join("test.db");
1664 let wal_dir = tmp.path().join("wal");
1665
1666 let (bpm, wal, scheduler) = setup_with_wal(&db_path, &wal_dir, 64);
1668 let key_schema: SchemaRef =
1669 Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1670 let index = BPlusTreeIndex::new(key_schema.clone(), bpm.clone(), 2, 3);
1671 let header_pid = index.header_page_id; let keys: Vec<i64> = (1..=30).collect();
1674 for k in &keys {
1675 let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1676 index.insert(&t, rid_from_key(*k)).unwrap();
1677 }
1678
1679 let _ = wal.flush(None).unwrap();
1681 drop(index);
1682 drop(bpm);
1683 drop(wal);
1684 drop(scheduler);
1685
1686 let dm2 = Arc::new(DiskManager::try_new(&db_path).unwrap());
1688 let scheduler2 = Arc::new(DiskScheduler::new(dm2));
1689 let mut cfg2 = WalConfig::default();
1690 cfg2.directory = wal_dir.clone();
1691 let wal2 = Arc::new(WalManager::new(cfg2, None, None).unwrap());
1692 let rm = RecoveryManager::new(wal2.clone(), scheduler2.clone());
1693 let _summary = rm.replay().unwrap();
1694
1695 let bpm2 = Arc::new(BufferManager::new(128, scheduler2.clone()));
1697 let reopened = BPlusTreeIndex::open(key_schema.clone(), bpm2.clone(), 2, 3, header_pid);
1698
1699 for k in &keys {
1700 let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1701 let got = reopened
1702 .get(&t)
1703 .unwrap()
1704 .expect("missing key after recovery");
1705 assert_eq!(got.page_id, rid_from_key(*k).page_id);
1706 assert_eq!(got.slot_num, rid_from_key(*k).slot_num);
1707 }
1708
1709 let index_arc = Arc::new(reopened);
1711 let mut it = TreeIndexIterator::new(index_arc, ..);
1712 let mut i = 1i64;
1713 while let Some(rid) = it.next().unwrap() {
1714 assert_eq!(rid.slot_num, rid_from_key(i).slot_num);
1715 i += 1;
1716 }
1717 assert_eq!(i, 31);
1718 }
1719
1720 #[test]
1721 fn test_index_recovery_replays_wal_delete_merge() {
1722 ensure_deadlock_watchdog();
1723 let tmp = TempDir::new().unwrap();
1724 let db_path = tmp.path().join("delete.db");
1725 let wal_dir = tmp.path().join("wal");
1726
1727 let (bpm, wal, scheduler) = setup_with_wal(&db_path, &wal_dir, 64);
1728 let key_schema: SchemaRef =
1729 Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1730 let index = BPlusTreeIndex::new(key_schema.clone(), bpm.clone(), 2, 3);
1731 let header_pid = index.header_page_id;
1732
1733 let keys: Vec<i64> = (1..=40).collect();
1734 for k in &keys {
1735 let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1736 index.insert(&t, rid_from_key(*k)).unwrap();
1737 }
1738
1739 let deletes: Vec<i64> = (5..=25).collect();
1740 for k in &deletes {
1741 let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1742 index.delete(&t).unwrap();
1743 }
1744
1745 let _ = wal.flush(None).unwrap();
1746 drop(index);
1747 drop(bpm);
1748 drop(wal);
1749 drop(scheduler);
1750
1751 let dm2 = Arc::new(DiskManager::try_new(&db_path).unwrap());
1752 let scheduler2 = Arc::new(DiskScheduler::new(dm2));
1753 let mut cfg2 = WalConfig::default();
1754 cfg2.directory = wal_dir.clone();
1755 let wal2 = Arc::new(WalManager::new(cfg2, None, None).unwrap());
1756 let rm = RecoveryManager::new(wal2.clone(), scheduler2.clone());
1757 let _summary = rm.replay().unwrap();
1758
1759 let bpm2 = Arc::new(BufferManager::new(128, scheduler2.clone()));
1760 let reopened = BPlusTreeIndex::open(key_schema.clone(), bpm2.clone(), 2, 3, header_pid);
1761 let delete_set: HashSet<i64> = deletes.into_iter().collect();
1762
1763 for k in &keys {
1764 let tuple = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1765 let result = reopened.get(&tuple).unwrap();
1766 if delete_set.contains(k) {
1767 assert!(result.is_none(), "key {} should be removed", k);
1768 } else {
1769 let rid = result.expect("missing key after recovery");
1770 assert_eq!(rid.page_id, rid_from_key(*k).page_id);
1771 assert_eq!(rid.slot_num, rid_from_key(*k).slot_num);
1772 }
1773 }
1774 }
1775
1776 fn create_rid_from_key(key: i64) -> RecordId {
1778 let value = key & 0xFFFFFFFF;
1779 RecordId::new((key >> 32) as u32, value as u32)
1780 }
1781
1782 fn create_tuple_from_key(key: i64, schema: SchemaRef) -> Tuple {
1784 Tuple::new(schema, vec![key.into()])
1785 }
1786
1787 #[test]
1789 fn test_basic_insert() {
1790 let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1791
1792 let key = 42i64;
1793 let tuple = create_tuple_from_key(key, key_schema.clone());
1794 let rid = create_rid_from_key(key);
1795
1796 index.insert(&tuple, rid).unwrap();
1797
1798 let root_page_id = index.get_root_page_id().unwrap();
1799 assert_ne!(root_page_id, crate::buffer::INVALID_PAGE_ID);
1800
1801 let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
1802 let (root_page, _) =
1803 BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
1804
1805 assert!(matches!(root_page, BPlusTreePage::Leaf(_)));
1806
1807 if let BPlusTreePage::Leaf(root_as_leaf) = root_page {
1808 assert_eq!(root_as_leaf.header.current_size, 1);
1809 assert_eq!(root_as_leaf.array[0].0, tuple);
1810 assert_eq!(root_as_leaf.array[0].1, rid);
1811 }
1812 }
1813
1814 #[test]
1816 fn test_insert_no_iterator() {
1817 let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1818
1819 let keys = vec![1i64, 2, 3, 4, 5];
1820 for key in &keys {
1821 let tuple = create_tuple_from_key(*key, key_schema.clone());
1822 let rid = create_rid_from_key(*key);
1823 index.insert(&tuple, rid).unwrap();
1824 }
1825
1826 for key in &keys {
1827 let tuple = create_tuple_from_key(*key, key_schema.clone());
1828 let result = index.get(&tuple).unwrap();
1829 assert!(result.is_some(), "missing key {}", key);
1830
1831 let expected_rid = create_rid_from_key(*key);
1832 let actual_rid = result.unwrap();
1833 assert_eq!(actual_rid.page_id, expected_rid.page_id);
1834 assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1835 }
1836 }
1837
1838 #[test]
1840 fn test_insert_reverse_order() {
1841 let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1842
1843 let keys = vec![5i64, 4, 3, 2, 1];
1844 for key in &keys {
1845 let tuple = create_tuple_from_key(*key, key_schema.clone());
1846 let rid = create_rid_from_key(*key);
1847 index.insert(&tuple, rid).unwrap();
1848 }
1849
1850 for key in &keys {
1851 let tuple = create_tuple_from_key(*key, key_schema.clone());
1852 let result = index.get(&tuple).unwrap();
1853 assert!(result.is_some(), "missing key {}", key);
1854
1855 let expected_rid = create_rid_from_key(*key);
1856 let actual_rid = result.unwrap();
1857 assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1858 }
1859
1860 let index_arc = Arc::new(index);
1862 let mut iter = TreeIndexIterator::new(index_arc, ..);
1863 let mut current_key = 1i64;
1864 while let Some(rid) = iter.next().unwrap() {
1865 assert_eq!(rid.slot_num as i64, current_key);
1866 current_key += 1;
1867 }
1868 assert_eq!(current_key, keys.len() as i64 + 1);
1869 }
1870
1871 #[test]
1873 fn test_delete_no_iterator() {
1874 let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1875
1876 let keys = vec![1i64, 2, 3, 4, 5];
1877 for key in &keys {
1878 let tuple = create_tuple_from_key(*key, key_schema.clone());
1879 let rid = create_rid_from_key(*key);
1880 index.insert(&tuple, rid).unwrap();
1881 }
1882
1883 for key in &keys {
1885 let tuple = create_tuple_from_key(*key, key_schema.clone());
1886 let result = index.get(&tuple).unwrap();
1887 assert!(result.is_some());
1888
1889 let expected_rid = create_rid_from_key(*key);
1890 let actual_rid = result.unwrap();
1891 assert_eq!(actual_rid.page_id, expected_rid.page_id);
1892 assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1893 }
1894
1895 let remove_keys = vec![1i64, 5, 3, 4];
1897 for key in &remove_keys {
1898 println!("Before deleting key {}:\n{}", key, index.to_dot().unwrap());
1899 let tuple = create_tuple_from_key(*key, key_schema.clone());
1900 index.delete(&tuple).unwrap();
1901 println!("After deleting key {}:\n{}", key, index.to_dot().unwrap());
1902 }
1903
1904 let mut size = 0;
1905 for key in &keys {
1906 let tuple = create_tuple_from_key(*key, key_schema.clone());
1907 let is_present = index.get(&tuple).unwrap().is_some();
1908
1909 if !is_present {
1910 assert!(remove_keys.contains(key));
1911 } else {
1912 assert!(!remove_keys.contains(key));
1913 assert_eq!(
1914 index.get(&tuple).unwrap().unwrap().slot_num,
1915 (*key & 0xFFFFFFFF) as u32
1916 );
1917 size += 1;
1918 }
1919 }
1920 assert_eq!(size, 1);
1921
1922 let tuple = create_tuple_from_key(2, key_schema.clone());
1924 println!("Before deleting final key 2:\n{}", index.to_dot().unwrap());
1925 index.delete(&tuple).unwrap();
1926 println!("After deleting final key 2:\n{}", index.to_dot().unwrap());
1927 assert!(index.is_empty().unwrap());
1928 }
1929
1930 #[test]
1931 fn test_internal_borrow_from_right_keeps_searchable() {
1932 let (_temp_dir, index, key_schema) = create_test_index(64, 2, 3);
1933
1934 let inserts = [
1935 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
1936 ];
1937 for key in inserts {
1938 let tuple = create_tuple_from_key(key, key_schema.clone());
1939 let rid = create_rid_from_key(key);
1940 index.insert(&tuple, rid).unwrap();
1941 }
1942
1943 let deletes = [-5, -4, -3, -2, -1, 0, 1, 2];
1944 for key in deletes {
1945 let tuple = create_tuple_from_key(key, key_schema.clone());
1946 index.delete(&tuple).unwrap();
1947 }
1948
1949 let root_page_id = index.get_root_page_id().unwrap();
1950 let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
1951 let (root_page, _) =
1952 BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
1953 let (left_internal_id, right_internal_id) =
1954 if let BPlusTreePage::Internal(root_internal) = root_page {
1955 assert_eq!(root_internal.header.current_size, 2);
1956 (root_internal.value_at(0), root_internal.value_at(1))
1957 } else {
1958 panic!("expected internal root after deletions");
1959 };
1960 drop(root_guard);
1961
1962 let left_guard = index.buffer_pool.fetch_page_read(left_internal_id).unwrap();
1963 let (left_page, _) =
1964 BPlusTreePageCodec::decode(left_guard.data(), key_schema.clone()).unwrap();
1965 if let BPlusTreePage::Internal(left_internal) = left_page {
1966 assert_eq!(
1967 left_internal.high_key,
1968 Some(create_tuple_from_key(7, key_schema.clone()))
1969 );
1970 assert_eq!(left_internal.header.next_page_id, right_internal_id);
1971 } else {
1972 panic!("expected left child to remain internal");
1973 }
1974
1975 let survivor = create_tuple_from_key(3, key_schema.clone());
1976 assert!(index.get(&survivor).unwrap().is_some());
1977 }
1978
1979 #[test]
1981 fn test_sequential_edge_mix() {
1982 ensure_deadlock_watchdog();
1983 for leaf_max_size in 2..=5 {
1984 let (_temp_dir, index, key_schema) = create_test_index(50, 3, leaf_max_size);
1985
1986 let keys = vec![1i64, 5, 15, 20, 25, 2, -1, -2, 6, 14, 4];
1987 let mut inserted = vec![];
1988 let mut deleted = vec![];
1989
1990 for key in &keys {
1991 let tuple = create_tuple_from_key(*key, key_schema.clone());
1992 let rid = create_rid_from_key(*key);
1993 index.insert(&tuple, rid).unwrap();
1994 inserted.push(*key);
1995
1996 verify_tree_state(&index, &key_schema, &inserted, &deleted);
1998 }
1999
2000 let tuple = create_tuple_from_key(1, key_schema.clone());
2002 index.delete(&tuple).unwrap();
2003 deleted.push(1);
2004 inserted.retain(|&x| x != 1);
2005 verify_tree_state(&index, &key_schema, &inserted, &deleted);
2006
2007 let tuple = create_tuple_from_key(3, key_schema.clone());
2009 let rid = create_rid_from_key(3);
2010 index.insert(&tuple, rid).unwrap();
2011 inserted.push(3);
2012 verify_tree_state(&index, &key_schema, &inserted, &deleted);
2013
2014 let delete_keys = vec![4i64, 14, 6, 2, 15, -2, -1, 3, 5, 25, 20];
2016 for key in &delete_keys {
2017 let tuple = create_tuple_from_key(*key, key_schema.clone());
2018 index.delete(&tuple).unwrap();
2019 deleted.push(*key);
2020 inserted.retain(|&x| x != *key);
2021 verify_tree_state(&index, &key_schema, &inserted, &deleted);
2022 }
2023 }
2024 }
2025
2026 fn verify_tree_state(
2028 index: &BPlusTreeIndex,
2029 key_schema: &SchemaRef,
2030 inserted: &[i64],
2031 deleted: &[i64],
2032 ) {
2033 for key in inserted {
2034 let tuple = create_tuple_from_key(*key, key_schema.clone());
2035 let result = index.get(&tuple).unwrap();
2036 assert!(result.is_some(), "Key {} should be present", key);
2037 }
2038
2039 for key in deleted {
2040 let tuple = create_tuple_from_key(*key, key_schema.clone());
2041 let result = index.get(&tuple).unwrap();
2042 assert!(result.is_none(), "Key {} should be deleted", key);
2043 }
2044 }
2045
2046 #[test]
2048 fn test_concurrent_insert() {
2049 std::thread::spawn(|| loop {
2051 std::thread::sleep(Duration::from_millis(500));
2052 let deadlocks = deadlock::check_deadlock();
2053 if !deadlocks.is_empty() {
2054 eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
2055 for (i, threads) in deadlocks.iter().enumerate() {
2056 eprintln!("Cycle {}:", i);
2057 for t in threads {
2058 eprintln!(" ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
2059 }
2060 }
2061 panic!("deadlock detected");
2062 }
2063 });
2064 const NUM_ITERS: usize = 3;
2065 const SCALE_FACTOR: i64 = 50; const NUM_THREADS: usize = 5;
2067
2068 for _iter in 0..NUM_ITERS {
2069 let (_temp_dir, index, key_schema) = create_test_index(64, 3, 5);
2070 let index = Arc::new(index);
2071
2072 let keys: Vec<i64> = (1..SCALE_FACTOR).collect();
2073 let mut threads = vec![];
2074
2075 for i in 0..NUM_THREADS {
2076 let index_clone = index.clone();
2077 let key_schema_clone = key_schema.clone();
2078 let keys_clone = keys.clone();
2079
2080 threads.push(std::thread::spawn(move || {
2081 for key in &keys_clone {
2082 if (*key as usize) % NUM_THREADS == i {
2083 let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2084 let rid = create_rid_from_key(*key);
2085 index_clone.insert(&tuple, rid).unwrap();
2086 }
2087 }
2088 }));
2089 }
2090
2091 for thread in threads {
2092 thread.join().unwrap();
2093 }
2094
2095 for key in &keys {
2097 let tuple = create_tuple_from_key(*key, key_schema.clone());
2098 let result = index.get(&tuple).unwrap();
2099 assert!(result.is_some());
2100 assert_eq!(result.unwrap().slot_num, (*key & 0xFFFFFFFF) as u32);
2101 }
2102
2103 let mut iter = TreeIndexIterator::new(index.clone(), ..);
2105 let mut current_key = 1i64;
2106 while let Some(rid) = iter.next().unwrap() {
2107 assert_eq!(rid.slot_num, (current_key & 0xFFFFFFFF) as u32);
2108 current_key += 1;
2109 }
2110 assert_eq!(current_key, keys.len() as i64 + 1);
2111 }
2112 }
2113
2114 #[test]
2116 fn test_basic_scale() {
2117 let (_temp_dir, index, key_schema) = create_test_index(512, 2, 3);
2118
2119 let scale = 1000i64;
2120 let mut keys: Vec<i64> = (1..=scale).collect();
2121
2122 let mut seed: u64 = 0x9E3779B97F4A7C15;
2124 for i in (1..keys.len()).rev() {
2125 seed = seed
2126 .wrapping_mul(2862933555777941757)
2127 .wrapping_add(3037000493);
2128 let j = (seed as usize) % (i + 1);
2129 keys.swap(i, j);
2130 }
2131
2132 for key in &keys {
2134 let tuple = create_tuple_from_key(*key, key_schema.clone());
2135 let rid = create_rid_from_key(*key);
2136 index.insert(&tuple, rid).unwrap();
2137 let got = index.get(&tuple).unwrap();
2138 if got.is_none() {
2139 println!(
2140 "After inserting {}, tree=\n{}",
2141 key,
2142 index.to_dot().unwrap()
2143 );
2144 panic!("immediate missing key {}", key);
2145 }
2146 }
2147
2148 let probe = 630i64;
2157 let tuple = create_tuple_from_key(probe, key_schema.clone());
2158 if index.get(&tuple).unwrap().is_none() {
2159 let guard = index.find_leaf_page_optimistic(&tuple).unwrap();
2160 let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone()).unwrap();
2161 if let BPlusTreePage::Leaf(leaf) = page {
2162 println!(
2163 "Early probe leaf for 630 has keys: {:?}",
2164 leaf.array
2165 .iter()
2166 .map(|(t, _)| format!("{}", t))
2167 .collect::<Vec<_>>()
2168 );
2169 }
2170 panic!("missing key 630 before verification loop");
2171 }
2172
2173 for key in &keys {
2175 let tuple = create_tuple_from_key(*key, key_schema.clone());
2176 let result = index.get(&tuple).unwrap();
2177 assert!(result.is_some(), "missing key {}", key);
2178
2179 let expected_rid = create_rid_from_key(*key);
2180 let actual_rid = result.unwrap();
2181 assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
2182 }
2183
2184 let probe = 630i64;
2186 let tuple = create_tuple_from_key(probe, key_schema.clone());
2187 if index.get(&tuple).unwrap().is_none() {
2188 let guard = index.find_leaf_page_optimistic(&tuple).unwrap();
2189 let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone()).unwrap();
2190 if let BPlusTreePage::Leaf(leaf) = page {
2191 println!(
2192 "Probe leaf for 630 has keys: {:?}",
2193 leaf.array
2194 .iter()
2195 .map(|(t, _)| format!("{}", t))
2196 .collect::<Vec<_>>()
2197 );
2198 }
2199 }
2200 }
2201
2202 #[test]
2204 fn test_concurrent_delete() {
2205 const NUM_ITERS: usize = 10;
2206
2207 for _iter in 0..NUM_ITERS {
2208 let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2209
2210 let keys = vec![1i64, 2, 3, 4, 5];
2212 for key in &keys {
2213 let tuple = create_tuple_from_key(*key, key_schema.clone());
2214 let rid = create_rid_from_key(*key);
2215 index.insert(&tuple, rid).unwrap();
2216 }
2217
2218 let index = Arc::new(index);
2219 let remove_keys = vec![1i64, 5, 3, 4];
2220 let mut threads = vec![];
2221
2222 for i in 0..2 {
2223 let index_clone = index.clone();
2224 let key_schema_clone = key_schema.clone();
2225 let remove_keys_clone = remove_keys.clone();
2226
2227 threads.push(std::thread::spawn(move || {
2228 for key in &remove_keys_clone {
2229 if (*key as usize) % 2 == i {
2230 let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2231 index_clone.delete(&tuple).unwrap();
2232 }
2233 }
2234 }));
2235 }
2236
2237 for thread in threads {
2238 thread.join().unwrap();
2239 }
2240
2241 let mut size = 0;
2243 let index_arc = index.clone();
2244 let mut iter = TreeIndexIterator::new(index_arc, ..);
2245 while let Some(rid) = iter.next().unwrap() {
2246 assert_eq!(rid.slot_num, 2);
2247 size += 1;
2248 }
2249 assert_eq!(size, 1);
2250 }
2251 }
2252
2253 #[test]
2255 fn test_concurrent_mix() {
2256 std::thread::spawn(|| loop {
2258 std::thread::sleep(Duration::from_millis(500));
2259 let deadlocks = deadlock::check_deadlock();
2260 if !deadlocks.is_empty() {
2261 eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
2262 for (i, threads) in deadlocks.iter().enumerate() {
2263 eprintln!("Cycle {}:", i);
2264 for t in threads {
2265 eprintln!(" ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
2266 }
2267 }
2268 panic!("deadlock detected");
2269 }
2270 });
2271 const NUM_ITERS: usize = 5;
2272
2273 for _iter in 0..NUM_ITERS {
2274 let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2275 let index = Arc::new(index);
2276
2277 let mut for_insert = vec![];
2279 let mut for_delete = vec![];
2280 let total_keys = 20i64; for i in 1..=total_keys {
2283 if i % 2 == 0 {
2284 for_insert.push(i);
2285 } else {
2286 for_delete.push(i);
2287 }
2288 }
2289
2290 for key in &for_delete {
2292 let tuple = create_tuple_from_key(*key, key_schema.clone());
2293 let rid = create_rid_from_key(*key);
2294 index.insert(&tuple, rid).unwrap();
2295 }
2296
2297 let mut threads = vec![];
2298 let num_threads = 5;
2299
2300 for i in 0..num_threads {
2301 let index_clone = index.clone();
2302 let key_schema_clone = key_schema.clone();
2303 let for_insert_clone = for_insert.clone();
2304 let for_delete_clone = for_delete.clone();
2305
2306 threads.push(std::thread::spawn(move || {
2307 if i % 2 == 0 {
2308 for key in &for_insert_clone {
2310 let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2311 let rid = create_rid_from_key(*key);
2312 index_clone.insert(&tuple, rid).unwrap();
2313 }
2314 } else {
2315 for key in &for_delete_clone {
2317 let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2318 index_clone.delete(&tuple).unwrap();
2319 }
2320 }
2321 }));
2322 }
2323
2324 for thread in threads {
2325 thread.join().unwrap();
2326 }
2327
2328 let mut count = 0;
2330 for key in &for_insert {
2331 let tuple = create_tuple_from_key(*key, key_schema.clone());
2332 let result = index.get(&tuple).unwrap();
2333 if result.is_some() {
2334 count += 1;
2335 }
2336 }
2337 assert_eq!(count, for_insert.len());
2338
2339 for key in &for_delete {
2341 let tuple = create_tuple_from_key(*key, key_schema.clone());
2342 let result = index.get(&tuple).unwrap();
2343 assert!(result.is_none());
2344 }
2345 }
2346 }
2347
2348 #[test]
2350 fn test_iterator_functionality() {
2351 let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2352
2353 let keys = vec![1i64, 3, 5, 7, 9];
2355 for key in &keys {
2356 let tuple = create_tuple_from_key(*key, key_schema.clone());
2357 let rid = create_rid_from_key(*key);
2358 index.insert(&tuple, rid).unwrap();
2359 }
2360
2361 let index = Arc::new(index);
2362
2363 let start_tuple = create_tuple_from_key(3, key_schema.clone());
2365 let end_tuple = create_tuple_from_key(7, key_schema.clone());
2366 let mut iterator = TreeIndexIterator::new(index.clone(), start_tuple..=end_tuple);
2367
2368 let mut results = vec![];
2369 while let Some(rid) = iterator.next().unwrap() {
2370 results.push(rid.slot_num as i64);
2371 }
2372 assert_eq!(results, vec![3, 5, 7]);
2373
2374 let mut iterator = TreeIndexIterator::new(index.clone(), ..);
2376 let mut all_results = vec![];
2377 while let Some(rid) = iterator.next().unwrap() {
2378 all_results.push(rid.slot_num as i64);
2379 }
2380 assert_eq!(all_results, vec![1, 3, 5, 7, 9]);
2381 }
2382
2383 #[test]
2385 fn test_leaf_split_structure() {
2386 let (_temp_dir, index, key_schema) = create_test_index(50, 10, 3);
2387
2388 for k in [1i64, 2, 3, 4] {
2390 let tuple = create_tuple_from_key(k, key_schema.clone());
2391 let rid = create_rid_from_key(k);
2392 index.insert(&tuple, rid).unwrap();
2393 }
2394
2395 let root_page_id = index.get_root_page_id().unwrap();
2396 let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
2397 let (root_page, _) =
2398 BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
2399
2400 let BPlusTreePage::Internal(root_internal) = root_page else {
2402 panic!("root is not internal after leaf split");
2403 };
2404
2405 let left_pid = root_internal.value_at(0);
2407 let right_pid = root_internal.value_at(1);
2408
2409 let left_guard = index.buffer_pool.fetch_page_read(left_pid).unwrap();
2410 let right_guard = index.buffer_pool.fetch_page_read(right_pid).unwrap();
2411 let (left_page, _) =
2412 BPlusTreePageCodec::decode(left_guard.data(), key_schema.clone()).unwrap();
2413 let (right_page, _) =
2414 BPlusTreePageCodec::decode(right_guard.data(), key_schema.clone()).unwrap();
2415
2416 let BPlusTreePage::Leaf(left_leaf) = left_page else {
2417 panic!("left child not leaf");
2418 };
2419 let BPlusTreePage::Leaf(_right_leaf) = right_page else {
2420 panic!("right child not leaf");
2421 };
2422
2423 assert_eq!(left_leaf.header.next_page_id, right_pid);
2425 }
2426
2427 #[test]
2429 #[ignore]
2430 fn bench_get_hot_read() {
2431 fn getenv_usize(k: &str, default_v: usize) -> usize {
2432 std::env::var(k)
2433 .ok()
2434 .and_then(|v| v.parse().ok())
2435 .unwrap_or(default_v)
2436 }
2437
2438 let bpm = getenv_usize("QUILL_BENCH_BPM", 1024);
2439 let nkeys = getenv_usize("QUILL_BENCH_N", 20000) as i64;
2440 let ops = getenv_usize("QUILL_BENCH_OPS", 200000);
2441
2442 let (_temp_dir, index, key_schema) = create_test_index(bpm, 3, 64);
2443
2444 let mut keys: Vec<i64> = (1..=nkeys).collect();
2446 let mut seed: u64 = 0x9E3779B97F4A7C15;
2448 for i in (1..keys.len()).rev() {
2449 seed = seed
2450 .wrapping_mul(2862933555777941757)
2451 .wrapping_add(3037000493);
2452 let j = (seed as usize) % (i + 1);
2453 keys.swap(i, j);
2454 }
2455 for k in &keys {
2456 let t = create_tuple_from_key(*k, key_schema.clone());
2457 index.insert(&t, create_rid_from_key(*k)).unwrap();
2458 }
2459
2460 let hot_start = (nkeys as usize * 9) / 10;
2462 let hot = &keys[hot_start..];
2463
2464 let start = std::time::Instant::now();
2465 let mut found = 0usize;
2466 let mut x = 0x243F6A8885A308D3u64;
2468 for _ in 0..ops {
2469 x = x.wrapping_mul(6364136223846793005).wrapping_add(1);
2470 let idx = (x as usize) % hot.len();
2471 let key = hot[idx];
2472 let t = create_tuple_from_key(key, key_schema.clone());
2473 if index.get(&t).unwrap().is_some() {
2474 found += 1;
2475 }
2476 }
2477 let el = start.elapsed();
2478 let qps = (ops as f64) / el.as_secs_f64();
2479 println!(
2480 "bench_get_hot_read: ops={} time={:?} qps={:.1} found={}",
2481 ops, el, qps, found
2482 );
2483 }
2484
2485 #[test]
2486 #[ignore]
2487 fn bench_range_scan() {
2488 fn getenv_usize(k: &str, default_v: usize) -> usize {
2489 std::env::var(k)
2490 .ok()
2491 .and_then(|v| v.parse().ok())
2492 .unwrap_or(default_v)
2493 }
2494
2495 let bpm = getenv_usize("QUILL_BENCH_BPM", 1024);
2496 let nkeys = getenv_usize("QUILL_BENCH_N", 30000) as i64;
2497 let passes = getenv_usize("QUILL_BENCH_PASSES", 20);
2498
2499 let (_temp_dir, index, key_schema) = create_test_index(bpm, 3, 64);
2500
2501 let mut keys: Vec<i64> = (1..=nkeys).collect();
2503 let mut seed: u64 = 0x9E3779B97F4A7C15;
2504 for i in (1..keys.len()).rev() {
2505 seed = seed
2506 .wrapping_mul(2862933555777941757)
2507 .wrapping_add(3037000493);
2508 let j = (seed as usize) % (i + 1);
2509 keys.swap(i, j);
2510 }
2511 for k in &keys {
2512 let t = create_tuple_from_key(*k, key_schema.clone());
2513 index.insert(&t, create_rid_from_key(*k)).unwrap();
2514 }
2515
2516 let index = Arc::new(index);
2517 let total = (nkeys as usize) * passes;
2518 let start = std::time::Instant::now();
2519 let mut seen = 0usize;
2520 for _ in 0..passes {
2521 let mut it = TreeIndexIterator::new(index.clone(), ..);
2522 while let Some(_rid) = it.next().unwrap() {
2523 seen += 1;
2524 }
2525 }
2526 let el = start.elapsed();
2527 let tps = (total as f64) / el.as_secs_f64();
2528 println!(
2529 "bench_range_scan: items={} time={:?} ips={:.1} seen={}",
2530 total, el, tps, seen
2531 );
2532 }
2533}