quill_sql/storage/index/
btree_index.rs

1use parking_lot::{RwLock, RwLockWriteGuard};
2use std::collections::VecDeque;
3use std::fmt::Write;
4use std::sync::Arc;
5
6use crate::buffer::{
7    BufferManager, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID, PAGE_SIZE,
8};
9use crate::catalog::SchemaRef;
10use crate::error::{QuillSQLError, QuillSQLResult};
11use crate::storage::codec::{
12    BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
13    BPlusTreePageCodec, TupleCodec,
14};
15use crate::storage::page::{BPlusTreeHeaderPage, BPlusTreeInternalPage};
16use crate::storage::page::{BPlusTreeLeafPage, BPlusTreePage, RecordId};
17
18use crate::config::BTreeConfig;
19use crate::recovery::{wal_record::WalRecordPayload, Lsn};
20use crate::storage::codec::BPlusTreePageTypeCodec;
21pub use crate::storage::index::btree_iterator::TreeIndexIterator;
22use crate::storage::index::wal_codec::{
23    IndexInternalEntryPayload, IndexInternalMergePayload, IndexInternalRedistributePayload,
24    IndexInternalSplitPayload, IndexLeafDeletePayload, IndexLeafInsertPayload,
25    IndexLeafMergePayload, IndexLeafRedistributePayload, IndexLeafSplitEntryPayload,
26    IndexLeafSplitPayload, IndexParentDeletePayload, IndexParentInsertPayload,
27    IndexParentUpdatePayload, IndexRecordPayload, IndexRelationIdent, IndexRootAdoptPayload,
28    IndexRootInstallInternalPayload, IndexRootInstallLeafPayload, IndexRootResetPayload,
29    SchemaRepr,
30};
31use crate::storage::page::BPlusTreePageType;
32use crate::storage::tuple::Tuple;
33use crate::transaction::TransactionId;
34// OLC bounded restart and backoff configuration
35const MAX_OLC_RESTARTS: usize = 64;
36const OLC_BACKOFF_BASE_US: u64 = 50;
37
38#[derive(Debug)]
39pub struct Context<'a> {
40    /// Write guards along the path from root to the current node.
41    /// When a child node is safe for the operation, all ancestor locks
42    /// can be released (clear this set) to reduce lock footprint.
43    pub write_set: VecDeque<WritePageGuard>,
44    /// Holds the header page lock during structural changes (e.g., root switch).
45    pub header_lock_guard: Option<RwLockWriteGuard<'a, ()>>,
46}
47
48impl<'a> Context<'a> {
49    pub fn new() -> Self {
50        Self {
51            write_set: VecDeque::new(),
52            header_lock_guard: None,
53        }
54    }
55
56    /// Push a write guard onto the traversal path.
57    pub fn push_write_guard(&mut self, guard: WritePageGuard) {
58        self.write_set.push_back(guard);
59    }
60
61    /// Release all ancestor write locks when it is safe to proceed.
62    pub fn release_all_write_locks(&mut self) {
63        self.write_set.clear();
64        self.header_lock_guard = None;
65    }
66}
67
68// Production-ready B+ Tree index with OLC readers and latch-crabbing writers.
69// - Header page holds immutable identity; root_page_id changes atomically.
70
71#[derive(Debug)]
72pub struct BPlusTreeIndex {
73    pub key_schema: SchemaRef,
74    pub buffer_pool: Arc<BufferManager>,
75    pub internal_max_size: u32,
76    pub leaf_max_size: u32,
77    pub header_page_id: PageId,
78    pub header_page_lock: Arc<RwLock<()>>,
79    pub config: BTreeConfig,
80}
81
82impl BPlusTreeIndex {
83    /// Write a modified index page back to the buffer with WAL (FPW/Delta) just like table heap.
84    /// This centralizes crash-consistent updates for header/leaf/internal pages.
85    fn wal_overwrite_page(
86        &self,
87        guard: &mut WritePageGuard,
88        new_image: Vec<u8>,
89        new_lsn: Option<Lsn>,
90    ) -> QuillSQLResult<()> {
91        debug_assert_eq!(new_image.len(), PAGE_SIZE);
92        if new_lsn.is_some() {
93            guard.overwrite(&new_image, new_lsn);
94        } else {
95            guard.apply_page_image(&new_image)?;
96        }
97        Ok(())
98    }
99
100    fn relation_ident(&self) -> IndexRelationIdent {
101        IndexRelationIdent {
102            header_page_id: self.header_page_id,
103            schema: SchemaRepr::from(self.key_schema.clone()),
104        }
105    }
106
107    fn append_index_record(&self, payload: IndexRecordPayload) -> QuillSQLResult<Option<Lsn>> {
108        if let Some(wal) = self.buffer_pool.wal_manager() {
109            let res = wal.append_record_with(|_| WalRecordPayload::Index(payload.clone()))?;
110            Ok(Some(res.end_lsn))
111        } else {
112            Ok(None)
113        }
114    }
115
116    pub fn new(
117        key_schema: SchemaRef,
118        buffer_pool: Arc<BufferManager>,
119        internal_max_size: u32,
120        leaf_max_size: u32,
121    ) -> Self {
122        // Create a header page to store the root_page_id
123        let mut header_page_guard = buffer_pool
124            .new_page()
125            .expect("Failed to create header page for B+ tree");
126        let header_page_id = header_page_guard.page_id();
127        let header_page = BPlusTreeHeaderPage {
128            root_page_id: INVALID_PAGE_ID,
129        };
130        let encoded = BPlusTreeHeaderPageCodec::encode(&header_page);
131        header_page_guard.overwrite(&encoded, None);
132        drop(header_page_guard);
133
134        Self {
135            key_schema,
136            buffer_pool,
137            // In our representation, internal node stores a sentinel pointer at index 0.
138            // If caller passes max_keys, then max pointers (KVs) = max_keys + 1.
139            internal_max_size: internal_max_size + 1,
140            leaf_max_size,
141            header_page_id,
142            header_page_lock: Arc::new(RwLock::new(())),
143            config: BTreeConfig::default(),
144        }
145    }
146
147    pub fn new_with_config(
148        key_schema: SchemaRef,
149        buffer_pool: Arc<BufferManager>,
150        internal_max_size: u32,
151        leaf_max_size: u32,
152        config: BTreeConfig,
153    ) -> Self {
154        let mut me = Self::new(key_schema, buffer_pool, internal_max_size, leaf_max_size);
155        me.config = config;
156        me
157    }
158
159    pub fn open(
160        key_schema: SchemaRef,
161        buffer_pool: Arc<BufferManager>,
162        internal_max_size: u32,
163        leaf_max_size: u32,
164        header_page_id: PageId,
165    ) -> Self {
166        Self {
167            key_schema,
168            buffer_pool,
169            // See note in new(): store as max pointers (KVs) capacity.
170            internal_max_size: internal_max_size + 1,
171            leaf_max_size,
172            header_page_id,
173            header_page_lock: Arc::new(RwLock::new(())),
174            config: BTreeConfig::default(),
175        }
176    }
177
178    pub fn open_with_config(
179        key_schema: SchemaRef,
180        buffer_pool: Arc<BufferManager>,
181        internal_max_size: u32,
182        leaf_max_size: u32,
183        header_page_id: PageId,
184        config: BTreeConfig,
185    ) -> Self {
186        let mut me = Self::open(
187            key_schema,
188            buffer_pool,
189            internal_max_size,
190            leaf_max_size,
191            header_page_id,
192        );
193        me.config = config;
194        me
195    }
196
197    pub fn get_root_page_id(&self) -> QuillSQLResult<PageId> {
198        let header_guard = self.buffer_pool.fetch_page_read(self.header_page_id)?;
199        let (header_page, _) = BPlusTreeHeaderPageCodec::decode(header_guard.data())?;
200        Ok(header_page.root_page_id)
201    }
202
203    fn set_root_page_id(&self, page_id: PageId, wal_lsn: Option<Lsn>) -> QuillSQLResult<()> {
204        let mut header_guard = self.buffer_pool.fetch_page_write(self.header_page_id)?;
205        let header_page = BPlusTreeHeaderPage {
206            root_page_id: page_id,
207        };
208        let encoded = BPlusTreeHeaderPageCodec::encode(&header_page);
209        self.wal_overwrite_page(&mut header_guard, encoded, wal_lsn)?;
210        Ok(())
211    }
212
213    pub fn is_empty(&self) -> QuillSQLResult<bool> {
214        Ok(self.get_root_page_id()? == INVALID_PAGE_ID)
215    }
216
217    pub fn get(&self, key: &Tuple) -> QuillSQLResult<Option<RecordId>> {
218        if self.is_empty()? {
219            return Ok(None);
220        }
221        let mut guard = self.find_leaf_page_optimistic(key)?;
222        // Walk right through leaf chain while key is greater than last key
223        loop {
224            let decoded = BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone());
225            if let Ok((leaf_page, _)) = decoded {
226                if let Some(rid) = leaf_page.look_up(key) {
227                    return Ok(Some(rid));
228                }
229                if leaf_page.header.current_size > 0
230                    && leaf_page.header.next_page_id != INVALID_PAGE_ID
231                {
232                    let last_key = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
233                    if *key > *last_key {
234                        guard = self
235                            .buffer_pool
236                            .fetch_page_read(leaf_page.header.next_page_id)?;
237                        continue;
238                    }
239                }
240                return Ok(None);
241            } else {
242                // Retry once from root if decode failed (transient)
243                guard = self.find_leaf_page_optimistic(key)?;
244                let (leaf_page, _) =
245                    BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone())?;
246                return Ok(leaf_page.look_up(key));
247            }
248        }
249    }
250
251    /// 辅助函数:以写模式查找叶子页面,并沿途执行闩锁耦合。
252    fn find_leaf_page_pessimistic<'a>(
253        &'a self,
254        key: &Tuple,
255        is_insert: bool,
256        mut context: Context<'a>,
257    ) -> QuillSQLResult<(WritePageGuard, Context<'a>)> {
258        // Do not pre-hold header lock on insert path; only take it around root modifications
259        let root_page_id = self.get_root_page_id()?;
260        if root_page_id == INVALID_PAGE_ID {
261            // This should be handled by the caller (insert/delete)
262            return Err(QuillSQLError::Internal(
263                "find_leaf_page_pessimistic called on an empty tree".to_string(),
264            ));
265        }
266        let mut current_guard = self.buffer_pool.fetch_page_write(root_page_id)?;
267
268        loop {
269            let (page, _) =
270                BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone())?;
271
272            match page {
273                BPlusTreePage::Internal(_internal) => {
274                    let child_page_id = BPlusTreeInternalPageCodec::lookup_child_from_bytes(
275                        current_guard.data(),
276                        self.key_schema.clone(),
277                        key,
278                    )?;
279                    let child_guard = self.buffer_pool.fetch_page_write(child_page_id)?;
280                    // header-only safety check for overflow
281                    let will_overflow = match BPlusTreePageTypeCodec::decode(child_guard.data())?.0
282                    {
283                        BPlusTreePageType::LeafPage => {
284                            let (hdr, _) =
285                                BPlusTreeLeafPageCodec::decode_header_only(child_guard.data())?;
286                            hdr.current_size == hdr.max_size
287                        }
288                        BPlusTreePageType::InternalPage => {
289                            let (hdr, _) =
290                                BPlusTreeInternalPageCodec::decode_header_only(child_guard.data())?;
291                            hdr.current_size == hdr.max_size
292                        }
293                    };
294
295                    if is_insert {
296                        if !will_overflow {
297                            context.release_all_write_locks();
298                            drop(current_guard);
299                            current_guard = child_guard;
300                            continue;
301                        }
302                    }
303                    context.push_write_guard(current_guard);
304                    current_guard = child_guard;
305                }
306                BPlusTreePage::Leaf(_) => {
307                    return Ok((current_guard, context));
308                }
309            }
310        }
311    }
312
313    pub fn insert(&self, key: &Tuple, rid: RecordId) -> QuillSQLResult<()> {
314        self.insert_with_txn(key, rid, TransactionId::default())
315    }
316
317    /// 公共 API: 插入一个键值对,使用闩锁耦合实现高并发。
318    pub fn insert_with_txn(
319        &self,
320        key: &Tuple,
321        rid: RecordId,
322        txn_id: TransactionId,
323    ) -> QuillSQLResult<()> {
324        let mut context = Context::new();
325
326        // Guard tree initialization to avoid concurrent start_new_tree races.
327        if self.is_empty()? {
328            let _lock = self.header_page_lock.write();
329            let root_now = self.get_root_page_id()?;
330            if root_now == INVALID_PAGE_ID {
331                // Create an empty root leaf under header lock.
332                self.start_new_tree()?;
333            }
334        }
335
336        // Split-before-insert loop: ensure we never insert into a full node.
337        loop {
338            let (mut leaf_guard, mut local_ctx) =
339                self.find_leaf_page_pessimistic(key, true, context)?;
340            let (mut leaf_page, _) =
341                BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
342
343            // If we still hold a parent, verify that this leaf is the expected child.
344            // If not, redirect to the expected child to avoid misplacing keys across parent ranges.
345            if let Some(parent_guard_ref) = local_ctx.write_set.back() {
346                let (parent_page_chk, _) = BPlusTreeInternalPageCodec::decode(
347                    parent_guard_ref.data(),
348                    self.key_schema.clone(),
349                )?;
350                let expected_pid = parent_page_chk.look_up(key);
351                if expected_pid != leaf_guard.page_id() {
352                    drop(leaf_guard);
353                    leaf_guard = self.buffer_pool.fetch_page_write(expected_pid)?;
354                    let (new_leaf, _) =
355                        BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
356                    leaf_page = new_leaf;
357                }
358            }
359
360            // Redirect to right siblings if key no longer belongs to this leaf (post-split race)
361            while leaf_page.header.current_size > 0
362                && leaf_page.header.next_page_id != INVALID_PAGE_ID
363            {
364                let last_key_ref = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
365                if *key <= *last_key_ref {
366                    break;
367                }
368                let next_pid = leaf_page.header.next_page_id;
369                // Peek the next leaf's minimal key using a read latch to avoid heavy contention
370                let next_guard_peek = self.buffer_pool.fetch_page_read(next_pid)?;
371                let (next_leaf_peek, _) = BPlusTreeLeafPageCodec::decode(
372                    next_guard_peek.data(),
373                    self.key_schema.clone(),
374                )?;
375                let next_first_key = if next_leaf_peek.header.current_size > 0 {
376                    next_leaf_peek.key_at(0).clone()
377                } else {
378                    break;
379                };
380                drop(next_guard_peek);
381                if *key < next_first_key {
382                    break;
383                }
384                // Release any parents held and current leaf, then jump to next sibling directly
385                local_ctx.release_all_write_locks();
386                drop(leaf_guard);
387                let next_guard = self.buffer_pool.fetch_page_write(next_pid)?;
388                let (next_leaf, _) =
389                    BPlusTreeLeafPageCodec::decode(next_guard.data(), self.key_schema.clone())?;
390                leaf_guard = next_guard;
391                leaf_page = next_leaf;
392            }
393
394            // Update if key exists
395            if let Some(existing_rid) = leaf_page.look_up_mut(key) {
396                let old_rid = *existing_rid;
397                *existing_rid = rid;
398                leaf_page.header.version += 1;
399                let key_encoded = TupleCodec::encode(key);
400                let delete_payload = IndexRecordPayload::LeafDelete(IndexLeafDeletePayload {
401                    relation: self.relation_ident(),
402                    page_id: leaf_guard.page_id(),
403                    op_txn_id: txn_id,
404                    key_data: key_encoded.clone(),
405                    old_rid,
406                });
407                let mut wal_lsn = self.append_index_record(delete_payload)?;
408                let insert_payload = IndexRecordPayload::LeafInsert(IndexLeafInsertPayload {
409                    relation: self.relation_ident(),
410                    page_id: leaf_guard.page_id(),
411                    op_txn_id: txn_id,
412                    key_data: key_encoded,
413                    rid,
414                });
415                if let Some(lsn) = self.append_index_record(insert_payload)? {
416                    wal_lsn = Some(lsn);
417                }
418                let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
419                self.wal_overwrite_page(&mut leaf_guard, encoded, wal_lsn)?;
420                local_ctx.release_all_write_locks();
421                return Ok(());
422            }
423
424            // If page is at capacity, split first, then retry to find the correct leaf
425            if leaf_page.header.current_size == leaf_page.header.max_size {
426                // Avoid deadlock: allow split to acquire header lock when promoting root
427                local_ctx.header_lock_guard = None;
428                match self.split(leaf_guard, &mut local_ctx) {
429                    Ok(()) => {
430                        // Release all locks and retry from root with fresh context
431                        local_ctx.release_all_write_locks();
432                        context = Context::new();
433                        continue;
434                    }
435                    Err(QuillSQLError::Internal(_e)) => {
436                        // Structural race: drop latches and retry from root
437                        local_ctx.release_all_write_locks();
438                        context = Context::new();
439                        continue;
440                    }
441                    Err(e) => return Err(e),
442                }
443            }
444
445            // Safe to insert
446            leaf_page.insert(key.clone(), rid);
447            leaf_page.header.version += 1;
448            let payload = IndexRecordPayload::LeafInsert(IndexLeafInsertPayload {
449                relation: self.relation_ident(),
450                page_id: leaf_guard.page_id(),
451                op_txn_id: txn_id,
452                key_data: TupleCodec::encode(key),
453                rid,
454            });
455            let wal_lsn = self.append_index_record(payload)?;
456            let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
457            self.wal_overwrite_page(&mut leaf_guard, encoded, wal_lsn)?;
458            local_ctx.release_all_write_locks();
459            return Ok(());
460        }
461    }
462
463    /// Public API: delete a key with latch coupling to ensure concurrency safety.
464    pub fn delete(&self, key: &Tuple) -> QuillSQLResult<()> {
465        self.delete_with_txn(key, TransactionId::default())
466    }
467
468    pub fn delete_with_txn(&self, key: &Tuple, txn_id: TransactionId) -> QuillSQLResult<()> {
469        if self.is_empty()? {
470            return Ok(());
471        }
472
473        let mut context = Context::new();
474        'restart: loop {
475            let (mut leaf_guard, mut local_ctx) =
476                self.find_leaf_page_pessimistic(key, false, context)?;
477            let (mut leaf_page, _) =
478                BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
479
480            // If we still hold a parent, prefer parent-guided redirection to avoid crossing
481            // parent boundary via leaf chain which can cause livelock during structure changes.
482            if let Some(parent_guard_ref) = local_ctx.write_set.back() {
483                let (parent_page_chk, _) = BPlusTreeInternalPageCodec::decode(
484                    parent_guard_ref.data(),
485                    self.key_schema.clone(),
486                )?;
487                let expected_pid = parent_page_chk.look_up(key);
488                if expected_pid != leaf_guard.page_id() {
489                    drop(leaf_guard);
490                    leaf_guard = self.buffer_pool.fetch_page_write(expected_pid)?;
491                    let (new_leaf, _) =
492                        BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
493                    leaf_page = new_leaf;
494                }
495            }
496
497            // Redirect: if key belongs to right siblings, jump directly to sibling while keeping parent path
498            let mut hops: u32 = 0;
499            while leaf_page.header.current_size > 0
500                && leaf_page.header.next_page_id != INVALID_PAGE_ID
501            {
502                let last_key_ref = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
503                if *key <= *last_key_ref {
504                    break;
505                }
506                let next_pid = leaf_page.header.next_page_id;
507                hops += 1;
508                if hops > 8 {
509                    local_ctx.release_all_write_locks();
510                    drop(leaf_guard);
511                    context = Context::new();
512                    continue 'restart;
513                }
514                drop(leaf_guard);
515                leaf_guard = self.buffer_pool.fetch_page_write(next_pid)?;
516                let (new_leaf, _) =
517                    BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
518                leaf_page = new_leaf;
519
520                // Do not force restart here; attempt deletion on the correct leaf first.
521            }
522
523            // Check presence & whether it is the first key (affects parent separator)
524            let was_first = if leaf_page.header.current_size > 0 {
525                let first_key = leaf_page.key_at(0).clone();
526                &first_key == key
527            } else {
528                false
529            };
530            let target_rid = match leaf_page.look_up(key) {
531                Some(rid) => rid,
532                None => {
533                    local_ctx.release_all_write_locks();
534                    return Ok(());
535                }
536            };
537            let key_bytes = TupleCodec::encode(key);
538
539            leaf_page.delete(key);
540            leaf_page.header.version += 1;
541            let payload = IndexRecordPayload::LeafDelete(IndexLeafDeletePayload {
542                relation: self.relation_ident(),
543                page_id: leaf_guard.page_id(),
544                op_txn_id: txn_id,
545                key_data: key_bytes,
546                old_rid: target_rid,
547            });
548            let wal_lsn = self.append_index_record(payload)?;
549            let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
550            self.wal_overwrite_page(&mut leaf_guard, encoded, wal_lsn)?;
551
552            // If the node is underflowing, handle it.
553            if leaf_page.header.current_size < leaf_page.min_size() {
554                // If parent path is empty but this leaf is not the actual root, rebuild path
555                if local_ctx.write_set.is_empty() {
556                    let root_id = self.get_root_page_id()?;
557                    if leaf_guard.page_id() != root_id {
558                        local_ctx.release_all_write_locks();
559                        drop(leaf_guard);
560                        context = Context::new();
561                        continue 'restart;
562                    }
563                }
564                match self.handle_underflow(leaf_guard, &mut local_ctx) {
565                    Ok(()) => {
566                        local_ctx.release_all_write_locks();
567                        return Ok(());
568                    }
569                    Err(QuillSQLError::Internal(_)) => {
570                        // Structural race/mismatch: release and retry from root
571                        local_ctx.release_all_write_locks();
572                        context = Context::new();
573                        continue;
574                    }
575                    Err(e) => return Err(e),
576                }
577            } else {
578                // If we deleted the first key of this leaf, and there's a parent above,
579                // update the parent's separator key to the new minimal key of this leaf.
580                if was_first && leaf_page.header.current_size > 0 {
581                    if let Some(mut parent_guard) = local_ctx.write_set.pop_back() {
582                        let (mut parent_page, _) = BPlusTreeInternalPageCodec::decode(
583                            parent_guard.data(),
584                            self.key_schema.clone(),
585                        )?;
586                        if let Some(node_idx) = parent_page.value_index(leaf_guard.page_id()) {
587                            if node_idx > 0 {
588                                parent_page.array[node_idx].0 = leaf_page.key_at(0).clone();
589                                parent_page.header.version += 1;
590                                let encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
591                                self.wal_overwrite_page(&mut parent_guard, encoded, None)?;
592                            }
593                        }
594                        // push back to maintain path for later releases
595                        local_ctx.write_set.push_back(parent_guard);
596                    }
597                }
598                local_ctx.release_all_write_locks();
599                return Ok(());
600            }
601        }
602    }
603
604    pub fn to_dot(&self) -> QuillSQLResult<String> {
605        let mut dot = String::new();
606        writeln!(&mut dot, "digraph BPlusTree {{").unwrap();
607        writeln!(&mut dot, "  rankdir=TB;").unwrap();
608        writeln!(&mut dot, "  node [shape=record, height=.1];").unwrap();
609
610        let root_page_id = self.get_root_page_id()?;
611        if root_page_id == INVALID_PAGE_ID {
612            writeln!(&mut dot, "  empty [label=\"<f0> Empty Tree\"];").unwrap();
613            writeln!(&mut dot, "}}").unwrap();
614            return Ok(dot);
615        }
616
617        let mut queue = VecDeque::new();
618        queue.push_back(root_page_id);
619
620        while let Some(page_id) = queue.pop_front() {
621            let guard = self.buffer_pool.fetch_page_read(page_id)?;
622            let (page, _) = BPlusTreePageCodec::decode(guard.data(), self.key_schema.clone())?;
623
624            match page {
625                BPlusTreePage::Internal(internal) => {
626                    let mut label = String::new();
627                    for i in 0..internal.header.current_size {
628                        if i > 0 {
629                            write!(&mut label, "|").unwrap();
630                        }
631                        write!(&mut label, "<p{}>", i).unwrap();
632                        if i > 0 {
633                            write!(&mut label, " {}", internal.key_at(i as usize)).unwrap();
634                        }
635                    }
636                    writeln!(&mut dot, "  page{} [label=\"{}\"];", page_id, label).unwrap();
637
638                    for i in 0..internal.header.current_size {
639                        let child_id = internal.value_at(i as usize);
640                        writeln!(
641                            &mut dot,
642                            "  \"page{}\":p{} -> \"page{}\";",
643                            page_id, i, child_id
644                        )
645                        .unwrap();
646                        queue.push_back(child_id);
647                    }
648                }
649                BPlusTreePage::Leaf(leaf) => {
650                    let mut label = String::new();
651                    for i in 0..leaf.header.current_size {
652                        if i > 0 {
653                            write!(&mut label, "|").unwrap();
654                        }
655                        write!(
656                            &mut label,
657                            "<f{}> {} -> {}",
658                            i,
659                            leaf.key_at(i as usize),
660                            leaf.array[i as usize].1
661                        )
662                        .unwrap();
663                    }
664                    writeln!(&mut dot, "  page{} [label=\"{}\"];", page_id, label).unwrap();
665
666                    if leaf.header.next_page_id != INVALID_PAGE_ID {
667                        writeln!(
668                            &mut dot,
669                            "  page{} -> page{} [style=dashed, constraint=false];",
670                            page_id, leaf.header.next_page_id
671                        )
672                        .unwrap();
673                    }
674                }
675            }
676        }
677
678        writeln!(&mut dot, "}}").unwrap();
679        Ok(dot)
680    }
681
682    fn handle_underflow(
683        &self,
684        mut node_guard: WritePageGuard,
685        context: &mut Context,
686    ) -> QuillSQLResult<()> {
687        if context.write_set.is_empty() {
688            // This is the root node. Let adjust_root handle it.
689            self.adjust_root(node_guard)?;
690            return Ok(());
691        }
692
693        let parent_guard = match context.write_set.pop_back() {
694            Some(g) => g,
695            None => {
696                return Err(QuillSQLError::Internal(
697                    "underflow: missing parent".to_string(),
698                ))
699            }
700        };
701        let (parent_page, _) =
702            BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
703
704        let Some(node_idx) = parent_page.value_index(node_guard.page_id()) else {
705            return Err(QuillSQLError::Internal(
706                "underflow: node not found in parent".to_string(),
707            ));
708        };
709
710        // Try to borrow from the left sibling first (acquire locks in PageId order to avoid deadlock).
711        if node_idx > 0 {
712            let left_sibling_pid = parent_page.value_at(node_idx - 1);
713            let node_pid = node_guard.page_id();
714            if left_sibling_pid < node_pid {
715                // Enforce PageId order: release node, lock left first, then node
716                drop(node_guard);
717                let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
718                let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
719                node_guard = node_guard_new;
720                let (left_sibling_page, _) =
721                    BPlusTreePageCodec::decode(left_sibling_guard.data(), self.key_schema.clone())?;
722                if left_sibling_page.current_size() > left_sibling_page.min_size() {
723                    self.redistribute(
724                        left_sibling_guard,
725                        node_guard,
726                        parent_guard,
727                        node_idx,
728                        true,
729                    )?;
730                    return Ok(());
731                }
732            } else {
733                // We already hold node; locking left next preserves non-decreasing order
734                let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
735                let (left_sibling_page, _) =
736                    BPlusTreePageCodec::decode(left_sibling_guard.data(), self.key_schema.clone())?;
737                if left_sibling_page.current_size() > left_sibling_page.min_size() {
738                    self.redistribute(
739                        left_sibling_guard,
740                        node_guard,
741                        parent_guard,
742                        node_idx,
743                        true,
744                    )?;
745                    return Ok(());
746                }
747            }
748        }
749
750        // Try to borrow from the right sibling (acquire locks in PageId order to avoid deadlock).
751        if node_idx < parent_page.header.current_size as usize - 1 {
752            let right_sibling_pid = parent_page.value_at(node_idx + 1);
753            let node_pid = node_guard.page_id();
754            if right_sibling_pid < node_pid {
755                // Enforce PageId order: release node, lock right first, then node
756                drop(node_guard);
757                let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
758                let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
759                node_guard = node_guard_new;
760                let (right_sibling_page, _) = BPlusTreePageCodec::decode(
761                    right_sibling_guard.data(),
762                    self.key_schema.clone(),
763                )?;
764                if right_sibling_page.current_size() > right_sibling_page.min_size() {
765                    self.redistribute(
766                        right_sibling_guard,
767                        node_guard,
768                        parent_guard,
769                        node_idx,
770                        false,
771                    )?;
772                    return Ok(());
773                }
774            } else {
775                let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
776                let (right_sibling_page, _) = BPlusTreePageCodec::decode(
777                    right_sibling_guard.data(),
778                    self.key_schema.clone(),
779                )?;
780                if right_sibling_page.current_size() > right_sibling_page.min_size() {
781                    self.redistribute(
782                        right_sibling_guard,
783                        node_guard,
784                        parent_guard,
785                        node_idx,
786                        false,
787                    )?;
788                    return Ok(());
789                }
790            }
791        }
792
793        // Neither sibling can lend, so we must merge.
794        if node_idx > 0 {
795            // Merge with the left sibling (lock in PageId order)
796            let left_sibling_pid = parent_page.value_at(node_idx - 1);
797            let node_pid = node_guard.page_id();
798            if left_sibling_pid < node_pid {
799                // Enforce PageId order for merge: release node first
800                drop(node_guard);
801                let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
802                let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
803                node_guard = node_guard_new;
804                self.coalesce(left_sibling_guard, node_guard, parent_guard, context)?;
805            } else {
806                let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
807                self.coalesce(left_sibling_guard, node_guard, parent_guard, context)?;
808            }
809        } else {
810            // Merge with the right sibling (lock in PageId order)
811            let right_sibling_pid = parent_page.value_at(node_idx + 1);
812            let node_pid = node_guard.page_id();
813            if right_sibling_pid < node_pid {
814                // Enforce PageId order for merge: release node first
815                drop(node_guard);
816                let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
817                let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
818                node_guard = node_guard_new;
819                // node is left in this case
820                self.coalesce(node_guard, right_sibling_guard, parent_guard, context)?;
821            } else {
822                let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
823                self.coalesce(node_guard, right_sibling_guard, parent_guard, context)?;
824            }
825        }
826
827        Ok(())
828    }
829
830    fn coalesce(
831        &self,
832        mut left_guard: WritePageGuard,
833        right_guard: WritePageGuard,
834        mut parent_guard: WritePageGuard,
835        context: &mut Context,
836    ) -> QuillSQLResult<()> {
837        let relation = self.relation_ident();
838        let (mut left_page, _) =
839            BPlusTreePageCodec::decode(left_guard.data(), self.key_schema.clone())?;
840        let (mut right_page, _) =
841            BPlusTreePageCodec::decode(right_guard.data(), self.key_schema.clone())?;
842        let (mut parent_page, _) =
843            BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
844
845        let right_page_id = right_guard.page_id();
846        let middle_key = match parent_page.remove(right_page_id) {
847            Some((k, _)) => k,
848            None => {
849                return Err(QuillSQLError::Internal(
850                    "coalesce: parent missing right child".to_string(),
851                ));
852            }
853        };
854
855        let child_payload = match (&mut left_page, &mut right_page) {
856            (BPlusTreePage::Leaf(left), BPlusTreePage::Leaf(right)) => {
857                left.merge(right);
858                left.header.version += 1;
859                IndexRecordPayload::LeafMerge(IndexLeafMergePayload {
860                    relation: relation.clone(),
861                    left_page_id: left_guard.page_id(),
862                    right_page_id,
863                    leaf_max_size: self.leaf_max_size,
864                    left_next_page_id: left.header.next_page_id,
865                    entries: left
866                        .array
867                        .iter()
868                        .map(|(key, rid)| IndexLeafSplitEntryPayload {
869                            key_data: TupleCodec::encode(key),
870                            rid: *rid,
871                        })
872                        .collect(),
873                })
874            }
875            (BPlusTreePage::Internal(left), BPlusTreePage::Internal(right)) => {
876                left.merge(middle_key, right);
877                left.header.next_page_id = right.header.next_page_id;
878                left.high_key = right.high_key.clone();
879                left.header.version += 1;
880                IndexRecordPayload::InternalMerge(IndexInternalMergePayload {
881                    relation: relation.clone(),
882                    left_page_id: left_guard.page_id(),
883                    right_page_id,
884                    internal_max_size: self.internal_max_size,
885                    left_entries: left
886                        .array
887                        .iter()
888                        .map(|(tuple, child)| IndexInternalEntryPayload {
889                            key_data: TupleCodec::encode(tuple),
890                            child_page_id: *child,
891                        })
892                        .collect(),
893                    high_key: left.high_key.as_ref().map(|t| TupleCodec::encode(t)),
894                    next_page_id: left.header.next_page_id,
895                })
896            }
897            _ => unreachable!("Mismatched page types in coalesce"),
898        };
899
900        let child_lsn = self.append_index_record(child_payload)?;
901        let left_encoded = BPlusTreePageCodec::encode(&left_page);
902        self.wal_overwrite_page(&mut left_guard, left_encoded, child_lsn)?;
903
904        parent_page.header.version += 1;
905        let parent_payload = IndexRecordPayload::ParentDelete(IndexParentDeletePayload {
906            relation: relation.clone(),
907            parent_page_id: parent_guard.page_id(),
908            child_page_id: right_page_id,
909        });
910        let parent_lsn = self.append_index_record(parent_payload)?;
911        let parent_encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
912        self.wal_overwrite_page(&mut parent_guard, parent_encoded, parent_lsn)?;
913        drop(left_guard);
914        drop(right_guard);
915        self.buffer_pool.delete_page(right_page_id)?;
916
917        // After merging, if the parent becomes a root with only one child,
918        // it needs to be adjusted to shrink the tree's height.
919        if context.write_set.is_empty() && parent_page.header.current_size == 1 {
920            self.adjust_root(parent_guard)?;
921        } else if parent_page.header.current_size < parent_page.min_size() {
922            // Otherwise, recursively handle underflow in the parent.
923            self.handle_underflow(parent_guard, context)?;
924        }
925        Ok(())
926    }
927
928    fn redistribute(
929        &self,
930        mut from_guard: WritePageGuard,
931        mut to_guard: WritePageGuard,
932        mut parent_guard: WritePageGuard,
933        parent_idx_of_to_node: usize,
934        from_is_left_sibling: bool,
935    ) -> QuillSQLResult<()> {
936        let relation = self.relation_ident();
937        let (mut from_page, _) =
938            BPlusTreePageCodec::decode(from_guard.data(), self.key_schema.clone())?;
939        let (mut to_page, _) =
940            BPlusTreePageCodec::decode(to_guard.data(), self.key_schema.clone())?;
941        let (mut parent_page, _) =
942            BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
943
944        let (child_payload, parent_child_id, parent_key_tuple) = if from_is_left_sibling {
945            let separator_idx = parent_idx_of_to_node;
946            match (&mut to_page, &mut from_page) {
947                (BPlusTreePage::Leaf(to_leaf), BPlusTreePage::Leaf(from_leaf)) => {
948                    let (moved_key, moved_rid) = from_leaf.remove_last_kv();
949                    let moved_payload = IndexLeafSplitEntryPayload {
950                        key_data: TupleCodec::encode(&moved_key),
951                        rid: moved_rid,
952                    };
953                    to_leaf.array.insert(0, (moved_key, moved_rid));
954                    to_leaf.header.current_size += 1;
955                    parent_page.array[separator_idx].0 = to_leaf.key_at(0).clone();
956                    to_leaf.header.version += 1;
957                    from_leaf.header.version += 1;
958                    (
959                        IndexRecordPayload::LeafRedistribute(IndexLeafRedistributePayload {
960                            relation: relation.clone(),
961                            from_page_id: from_guard.page_id(),
962                            to_page_id: to_guard.page_id(),
963                            from_is_left: true,
964                            moved_entry: moved_payload,
965                        }),
966                        to_guard.page_id(),
967                        parent_page.array[separator_idx].0.clone(),
968                    )
969                }
970                (BPlusTreePage::Internal(to_internal), BPlusTreePage::Internal(from_internal)) => {
971                    let item_to_move = from_internal.remove_last_kv();
972                    let separator_key_in_parent = parent_page.key_at(separator_idx).clone();
973                    let from_old_sentinel = from_internal.value_at(0);
974                    let to_old_sentinel = to_internal.value_at(0);
975                    let moved_payload = IndexInternalEntryPayload {
976                        key_data: TupleCodec::encode(&item_to_move.0),
977                        child_page_id: item_to_move.1,
978                    };
979                    to_internal.array.insert(
980                        1,
981                        (separator_key_in_parent.clone(), to_internal.value_at(0)),
982                    );
983                    to_internal.array[0].1 = item_to_move.1;
984                    parent_page.array[separator_idx].0 = item_to_move.0.clone();
985                    to_internal.header.current_size += 1;
986                    to_internal.header.version += 1;
987                    from_internal.header.version += 1;
988                    self.refresh_internal_child_fence(
989                        &parent_page,
990                        to_guard.page_id(),
991                        to_internal,
992                    )?;
993                    self.refresh_internal_child_fence(
994                        &parent_page,
995                        from_guard.page_id(),
996                        from_internal,
997                    )?;
998                    (
999                        IndexRecordPayload::InternalRedistribute(
1000                            IndexInternalRedistributePayload {
1001                                relation: relation.clone(),
1002                                from_page_id: from_guard.page_id(),
1003                                to_page_id: to_guard.page_id(),
1004                                parent_page_id: parent_guard.page_id(),
1005                                from_is_left: true,
1006                                from_old_sentinel,
1007                                to_old_sentinel,
1008                                moved_entry: moved_payload,
1009                                separator_key_data: TupleCodec::encode(&separator_key_in_parent),
1010                                to_new_high_key: to_internal
1011                                    .high_key
1012                                    .as_ref()
1013                                    .map(|t| TupleCodec::encode(t)),
1014                                to_new_next_page_id: to_internal.header.next_page_id,
1015                                from_new_high_key: from_internal
1016                                    .high_key
1017                                    .as_ref()
1018                                    .map(|t| TupleCodec::encode(t)),
1019                                from_new_next_page_id: from_internal.header.next_page_id,
1020                            },
1021                        ),
1022                        to_guard.page_id(),
1023                        parent_page.array[separator_idx].0.clone(),
1024                    )
1025                }
1026                _ => return Err(QuillSQLError::Internal("Mismatched page types".to_string())),
1027            }
1028        } else {
1029            let separator_idx = parent_idx_of_to_node + 1;
1030            match (&mut to_page, &mut from_page) {
1031                (BPlusTreePage::Leaf(to_leaf), BPlusTreePage::Leaf(from_leaf)) => {
1032                    let (moved_key, moved_rid) = from_leaf.remove_first_kv();
1033                    let moved_payload = IndexLeafSplitEntryPayload {
1034                        key_data: TupleCodec::encode(&moved_key),
1035                        rid: moved_rid,
1036                    };
1037                    to_leaf.array.push((moved_key, moved_rid));
1038                    to_leaf.header.current_size += 1;
1039                    parent_page.array[separator_idx].0 = from_leaf.key_at(0).clone();
1040                    to_leaf.header.version += 1;
1041                    from_leaf.header.version += 1;
1042                    (
1043                        IndexRecordPayload::LeafRedistribute(IndexLeafRedistributePayload {
1044                            relation: relation.clone(),
1045                            from_page_id: from_guard.page_id(),
1046                            to_page_id: to_guard.page_id(),
1047                            from_is_left: false,
1048                            moved_entry: moved_payload,
1049                        }),
1050                        from_guard.page_id(),
1051                        parent_page.array[separator_idx].0.clone(),
1052                    )
1053                }
1054                (BPlusTreePage::Internal(to_internal), BPlusTreePage::Internal(from_internal)) => {
1055                    let separator_key_in_parent = parent_page.key_at(separator_idx).clone();
1056                    let item_to_move = from_internal.remove_first_kv();
1057                    let from_old_sentinel = from_internal.value_at(0);
1058                    let to_old_sentinel = to_internal.value_at(0);
1059                    let moved_payload = IndexInternalEntryPayload {
1060                        key_data: TupleCodec::encode(&item_to_move.0),
1061                        child_page_id: item_to_move.1,
1062                    };
1063                    to_internal
1064                        .array
1065                        .push((separator_key_in_parent.clone(), from_internal.value_at(0)));
1066                    parent_page.array[separator_idx].0 = item_to_move.0.clone();
1067                    from_internal.array[0].1 = item_to_move.1;
1068                    to_internal.header.current_size += 1;
1069                    to_internal.header.version += 1;
1070                    from_internal.header.version += 1;
1071                    self.refresh_internal_child_fence(
1072                        &parent_page,
1073                        to_guard.page_id(),
1074                        to_internal,
1075                    )?;
1076                    self.refresh_internal_child_fence(
1077                        &parent_page,
1078                        from_guard.page_id(),
1079                        from_internal,
1080                    )?;
1081                    (
1082                        IndexRecordPayload::InternalRedistribute(
1083                            IndexInternalRedistributePayload {
1084                                relation: relation.clone(),
1085                                from_page_id: from_guard.page_id(),
1086                                to_page_id: to_guard.page_id(),
1087                                parent_page_id: parent_guard.page_id(),
1088                                from_is_left: false,
1089                                from_old_sentinel,
1090                                to_old_sentinel,
1091                                moved_entry: moved_payload,
1092                                separator_key_data: TupleCodec::encode(&separator_key_in_parent),
1093                                to_new_high_key: to_internal
1094                                    .high_key
1095                                    .as_ref()
1096                                    .map(|t| TupleCodec::encode(t)),
1097                                to_new_next_page_id: to_internal.header.next_page_id,
1098                                from_new_high_key: from_internal
1099                                    .high_key
1100                                    .as_ref()
1101                                    .map(|t| TupleCodec::encode(t)),
1102                                from_new_next_page_id: from_internal.header.next_page_id,
1103                            },
1104                        ),
1105                        from_guard.page_id(),
1106                        parent_page.array[separator_idx].0.clone(),
1107                    )
1108                }
1109                _ => return Err(QuillSQLError::Internal("Mismatched page types".to_string())),
1110            }
1111        };
1112
1113        let child_lsn = self.append_index_record(child_payload)?;
1114        let from_encoded = BPlusTreePageCodec::encode(&from_page);
1115        self.wal_overwrite_page(&mut from_guard, from_encoded, child_lsn)?;
1116        let to_encoded = BPlusTreePageCodec::encode(&to_page);
1117        self.wal_overwrite_page(&mut to_guard, to_encoded, child_lsn)?;
1118
1119        parent_page.header.version += 1;
1120        let parent_payload = IndexRecordPayload::ParentUpdate(IndexParentUpdatePayload {
1121            relation,
1122            parent_page_id: parent_guard.page_id(),
1123            child_page_id: parent_child_id,
1124            key_data: TupleCodec::encode(&parent_key_tuple),
1125        });
1126        let parent_lsn = self.append_index_record(parent_payload)?;
1127        let parent_encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
1128        self.wal_overwrite_page(&mut parent_guard, parent_encoded, parent_lsn)?;
1129
1130        Ok(())
1131    }
1132
1133    fn refresh_internal_child_fence(
1134        &self,
1135        parent_page: &BPlusTreeInternalPage,
1136        child_page_id: PageId,
1137        child_page: &mut BPlusTreeInternalPage,
1138    ) -> QuillSQLResult<()> {
1139        let Some(child_idx) = parent_page.value_index(child_page_id) else {
1140            return Err(QuillSQLError::Internal(
1141                "redistribute: child missing from parent".to_string(),
1142            ));
1143        };
1144        let size = parent_page.header.current_size as usize;
1145        if child_idx + 1 < size {
1146            child_page.high_key = Some(parent_page.key_at(child_idx + 1).clone());
1147            child_page.header.next_page_id = parent_page.value_at(child_idx + 1);
1148        } else {
1149            child_page.high_key = parent_page.high_key.clone();
1150            child_page.header.next_page_id = parent_page.header.next_page_id;
1151        }
1152        Ok(())
1153    }
1154
1155    fn adjust_root(&self, root_guard: WritePageGuard) -> QuillSQLResult<()> {
1156        let (root_page, _) =
1157            BPlusTreePageCodec::decode(root_guard.data(), self.key_schema.clone())?;
1158
1159        if let BPlusTreePage::Internal(root_internal) = root_page {
1160            if root_internal.header.current_size == 1 {
1161                let payload = IndexRecordPayload::RootAdopt(IndexRootAdoptPayload {
1162                    relation: self.relation_ident(),
1163                    new_root_page_id: root_internal.value_at(0),
1164                });
1165                let wal_lsn = self.append_index_record(payload)?;
1166                // The lock is already held by the caller (e.g., delete).
1167                // Re-acquiring it would cause a deadlock.
1168                let new_root_id = root_internal.value_at(0);
1169                // Drop page guard before touching header to avoid page<->header lock inversion
1170                drop(root_guard);
1171                let _lock = self.header_page_lock.write();
1172                self.set_root_page_id(new_root_id, wal_lsn)?;
1173                // Delay old-root physical deallocation to avoid races with concurrent readers
1174            }
1175        } else if let BPlusTreePage::Leaf(root_leaf) = root_page {
1176            if root_leaf.header.current_size == 0 {
1177                let payload = IndexRecordPayload::RootReset(IndexRootResetPayload {
1178                    relation: self.relation_ident(),
1179                });
1180                let wal_lsn = self.append_index_record(payload)?;
1181                // The lock is already held by the caller.
1182                drop(root_guard);
1183                let _lock = self.header_page_lock.write();
1184                self.set_root_page_id(INVALID_PAGE_ID, wal_lsn)?;
1185                // Delay old-root physical deallocation to avoid races with concurrent readers
1186            }
1187        }
1188        Ok(())
1189    }
1190
1191    /// Internal: create the very first node when the tree is empty.
1192    fn start_new_tree(&self) -> QuillSQLResult<()> {
1193        let mut root_guard = self.buffer_pool.new_page()?;
1194        let root_page_id = root_guard.page_id();
1195        let mut leaf_page = BPlusTreeLeafPage::new(self.key_schema.clone(), self.leaf_max_size);
1196        leaf_page.header.version += 1;
1197        let payload = IndexRecordPayload::RootInstallLeaf(IndexRootInstallLeafPayload {
1198            relation: self.relation_ident(),
1199            page_id: root_page_id,
1200            leaf_max_size: self.leaf_max_size,
1201            next_page_id: leaf_page.header.next_page_id,
1202            entries: Vec::new(),
1203        });
1204        let wal_lsn = self.append_index_record(payload)?;
1205        let encoded_data = BPlusTreeLeafPageCodec::encode(&leaf_page);
1206        self.wal_overwrite_page(&mut root_guard, encoded_data, wal_lsn)?;
1207        // Update root id (release page latch before touching header to avoid lock inversion).
1208        // Precondition: caller holds header_page_lock.
1209        drop(root_guard);
1210        self.set_root_page_id(root_page_id, wal_lsn)?;
1211        Ok(())
1212    }
1213
1214    fn find_leaf_page_optimistic(&self, key: &Tuple) -> QuillSQLResult<ReadPageGuard> {
1215        // OLC + B-link: version-check each step; if changed, restart with bounded backoff.
1216        let mut restarts = 0usize;
1217        'restart: loop {
1218            let mut current_guard = self.buffer_pool.fetch_page_read(self.get_root_page_id()?)?;
1219            loop {
1220                let decoded =
1221                    BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone());
1222                if decoded.is_err() {
1223                    drop(current_guard);
1224                    restarts += 1;
1225                    if restarts > MAX_OLC_RESTARTS {
1226                        std::thread::sleep(std::time::Duration::from_micros(
1227                            OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1228                        ));
1229                        restarts = 0;
1230                    }
1231                    continue 'restart;
1232                }
1233                let (page, _) = decoded.unwrap();
1234                match page {
1235                    BPlusTreePage::Internal(internal) => {
1236                        // Double-read header-only version for OLC
1237                        let (hdr1, _) =
1238                            BPlusTreeInternalPageCodec::decode_header_only(current_guard.data())?;
1239                        let v1 = hdr1.version;
1240                        if let Some(ref hk) = internal.high_key {
1241                            if key >= hk && internal.header.next_page_id != INVALID_PAGE_ID {
1242                                let sib = self
1243                                    .buffer_pool
1244                                    .fetch_page_read(internal.header.next_page_id)?;
1245                                drop(current_guard);
1246                                current_guard = sib;
1247                                continue;
1248                            }
1249                        }
1250                        // Byte-based child lookup to avoid re-decode costs
1251                        let next_page_id = BPlusTreeInternalPageCodec::lookup_child_from_bytes(
1252                            current_guard.data(),
1253                            self.key_schema.clone(),
1254                            key,
1255                        )?;
1256                        let (hdr2, _) =
1257                            BPlusTreeInternalPageCodec::decode_header_only(current_guard.data())?;
1258                        let v2 = hdr2.version;
1259                        if v1 != v2 {
1260                            drop(current_guard);
1261                            restarts += 1;
1262                            if restarts > MAX_OLC_RESTARTS {
1263                                std::thread::sleep(std::time::Duration::from_micros(
1264                                    OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1265                                ));
1266                                restarts = 0;
1267                            }
1268                            continue 'restart;
1269                        }
1270                        let child_guard = self.buffer_pool.fetch_page_read(next_page_id)?;
1271                        drop(current_guard);
1272                        current_guard = child_guard;
1273                    }
1274                    BPlusTreePage::Leaf(_leaf) => {
1275                        let (h1, _) =
1276                            BPlusTreeLeafPageCodec::decode_header_only(current_guard.data())?;
1277                        let v1 = h1.version;
1278                        let (h2, _) =
1279                            BPlusTreeLeafPageCodec::decode_header_only(current_guard.data())?;
1280                        let v2 = h2.version;
1281                        if v1 != v2 {
1282                            drop(current_guard);
1283                            restarts += 1;
1284                            if restarts > MAX_OLC_RESTARTS {
1285                                std::thread::sleep(std::time::Duration::from_micros(
1286                                    OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1287                                ));
1288                                restarts = 0;
1289                            }
1290                            continue 'restart;
1291                        }
1292                        return Ok(current_guard);
1293                    }
1294                }
1295            }
1296        }
1297    }
1298
1299    pub fn find_first_leaf_page(&self) -> QuillSQLResult<ReadPageGuard> {
1300        let mut current_page_id = self.get_root_page_id()?;
1301        if current_page_id == INVALID_PAGE_ID {
1302            return Err(QuillSQLError::Internal("Tree is empty".to_string()));
1303        }
1304
1305        loop {
1306            let guard = self.buffer_pool.fetch_page_read(current_page_id)?;
1307            let (page, _) = BPlusTreePageCodec::decode(guard.data(), self.key_schema.clone())?;
1308
1309            match page {
1310                BPlusTreePage::Internal(internal) => {
1311                    current_page_id = internal.value_at(0);
1312                }
1313                BPlusTreePage::Leaf(_) => {
1314                    return Ok(guard);
1315                }
1316            }
1317        }
1318    }
1319
1320    /// 内部辅助函数:从根节点开始遍历,找到并返回包含目标 key 的
1321    /// 叶子节点的只读保护器 (ReadPageGuard)。
1322    ///
1323    /// 这个函数通过 ReadPageGuard 的 RAII 特性,在遍历时实现了闩锁耦合。
1324    pub fn find_leaf_page_for_iterator(
1325        &self,
1326        key: &Tuple,
1327        start_page_id: PageId,
1328    ) -> QuillSQLResult<ReadPageGuard> {
1329        let mut current_page_id = start_page_id;
1330        if current_page_id == INVALID_PAGE_ID {
1331            return Err(QuillSQLError::Storage("btree: empty tree".to_string()));
1332        }
1333
1334        loop {
1335            // a. 为当前页面获取一个 ReadPageGuard。
1336            //    这会自动 pin 住页面并加上读锁。
1337            let current_guard = self.buffer_pool.fetch_page_read(current_page_id)?;
1338
1339            // b. 解码页面内容以判断其类型。
1340            let (page_content, _) =
1341                BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone())?;
1342
1343            match page_content {
1344                // c. 如果是内部节点...
1345                BPlusTreePage::Internal(_internal_page) => {
1346                    // find next child id using bytes path
1347                    current_page_id =
1348                        crate::storage::codec::BPlusTreeInternalPageCodec::lookup_child_from_bytes(
1349                            current_guard.data(),
1350                            self.key_schema.clone(),
1351                            key,
1352                        )?;
1353                }
1354                // d. 如果是叶子节点...
1355                BPlusTreePage::Leaf(_) => {
1356                    // 我们已经到达了树的最底层。返回这个页面的 Guard,
1357                    // 它的所有权会被转移给调用者 (get 方法),从而延长锁的生命周期。
1358                    return Ok(current_guard);
1359                }
1360            }
1361        }
1362    }
1363
1364    /// 内部方法:分裂一个节点,并可能递归地向上传播分裂。
1365    fn split<'a>(
1366        &'a self,
1367        mut page_guard: WritePageGuard,
1368        context: &mut Context<'a>,
1369    ) -> QuillSQLResult<()> {
1370        loop {
1371            let page_id = page_guard.page_id();
1372            let (mut page, _) =
1373                BPlusTreePageCodec::decode(page_guard.data(), self.key_schema.clone())?;
1374
1375            let mut new_page_guard = self.buffer_pool.new_page()?;
1376            let new_page_id = new_page_guard.page_id();
1377
1378            let (middle_key, split_wal_lsn) = match &mut page {
1379                BPlusTreePage::Leaf(leaf_page) => {
1380                    let split_at = leaf_page.header.current_size as usize / 2;
1381                    let mut new_leaf =
1382                        BPlusTreeLeafPage::new(self.key_schema.clone(), self.leaf_max_size);
1383                    new_leaf.batch_insert(leaf_page.split_off(split_at));
1384                    new_leaf.header.next_page_id = leaf_page.header.next_page_id;
1385                    leaf_page.header.next_page_id = new_page_id;
1386                    let entries_payload = new_leaf
1387                        .array
1388                        .iter()
1389                        .map(|(key, rid)| IndexLeafSplitEntryPayload {
1390                            key_data: TupleCodec::encode(key),
1391                            rid: *rid,
1392                        })
1393                        .collect::<Vec<_>>();
1394                    let payload = IndexRecordPayload::LeafSplit(IndexLeafSplitPayload {
1395                        relation: self.relation_ident(),
1396                        left_page_id: page_id,
1397                        right_page_id: new_page_id,
1398                        leaf_max_size: self.leaf_max_size,
1399                        split_index: split_at as u16,
1400                        right_next_page_id: new_leaf.header.next_page_id,
1401                        entries: entries_payload,
1402                    });
1403                    let wal_lsn = self.append_index_record(payload)?;
1404                    let new_data = BPlusTreeLeafPageCodec::encode(&new_leaf);
1405                    self.wal_overwrite_page(&mut new_page_guard, new_data, wal_lsn)?;
1406                    leaf_page.header.version += 1;
1407                    (new_leaf.key_at(0).clone(), wal_lsn)
1408                }
1409                BPlusTreePage::Internal(internal_page) => {
1410                    let mut new_internal =
1411                        BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size);
1412
1413                    let num_pointers = internal_page.header.current_size as usize;
1414                    let promote_idx = 1 + (num_pointers.saturating_sub(1) / 2);
1415
1416                    let mut moved = internal_page.split_off(promote_idx);
1417
1418                    let (middle_key, right_sentinel_ptr) = {
1419                        let pair = moved.get(0).ok_or(QuillSQLError::Internal(
1420                            "Internal split moved entries empty".to_string(),
1421                        ))?;
1422                        (pair.0.clone(), pair.1)
1423                    };
1424
1425                    new_internal.insert(Tuple::empty(self.key_schema.clone()), right_sentinel_ptr);
1426                    if moved.len() > 1 {
1427                        new_internal.batch_insert(moved.split_off(1));
1428                    }
1429
1430                    let old_high_key = internal_page.high_key.clone();
1431                    let old_next = internal_page.header.next_page_id;
1432                    internal_page.high_key = Some(middle_key.clone());
1433                    new_internal.high_key = old_high_key;
1434                    new_internal.header.next_page_id = old_next;
1435                    internal_page.header.next_page_id = new_page_id;
1436                    internal_page.header.version += 1;
1437
1438                    let right_entries = new_internal
1439                        .array
1440                        .iter()
1441                        .map(|(tuple, child)| IndexInternalEntryPayload {
1442                            key_data: TupleCodec::encode(tuple),
1443                            child_page_id: *child,
1444                        })
1445                        .collect::<Vec<_>>();
1446                    let payload = IndexRecordPayload::InternalSplit(IndexInternalSplitPayload {
1447                        relation: self.relation_ident(),
1448                        left_page_id: page_id,
1449                        left_new_size: internal_page.header.current_size as u16,
1450                        left_high_key: internal_page
1451                            .high_key
1452                            .as_ref()
1453                            .map(|t| TupleCodec::encode(t)),
1454                        left_next_page_id: internal_page.header.next_page_id,
1455                        right_page_id: new_page_id,
1456                        internal_max_size: self.internal_max_size,
1457                        right_entries,
1458                        right_high_key: new_internal
1459                            .high_key
1460                            .as_ref()
1461                            .map(|t| TupleCodec::encode(t)),
1462                        right_next_page_id: new_internal.header.next_page_id,
1463                    });
1464                    let wal_lsn = self.append_index_record(payload)?;
1465                    let new_data = BPlusTreeInternalPageCodec::encode(&new_internal);
1466                    self.wal_overwrite_page(&mut new_page_guard, new_data, wal_lsn)?;
1467                    (middle_key, wal_lsn)
1468                }
1469            };
1470
1471            // 写回修改后的旧页面和新页面(保持子页锁直到父更新完成)
1472            let old_page_data = BPlusTreePageCodec::encode(&page);
1473            self.wal_overwrite_page(&mut page_guard, old_page_data, split_wal_lsn)?;
1474
1475            // 若当前分裂页是根(无父在 write_set),则创建新的根
1476            if page_guard.page_id() == self.get_root_page_id()? {
1477                let mut new_root_guard = self.buffer_pool.new_page()?;
1478                let new_root_id = new_root_guard.page_id();
1479                let mut new_root_page =
1480                    BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size);
1481
1482                new_root_page.insert(Tuple::empty(self.key_schema.clone()), page_id);
1483                new_root_page.insert(middle_key, new_page_id);
1484                new_root_page.header.version += 1;
1485
1486                let entries_payload = new_root_page
1487                    .array
1488                    .iter()
1489                    .map(|(key, child)| IndexInternalEntryPayload {
1490                        key_data: TupleCodec::encode(key),
1491                        child_page_id: *child,
1492                    })
1493                    .collect::<Vec<_>>();
1494                let payload =
1495                    IndexRecordPayload::RootInstallInternal(IndexRootInstallInternalPayload {
1496                        relation: self.relation_ident(),
1497                        page_id: new_root_id,
1498                        internal_max_size: self.internal_max_size,
1499                        entries: entries_payload,
1500                        high_key: new_root_page
1501                            .high_key
1502                            .as_ref()
1503                            .map(|t| TupleCodec::encode(t)),
1504                        next_page_id: new_root_page.header.next_page_id,
1505                    });
1506                let wal_lsn = self.append_index_record(payload)?;
1507                let encoded = BPlusTreeInternalPageCodec::encode(&new_root_page);
1508                self.wal_overwrite_page(&mut new_root_guard, encoded, wal_lsn)?;
1509
1510                // Avoid deadlock: release child page latches before taking header lock
1511                drop(new_page_guard);
1512                drop(page_guard);
1513
1514                let _lock = self.header_page_lock.write();
1515                self.set_root_page_id(new_root_id, wal_lsn)?;
1516                return Ok(());
1517            }
1518
1519            // 否则,更新父节点
1520            let mut parent_guard = match context.write_set.pop_back() {
1521                Some(g) => g,
1522                None => {
1523                    // Parent missing due to concurrent structural change; restart from root
1524                    return Err(QuillSQLError::Internal(
1525                        "split: missing parent in context".to_string(),
1526                    ));
1527                }
1528            };
1529            let (mut parent_page, _) =
1530                BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
1531            // Insert the separator key right after the original left child (page_id)
1532            if parent_page.value_index(page_id).is_none() {
1533                // Parent no longer contains this child; signal upper layer to retry from root
1534                return Err(QuillSQLError::Internal(
1535                    "split: parent no longer contains left child".to_string(),
1536                ));
1537            }
1538            parent_page.insert_after(page_id, middle_key.clone(), new_page_id);
1539            parent_page.header.version += 1;
1540
1541            let parent_payload = IndexRecordPayload::ParentInsert(IndexParentInsertPayload {
1542                relation: self.relation_ident(),
1543                parent_page_id: parent_guard.page_id(),
1544                left_child_page_id: page_id,
1545                right_child_page_id: new_page_id,
1546                key_data: TupleCodec::encode(&middle_key),
1547            });
1548            let parent_lsn = self.append_index_record(parent_payload)?;
1549
1550            let encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
1551            self.wal_overwrite_page(&mut parent_guard, encoded, parent_lsn)?;
1552
1553            if parent_page.is_full() {
1554                // 子页到父的结构已一致,现在可释放子页锁,继续向上分裂父
1555                drop(new_page_guard);
1556                drop(page_guard);
1557                page_guard = parent_guard;
1558                // 不要释放更上层的锁,保持保守闩锁直到分裂完成
1559            } else {
1560                // 更新完成,释放子页锁,结束
1561                drop(new_page_guard);
1562                drop(page_guard);
1563                return Ok(());
1564            }
1565        }
1566    }
1567}
1568
1569#[cfg(test)]
1570mod tests {
1571    use parking_lot::deadlock;
1572    use std::collections::HashSet;
1573    use std::sync::Arc;
1574    use std::sync::Once;
1575    use std::time::Duration;
1576    use tempfile::TempDir;
1577
1578    use crate::catalog::SchemaRef;
1579    use crate::config::WalConfig;
1580    use crate::recovery::{RecoveryManager, WalManager};
1581    use crate::storage::disk_manager::DiskManager;
1582    use crate::storage::disk_scheduler::DiskScheduler;
1583    use crate::storage::index::btree_index::TreeIndexIterator;
1584    use crate::storage::page::{BPlusTreePage, RecordId};
1585    use crate::storage::tuple::Tuple;
1586    use crate::{
1587        buffer::BufferManager,
1588        catalog::{Column, DataType, Schema},
1589        storage::codec::BPlusTreePageCodec,
1590    };
1591
1592    use super::BPlusTreeIndex;
1593
1594    fn ensure_deadlock_watchdog() {
1595        static START: Once = Once::new();
1596        START.call_once(|| {
1597            std::thread::spawn(|| loop {
1598                std::thread::sleep(Duration::from_millis(500));
1599                let deadlocks = deadlock::check_deadlock();
1600                if !deadlocks.is_empty() {
1601                    eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
1602                    for (i, threads) in deadlocks.iter().enumerate() {
1603                        eprintln!("Cycle {}:", i);
1604                        for t in threads {
1605                            eprintln!("  ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
1606                        }
1607                    }
1608                    panic!("deadlock detected");
1609                }
1610            });
1611        });
1612    }
1613
1614    /// Creates a test environment with specified buffer pool size and B+ tree parameters
1615    fn create_test_index(
1616        buffer_pool_size: usize,
1617        internal_max_size: u32,
1618        leaf_max_size: u32,
1619    ) -> (TempDir, BPlusTreeIndex, SchemaRef) {
1620        let temp_dir = TempDir::new().unwrap();
1621        let temp_path = temp_dir.path().join("test.db");
1622
1623        let key_schema = Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1624        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1625        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1626        let buffer_pool = Arc::new(BufferManager::new(buffer_pool_size, disk_scheduler));
1627        let index = BPlusTreeIndex::new(
1628            key_schema.clone(),
1629            buffer_pool,
1630            internal_max_size,
1631            leaf_max_size,
1632        );
1633
1634        (temp_dir, index, key_schema)
1635    }
1636
1637    /// Helper: build bpm + wal on a fresh temp dir
1638    fn setup_with_wal(
1639        db_path: &std::path::Path,
1640        wal_dir: &std::path::Path,
1641        bpm_pages: usize,
1642    ) -> (Arc<BufferManager>, Arc<WalManager>, Arc<DiskScheduler>) {
1643        let dm = Arc::new(DiskManager::try_new(db_path).unwrap());
1644        let scheduler = Arc::new(DiskScheduler::new(dm));
1645        let mut cfg = WalConfig::default();
1646        cfg.directory = wal_dir.to_path_buf();
1647        let wal = Arc::new(WalManager::new(cfg, None, None).unwrap());
1648        let bpm = Arc::new(BufferManager::new(bpm_pages, scheduler.clone()));
1649        bpm.set_wal_manager(wal.clone());
1650        (bpm, wal, scheduler)
1651    }
1652
1653    /// 根据 key 生成与 BusTub 一致的 RID,用于断言
1654    fn rid_from_key(key: i64) -> RecordId {
1655        let value = key & 0xFFFFFFFF;
1656        RecordId::new((key >> 32) as u32, value as u32)
1657    }
1658
1659    #[test]
1660    fn test_index_recovery_replays_wal_split() {
1661        ensure_deadlock_watchdog();
1662        let tmp = TempDir::new().unwrap();
1663        let db_path = tmp.path().join("test.db");
1664        let wal_dir = tmp.path().join("wal");
1665
1666        // 1) 初始运行:带 WAL 的环境,强制小页触发分裂
1667        let (bpm, wal, scheduler) = setup_with_wal(&db_path, &wal_dir, 64);
1668        let key_schema: SchemaRef =
1669            Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1670        let index = BPlusTreeIndex::new(key_schema.clone(), bpm.clone(), 2, 3);
1671        let header_pid = index.header_page_id; // 记录 header 页 id,崩溃后用来 reopen
1672
1673        let keys: Vec<i64> = (1..=30).collect();
1674        for k in &keys {
1675            let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1676            index.insert(&t, rid_from_key(*k)).unwrap();
1677        }
1678
1679        // 刷 WAL 到耐久,模拟崩溃
1680        let _ = wal.flush(None).unwrap();
1681        drop(index);
1682        drop(bpm);
1683        drop(wal);
1684        drop(scheduler);
1685
1686        // 2) 恢复阶段:重放 WAL 到数据文件
1687        let dm2 = Arc::new(DiskManager::try_new(&db_path).unwrap());
1688        let scheduler2 = Arc::new(DiskScheduler::new(dm2));
1689        let mut cfg2 = WalConfig::default();
1690        cfg2.directory = wal_dir.clone();
1691        let wal2 = Arc::new(WalManager::new(cfg2, None, None).unwrap());
1692        let rm = RecoveryManager::new(wal2.clone(), scheduler2.clone());
1693        let _summary = rm.replay().unwrap();
1694
1695        // 3) 打开新的 BufferPool,reopen 索引并验证所有键可查询 & 有序遍历
1696        let bpm2 = Arc::new(BufferManager::new(128, scheduler2.clone()));
1697        let reopened = BPlusTreeIndex::open(key_schema.clone(), bpm2.clone(), 2, 3, header_pid);
1698
1699        for k in &keys {
1700            let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1701            let got = reopened
1702                .get(&t)
1703                .unwrap()
1704                .expect("missing key after recovery");
1705            assert_eq!(got.page_id, rid_from_key(*k).page_id);
1706            assert_eq!(got.slot_num, rid_from_key(*k).slot_num);
1707        }
1708
1709        // 验证遍历顺序
1710        let index_arc = Arc::new(reopened);
1711        let mut it = TreeIndexIterator::new(index_arc, ..);
1712        let mut i = 1i64;
1713        while let Some(rid) = it.next().unwrap() {
1714            assert_eq!(rid.slot_num, rid_from_key(i).slot_num);
1715            i += 1;
1716        }
1717        assert_eq!(i, 31);
1718    }
1719
1720    #[test]
1721    fn test_index_recovery_replays_wal_delete_merge() {
1722        ensure_deadlock_watchdog();
1723        let tmp = TempDir::new().unwrap();
1724        let db_path = tmp.path().join("delete.db");
1725        let wal_dir = tmp.path().join("wal");
1726
1727        let (bpm, wal, scheduler) = setup_with_wal(&db_path, &wal_dir, 64);
1728        let key_schema: SchemaRef =
1729            Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1730        let index = BPlusTreeIndex::new(key_schema.clone(), bpm.clone(), 2, 3);
1731        let header_pid = index.header_page_id;
1732
1733        let keys: Vec<i64> = (1..=40).collect();
1734        for k in &keys {
1735            let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1736            index.insert(&t, rid_from_key(*k)).unwrap();
1737        }
1738
1739        let deletes: Vec<i64> = (5..=25).collect();
1740        for k in &deletes {
1741            let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1742            index.delete(&t).unwrap();
1743        }
1744
1745        let _ = wal.flush(None).unwrap();
1746        drop(index);
1747        drop(bpm);
1748        drop(wal);
1749        drop(scheduler);
1750
1751        let dm2 = Arc::new(DiskManager::try_new(&db_path).unwrap());
1752        let scheduler2 = Arc::new(DiskScheduler::new(dm2));
1753        let mut cfg2 = WalConfig::default();
1754        cfg2.directory = wal_dir.clone();
1755        let wal2 = Arc::new(WalManager::new(cfg2, None, None).unwrap());
1756        let rm = RecoveryManager::new(wal2.clone(), scheduler2.clone());
1757        let _summary = rm.replay().unwrap();
1758
1759        let bpm2 = Arc::new(BufferManager::new(128, scheduler2.clone()));
1760        let reopened = BPlusTreeIndex::open(key_schema.clone(), bpm2.clone(), 2, 3, header_pid);
1761        let delete_set: HashSet<i64> = deletes.into_iter().collect();
1762
1763        for k in &keys {
1764            let tuple = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1765            let result = reopened.get(&tuple).unwrap();
1766            if delete_set.contains(k) {
1767                assert!(result.is_none(), "key {} should be removed", k);
1768            } else {
1769                let rid = result.expect("missing key after recovery");
1770                assert_eq!(rid.page_id, rid_from_key(*k).page_id);
1771                assert_eq!(rid.slot_num, rid_from_key(*k).slot_num);
1772            }
1773        }
1774    }
1775
1776    /// Helper function to create RID from i64 key (exactly like BusTub's RID construction)
1777    fn create_rid_from_key(key: i64) -> RecordId {
1778        let value = key & 0xFFFFFFFF;
1779        RecordId::new((key >> 32) as u32, value as u32)
1780    }
1781
1782    /// Helper function to create tuple from i64 key
1783    fn create_tuple_from_key(key: i64, schema: SchemaRef) -> Tuple {
1784        Tuple::new(schema, vec![key.into()])
1785    }
1786
1787    /// TEST: BasicInsertTest - equivalent to BusTub's basic insert test
1788    #[test]
1789    fn test_basic_insert() {
1790        let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1791
1792        let key = 42i64;
1793        let tuple = create_tuple_from_key(key, key_schema.clone());
1794        let rid = create_rid_from_key(key);
1795
1796        index.insert(&tuple, rid).unwrap();
1797
1798        let root_page_id = index.get_root_page_id().unwrap();
1799        assert_ne!(root_page_id, crate::buffer::INVALID_PAGE_ID);
1800
1801        let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
1802        let (root_page, _) =
1803            BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
1804
1805        assert!(matches!(root_page, BPlusTreePage::Leaf(_)));
1806
1807        if let BPlusTreePage::Leaf(root_as_leaf) = root_page {
1808            assert_eq!(root_as_leaf.header.current_size, 1);
1809            assert_eq!(root_as_leaf.array[0].0, tuple);
1810            assert_eq!(root_as_leaf.array[0].1, rid);
1811        }
1812    }
1813
1814    /// TEST: InsertTest1NoIterator - equivalent to BusTub's insert test without iterator
1815    #[test]
1816    fn test_insert_no_iterator() {
1817        let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1818
1819        let keys = vec![1i64, 2, 3, 4, 5];
1820        for key in &keys {
1821            let tuple = create_tuple_from_key(*key, key_schema.clone());
1822            let rid = create_rid_from_key(*key);
1823            index.insert(&tuple, rid).unwrap();
1824        }
1825
1826        for key in &keys {
1827            let tuple = create_tuple_from_key(*key, key_schema.clone());
1828            let result = index.get(&tuple).unwrap();
1829            assert!(result.is_some(), "missing key {}", key);
1830
1831            let expected_rid = create_rid_from_key(*key);
1832            let actual_rid = result.unwrap();
1833            assert_eq!(actual_rid.page_id, expected_rid.page_id);
1834            assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1835        }
1836    }
1837
1838    /// TEST: InsertTest2 - insert in reverse order with iterator validation
1839    #[test]
1840    fn test_insert_reverse_order() {
1841        let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1842
1843        let keys = vec![5i64, 4, 3, 2, 1];
1844        for key in &keys {
1845            let tuple = create_tuple_from_key(*key, key_schema.clone());
1846            let rid = create_rid_from_key(*key);
1847            index.insert(&tuple, rid).unwrap();
1848        }
1849
1850        for key in &keys {
1851            let tuple = create_tuple_from_key(*key, key_schema.clone());
1852            let result = index.get(&tuple).unwrap();
1853            assert!(result.is_some(), "missing key {}", key);
1854
1855            let expected_rid = create_rid_from_key(*key);
1856            let actual_rid = result.unwrap();
1857            assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1858        }
1859
1860        // Test iterator order (should be sorted)
1861        let index_arc = Arc::new(index);
1862        let mut iter = TreeIndexIterator::new(index_arc, ..);
1863        let mut current_key = 1i64;
1864        while let Some(rid) = iter.next().unwrap() {
1865            assert_eq!(rid.slot_num as i64, current_key);
1866            current_key += 1;
1867        }
1868        assert_eq!(current_key, keys.len() as i64 + 1);
1869    }
1870
1871    /// TEST: DeleteTestNoIterator - equivalent to BusTub's delete test without iterator
1872    #[test]
1873    fn test_delete_no_iterator() {
1874        let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1875
1876        let keys = vec![1i64, 2, 3, 4, 5];
1877        for key in &keys {
1878            let tuple = create_tuple_from_key(*key, key_schema.clone());
1879            let rid = create_rid_from_key(*key);
1880            index.insert(&tuple, rid).unwrap();
1881        }
1882
1883        // Verify all keys are present and have correct RIDs
1884        for key in &keys {
1885            let tuple = create_tuple_from_key(*key, key_schema.clone());
1886            let result = index.get(&tuple).unwrap();
1887            assert!(result.is_some());
1888
1889            let expected_rid = create_rid_from_key(*key);
1890            let actual_rid = result.unwrap();
1891            assert_eq!(actual_rid.page_id, expected_rid.page_id);
1892            assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1893        }
1894
1895        // Remove some keys - exactly like BusTub test
1896        let remove_keys = vec![1i64, 5, 3, 4];
1897        for key in &remove_keys {
1898            println!("Before deleting key {}:\n{}", key, index.to_dot().unwrap());
1899            let tuple = create_tuple_from_key(*key, key_schema.clone());
1900            index.delete(&tuple).unwrap();
1901            println!("After deleting key {}:\n{}", key, index.to_dot().unwrap());
1902        }
1903
1904        let mut size = 0;
1905        for key in &keys {
1906            let tuple = create_tuple_from_key(*key, key_schema.clone());
1907            let is_present = index.get(&tuple).unwrap().is_some();
1908
1909            if !is_present {
1910                assert!(remove_keys.contains(key));
1911            } else {
1912                assert!(!remove_keys.contains(key));
1913                assert_eq!(
1914                    index.get(&tuple).unwrap().unwrap().slot_num,
1915                    (*key & 0xFFFFFFFF) as u32
1916                );
1917                size += 1;
1918            }
1919        }
1920        assert_eq!(size, 1);
1921
1922        // Remove the remaining key and check if tree becomes empty
1923        let tuple = create_tuple_from_key(2, key_schema.clone());
1924        println!("Before deleting final key 2:\n{}", index.to_dot().unwrap());
1925        index.delete(&tuple).unwrap();
1926        println!("After deleting final key 2:\n{}", index.to_dot().unwrap());
1927        assert!(index.is_empty().unwrap());
1928    }
1929
1930    #[test]
1931    fn test_internal_borrow_from_right_keeps_searchable() {
1932        let (_temp_dir, index, key_schema) = create_test_index(64, 2, 3);
1933
1934        let inserts = [
1935            -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
1936        ];
1937        for key in inserts {
1938            let tuple = create_tuple_from_key(key, key_schema.clone());
1939            let rid = create_rid_from_key(key);
1940            index.insert(&tuple, rid).unwrap();
1941        }
1942
1943        let deletes = [-5, -4, -3, -2, -1, 0, 1, 2];
1944        for key in deletes {
1945            let tuple = create_tuple_from_key(key, key_schema.clone());
1946            index.delete(&tuple).unwrap();
1947        }
1948
1949        let root_page_id = index.get_root_page_id().unwrap();
1950        let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
1951        let (root_page, _) =
1952            BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
1953        let (left_internal_id, right_internal_id) =
1954            if let BPlusTreePage::Internal(root_internal) = root_page {
1955                assert_eq!(root_internal.header.current_size, 2);
1956                (root_internal.value_at(0), root_internal.value_at(1))
1957            } else {
1958                panic!("expected internal root after deletions");
1959            };
1960        drop(root_guard);
1961
1962        let left_guard = index.buffer_pool.fetch_page_read(left_internal_id).unwrap();
1963        let (left_page, _) =
1964            BPlusTreePageCodec::decode(left_guard.data(), key_schema.clone()).unwrap();
1965        if let BPlusTreePage::Internal(left_internal) = left_page {
1966            assert_eq!(
1967                left_internal.high_key,
1968                Some(create_tuple_from_key(7, key_schema.clone()))
1969            );
1970            assert_eq!(left_internal.header.next_page_id, right_internal_id);
1971        } else {
1972            panic!("expected left child to remain internal");
1973        }
1974
1975        let survivor = create_tuple_from_key(3, key_schema.clone());
1976        assert!(index.get(&survivor).unwrap().is_some());
1977    }
1978
1979    /// TEST: SequentialEdgeMixTest - equivalent to BusTub's mixed insert/delete test
1980    #[test]
1981    fn test_sequential_edge_mix() {
1982        ensure_deadlock_watchdog();
1983        for leaf_max_size in 2..=5 {
1984            let (_temp_dir, index, key_schema) = create_test_index(50, 3, leaf_max_size);
1985
1986            let keys = vec![1i64, 5, 15, 20, 25, 2, -1, -2, 6, 14, 4];
1987            let mut inserted = vec![];
1988            let mut deleted = vec![];
1989
1990            for key in &keys {
1991                let tuple = create_tuple_from_key(*key, key_schema.clone());
1992                let rid = create_rid_from_key(*key);
1993                index.insert(&tuple, rid).unwrap();
1994                inserted.push(*key);
1995
1996                // Verify all inserted keys are present and deleted keys are absent
1997                verify_tree_state(&index, &key_schema, &inserted, &deleted);
1998            }
1999
2000            // Delete key 1
2001            let tuple = create_tuple_from_key(1, key_schema.clone());
2002            index.delete(&tuple).unwrap();
2003            deleted.push(1);
2004            inserted.retain(|&x| x != 1);
2005            verify_tree_state(&index, &key_schema, &inserted, &deleted);
2006
2007            // Insert key 3
2008            let tuple = create_tuple_from_key(3, key_schema.clone());
2009            let rid = create_rid_from_key(3);
2010            index.insert(&tuple, rid).unwrap();
2011            inserted.push(3);
2012            verify_tree_state(&index, &key_schema, &inserted, &deleted);
2013
2014            // Delete remaining keys
2015            let delete_keys = vec![4i64, 14, 6, 2, 15, -2, -1, 3, 5, 25, 20];
2016            for key in &delete_keys {
2017                let tuple = create_tuple_from_key(*key, key_schema.clone());
2018                index.delete(&tuple).unwrap();
2019                deleted.push(*key);
2020                inserted.retain(|&x| x != *key);
2021                verify_tree_state(&index, &key_schema, &inserted, &deleted);
2022            }
2023        }
2024    }
2025
2026    /// Helper function to verify tree state
2027    fn verify_tree_state(
2028        index: &BPlusTreeIndex,
2029        key_schema: &SchemaRef,
2030        inserted: &[i64],
2031        deleted: &[i64],
2032    ) {
2033        for key in inserted {
2034            let tuple = create_tuple_from_key(*key, key_schema.clone());
2035            let result = index.get(&tuple).unwrap();
2036            assert!(result.is_some(), "Key {} should be present", key);
2037        }
2038
2039        for key in deleted {
2040            let tuple = create_tuple_from_key(*key, key_schema.clone());
2041            let result = index.get(&tuple).unwrap();
2042            assert!(result.is_none(), "Key {} should be deleted", key);
2043        }
2044    }
2045
2046    /// TEST: Concurrent insert test - multi-threaded insertions
2047    #[test]
2048    fn test_concurrent_insert() {
2049        // spawn deadlock watchdog
2050        std::thread::spawn(|| loop {
2051            std::thread::sleep(Duration::from_millis(500));
2052            let deadlocks = deadlock::check_deadlock();
2053            if !deadlocks.is_empty() {
2054                eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
2055                for (i, threads) in deadlocks.iter().enumerate() {
2056                    eprintln!("Cycle {}:", i);
2057                    for t in threads {
2058                        eprintln!("  ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
2059                    }
2060                }
2061                panic!("deadlock detected");
2062            }
2063        });
2064        const NUM_ITERS: usize = 3;
2065        const SCALE_FACTOR: i64 = 50; // further scaled down for fast diagnostics
2066        const NUM_THREADS: usize = 5;
2067
2068        for _iter in 0..NUM_ITERS {
2069            let (_temp_dir, index, key_schema) = create_test_index(64, 3, 5);
2070            let index = Arc::new(index);
2071
2072            let keys: Vec<i64> = (1..SCALE_FACTOR).collect();
2073            let mut threads = vec![];
2074
2075            for i in 0..NUM_THREADS {
2076                let index_clone = index.clone();
2077                let key_schema_clone = key_schema.clone();
2078                let keys_clone = keys.clone();
2079
2080                threads.push(std::thread::spawn(move || {
2081                    for key in &keys_clone {
2082                        if (*key as usize) % NUM_THREADS == i {
2083                            let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2084                            let rid = create_rid_from_key(*key);
2085                            index_clone.insert(&tuple, rid).unwrap();
2086                        }
2087                    }
2088                }));
2089            }
2090
2091            for thread in threads {
2092                thread.join().unwrap();
2093            }
2094
2095            // Verify all keys were inserted correctly
2096            for key in &keys {
2097                let tuple = create_tuple_from_key(*key, key_schema.clone());
2098                let result = index.get(&tuple).unwrap();
2099                assert!(result.is_some());
2100                assert_eq!(result.unwrap().slot_num, (*key & 0xFFFFFFFF) as u32);
2101            }
2102
2103            // Test iterator order
2104            let mut iter = TreeIndexIterator::new(index.clone(), ..);
2105            let mut current_key = 1i64;
2106            while let Some(rid) = iter.next().unwrap() {
2107                assert_eq!(rid.slot_num, (current_key & 0xFFFFFFFF) as u32);
2108                current_key += 1;
2109            }
2110            assert_eq!(current_key, keys.len() as i64 + 1);
2111        }
2112    }
2113
2114    /// TEST: BasicScaleTest - equivalent to BusTub's large scale insertion test
2115    #[test]
2116    fn test_basic_scale() {
2117        let (_temp_dir, index, key_schema) = create_test_index(512, 2, 3);
2118
2119        let scale = 1000i64;
2120        let mut keys: Vec<i64> = (1..=scale).collect();
2121
2122        // Shuffle keys to randomize insertion order
2123        let mut seed: u64 = 0x9E3779B97F4A7C15;
2124        for i in (1..keys.len()).rev() {
2125            seed = seed
2126                .wrapping_mul(2862933555777941757)
2127                .wrapping_add(3037000493);
2128            let j = (seed as usize) % (i + 1);
2129            keys.swap(i, j);
2130        }
2131
2132        // Insert all keys and verify immediately
2133        for key in &keys {
2134            let tuple = create_tuple_from_key(*key, key_schema.clone());
2135            let rid = create_rid_from_key(*key);
2136            index.insert(&tuple, rid).unwrap();
2137            let got = index.get(&tuple).unwrap();
2138            if got.is_none() {
2139                println!(
2140                    "After inserting {}, tree=\n{}",
2141                    key,
2142                    index.to_dot().unwrap()
2143                );
2144                panic!("immediate missing key {}", key);
2145            }
2146        }
2147
2148        // // Debug view
2149        // println!(
2150        //     "Tree after {} inserts:\n{}",
2151        //     keys.len(),
2152        //     index.to_dot().unwrap()
2153        // );
2154
2155        // Early probe for a known-missing sample to print context
2156        let probe = 630i64;
2157        let tuple = create_tuple_from_key(probe, key_schema.clone());
2158        if index.get(&tuple).unwrap().is_none() {
2159            let guard = index.find_leaf_page_optimistic(&tuple).unwrap();
2160            let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone()).unwrap();
2161            if let BPlusTreePage::Leaf(leaf) = page {
2162                println!(
2163                    "Early probe leaf for 630 has keys: {:?}",
2164                    leaf.array
2165                        .iter()
2166                        .map(|(t, _)| format!("{}", t))
2167                        .collect::<Vec<_>>()
2168                );
2169            }
2170            panic!("missing key 630 before verification loop");
2171        }
2172
2173        // Verify all keys are retrievable
2174        for key in &keys {
2175            let tuple = create_tuple_from_key(*key, key_schema.clone());
2176            let result = index.get(&tuple).unwrap();
2177            assert!(result.is_some(), "missing key {}", key);
2178
2179            let expected_rid = create_rid_from_key(*key);
2180            let actual_rid = result.unwrap();
2181            assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
2182        }
2183
2184        // Focus probe around 630 for debugging
2185        let probe = 630i64;
2186        let tuple = create_tuple_from_key(probe, key_schema.clone());
2187        if index.get(&tuple).unwrap().is_none() {
2188            let guard = index.find_leaf_page_optimistic(&tuple).unwrap();
2189            let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone()).unwrap();
2190            if let BPlusTreePage::Leaf(leaf) = page {
2191                println!(
2192                    "Probe leaf for 630 has keys: {:?}",
2193                    leaf.array
2194                        .iter()
2195                        .map(|(t, _)| format!("{}", t))
2196                        .collect::<Vec<_>>()
2197                );
2198            }
2199        }
2200    }
2201
2202    /// TEST: Concurrent delete test
2203    #[test]
2204    fn test_concurrent_delete() {
2205        const NUM_ITERS: usize = 10;
2206
2207        for _iter in 0..NUM_ITERS {
2208            let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2209
2210            // Sequential insert
2211            let keys = vec![1i64, 2, 3, 4, 5];
2212            for key in &keys {
2213                let tuple = create_tuple_from_key(*key, key_schema.clone());
2214                let rid = create_rid_from_key(*key);
2215                index.insert(&tuple, rid).unwrap();
2216            }
2217
2218            let index = Arc::new(index);
2219            let remove_keys = vec![1i64, 5, 3, 4];
2220            let mut threads = vec![];
2221
2222            for i in 0..2 {
2223                let index_clone = index.clone();
2224                let key_schema_clone = key_schema.clone();
2225                let remove_keys_clone = remove_keys.clone();
2226
2227                threads.push(std::thread::spawn(move || {
2228                    for key in &remove_keys_clone {
2229                        if (*key as usize) % 2 == i {
2230                            let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2231                            index_clone.delete(&tuple).unwrap();
2232                        }
2233                    }
2234                }));
2235            }
2236
2237            for thread in threads {
2238                thread.join().unwrap();
2239            }
2240
2241            // Check that only key 2 remains
2242            let mut size = 0;
2243            let index_arc = index.clone();
2244            let mut iter = TreeIndexIterator::new(index_arc, ..);
2245            while let Some(rid) = iter.next().unwrap() {
2246                assert_eq!(rid.slot_num, 2);
2247                size += 1;
2248            }
2249            assert_eq!(size, 1);
2250        }
2251    }
2252
2253    /// TEST: Mixed concurrent operations
2254    #[test]
2255    fn test_concurrent_mix() {
2256        // spawn deadlock watchdog
2257        std::thread::spawn(|| loop {
2258            std::thread::sleep(Duration::from_millis(500));
2259            let deadlocks = deadlock::check_deadlock();
2260            if !deadlocks.is_empty() {
2261                eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
2262                for (i, threads) in deadlocks.iter().enumerate() {
2263                    eprintln!("Cycle {}:", i);
2264                    for t in threads {
2265                        eprintln!("  ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
2266                    }
2267                }
2268                panic!("deadlock detected");
2269            }
2270        });
2271        const NUM_ITERS: usize = 5;
2272
2273        for _iter in 0..NUM_ITERS {
2274            let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2275            let index = Arc::new(index);
2276
2277            // Divide keys for insert and delete
2278            let mut for_insert = vec![];
2279            let mut for_delete = vec![];
2280            let total_keys = 20i64; // further scaled down for diagnostics
2281
2282            for i in 1..=total_keys {
2283                if i % 2 == 0 {
2284                    for_insert.push(i);
2285                } else {
2286                    for_delete.push(i);
2287                }
2288            }
2289
2290            // Insert all keys to delete first
2291            for key in &for_delete {
2292                let tuple = create_tuple_from_key(*key, key_schema.clone());
2293                let rid = create_rid_from_key(*key);
2294                index.insert(&tuple, rid).unwrap();
2295            }
2296
2297            let mut threads = vec![];
2298            let num_threads = 5;
2299
2300            for i in 0..num_threads {
2301                let index_clone = index.clone();
2302                let key_schema_clone = key_schema.clone();
2303                let for_insert_clone = for_insert.clone();
2304                let for_delete_clone = for_delete.clone();
2305
2306                threads.push(std::thread::spawn(move || {
2307                    if i % 2 == 0 {
2308                        // Insert thread
2309                        for key in &for_insert_clone {
2310                            let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2311                            let rid = create_rid_from_key(*key);
2312                            index_clone.insert(&tuple, rid).unwrap();
2313                        }
2314                    } else {
2315                        // Delete thread
2316                        for key in &for_delete_clone {
2317                            let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2318                            index_clone.delete(&tuple).unwrap();
2319                        }
2320                    }
2321                }));
2322            }
2323
2324            for thread in threads {
2325                thread.join().unwrap();
2326            }
2327
2328            // Verify only inserted keys remain
2329            let mut count = 0;
2330            for key in &for_insert {
2331                let tuple = create_tuple_from_key(*key, key_schema.clone());
2332                let result = index.get(&tuple).unwrap();
2333                if result.is_some() {
2334                    count += 1;
2335                }
2336            }
2337            assert_eq!(count, for_insert.len());
2338
2339            // Verify deleted keys are gone
2340            for key in &for_delete {
2341                let tuple = create_tuple_from_key(*key, key_schema.clone());
2342                let result = index.get(&tuple).unwrap();
2343                assert!(result.is_none());
2344            }
2345        }
2346    }
2347
2348    /// TEST: Iterator functionality test
2349    #[test]
2350    fn test_iterator_functionality() {
2351        let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2352
2353        // Insert test data
2354        let keys = vec![1i64, 3, 5, 7, 9];
2355        for key in &keys {
2356            let tuple = create_tuple_from_key(*key, key_schema.clone());
2357            let rid = create_rid_from_key(*key);
2358            index.insert(&tuple, rid).unwrap();
2359        }
2360
2361        let index = Arc::new(index);
2362
2363        // Test range iterator [3, 7]
2364        let start_tuple = create_tuple_from_key(3, key_schema.clone());
2365        let end_tuple = create_tuple_from_key(7, key_schema.clone());
2366        let mut iterator = TreeIndexIterator::new(index.clone(), start_tuple..=end_tuple);
2367
2368        let mut results = vec![];
2369        while let Some(rid) = iterator.next().unwrap() {
2370            results.push(rid.slot_num as i64);
2371        }
2372        assert_eq!(results, vec![3, 5, 7]);
2373
2374        // Test unbounded iterator
2375        let mut iterator = TreeIndexIterator::new(index.clone(), ..);
2376        let mut all_results = vec![];
2377        while let Some(rid) = iterator.next().unwrap() {
2378            all_results.push(rid.slot_num as i64);
2379        }
2380        assert_eq!(all_results, vec![1, 3, 5, 7, 9]);
2381    }
2382
2383    /// TEST: Tree structure tests - leaf split
2384    #[test]
2385    fn test_leaf_split_structure() {
2386        let (_temp_dir, index, key_schema) = create_test_index(50, 10, 3);
2387
2388        // Insert keys to trigger leaf split
2389        for k in [1i64, 2, 3, 4] {
2390            let tuple = create_tuple_from_key(k, key_schema.clone());
2391            let rid = create_rid_from_key(k);
2392            index.insert(&tuple, rid).unwrap();
2393        }
2394
2395        let root_page_id = index.get_root_page_id().unwrap();
2396        let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
2397        let (root_page, _) =
2398            BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
2399
2400        // Root should be internal page after split
2401        let BPlusTreePage::Internal(root_internal) = root_page else {
2402            panic!("root is not internal after leaf split");
2403        };
2404
2405        // Check structure integrity
2406        let left_pid = root_internal.value_at(0);
2407        let right_pid = root_internal.value_at(1);
2408
2409        let left_guard = index.buffer_pool.fetch_page_read(left_pid).unwrap();
2410        let right_guard = index.buffer_pool.fetch_page_read(right_pid).unwrap();
2411        let (left_page, _) =
2412            BPlusTreePageCodec::decode(left_guard.data(), key_schema.clone()).unwrap();
2413        let (right_page, _) =
2414            BPlusTreePageCodec::decode(right_guard.data(), key_schema.clone()).unwrap();
2415
2416        let BPlusTreePage::Leaf(left_leaf) = left_page else {
2417            panic!("left child not leaf");
2418        };
2419        let BPlusTreePage::Leaf(_right_leaf) = right_page else {
2420            panic!("right child not leaf");
2421        };
2422
2423        // Verify leaf chain linkage
2424        assert_eq!(left_leaf.header.next_page_id, right_pid);
2425    }
2426
2427    // ---------------- Benchmarks (ignored by default) ----------------
2428    #[test]
2429    #[ignore]
2430    fn bench_get_hot_read() {
2431        fn getenv_usize(k: &str, default_v: usize) -> usize {
2432            std::env::var(k)
2433                .ok()
2434                .and_then(|v| v.parse().ok())
2435                .unwrap_or(default_v)
2436        }
2437
2438        let bpm = getenv_usize("QUILL_BENCH_BPM", 1024);
2439        let nkeys = getenv_usize("QUILL_BENCH_N", 20000) as i64;
2440        let ops = getenv_usize("QUILL_BENCH_OPS", 200000);
2441
2442        let (_temp_dir, index, key_schema) = create_test_index(bpm, 3, 64);
2443
2444        // Prepare keys and insert (shuffled)
2445        let mut keys: Vec<i64> = (1..=nkeys).collect();
2446        // simple LCG shuffle
2447        let mut seed: u64 = 0x9E3779B97F4A7C15;
2448        for i in (1..keys.len()).rev() {
2449            seed = seed
2450                .wrapping_mul(2862933555777941757)
2451                .wrapping_add(3037000493);
2452            let j = (seed as usize) % (i + 1);
2453            keys.swap(i, j);
2454        }
2455        for k in &keys {
2456            let t = create_tuple_from_key(*k, key_schema.clone());
2457            index.insert(&t, create_rid_from_key(*k)).unwrap();
2458        }
2459
2460        // Hot set: last 10% keys
2461        let hot_start = (nkeys as usize * 9) / 10;
2462        let hot = &keys[hot_start..];
2463
2464        let start = std::time::Instant::now();
2465        let mut found = 0usize;
2466        // query stream via LCG over hot set
2467        let mut x = 0x243F6A8885A308D3u64;
2468        for _ in 0..ops {
2469            x = x.wrapping_mul(6364136223846793005).wrapping_add(1);
2470            let idx = (x as usize) % hot.len();
2471            let key = hot[idx];
2472            let t = create_tuple_from_key(key, key_schema.clone());
2473            if index.get(&t).unwrap().is_some() {
2474                found += 1;
2475            }
2476        }
2477        let el = start.elapsed();
2478        let qps = (ops as f64) / el.as_secs_f64();
2479        println!(
2480            "bench_get_hot_read: ops={} time={:?} qps={:.1} found={}",
2481            ops, el, qps, found
2482        );
2483    }
2484
2485    #[test]
2486    #[ignore]
2487    fn bench_range_scan() {
2488        fn getenv_usize(k: &str, default_v: usize) -> usize {
2489            std::env::var(k)
2490                .ok()
2491                .and_then(|v| v.parse().ok())
2492                .unwrap_or(default_v)
2493        }
2494
2495        let bpm = getenv_usize("QUILL_BENCH_BPM", 1024);
2496        let nkeys = getenv_usize("QUILL_BENCH_N", 30000) as i64;
2497        let passes = getenv_usize("QUILL_BENCH_PASSES", 20);
2498
2499        let (_temp_dir, index, key_schema) = create_test_index(bpm, 3, 64);
2500
2501        // Insert in random order
2502        let mut keys: Vec<i64> = (1..=nkeys).collect();
2503        let mut seed: u64 = 0x9E3779B97F4A7C15;
2504        for i in (1..keys.len()).rev() {
2505            seed = seed
2506                .wrapping_mul(2862933555777941757)
2507                .wrapping_add(3037000493);
2508            let j = (seed as usize) % (i + 1);
2509            keys.swap(i, j);
2510        }
2511        for k in &keys {
2512            let t = create_tuple_from_key(*k, key_schema.clone());
2513            index.insert(&t, create_rid_from_key(*k)).unwrap();
2514        }
2515
2516        let index = Arc::new(index);
2517        let total = (nkeys as usize) * passes;
2518        let start = std::time::Instant::now();
2519        let mut seen = 0usize;
2520        for _ in 0..passes {
2521            let mut it = TreeIndexIterator::new(index.clone(), ..);
2522            while let Some(_rid) = it.next().unwrap() {
2523                seen += 1;
2524            }
2525        }
2526        let el = start.elapsed();
2527        let tps = (total as f64) / el.as_secs_f64();
2528        println!(
2529            "bench_range_scan: items={} time={:?} ips={:.1} seen={}",
2530            total, el, tps, seen
2531        );
2532    }
2533}