quill_sql/storage/index/
btree_index.rs

1use parking_lot::{RwLock, RwLockWriteGuard};
2use std::collections::VecDeque;
3use std::fmt::Write;
4use std::sync::Arc;
5
6use crate::buffer::{
7    BufferManager, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID, PAGE_SIZE,
8};
9use crate::catalog::SchemaRef;
10use crate::error::{QuillSQLError, QuillSQLResult};
11use crate::storage::codec::{
12    BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
13    BPlusTreePageCodec,
14};
15use crate::storage::page::{BPlusTreeHeaderPage, BPlusTreeInternalPage};
16use crate::storage::page::{BPlusTreeLeafPage, BPlusTreePage, RecordId};
17
18use crate::config::BTreeConfig;
19use crate::storage::codec::BPlusTreePageTypeCodec;
20pub use crate::storage::index::btree_iterator::TreeIndexIterator;
21use crate::storage::page::BPlusTreePageType;
22use crate::storage::tuple::Tuple;
23use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
24
25// OLC bounded restart and backoff configuration
26const MAX_OLC_RESTARTS: usize = 64;
27const OLC_BACKOFF_BASE_US: u64 = 50;
28
29#[derive(Debug)]
30pub struct Context<'a> {
31    /// Write guards along the path from root to the current node.
32    /// When a child node is safe for the operation, all ancestor locks
33    /// can be released (clear this set) to reduce lock footprint.
34    pub write_set: VecDeque<WritePageGuard>,
35
36    /// Optional read guards used on read paths for validations/restarts.
37    pub read_set: VecDeque<ReadPageGuard>,
38
39    /// Holds the header page lock during structural changes (e.g., root switch).
40    pub header_lock_guard: Option<RwLockWriteGuard<'a, ()>>,
41}
42
43impl<'a> Context<'a> {
44    pub fn new() -> Self {
45        Self {
46            write_set: VecDeque::new(),
47            read_set: VecDeque::new(),
48            header_lock_guard: None,
49        }
50    }
51
52    /// Push a write guard onto the traversal path.
53    pub fn push_write_guard(&mut self, guard: WritePageGuard) {
54        self.write_set.push_back(guard);
55    }
56
57    pub fn push_read_guard(&mut self, guard: ReadPageGuard) {
58        self.read_set.push_back(guard);
59    }
60
61    /// Release all ancestor write locks when it is safe to proceed.
62    pub fn release_all_write_locks(&mut self) {
63        self.write_set.clear();
64        self.header_lock_guard = None;
65    }
66}
67
68// Production-ready B+ Tree index with OLC readers and latch-crabbing writers.
69// - Header page holds immutable identity; root_page_id changes atomically.
70
71#[derive(Debug)]
72pub struct BPlusTreeIndex {
73    pub key_schema: SchemaRef,
74    pub buffer_pool: Arc<BufferManager>,
75    pub internal_max_size: u32,
76    pub leaf_max_size: u32,
77    pub header_page_id: PageId,
78    pub header_page_lock: Arc<RwLock<()>>,
79    pub config: BTreeConfig,
80    /// Best-effort counter of potentially dead index entries observed by readers.
81    pub pending_garbage: AtomicUsize,
82}
83
84impl BPlusTreeIndex {
85    /// Write a modified index page back to the buffer with WAL (FPW/Delta) just like table heap.
86    /// This centralizes crash-consistent updates for header/leaf/internal pages.
87    fn wal_overwrite_page(
88        &self,
89        guard: &mut WritePageGuard,
90        new_image: Vec<u8>,
91    ) -> QuillSQLResult<()> {
92        debug_assert_eq!(new_image.len(), PAGE_SIZE);
93        guard.apply_page_image(&new_image)?;
94        Ok(())
95    }
96    pub fn new(
97        key_schema: SchemaRef,
98        buffer_pool: Arc<BufferManager>,
99        internal_max_size: u32,
100        leaf_max_size: u32,
101    ) -> Self {
102        // Create a header page to store the root_page_id
103        let mut header_page_guard = buffer_pool
104            .new_page()
105            .expect("Failed to create header page for B+ tree");
106        let header_page_id = header_page_guard.page_id();
107        let header_page = BPlusTreeHeaderPage {
108            root_page_id: INVALID_PAGE_ID,
109        };
110        let encoded = BPlusTreeHeaderPageCodec::encode(&header_page);
111        header_page_guard.overwrite(&encoded, None);
112        drop(header_page_guard);
113
114        Self {
115            key_schema,
116            buffer_pool,
117            // In our representation, internal node stores a sentinel pointer at index 0.
118            // If caller passes max_keys, then max pointers (KVs) = max_keys + 1.
119            internal_max_size: internal_max_size + 1,
120            leaf_max_size,
121            header_page_id,
122            header_page_lock: Arc::new(RwLock::new(())),
123            config: BTreeConfig::default(),
124            pending_garbage: AtomicUsize::new(0),
125        }
126    }
127
128    pub fn new_with_config(
129        key_schema: SchemaRef,
130        buffer_pool: Arc<BufferManager>,
131        internal_max_size: u32,
132        leaf_max_size: u32,
133        config: BTreeConfig,
134    ) -> Self {
135        let mut me = Self::new(key_schema, buffer_pool, internal_max_size, leaf_max_size);
136        me.config = config;
137        me
138    }
139
140    pub fn open(
141        key_schema: SchemaRef,
142        buffer_pool: Arc<BufferManager>,
143        internal_max_size: u32,
144        leaf_max_size: u32,
145        header_page_id: PageId,
146    ) -> Self {
147        Self {
148            key_schema,
149            buffer_pool,
150            // See note in new(): store as max pointers (KVs) capacity.
151            internal_max_size: internal_max_size + 1,
152            leaf_max_size,
153            header_page_id,
154            header_page_lock: Arc::new(RwLock::new(())),
155            config: BTreeConfig::default(),
156            pending_garbage: AtomicUsize::new(0),
157        }
158    }
159
160    pub fn open_with_config(
161        key_schema: SchemaRef,
162        buffer_pool: Arc<BufferManager>,
163        internal_max_size: u32,
164        leaf_max_size: u32,
165        header_page_id: PageId,
166        config: BTreeConfig,
167    ) -> Self {
168        let mut me = Self::open(
169            key_schema,
170            buffer_pool,
171            internal_max_size,
172            leaf_max_size,
173            header_page_id,
174        );
175        me.config = config;
176        me
177    }
178
179    pub fn get_root_page_id(&self) -> QuillSQLResult<PageId> {
180        let header_guard = self.buffer_pool.fetch_page_read(self.header_page_id)?;
181        let (header_page, _) = BPlusTreeHeaderPageCodec::decode(header_guard.data())?;
182        Ok(header_page.root_page_id)
183    }
184
185    fn set_root_page_id(&self, page_id: PageId) -> QuillSQLResult<()> {
186        let mut header_guard = self.buffer_pool.fetch_page_write(self.header_page_id)?;
187        //
188        let header_page = BPlusTreeHeaderPage {
189            root_page_id: page_id,
190        };
191        let encoded = BPlusTreeHeaderPageCodec::encode(&header_page);
192        self.wal_overwrite_page(&mut header_guard, encoded)?;
193        Ok(())
194    }
195
196    pub fn is_empty(&self) -> QuillSQLResult<bool> {
197        Ok(self.get_root_page_id()? == INVALID_PAGE_ID)
198    }
199
200    pub fn get(&self, key: &Tuple) -> QuillSQLResult<Option<RecordId>> {
201        if self.is_empty()? {
202            return Ok(None);
203        }
204        let mut guard = self.find_leaf_page_optimistic(key)?;
205        // Walk right through leaf chain while key is greater than last key
206        loop {
207            let decoded = BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone());
208            if let Ok((leaf_page, _)) = decoded {
209                if let Some(rid) = leaf_page.look_up(key) {
210                    return Ok(Some(rid));
211                }
212                if leaf_page.header.current_size > 0
213                    && leaf_page.header.next_page_id != INVALID_PAGE_ID
214                {
215                    let last_key = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
216                    if *key > *last_key {
217                        guard = self
218                            .buffer_pool
219                            .fetch_page_read(leaf_page.header.next_page_id)?;
220                        continue;
221                    }
222                }
223                return Ok(None);
224            } else {
225                // Retry once from root if decode failed (transient)
226                guard = self.find_leaf_page_optimistic(key)?;
227                let (leaf_page, _) =
228                    BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone())?;
229                return Ok(leaf_page.look_up(key));
230            }
231        }
232    }
233
234    /// 辅助函数:以写模式查找叶子页面,并沿途执行闩锁耦合。
235    fn find_leaf_page_pessimistic<'a>(
236        &'a self,
237        key: &Tuple,
238        is_insert: bool,
239        mut context: Context<'a>,
240    ) -> QuillSQLResult<(WritePageGuard, Context<'a>)> {
241        if self.config.debug_find_level >= 1 {
242            eprintln!(
243                "[FIND] thread={:?} begin is_insert={} key={}",
244                std::thread::current().id(),
245                is_insert,
246                key
247            );
248        }
249        // Do not pre-hold header lock on insert path; only take it around root modifications
250        let root_page_id = self.get_root_page_id()?;
251        if root_page_id == INVALID_PAGE_ID {
252            // This should be handled by the caller (insert/delete)
253            return Err(QuillSQLError::Internal(
254                "find_leaf_page_pessimistic called on an empty tree".to_string(),
255            ));
256        }
257        let mut current_guard = self.buffer_pool.fetch_page_write(root_page_id)?;
258
259        loop {
260            let (page, _) =
261                BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone())?;
262
263            match page {
264                BPlusTreePage::Internal(_internal) => {
265                    let child_page_id = BPlusTreeInternalPageCodec::lookup_child_from_bytes(
266                        current_guard.data(),
267                        self.key_schema.clone(),
268                        key,
269                    )?;
270                    if self.config.debug_find_level >= 1 {
271                        eprintln!(
272                            "[FIND] thread={:?} at_internal parent={} -> child={}",
273                            std::thread::current().id(),
274                            current_guard.page_id(),
275                            child_page_id
276                        );
277                    }
278                    let child_guard = self.buffer_pool.fetch_page_write(child_page_id)?;
279                    // header-only safety check for overflow
280                    let will_overflow = match BPlusTreePageTypeCodec::decode(child_guard.data())?.0
281                    {
282                        BPlusTreePageType::LeafPage => {
283                            let (hdr, _) =
284                                BPlusTreeLeafPageCodec::decode_header_only(child_guard.data())?;
285                            hdr.current_size == hdr.max_size
286                        }
287                        BPlusTreePageType::InternalPage => {
288                            let (hdr, _) =
289                                BPlusTreeInternalPageCodec::decode_header_only(child_guard.data())?;
290                            hdr.current_size == hdr.max_size
291                        }
292                    };
293
294                    if is_insert {
295                        if !will_overflow {
296                            if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
297                                eprintln!(
298                                    "[FIND] thread={:?} safe_descend release_ancestors parent={} child={}",
299                                    std::thread::current().id(),
300                                    current_guard.page_id(),
301                                    child_page_id
302                                );
303                            }
304                            context.release_all_write_locks();
305                            drop(current_guard);
306                            current_guard = child_guard;
307                            continue;
308                        } else if self.config.debug_find_level >= 1 {
309                            eprintln!(
310                                "[FIND] thread={:?} hold_parent due_to_full child={} write_set_len={}",
311                                std::thread::current().id(),
312                                child_page_id,
313                                context.write_set.len()
314                            );
315                        }
316                    }
317
318                    if self.config.debug_find_level >= 2 {
319                        eprintln!(
320                            "[FIND DEBUG] hold-parent: parent={}, child={}, write_set_len={}",
321                            current_guard.page_id(),
322                            child_page_id,
323                            context.write_set.len()
324                        );
325                    }
326                    context.push_write_guard(current_guard);
327                    current_guard = child_guard;
328                }
329                BPlusTreePage::Leaf(_) => {
330                    if self.config.debug_find_level >= 1 {
331                        eprintln!(
332                            "[FIND] thread={:?} reached_leaf leaf_page_id={} write_set_len={}",
333                            std::thread::current().id(),
334                            current_guard.page_id(),
335                            context.write_set.len()
336                        );
337                    }
338                    return Ok((current_guard, context));
339                }
340            }
341        }
342    }
343
344    /// 公共 API: 插入一个键值对,使用闩锁耦合实现高并发。
345    pub fn insert(&self, key: &Tuple, rid: RecordId) -> QuillSQLResult<()> {
346        let mut context = Context::new();
347        if self.config.debug_insert_level >= 1 {
348            eprintln!(
349                "[INSERT] thread={:?} start key={}",
350                std::thread::current().id(),
351                key
352            );
353        }
354
355        // Guard tree initialization to avoid concurrent start_new_tree races.
356        if self.is_empty()? {
357            let _lock = self.header_page_lock.write();
358            let root_now = self.get_root_page_id()?;
359            if root_now == INVALID_PAGE_ID {
360                if self.config.debug_insert_level >= 1 {
361                    eprintln!(
362                        "[INSERT] thread={:?} start_new_tree key={}",
363                        std::thread::current().id(),
364                        key
365                    );
366                }
367                // Create root leaf and set root under header lock.
368                self.start_new_tree(key, rid)?;
369                return Ok(());
370            }
371        }
372
373        // Split-before-insert loop: ensure we never insert into a full node.
374        loop {
375            if self.config.debug_insert_level >= 1 {
376                eprintln!(
377                    "[INSERT] thread={:?} find_leaf_pessimistic key={}",
378                    std::thread::current().id(),
379                    key
380                );
381            }
382            let (mut leaf_guard, mut local_ctx) =
383                self.find_leaf_page_pessimistic(key, true, context)?;
384            let (mut leaf_page, _) =
385                BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
386
387            // If we still hold a parent, verify that this leaf is the expected child.
388            // If not, redirect to the expected child to avoid misplacing keys across parent ranges.
389            if let Some(parent_guard_ref) = local_ctx.write_set.back() {
390                let (parent_page_chk, _) = BPlusTreeInternalPageCodec::decode(
391                    parent_guard_ref.data(),
392                    self.key_schema.clone(),
393                )?;
394                let expected_pid = parent_page_chk.look_up(key);
395                if expected_pid != leaf_guard.page_id() {
396                    if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
397                        eprintln!(
398                            "[INSERT] parent_guided_redirect: from_leaf={} -> expected_child={}",
399                            leaf_guard.page_id(),
400                            expected_pid
401                        );
402                    }
403                    drop(leaf_guard);
404                    leaf_guard = self.buffer_pool.fetch_page_write(expected_pid)?;
405                    let (new_leaf, _) =
406                        BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
407                    leaf_page = new_leaf;
408                }
409            }
410
411            // Redirect to right siblings if key no longer belongs to this leaf (post-split race)
412            while leaf_page.header.current_size > 0
413                && leaf_page.header.next_page_id != INVALID_PAGE_ID
414            {
415                let last_key_ref = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
416                if *key <= *last_key_ref {
417                    break;
418                }
419                let next_pid = leaf_page.header.next_page_id;
420                // Peek the next leaf's minimal key using a read latch to avoid heavy contention
421                let next_guard_peek = self.buffer_pool.fetch_page_read(next_pid)?;
422                let (next_leaf_peek, _) = BPlusTreeLeafPageCodec::decode(
423                    next_guard_peek.data(),
424                    self.key_schema.clone(),
425                )?;
426                let next_first_key = if next_leaf_peek.header.current_size > 0 {
427                    next_leaf_peek.key_at(0).clone()
428                } else {
429                    break;
430                };
431                drop(next_guard_peek);
432                if *key < next_first_key {
433                    break;
434                }
435                if self.config.debug_insert_level >= 1 {
436                    eprintln!(
437                        "[INSERT] thread={:?} redirect_to_sibling: from_leaf={} -> next_leaf={} key={} last_key={} next_first_key={}",
438                        std::thread::current().id(),
439                        leaf_guard.page_id(),
440                        next_pid,
441                        key,
442                        last_key_ref,
443                        next_first_key
444                    );
445                }
446                // Release any parents held and current leaf, then jump to next sibling directly
447                local_ctx.release_all_write_locks();
448                drop(leaf_guard);
449                let next_guard = self.buffer_pool.fetch_page_write(next_pid)?;
450                let (next_leaf, _) =
451                    BPlusTreeLeafPageCodec::decode(next_guard.data(), self.key_schema.clone())?;
452                leaf_guard = next_guard;
453                leaf_page = next_leaf;
454            }
455
456            // Update if key exists
457            if let Some(existing_rid) = leaf_page.look_up_mut(key) {
458                if self.config.debug_insert_level >= 1 {
459                    eprintln!(
460                        "[INSERT] thread={:?} update leaf_page_id={} key={} old_rid={:?} new_rid={:?}",
461                        std::thread::current().id(),
462                        leaf_guard.page_id(),
463                        key,
464                        *existing_rid,
465                        rid
466                    );
467                }
468                *existing_rid = rid;
469                leaf_page.header.version += 1;
470                let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
471                self.wal_overwrite_page(&mut leaf_guard, encoded)?;
472                local_ctx.release_all_write_locks();
473                return Ok(());
474            }
475
476            // If page is at capacity, split first, then retry to find the correct leaf
477            if leaf_page.header.current_size == leaf_page.header.max_size {
478                if std::env::var("QUILL_DEBUG_INSERT").ok().as_deref() == Some("1") {
479                    eprintln!(
480                        "[INSERT] thread={:?} leaf_full split leaf_page_id={} key={}",
481                        std::thread::current().id(),
482                        leaf_guard.page_id(),
483                        key
484                    );
485                }
486                // Avoid deadlock: allow split to acquire header lock when promoting root
487                local_ctx.header_lock_guard = None;
488                match self.split(leaf_guard, &mut local_ctx) {
489                    Ok(()) => {
490                        // Release all locks and retry from root with fresh context
491                        local_ctx.release_all_write_locks();
492                        context = Context::new();
493                        continue;
494                    }
495                    Err(QuillSQLError::Internal(_e)) => {
496                        // Structural race: drop latches and retry from root
497                        local_ctx.release_all_write_locks();
498                        context = Context::new();
499                        continue;
500                    }
501                    Err(e) => return Err(e),
502                }
503            }
504
505            // Safe to insert
506            if std::env::var("QUILL_DEBUG_INSERT").ok().as_deref() == Some("1") {
507                eprintln!(
508                    "[INSERT] thread={:?} insert leaf_page_id={} key={} rid={:?}",
509                    std::thread::current().id(),
510                    leaf_guard.page_id(),
511                    key,
512                    rid
513                );
514            }
515            leaf_page.insert(key.clone(), rid);
516            leaf_page.header.version += 1;
517            let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
518            self.wal_overwrite_page(&mut leaf_guard, encoded)?;
519            local_ctx.release_all_write_locks();
520            return Ok(());
521        }
522    }
523
524    /// Public API: delete a key with latch coupling to ensure concurrency safety.
525    pub fn delete(&self, key: &Tuple) -> QuillSQLResult<()> {
526        if self.is_empty()? {
527            return Ok(());
528        }
529
530        let mut context = Context::new();
531        'restart: loop {
532            let (mut leaf_guard, mut local_ctx) =
533                self.find_leaf_page_pessimistic(key, false, context)?;
534            let (mut leaf_page, _) =
535                BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
536
537            // If we still hold a parent, prefer parent-guided redirection to avoid crossing
538            // parent boundary via leaf chain which can cause livelock during structure changes.
539            if let Some(parent_guard_ref) = local_ctx.write_set.back() {
540                let (parent_page_chk, _) = BPlusTreeInternalPageCodec::decode(
541                    parent_guard_ref.data(),
542                    self.key_schema.clone(),
543                )?;
544                let expected_pid = parent_page_chk.look_up(key);
545                if expected_pid != leaf_guard.page_id() {
546                    if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
547                        eprintln!(
548                            "[DELETE] parent_guided_redirect: from_leaf={} -> expected_child={}",
549                            leaf_guard.page_id(),
550                            expected_pid
551                        );
552                    }
553                    drop(leaf_guard);
554                    leaf_guard = self.buffer_pool.fetch_page_write(expected_pid)?;
555                    let (new_leaf, _) =
556                        BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
557                    leaf_page = new_leaf;
558                }
559            }
560
561            // Redirect: if key belongs to right siblings, jump directly to sibling while keeping parent path
562            let mut hops: u32 = 0;
563            while leaf_page.header.current_size > 0
564                && leaf_page.header.next_page_id != INVALID_PAGE_ID
565            {
566                let last_key_ref = leaf_page.key_at((leaf_page.header.current_size - 1) as usize);
567                if *key <= *last_key_ref {
568                    break;
569                }
570                let next_pid = leaf_page.header.next_page_id;
571                if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
572                    eprintln!(
573                        "[DELETE] thread={:?} redirect_to_sibling: from_leaf={} -> next_leaf={} key={} last_key={}",
574                        std::thread::current().id(),
575                        leaf_guard.page_id(),
576                        next_pid,
577                        key,
578                        last_key_ref
579                    );
580                }
581                hops += 1;
582                if hops > 8 {
583                    if std::env::var("QUILL_DEBUG_FIND").ok().as_deref() == Some("1") {
584                        eprintln!(
585                            "[DELETE] redirect_hops_exceeded: restart from root at leaf={}",
586                            leaf_guard.page_id()
587                        );
588                    }
589                    local_ctx.release_all_write_locks();
590                    drop(leaf_guard);
591                    context = Context::new();
592                    continue 'restart;
593                }
594                drop(leaf_guard);
595                leaf_guard = self.buffer_pool.fetch_page_write(next_pid)?;
596                let (new_leaf, _) =
597                    BPlusTreeLeafPageCodec::decode(leaf_guard.data(), self.key_schema.clone())?;
598                leaf_page = new_leaf;
599
600                // Do not force restart here; attempt deletion on the correct leaf first.
601            }
602
603            // Check presence & whether it is the first key (affects parent separator)
604            let was_first = if leaf_page.header.current_size > 0 {
605                let first_key = leaf_page.key_at(0).clone();
606                &first_key == key
607            } else {
608                false
609            };
610            if leaf_page.look_up(key).is_none() {
611                local_ctx.release_all_write_locks();
612                return Ok(());
613            }
614
615            leaf_page.delete(key);
616            leaf_page.header.version += 1;
617            let encoded = BPlusTreeLeafPageCodec::encode(&leaf_page);
618            self.wal_overwrite_page(&mut leaf_guard, encoded)?;
619
620            // If the node is underflowing, handle it.
621            if leaf_page.header.current_size < leaf_page.min_size() {
622                // If parent path is empty but this leaf is not the actual root, rebuild path
623                if local_ctx.write_set.is_empty() {
624                    let root_id = self.get_root_page_id()?;
625                    if leaf_guard.page_id() != root_id {
626                        local_ctx.release_all_write_locks();
627                        drop(leaf_guard);
628                        context = Context::new();
629                        continue 'restart;
630                    }
631                }
632                match self.handle_underflow(leaf_guard, &mut local_ctx) {
633                    Ok(()) => {
634                        local_ctx.release_all_write_locks();
635                        return Ok(());
636                    }
637                    Err(QuillSQLError::Internal(_)) => {
638                        // Structural race/mismatch: release and retry from root
639                        local_ctx.release_all_write_locks();
640                        context = Context::new();
641                        continue;
642                    }
643                    Err(e) => return Err(e),
644                }
645            } else {
646                // If we deleted the first key of this leaf, and there's a parent above,
647                // update the parent's separator key to the new minimal key of this leaf.
648                if was_first && leaf_page.header.current_size > 0 {
649                    if let Some(mut parent_guard) = local_ctx.write_set.pop_back() {
650                        let (mut parent_page, _) = BPlusTreeInternalPageCodec::decode(
651                            parent_guard.data(),
652                            self.key_schema.clone(),
653                        )?;
654                        if let Some(node_idx) = parent_page.value_index(leaf_guard.page_id()) {
655                            if node_idx > 0 {
656                                parent_page.array[node_idx].0 = leaf_page.key_at(0).clone();
657                                parent_page.header.version += 1;
658                                let encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
659                                self.wal_overwrite_page(&mut parent_guard, encoded)?;
660                            }
661                        }
662                        // push back to maintain path for later releases
663                        local_ctx.write_set.push_back(parent_guard);
664                    }
665                }
666                local_ctx.release_all_write_locks();
667                return Ok(());
668            }
669        }
670    }
671
672    pub fn to_dot(&self) -> QuillSQLResult<String> {
673        let mut dot = String::new();
674        writeln!(&mut dot, "digraph BPlusTree {{").unwrap();
675        writeln!(&mut dot, "  rankdir=TB;").unwrap();
676        writeln!(&mut dot, "  node [shape=record, height=.1];").unwrap();
677
678        let root_page_id = self.get_root_page_id()?;
679        if root_page_id == INVALID_PAGE_ID {
680            writeln!(&mut dot, "  empty [label=\"<f0> Empty Tree\"];").unwrap();
681            writeln!(&mut dot, "}}").unwrap();
682            return Ok(dot);
683        }
684
685        let mut queue = VecDeque::new();
686        queue.push_back(root_page_id);
687
688        while let Some(page_id) = queue.pop_front() {
689            let guard = self.buffer_pool.fetch_page_read(page_id)?;
690            let (page, _) = BPlusTreePageCodec::decode(guard.data(), self.key_schema.clone())?;
691
692            match page {
693                BPlusTreePage::Internal(internal) => {
694                    let mut label = String::new();
695                    for i in 0..internal.header.current_size {
696                        if i > 0 {
697                            write!(&mut label, "|").unwrap();
698                        }
699                        write!(&mut label, "<p{}>", i).unwrap();
700                        if i > 0 {
701                            write!(&mut label, " {}", internal.key_at(i as usize)).unwrap();
702                        }
703                    }
704                    writeln!(&mut dot, "  page{} [label=\"{}\"];", page_id, label).unwrap();
705
706                    for i in 0..internal.header.current_size {
707                        let child_id = internal.value_at(i as usize);
708                        writeln!(
709                            &mut dot,
710                            "  \"page{}\":p{} -> \"page{}\";",
711                            page_id, i, child_id
712                        )
713                        .unwrap();
714                        queue.push_back(child_id);
715                    }
716                }
717                BPlusTreePage::Leaf(leaf) => {
718                    let mut label = String::new();
719                    for i in 0..leaf.header.current_size {
720                        if i > 0 {
721                            write!(&mut label, "|").unwrap();
722                        }
723                        write!(
724                            &mut label,
725                            "<f{}> {} -> {}",
726                            i,
727                            leaf.key_at(i as usize),
728                            leaf.array[i as usize].1
729                        )
730                        .unwrap();
731                    }
732                    writeln!(&mut dot, "  page{} [label=\"{}\"];", page_id, label).unwrap();
733
734                    if leaf.header.next_page_id != INVALID_PAGE_ID {
735                        writeln!(
736                            &mut dot,
737                            "  page{} -> page{} [style=dashed, constraint=false];",
738                            page_id, leaf.header.next_page_id
739                        )
740                        .unwrap();
741                    }
742                }
743            }
744        }
745
746        writeln!(&mut dot, "}}").unwrap();
747        Ok(dot)
748    }
749
750    fn handle_underflow(
751        &self,
752        mut node_guard: WritePageGuard,
753        context: &mut Context,
754    ) -> QuillSQLResult<()> {
755        if context.write_set.is_empty() {
756            // This is the root node. Let adjust_root handle it.
757            self.adjust_root(node_guard)?;
758            return Ok(());
759        }
760
761        let parent_guard = match context.write_set.pop_back() {
762            Some(g) => g,
763            None => {
764                return Err(QuillSQLError::Internal(
765                    "underflow: missing parent".to_string(),
766                ))
767            }
768        };
769        let (parent_page, _) =
770            BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
771
772        let Some(node_idx) = parent_page.value_index(node_guard.page_id()) else {
773            return Err(QuillSQLError::Internal(
774                "underflow: node not found in parent".to_string(),
775            ));
776        };
777
778        // Try to borrow from the left sibling first (acquire locks in PageId order to avoid deadlock).
779        if node_idx > 0 {
780            let left_sibling_pid = parent_page.value_at(node_idx - 1);
781            let node_pid = node_guard.page_id();
782            if left_sibling_pid < node_pid {
783                // Enforce PageId order: release node, lock left first, then node
784                drop(node_guard);
785                let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
786                let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
787                node_guard = node_guard_new;
788                let (left_sibling_page, _) =
789                    BPlusTreePageCodec::decode(left_sibling_guard.data(), self.key_schema.clone())?;
790                if left_sibling_page.current_size() > left_sibling_page.min_size() {
791                    self.redistribute(
792                        left_sibling_guard,
793                        node_guard,
794                        parent_guard,
795                        node_idx,
796                        true,
797                    )?;
798                    return Ok(());
799                }
800            } else {
801                // We already hold node; locking left next preserves non-decreasing order
802                let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
803                let (left_sibling_page, _) =
804                    BPlusTreePageCodec::decode(left_sibling_guard.data(), self.key_schema.clone())?;
805                if left_sibling_page.current_size() > left_sibling_page.min_size() {
806                    self.redistribute(
807                        left_sibling_guard,
808                        node_guard,
809                        parent_guard,
810                        node_idx,
811                        true,
812                    )?;
813                    return Ok(());
814                }
815            }
816        }
817
818        // Try to borrow from the right sibling (acquire locks in PageId order to avoid deadlock).
819        if node_idx < parent_page.header.current_size as usize - 1 {
820            let right_sibling_pid = parent_page.value_at(node_idx + 1);
821            let node_pid = node_guard.page_id();
822            if right_sibling_pid < node_pid {
823                // Enforce PageId order: release node, lock right first, then node
824                drop(node_guard);
825                let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
826                let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
827                node_guard = node_guard_new;
828                let (right_sibling_page, _) = BPlusTreePageCodec::decode(
829                    right_sibling_guard.data(),
830                    self.key_schema.clone(),
831                )?;
832                if right_sibling_page.current_size() > right_sibling_page.min_size() {
833                    self.redistribute(
834                        right_sibling_guard,
835                        node_guard,
836                        parent_guard,
837                        node_idx,
838                        false,
839                    )?;
840                    return Ok(());
841                }
842            } else {
843                let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
844                let (right_sibling_page, _) = BPlusTreePageCodec::decode(
845                    right_sibling_guard.data(),
846                    self.key_schema.clone(),
847                )?;
848                if right_sibling_page.current_size() > right_sibling_page.min_size() {
849                    self.redistribute(
850                        right_sibling_guard,
851                        node_guard,
852                        parent_guard,
853                        node_idx,
854                        false,
855                    )?;
856                    return Ok(());
857                }
858            }
859        }
860
861        // Neither sibling can lend, so we must merge.
862        if node_idx > 0 {
863            // Merge with the left sibling (lock in PageId order)
864            let left_sibling_pid = parent_page.value_at(node_idx - 1);
865            let node_pid = node_guard.page_id();
866            if left_sibling_pid < node_pid {
867                // Enforce PageId order for merge: release node first
868                drop(node_guard);
869                let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
870                let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
871                node_guard = node_guard_new;
872                self.coalesce(left_sibling_guard, node_guard, parent_guard, context)?;
873            } else {
874                let left_sibling_guard = self.buffer_pool.fetch_page_write(left_sibling_pid)?;
875                self.coalesce(left_sibling_guard, node_guard, parent_guard, context)?;
876            }
877        } else {
878            // Merge with the right sibling (lock in PageId order)
879            let right_sibling_pid = parent_page.value_at(node_idx + 1);
880            let node_pid = node_guard.page_id();
881            if right_sibling_pid < node_pid {
882                // Enforce PageId order for merge: release node first
883                drop(node_guard);
884                let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
885                let node_guard_new = self.buffer_pool.fetch_page_write(node_pid)?;
886                node_guard = node_guard_new;
887                // node is left in this case
888                self.coalesce(node_guard, right_sibling_guard, parent_guard, context)?;
889            } else {
890                let right_sibling_guard = self.buffer_pool.fetch_page_write(right_sibling_pid)?;
891                self.coalesce(node_guard, right_sibling_guard, parent_guard, context)?;
892            }
893        }
894
895        Ok(())
896    }
897
898    fn coalesce(
899        &self,
900        mut left_guard: WritePageGuard,
901        right_guard: WritePageGuard,
902        mut parent_guard: WritePageGuard,
903        context: &mut Context,
904    ) -> QuillSQLResult<()> {
905        let (mut left_page, _) =
906            BPlusTreePageCodec::decode(left_guard.data(), self.key_schema.clone())?;
907        let (mut right_page, _) =
908            BPlusTreePageCodec::decode(right_guard.data(), self.key_schema.clone())?;
909        let (mut parent_page, _) =
910            BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
911
912        let right_page_id = right_guard.page_id();
913        let middle_key = match parent_page.remove(right_page_id) {
914            Some((k, _)) => k,
915            None => {
916                return Err(QuillSQLError::Internal(
917                    "coalesce: parent missing right child".to_string(),
918                ));
919            }
920        };
921
922        match (&mut left_page, &mut right_page) {
923            (BPlusTreePage::Leaf(left), BPlusTreePage::Leaf(right)) => {
924                left.merge(right);
925            }
926            (BPlusTreePage::Internal(left), BPlusTreePage::Internal(right)) => {
927                left.merge(middle_key, right);
928                // B-link maintenance: after merge, left should inherit right's high bound and next pointer
929                left.header.next_page_id = right.header.next_page_id;
930                left.high_key = right.high_key.clone();
931            }
932            _ => unreachable!("Mismatched page types in coalesce"),
933        }
934
935        // Update pages on disk
936        if let BPlusTreePage::Leaf(p) = &mut left_page {
937            p.header.version += 1;
938        } else if let BPlusTreePage::Internal(p) = &mut left_page {
939            p.header.version += 1;
940        }
941        parent_page.header.version += 1;
942
943        let left_encoded = BPlusTreePageCodec::encode(&left_page);
944        self.wal_overwrite_page(&mut left_guard, left_encoded)?;
945        let parent_encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
946        self.wal_overwrite_page(&mut parent_guard, parent_encoded)?;
947        drop(left_guard);
948        drop(right_guard);
949        self.buffer_pool.delete_page(right_page_id)?;
950
951        // After merging, if the parent becomes a root with only one child,
952        // it needs to be adjusted to shrink the tree's height.
953        if context.write_set.is_empty() && parent_page.header.current_size == 1 {
954            self.adjust_root(parent_guard)?;
955        } else if parent_page.header.current_size < parent_page.min_size() {
956            // Otherwise, recursively handle underflow in the parent.
957            self.handle_underflow(parent_guard, context)?;
958        }
959        Ok(())
960    }
961
962    fn redistribute(
963        &self,
964        mut from_guard: WritePageGuard,
965        mut to_guard: WritePageGuard,
966        mut parent_guard: WritePageGuard,
967        parent_idx_of_to_node: usize,
968        from_is_left_sibling: bool,
969    ) -> QuillSQLResult<()> {
970        let (mut from_page, _) =
971            BPlusTreePageCodec::decode(from_guard.data(), self.key_schema.clone())?;
972        let (mut to_page, _) =
973            BPlusTreePageCodec::decode(to_guard.data(), self.key_schema.clone())?;
974        let (mut parent_page, _) =
975            BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
976
977        if from_is_left_sibling {
978            // from=left(from_internal), to=right(to_internal). Separator key is at parent_idx_of_to_node.
979            let separator_idx = parent_idx_of_to_node;
980            match (&mut to_page, &mut from_page) {
981                (BPlusTreePage::Leaf(to_leaf), BPlusTreePage::Leaf(from_leaf)) => {
982                    let item_to_move = from_leaf.remove_last_kv();
983                    to_leaf.array.insert(0, item_to_move);
984                    to_leaf.header.current_size += 1;
985                    // Parent separator stores the minimal key of the RIGHT child
986                    parent_page.array[separator_idx].0 = to_leaf.key_at(0).clone();
987                    to_leaf.header.version += 1;
988                    from_leaf.header.version += 1;
989                }
990                (BPlusTreePage::Internal(to_internal), BPlusTreePage::Internal(from_internal)) => {
991                    // This logic is based on bustub's BPlusTreeInternalPage::MoveLastToFrontOf
992                    // 1. Get the last key-pointer pair from the left sibling (from_internal)
993                    let item_to_move = from_internal.remove_last_kv(); // This is (K_last, P_last)
994                    let separator_key_in_parent = parent_page.key_at(separator_idx).clone();
995
996                    // 2. The old separator key from the parent moves down to become the first key in the recipient (to_internal).
997                    // The original sentinel pointer of the recipient becomes the pointer for this new key.
998                    to_internal
999                        .array
1000                        .insert(1, (separator_key_in_parent, to_internal.value_at(0)));
1001
1002                    // 3. The pointer from the moved item becomes the new sentinel for the recipient.
1003                    to_internal.array[0].1 = item_to_move.1; // Set sentinel to P_last
1004
1005                    // 4. The key from the moved item becomes the new separator in the parent.
1006                    parent_page.array[separator_idx].0 = item_to_move.0; // Set parent key to K_last
1007
1008                    to_internal.header.current_size += 1;
1009                    to_internal.header.version += 1;
1010                    from_internal.header.version += 1;
1011
1012                    self.refresh_internal_child_fence(
1013                        &parent_page,
1014                        to_guard.page_id(),
1015                        to_internal,
1016                    )?;
1017                    self.refresh_internal_child_fence(
1018                        &parent_page,
1019                        from_guard.page_id(),
1020                        from_internal,
1021                    )?;
1022                }
1023                _ => return Err(QuillSQLError::Internal("Mismatched page types".to_string())),
1024            }
1025        } else {
1026            // Borrow from RIGHT
1027            // from=right(from_internal), to=left(to_internal). Separator key is at parent_idx_of_to_node + 1.
1028            let separator_idx = parent_idx_of_to_node + 1;
1029            match (&mut to_page, &mut from_page) {
1030                (BPlusTreePage::Leaf(to_leaf), BPlusTreePage::Leaf(from_leaf)) => {
1031                    let item_to_move = from_leaf.remove_first_kv();
1032                    to_leaf.array.push(item_to_move);
1033                    to_leaf.header.current_size += 1;
1034                    // Parent separator stores the minimal key of the RIGHT child
1035                    parent_page.array[separator_idx].0 = from_leaf.key_at(0).clone();
1036                    to_leaf.header.version += 1;
1037                    from_leaf.header.version += 1;
1038                }
1039                (BPlusTreePage::Internal(to_internal), BPlusTreePage::Internal(from_internal)) => {
1040                    // This logic is based on bustub's BPlusTreeInternalPage::MoveFirstToEndOf
1041                    // 1. Get the separator from the parent and the first *real* item from the right sibling (from_internal)
1042                    let separator_key_in_parent = parent_page.key_at(separator_idx).clone();
1043                    let item_to_move = from_internal.remove_first_kv(); // This is (K_R1, P_R1)
1044
1045                    // 2. The old separator moves down to the end of the recipient (to_internal).
1046                    // Its pointer is the original sentinel pointer of the right sibling.
1047                    to_internal
1048                        .array
1049                        .push((separator_key_in_parent, from_internal.value_at(0)));
1050
1051                    // 3. The key from the moved item becomes the new separator in the parent.
1052                    parent_page.array[separator_idx].0 = item_to_move.0; // Set parent key to K_R1
1053
1054                    // 4. The pointer from the moved item becomes the new sentinel of the right sibling.
1055                    from_internal.array[0].1 = item_to_move.1; // Set right sibling's sentinel to P_R1
1056
1057                    to_internal.header.current_size += 1;
1058                    to_internal.header.version += 1;
1059                    from_internal.header.version += 1;
1060
1061                    self.refresh_internal_child_fence(
1062                        &parent_page,
1063                        to_guard.page_id(),
1064                        to_internal,
1065                    )?;
1066                    self.refresh_internal_child_fence(
1067                        &parent_page,
1068                        from_guard.page_id(),
1069                        from_internal,
1070                    )?;
1071                }
1072                _ => return Err(QuillSQLError::Internal("Mismatched page types".to_string())),
1073            }
1074        }
1075
1076        parent_page.header.version += 1;
1077
1078        let from_encoded = BPlusTreePageCodec::encode(&from_page);
1079        self.wal_overwrite_page(&mut from_guard, from_encoded)?;
1080        let to_encoded = BPlusTreePageCodec::encode(&to_page);
1081        self.wal_overwrite_page(&mut to_guard, to_encoded)?;
1082        let parent_encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
1083        self.wal_overwrite_page(&mut parent_guard, parent_encoded)?;
1084
1085        Ok(())
1086    }
1087
1088    fn refresh_internal_child_fence(
1089        &self,
1090        parent_page: &BPlusTreeInternalPage,
1091        child_page_id: PageId,
1092        child_page: &mut BPlusTreeInternalPage,
1093    ) -> QuillSQLResult<()> {
1094        let Some(child_idx) = parent_page.value_index(child_page_id) else {
1095            return Err(QuillSQLError::Internal(
1096                "redistribute: child missing from parent".to_string(),
1097            ));
1098        };
1099        let size = parent_page.header.current_size as usize;
1100        if child_idx + 1 < size {
1101            child_page.high_key = Some(parent_page.key_at(child_idx + 1).clone());
1102            child_page.header.next_page_id = parent_page.value_at(child_idx + 1);
1103        } else {
1104            child_page.high_key = parent_page.high_key.clone();
1105            child_page.header.next_page_id = parent_page.header.next_page_id;
1106        }
1107        Ok(())
1108    }
1109
1110    fn adjust_root(&self, root_guard: WritePageGuard) -> QuillSQLResult<()> {
1111        let (root_page, _) =
1112            BPlusTreePageCodec::decode(root_guard.data(), self.key_schema.clone())?;
1113
1114        if let BPlusTreePage::Internal(root_internal) = root_page {
1115            if root_internal.header.current_size == 1 {
1116                // The lock is already held by the caller (e.g., delete).
1117                // Re-acquiring it would cause a deadlock.
1118                let new_root_id = root_internal.value_at(0);
1119                // Drop page guard before touching header to avoid page<->header lock inversion
1120                drop(root_guard);
1121                let _lock = self.header_page_lock.write();
1122                self.set_root_page_id(new_root_id)?;
1123                // Delay old-root physical deallocation to avoid races with concurrent readers
1124            }
1125        } else if let BPlusTreePage::Leaf(root_leaf) = root_page {
1126            if root_leaf.header.current_size == 0 {
1127                // The lock is already held by the caller.
1128                drop(root_guard);
1129                let _lock = self.header_page_lock.write();
1130                self.set_root_page_id(INVALID_PAGE_ID)?;
1131                // Delay old-root physical deallocation to avoid races with concurrent readers
1132            }
1133        }
1134        Ok(())
1135    }
1136
1137    /// Internal: create the very first node when the tree is empty.
1138    fn start_new_tree(&self, key: &Tuple, rid: RecordId) -> QuillSQLResult<()> {
1139        let mut root_guard = self.buffer_pool.new_page()?;
1140        let root_page_id = root_guard.page_id();
1141        let mut leaf_page = BPlusTreeLeafPage::new(self.key_schema.clone(), self.leaf_max_size);
1142        leaf_page.insert(key.clone(), rid);
1143        let encoded_data = BPlusTreeLeafPageCodec::encode(&leaf_page);
1144        self.wal_overwrite_page(&mut root_guard, encoded_data)?;
1145        // Update root id (release page latch before touching header to avoid lock inversion).
1146        // Precondition: caller holds header_page_lock.
1147        drop(root_guard);
1148        self.set_root_page_id(root_page_id)?;
1149        Ok(())
1150    }
1151
1152    /// Best-effort lazy cleanup: iterate across leaves and delete keys whose RIDs
1153    /// satisfy the `is_globally_dead` predicate. This is REDO-safe and uses the
1154    /// existing delete() path to maintain separators and handle underflow properly.
1155    /// Notes:
1156    /// - Callers should pass a predicate that only returns true for tuples that are
1157    ///   globally invisible (e.g., deleted and the deleting txn committed).
1158    /// - This method is conservative and may revisit pages as structure changes; it
1159    ///   favors correctness over speed.
1160    pub fn lazy_cleanup_with<F>(
1161        &self,
1162        mut is_globally_dead: F,
1163        limit: Option<usize>,
1164    ) -> QuillSQLResult<usize>
1165    where
1166        F: FnMut(&RecordId) -> bool,
1167    {
1168        let mut cleaned = 0usize;
1169        let mut guard = self.find_first_leaf_page()?;
1170        loop {
1171            let (leaf, _) = BPlusTreeLeafPageCodec::decode(guard.data(), self.key_schema.clone())?;
1172            // Snapshot next pid before releasing read guard
1173            let next_pid = leaf.header.next_page_id;
1174            // Collect keys to delete
1175            let mut to_delete: Vec<Tuple> = Vec::new();
1176            for i in 0..(leaf.header.current_size as usize) {
1177                let kv = leaf.kv_at(i).clone();
1178                if is_globally_dead(&kv.1) {
1179                    to_delete.push(kv.0);
1180                }
1181            }
1182            drop(guard);
1183
1184            for k in to_delete.into_iter() {
1185                self.delete(&k)?;
1186                cleaned += 1;
1187                if let Some(maxn) = limit {
1188                    if cleaned >= maxn {
1189                        return Ok(cleaned);
1190                    }
1191                }
1192            }
1193
1194            if next_pid == INVALID_PAGE_ID {
1195                break;
1196            }
1197            guard = self.buffer_pool.fetch_page_read(next_pid)?;
1198        }
1199        Ok(cleaned)
1200    }
1201
1202    fn find_leaf_page_optimistic(&self, key: &Tuple) -> QuillSQLResult<ReadPageGuard> {
1203        // OLC + B-link: version-check each step; if changed, restart with bounded backoff.
1204        let mut restarts = 0usize;
1205        'restart: loop {
1206            let mut current_guard = self.buffer_pool.fetch_page_read(self.get_root_page_id()?)?;
1207            loop {
1208                let decoded =
1209                    BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone());
1210                if decoded.is_err() {
1211                    drop(current_guard);
1212                    restarts += 1;
1213                    if restarts > MAX_OLC_RESTARTS {
1214                        std::thread::sleep(std::time::Duration::from_micros(
1215                            OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1216                        ));
1217                        restarts = 0;
1218                    }
1219                    continue 'restart;
1220                }
1221                let (page, _) = decoded.unwrap();
1222                match page {
1223                    BPlusTreePage::Internal(internal) => {
1224                        // Double-read header-only version for OLC
1225                        let (hdr1, _) =
1226                            BPlusTreeInternalPageCodec::decode_header_only(current_guard.data())?;
1227                        let v1 = hdr1.version;
1228                        if let Some(ref hk) = internal.high_key {
1229                            if key >= hk && internal.header.next_page_id != INVALID_PAGE_ID {
1230                                let sib = self
1231                                    .buffer_pool
1232                                    .fetch_page_read(internal.header.next_page_id)?;
1233                                drop(current_guard);
1234                                current_guard = sib;
1235                                continue;
1236                            }
1237                        }
1238                        // Byte-based child lookup to avoid re-decode costs
1239                        let next_page_id = BPlusTreeInternalPageCodec::lookup_child_from_bytes(
1240                            current_guard.data(),
1241                            self.key_schema.clone(),
1242                            key,
1243                        )?;
1244                        let (hdr2, _) =
1245                            BPlusTreeInternalPageCodec::decode_header_only(current_guard.data())?;
1246                        let v2 = hdr2.version;
1247                        if v1 != v2 {
1248                            drop(current_guard);
1249                            restarts += 1;
1250                            if restarts > MAX_OLC_RESTARTS {
1251                                std::thread::sleep(std::time::Duration::from_micros(
1252                                    OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1253                                ));
1254                                restarts = 0;
1255                            }
1256                            continue 'restart;
1257                        }
1258                        let child_guard = self.buffer_pool.fetch_page_read(next_page_id)?;
1259                        drop(current_guard);
1260                        current_guard = child_guard;
1261                    }
1262                    BPlusTreePage::Leaf(_leaf) => {
1263                        let (h1, _) =
1264                            BPlusTreeLeafPageCodec::decode_header_only(current_guard.data())?;
1265                        let v1 = h1.version;
1266                        let (h2, _) =
1267                            BPlusTreeLeafPageCodec::decode_header_only(current_guard.data())?;
1268                        let v2 = h2.version;
1269                        if v1 != v2 {
1270                            drop(current_guard);
1271                            restarts += 1;
1272                            if restarts > MAX_OLC_RESTARTS {
1273                                std::thread::sleep(std::time::Duration::from_micros(
1274                                    OLC_BACKOFF_BASE_US.saturating_mul(1 << (restarts.min(10) - 1)),
1275                                ));
1276                                restarts = 0;
1277                            }
1278                            continue 'restart;
1279                        }
1280                        return Ok(current_guard);
1281                    }
1282                }
1283            }
1284        }
1285    }
1286
1287    pub fn find_first_leaf_page(&self) -> QuillSQLResult<ReadPageGuard> {
1288        let mut current_page_id = self.get_root_page_id()?;
1289        if current_page_id == INVALID_PAGE_ID {
1290            return Err(QuillSQLError::Internal("Tree is empty".to_string()));
1291        }
1292
1293        loop {
1294            let guard = self.buffer_pool.fetch_page_read(current_page_id)?;
1295            let (page, _) = BPlusTreePageCodec::decode(guard.data(), self.key_schema.clone())?;
1296
1297            match page {
1298                BPlusTreePage::Internal(internal) => {
1299                    current_page_id = internal.value_at(0);
1300                }
1301                BPlusTreePage::Leaf(_) => {
1302                    return Ok(guard);
1303                }
1304            }
1305        }
1306    }
1307
1308    /// 内部辅助函数:从根节点开始遍历,找到并返回包含目标 key 的
1309    /// 叶子节点的只读保护器 (ReadPageGuard)。
1310    ///
1311    /// 这个函数通过 ReadPageGuard 的 RAII 特性,在遍历时实现了闩锁耦合。
1312    pub fn find_leaf_page_for_iterator(
1313        &self,
1314        key: &Tuple,
1315        start_page_id: PageId,
1316    ) -> QuillSQLResult<ReadPageGuard> {
1317        let mut current_page_id = start_page_id;
1318        if current_page_id == INVALID_PAGE_ID {
1319            return Err(QuillSQLError::Storage("btree: empty tree".to_string()));
1320        }
1321
1322        loop {
1323            // a. 为当前页面获取一个 ReadPageGuard。
1324            //    这会自动 pin 住页面并加上读锁。
1325            let current_guard = self.buffer_pool.fetch_page_read(current_page_id)?;
1326
1327            // b. 解码页面内容以判断其类型。
1328            let (page_content, _) =
1329                BPlusTreePageCodec::decode(current_guard.data(), self.key_schema.clone())?;
1330
1331            match page_content {
1332                // c. 如果是内部节点...
1333                BPlusTreePage::Internal(_internal_page) => {
1334                    // find next child id using bytes path
1335                    current_page_id =
1336                        crate::storage::codec::BPlusTreeInternalPageCodec::lookup_child_from_bytes(
1337                            current_guard.data(),
1338                            self.key_schema.clone(),
1339                            key,
1340                        )?;
1341                }
1342                // d. 如果是叶子节点...
1343                BPlusTreePage::Leaf(_) => {
1344                    // 我们已经到达了树的最底层。返回这个页面的 Guard,
1345                    // 它的所有权会被转移给调用者 (get 方法),从而延长锁的生命周期。
1346                    return Ok(current_guard);
1347                }
1348            }
1349        }
1350    }
1351
1352    /// 内部方法:分裂一个节点,并可能递归地向上传播分裂。
1353    fn split<'a>(
1354        &'a self,
1355        mut page_guard: WritePageGuard,
1356        context: &mut Context<'a>,
1357    ) -> QuillSQLResult<()> {
1358        if self.config.debug_split_level >= 2 {
1359            eprintln!(
1360                "[SPLIT DEBUG] splitting page={}, write_set_len={}",
1361                page_guard.page_id(),
1362                context.write_set.len()
1363            );
1364        }
1365        loop {
1366            let page_id = page_guard.page_id();
1367            if self.config.debug_split_level >= 2 {
1368                eprintln!(
1369                    "[SPLIT DEBUG] splitting page={}, write_set_len={}",
1370                    page_id,
1371                    context.write_set.len()
1372                );
1373            }
1374            let (mut page, _) =
1375                BPlusTreePageCodec::decode(page_guard.data(), self.key_schema.clone())?;
1376
1377            let mut new_page_guard = self.buffer_pool.new_page()?;
1378            let new_page_id = new_page_guard.page_id();
1379
1380            let middle_key = match &mut page {
1381                BPlusTreePage::Leaf(leaf_page) => {
1382                    let mut new_leaf =
1383                        BPlusTreeLeafPage::new(self.key_schema.clone(), self.leaf_max_size);
1384                    new_leaf.batch_insert(
1385                        leaf_page.split_off(leaf_page.header.current_size as usize / 2),
1386                    );
1387                    new_leaf.header.next_page_id = leaf_page.header.next_page_id;
1388                    leaf_page.header.next_page_id = new_page_id;
1389                    let new_data = BPlusTreeLeafPageCodec::encode(&new_leaf);
1390                    self.wal_overwrite_page(&mut new_page_guard, new_data)?;
1391                    leaf_page.header.version += 1;
1392                    if self.config.debug_split_level >= 2 && new_leaf.header.current_size > 0 {
1393                        eprintln!(
1394                            "[SPLIT DEBUG] leaf_split left={} right={} sep_key={}",
1395                            page_id,
1396                            new_page_id,
1397                            new_leaf.key_at(0)
1398                        );
1399                    }
1400                    new_leaf.key_at(0).clone()
1401                }
1402                BPlusTreePage::Internal(internal_page) => {
1403                    let mut new_internal =
1404                        BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size);
1405
1406                    // 计算应上推的中间键位置:
1407                    // 假设 array 以哨兵在索引0,真实键在 [1..num_pointers-1]
1408                    // promote_idx = 1 + floor((num_pointers - 1) / 2)
1409                    let num_pointers = internal_page.header.current_size as usize;
1410                    let promote_idx = 1 + (num_pointers.saturating_sub(1) / 2);
1411
1412                    // 将 [promote_idx, ..) 全部从左页移出
1413                    let mut moved = internal_page.split_off(promote_idx);
1414
1415                    // moved[0] 为 (K_mid, P_mid)。提升 K_mid 到父节点。
1416                    // 右页的哨兵应为 P_mid(对应右页首键的左侧指针)。
1417                    let (middle_key, right_sentinel_ptr) = {
1418                        let pair = moved.get(0).ok_or(QuillSQLError::Internal(
1419                            "Internal split moved entries empty".to_string(),
1420                        ))?;
1421                        (pair.0.clone(), pair.1)
1422                    };
1423
1424                    // 右页插入哨兵 (Empty, P_mid)
1425                    new_internal.insert(Tuple::empty(self.key_schema.clone()), right_sentinel_ptr);
1426
1427                    // 将剩余的键值对(严格大于 middle_key 的部分)批量放入右页
1428                    if moved.len() > 1 {
1429                        // 跳过 moved[0](middle_key)
1430                        new_internal.batch_insert(moved.split_off(1));
1431                    }
1432
1433                    // Set B-link wires:
1434                    // - Save old high_key and next pointer from left
1435                    let old_high_key = internal_page.high_key.clone();
1436                    let old_next = internal_page.header.next_page_id;
1437                    // - Left's high_key becomes middle_key
1438                    internal_page.high_key = Some(middle_key.clone());
1439                    // - Right's high_key inherits old left's high bound
1440                    new_internal.high_key = old_high_key;
1441                    // - Right's next pointer inherits left's next
1442                    new_internal.header.next_page_id = old_next;
1443                    let new_data = BPlusTreeInternalPageCodec::encode(&new_internal);
1444                    self.wal_overwrite_page(&mut new_page_guard, new_data)?;
1445                    // B-link: publish right sibling pointer for readers to chase
1446                    internal_page.header.next_page_id = new_page_id;
1447                    internal_page.header.version += 1;
1448                    if self.config.debug_split_level >= 2 {
1449                        eprintln!(
1450                            "[SPLIT DEBUG] internal_split left={} right={} promote_key={}",
1451                            page_id, new_page_id, middle_key
1452                        );
1453                    }
1454                    middle_key
1455                }
1456            };
1457
1458            // 写回修改后的旧页面和新页面(保持子页锁直到父更新完成)
1459            let old_page_data = BPlusTreePageCodec::encode(&page);
1460            self.wal_overwrite_page(&mut page_guard, old_page_data)?;
1461
1462            // 若当前分裂页是根(无父在 write_set),则创建新的根
1463            if page_guard.page_id() == self.get_root_page_id()? {
1464                if self.config.debug_split_level >= 2 {
1465                    eprintln!(
1466                        "[SPLIT DEBUG] root-split: old_root={}, new_right={}",
1467                        page_id, new_page_id
1468                    );
1469                }
1470                let mut new_root_guard = self.buffer_pool.new_page()?;
1471                let new_root_id = new_root_guard.page_id();
1472                let mut new_root_page =
1473                    BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size);
1474
1475                new_root_page.insert(Tuple::empty(self.key_schema.clone()), page_id);
1476                new_root_page.insert(middle_key, new_page_id);
1477
1478                let encoded = BPlusTreeInternalPageCodec::encode(&new_root_page);
1479                self.wal_overwrite_page(&mut new_root_guard, encoded)?;
1480
1481                // Avoid deadlock: release child page latches before taking header lock
1482                drop(new_page_guard);
1483                drop(page_guard);
1484
1485                let _lock = self.header_page_lock.write();
1486                self.set_root_page_id(new_root_id)?;
1487                return Ok(());
1488            }
1489
1490            // 否则,更新父节点
1491            let mut parent_guard = match context.write_set.pop_back() {
1492                Some(g) => g,
1493                None => {
1494                    // Parent missing due to concurrent structural change; restart from root
1495                    return Err(QuillSQLError::Internal(
1496                        "split: missing parent in context".to_string(),
1497                    ));
1498                }
1499            };
1500            if std::env::var("QUILL_DEBUG_SPLIT").ok().as_deref() == Some("2") {
1501                eprintln!(
1502                    "[SPLIT DEBUG] promote to parent={}, left={}, right={}",
1503                    parent_guard.page_id(),
1504                    page_id,
1505                    new_page_id
1506                );
1507            }
1508            let (mut parent_page, _) =
1509                BPlusTreeInternalPageCodec::decode(parent_guard.data(), self.key_schema.clone())?;
1510            // Insert the separator key right after the original left child (page_id)
1511            if parent_page.value_index(page_id).is_none() {
1512                // Parent no longer contains this child; signal upper layer to retry from root
1513                return Err(QuillSQLError::Internal(
1514                    "split: parent no longer contains left child".to_string(),
1515                ));
1516            }
1517            parent_page.insert_after(page_id, middle_key, new_page_id);
1518            parent_page.header.version += 1;
1519
1520            let encoded = BPlusTreeInternalPageCodec::encode(&parent_page);
1521            self.wal_overwrite_page(&mut parent_guard, encoded)?;
1522
1523            if parent_page.is_full() {
1524                // 子页到父的结构已一致,现在可释放子页锁,继续向上分裂父
1525                drop(new_page_guard);
1526                drop(page_guard);
1527                page_guard = parent_guard;
1528                // 不要释放更上层的锁,保持保守闩锁直到分裂完成
1529            } else {
1530                // 更新完成,释放子页锁,结束
1531                drop(new_page_guard);
1532                drop(page_guard);
1533                return Ok(());
1534            }
1535        }
1536    }
1537
1538    /// Hint that a reader observed a globally invisible RID through this index.
1539    /// This increments a best-effort counter to help background cleanup scheduling.
1540    pub fn note_potential_garbage(&self, n: usize) {
1541        self.pending_garbage.fetch_add(n, AtomicOrdering::Relaxed);
1542    }
1543
1544    /// Returns and resets the pending garbage counter.
1545    pub fn take_pending_garbage(&self) -> usize {
1546        self.pending_garbage.swap(0, AtomicOrdering::Relaxed)
1547    }
1548}
1549
1550#[cfg(test)]
1551mod tests {
1552    use parking_lot::deadlock;
1553    use std::sync::Arc;
1554    use std::sync::Once;
1555    use std::time::Duration;
1556    use tempfile::TempDir;
1557
1558    use crate::catalog::SchemaRef;
1559    use crate::config::WalConfig;
1560    use crate::recovery::{RecoveryManager, WalManager};
1561    use crate::storage::disk_manager::DiskManager;
1562    use crate::storage::disk_scheduler::DiskScheduler;
1563    use crate::storage::index::btree_index::TreeIndexIterator;
1564    use crate::storage::page::{BPlusTreePage, RecordId};
1565    use crate::storage::tuple::Tuple;
1566    use crate::{
1567        buffer::BufferManager,
1568        catalog::{Column, DataType, Schema},
1569        storage::codec::BPlusTreePageCodec,
1570    };
1571
1572    use super::BPlusTreeIndex;
1573
1574    fn ensure_deadlock_watchdog() {
1575        static START: Once = Once::new();
1576        START.call_once(|| {
1577            std::thread::spawn(|| loop {
1578                std::thread::sleep(Duration::from_millis(500));
1579                let deadlocks = deadlock::check_deadlock();
1580                if !deadlocks.is_empty() {
1581                    eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
1582                    for (i, threads) in deadlocks.iter().enumerate() {
1583                        eprintln!("Cycle {}:", i);
1584                        for t in threads {
1585                            eprintln!("  ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
1586                        }
1587                    }
1588                    panic!("deadlock detected");
1589                }
1590            });
1591        });
1592    }
1593
1594    /// Creates a test environment with specified buffer pool size and B+ tree parameters
1595    fn create_test_index(
1596        buffer_pool_size: usize,
1597        internal_max_size: u32,
1598        leaf_max_size: u32,
1599    ) -> (TempDir, BPlusTreeIndex, SchemaRef) {
1600        let temp_dir = TempDir::new().unwrap();
1601        let temp_path = temp_dir.path().join("test.db");
1602
1603        let key_schema = Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1604        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1605        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1606        let buffer_pool = Arc::new(BufferManager::new(buffer_pool_size, disk_scheduler));
1607        let index = BPlusTreeIndex::new(
1608            key_schema.clone(),
1609            buffer_pool,
1610            internal_max_size,
1611            leaf_max_size,
1612        );
1613
1614        (temp_dir, index, key_schema)
1615    }
1616
1617    /// Helper: build bpm + wal on a fresh temp dir
1618    fn setup_with_wal(
1619        db_path: &std::path::Path,
1620        wal_dir: &std::path::Path,
1621        bpm_pages: usize,
1622    ) -> (Arc<BufferManager>, Arc<WalManager>, Arc<DiskScheduler>) {
1623        let dm = Arc::new(DiskManager::try_new(db_path).unwrap());
1624        let scheduler = Arc::new(DiskScheduler::new(dm));
1625        let mut cfg = WalConfig::default();
1626        cfg.directory = wal_dir.to_path_buf();
1627        let wal = Arc::new(WalManager::new(cfg, None, None).unwrap());
1628        let bpm = Arc::new(BufferManager::new(bpm_pages, scheduler.clone()));
1629        bpm.set_wal_manager(wal.clone());
1630        (bpm, wal, scheduler)
1631    }
1632
1633    /// 根据 key 生成与 BusTub 一致的 RID,用于断言
1634    fn rid_from_key(key: i64) -> RecordId {
1635        let value = key & 0xFFFFFFFF;
1636        RecordId::new((key >> 32) as u32, value as u32)
1637    }
1638
1639    #[test]
1640    fn test_index_recovery_replays_wal_split() {
1641        ensure_deadlock_watchdog();
1642        let tmp = TempDir::new().unwrap();
1643        let db_path = tmp.path().join("test.db");
1644        let wal_dir = tmp.path().join("wal");
1645
1646        // 1) 初始运行:带 WAL 的环境,强制小页触发分裂
1647        let (bpm, wal, scheduler) = setup_with_wal(&db_path, &wal_dir, 64);
1648        let key_schema: SchemaRef =
1649            Arc::new(Schema::new(vec![Column::new("a", DataType::Int64, false)]));
1650        let index = BPlusTreeIndex::new(key_schema.clone(), bpm.clone(), 2, 3);
1651        let header_pid = index.header_page_id; // 记录 header 页 id,崩溃后用来 reopen
1652
1653        let keys: Vec<i64> = (1..=30).collect();
1654        for k in &keys {
1655            let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1656            index.insert(&t, rid_from_key(*k)).unwrap();
1657        }
1658
1659        // 刷 WAL 到耐久,模拟崩溃
1660        let _ = wal.flush(None).unwrap();
1661        drop(index);
1662        drop(bpm);
1663        drop(wal);
1664        drop(scheduler);
1665
1666        // 2) 恢复阶段:重放 WAL 到数据文件
1667        let dm2 = Arc::new(DiskManager::try_new(&db_path).unwrap());
1668        let scheduler2 = Arc::new(DiskScheduler::new(dm2));
1669        let mut cfg2 = WalConfig::default();
1670        cfg2.directory = wal_dir.clone();
1671        let wal2 = Arc::new(WalManager::new(cfg2, None, None).unwrap());
1672        let rm = RecoveryManager::new(wal2.clone(), scheduler2.clone());
1673        let _summary = rm.replay().unwrap();
1674
1675        // 3) 打开新的 BufferPool,reopen 索引并验证所有键可查询 & 有序遍历
1676        let bpm2 = Arc::new(BufferManager::new(128, scheduler2.clone()));
1677        let reopened = BPlusTreeIndex::open(key_schema.clone(), bpm2.clone(), 2, 3, header_pid);
1678
1679        for k in &keys {
1680            let t = Tuple::new(key_schema.clone(), vec![(*k).into()]);
1681            let got = reopened
1682                .get(&t)
1683                .unwrap()
1684                .expect("missing key after recovery");
1685            assert_eq!(got.page_id, rid_from_key(*k).page_id);
1686            assert_eq!(got.slot_num, rid_from_key(*k).slot_num);
1687        }
1688
1689        // 验证遍历顺序
1690        let index_arc = Arc::new(reopened);
1691        let mut it = TreeIndexIterator::new(index_arc, ..);
1692        let mut i = 1i64;
1693        while let Some(rid) = it.next().unwrap() {
1694            assert_eq!(rid.slot_num, rid_from_key(i).slot_num);
1695            i += 1;
1696        }
1697        assert_eq!(i, 31);
1698    }
1699
1700    /// Helper function to create RID from i64 key (exactly like BusTub's RID construction)
1701    fn create_rid_from_key(key: i64) -> RecordId {
1702        let value = key & 0xFFFFFFFF;
1703        RecordId::new((key >> 32) as u32, value as u32)
1704    }
1705
1706    /// Helper function to create tuple from i64 key
1707    fn create_tuple_from_key(key: i64, schema: SchemaRef) -> Tuple {
1708        Tuple::new(schema, vec![key.into()])
1709    }
1710
1711    /// TEST: BasicInsertTest - equivalent to BusTub's basic insert test
1712    #[test]
1713    fn test_basic_insert() {
1714        let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1715
1716        let key = 42i64;
1717        let tuple = create_tuple_from_key(key, key_schema.clone());
1718        let rid = create_rid_from_key(key);
1719
1720        index.insert(&tuple, rid).unwrap();
1721
1722        let root_page_id = index.get_root_page_id().unwrap();
1723        assert_ne!(root_page_id, crate::buffer::INVALID_PAGE_ID);
1724
1725        let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
1726        let (root_page, _) =
1727            BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
1728
1729        assert!(matches!(root_page, BPlusTreePage::Leaf(_)));
1730
1731        if let BPlusTreePage::Leaf(root_as_leaf) = root_page {
1732            assert_eq!(root_as_leaf.header.current_size, 1);
1733            assert_eq!(root_as_leaf.array[0].0, tuple);
1734            assert_eq!(root_as_leaf.array[0].1, rid);
1735        }
1736    }
1737
1738    /// TEST: InsertTest1NoIterator - equivalent to BusTub's insert test without iterator
1739    #[test]
1740    fn test_insert_no_iterator() {
1741        let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1742
1743        let keys = vec![1i64, 2, 3, 4, 5];
1744        for key in &keys {
1745            let tuple = create_tuple_from_key(*key, key_schema.clone());
1746            let rid = create_rid_from_key(*key);
1747            index.insert(&tuple, rid).unwrap();
1748        }
1749
1750        for key in &keys {
1751            let tuple = create_tuple_from_key(*key, key_schema.clone());
1752            let result = index.get(&tuple).unwrap();
1753            assert!(result.is_some(), "missing key {}", key);
1754
1755            let expected_rid = create_rid_from_key(*key);
1756            let actual_rid = result.unwrap();
1757            assert_eq!(actual_rid.page_id, expected_rid.page_id);
1758            assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1759        }
1760    }
1761
1762    /// TEST: InsertTest2 - insert in reverse order with iterator validation
1763    #[test]
1764    fn test_insert_reverse_order() {
1765        let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1766
1767        let keys = vec![5i64, 4, 3, 2, 1];
1768        for key in &keys {
1769            let tuple = create_tuple_from_key(*key, key_schema.clone());
1770            let rid = create_rid_from_key(*key);
1771            index.insert(&tuple, rid).unwrap();
1772        }
1773
1774        for key in &keys {
1775            let tuple = create_tuple_from_key(*key, key_schema.clone());
1776            let result = index.get(&tuple).unwrap();
1777            assert!(result.is_some(), "missing key {}", key);
1778
1779            let expected_rid = create_rid_from_key(*key);
1780            let actual_rid = result.unwrap();
1781            assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1782        }
1783
1784        // Test iterator order (should be sorted)
1785        let index_arc = Arc::new(index);
1786        let mut iter = TreeIndexIterator::new(index_arc, ..);
1787        let mut current_key = 1i64;
1788        while let Some(rid) = iter.next().unwrap() {
1789            assert_eq!(rid.slot_num as i64, current_key);
1790            current_key += 1;
1791        }
1792        assert_eq!(current_key, keys.len() as i64 + 1);
1793    }
1794
1795    /// TEST: DeleteTestNoIterator - equivalent to BusTub's delete test without iterator
1796    #[test]
1797    fn test_delete_no_iterator() {
1798        let (_temp_dir, index, key_schema) = create_test_index(50, 2, 3);
1799
1800        let keys = vec![1i64, 2, 3, 4, 5];
1801        for key in &keys {
1802            let tuple = create_tuple_from_key(*key, key_schema.clone());
1803            let rid = create_rid_from_key(*key);
1804            index.insert(&tuple, rid).unwrap();
1805        }
1806
1807        // Verify all keys are present and have correct RIDs
1808        for key in &keys {
1809            let tuple = create_tuple_from_key(*key, key_schema.clone());
1810            let result = index.get(&tuple).unwrap();
1811            assert!(result.is_some());
1812
1813            let expected_rid = create_rid_from_key(*key);
1814            let actual_rid = result.unwrap();
1815            assert_eq!(actual_rid.page_id, expected_rid.page_id);
1816            assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
1817        }
1818
1819        // Remove some keys - exactly like BusTub test
1820        let remove_keys = vec![1i64, 5, 3, 4];
1821        for key in &remove_keys {
1822            println!("Before deleting key {}:\n{}", key, index.to_dot().unwrap());
1823            let tuple = create_tuple_from_key(*key, key_schema.clone());
1824            index.delete(&tuple).unwrap();
1825            println!("After deleting key {}:\n{}", key, index.to_dot().unwrap());
1826        }
1827
1828        let mut size = 0;
1829        for key in &keys {
1830            let tuple = create_tuple_from_key(*key, key_schema.clone());
1831            let is_present = index.get(&tuple).unwrap().is_some();
1832
1833            if !is_present {
1834                assert!(remove_keys.contains(key));
1835            } else {
1836                assert!(!remove_keys.contains(key));
1837                assert_eq!(
1838                    index.get(&tuple).unwrap().unwrap().slot_num,
1839                    (*key & 0xFFFFFFFF) as u32
1840                );
1841                size += 1;
1842            }
1843        }
1844        assert_eq!(size, 1);
1845
1846        // Remove the remaining key and check if tree becomes empty
1847        let tuple = create_tuple_from_key(2, key_schema.clone());
1848        println!("Before deleting final key 2:\n{}", index.to_dot().unwrap());
1849        index.delete(&tuple).unwrap();
1850        println!("After deleting final key 2:\n{}", index.to_dot().unwrap());
1851        assert!(index.is_empty().unwrap());
1852    }
1853
1854    #[test]
1855    fn test_internal_borrow_from_right_keeps_searchable() {
1856        let (_temp_dir, index, key_schema) = create_test_index(64, 2, 3);
1857
1858        let inserts = [
1859            -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
1860        ];
1861        for key in inserts {
1862            let tuple = create_tuple_from_key(key, key_schema.clone());
1863            let rid = create_rid_from_key(key);
1864            index.insert(&tuple, rid).unwrap();
1865        }
1866
1867        let deletes = [-5, -4, -3, -2, -1, 0, 1, 2];
1868        for key in deletes {
1869            let tuple = create_tuple_from_key(key, key_schema.clone());
1870            index.delete(&tuple).unwrap();
1871        }
1872
1873        let root_page_id = index.get_root_page_id().unwrap();
1874        let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
1875        let (root_page, _) =
1876            BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
1877        let (left_internal_id, right_internal_id) =
1878            if let BPlusTreePage::Internal(root_internal) = root_page {
1879                assert_eq!(root_internal.header.current_size, 2);
1880                (root_internal.value_at(0), root_internal.value_at(1))
1881            } else {
1882                panic!("expected internal root after deletions");
1883            };
1884        drop(root_guard);
1885
1886        let left_guard = index.buffer_pool.fetch_page_read(left_internal_id).unwrap();
1887        let (left_page, _) =
1888            BPlusTreePageCodec::decode(left_guard.data(), key_schema.clone()).unwrap();
1889        if let BPlusTreePage::Internal(left_internal) = left_page {
1890            assert_eq!(
1891                left_internal.high_key,
1892                Some(create_tuple_from_key(7, key_schema.clone()))
1893            );
1894            assert_eq!(left_internal.header.next_page_id, right_internal_id);
1895        } else {
1896            panic!("expected left child to remain internal");
1897        }
1898
1899        let survivor = create_tuple_from_key(3, key_schema.clone());
1900        assert!(index.get(&survivor).unwrap().is_some());
1901    }
1902
1903    /// TEST: SequentialEdgeMixTest - equivalent to BusTub's mixed insert/delete test
1904    #[test]
1905    fn test_sequential_edge_mix() {
1906        ensure_deadlock_watchdog();
1907        for leaf_max_size in 2..=5 {
1908            let (_temp_dir, index, key_schema) = create_test_index(50, 3, leaf_max_size);
1909
1910            let keys = vec![1i64, 5, 15, 20, 25, 2, -1, -2, 6, 14, 4];
1911            let mut inserted = vec![];
1912            let mut deleted = vec![];
1913
1914            for key in &keys {
1915                let tuple = create_tuple_from_key(*key, key_schema.clone());
1916                let rid = create_rid_from_key(*key);
1917                index.insert(&tuple, rid).unwrap();
1918                inserted.push(*key);
1919
1920                // Verify all inserted keys are present and deleted keys are absent
1921                verify_tree_state(&index, &key_schema, &inserted, &deleted);
1922            }
1923
1924            // Delete key 1
1925            let tuple = create_tuple_from_key(1, key_schema.clone());
1926            index.delete(&tuple).unwrap();
1927            deleted.push(1);
1928            inserted.retain(|&x| x != 1);
1929            verify_tree_state(&index, &key_schema, &inserted, &deleted);
1930
1931            // Insert key 3
1932            let tuple = create_tuple_from_key(3, key_schema.clone());
1933            let rid = create_rid_from_key(3);
1934            index.insert(&tuple, rid).unwrap();
1935            inserted.push(3);
1936            verify_tree_state(&index, &key_schema, &inserted, &deleted);
1937
1938            // Delete remaining keys
1939            let delete_keys = vec![4i64, 14, 6, 2, 15, -2, -1, 3, 5, 25, 20];
1940            for key in &delete_keys {
1941                let tuple = create_tuple_from_key(*key, key_schema.clone());
1942                index.delete(&tuple).unwrap();
1943                deleted.push(*key);
1944                inserted.retain(|&x| x != *key);
1945                verify_tree_state(&index, &key_schema, &inserted, &deleted);
1946            }
1947        }
1948    }
1949
1950    /// Helper function to verify tree state
1951    fn verify_tree_state(
1952        index: &BPlusTreeIndex,
1953        key_schema: &SchemaRef,
1954        inserted: &[i64],
1955        deleted: &[i64],
1956    ) {
1957        for key in inserted {
1958            let tuple = create_tuple_from_key(*key, key_schema.clone());
1959            let result = index.get(&tuple).unwrap();
1960            assert!(result.is_some(), "Key {} should be present", key);
1961        }
1962
1963        for key in deleted {
1964            let tuple = create_tuple_from_key(*key, key_schema.clone());
1965            let result = index.get(&tuple).unwrap();
1966            assert!(result.is_none(), "Key {} should be deleted", key);
1967        }
1968    }
1969
1970    /// TEST: Concurrent insert test - multi-threaded insertions
1971    #[test]
1972    fn test_concurrent_insert() {
1973        // spawn deadlock watchdog
1974        std::thread::spawn(|| loop {
1975            std::thread::sleep(Duration::from_millis(500));
1976            let deadlocks = deadlock::check_deadlock();
1977            if !deadlocks.is_empty() {
1978                eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
1979                for (i, threads) in deadlocks.iter().enumerate() {
1980                    eprintln!("Cycle {}:", i);
1981                    for t in threads {
1982                        eprintln!("  ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
1983                    }
1984                }
1985                panic!("deadlock detected");
1986            }
1987        });
1988        const NUM_ITERS: usize = 3;
1989        const SCALE_FACTOR: i64 = 50; // further scaled down for fast diagnostics
1990        const NUM_THREADS: usize = 5;
1991
1992        for _iter in 0..NUM_ITERS {
1993            let (_temp_dir, index, key_schema) = create_test_index(64, 3, 5);
1994            let index = Arc::new(index);
1995
1996            let keys: Vec<i64> = (1..SCALE_FACTOR).collect();
1997            let mut threads = vec![];
1998
1999            for i in 0..NUM_THREADS {
2000                let index_clone = index.clone();
2001                let key_schema_clone = key_schema.clone();
2002                let keys_clone = keys.clone();
2003
2004                threads.push(std::thread::spawn(move || {
2005                    for key in &keys_clone {
2006                        if (*key as usize) % NUM_THREADS == i {
2007                            let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2008                            let rid = create_rid_from_key(*key);
2009                            index_clone.insert(&tuple, rid).unwrap();
2010                        }
2011                    }
2012                }));
2013            }
2014
2015            for thread in threads {
2016                thread.join().unwrap();
2017            }
2018
2019            // Verify all keys were inserted correctly
2020            for key in &keys {
2021                let tuple = create_tuple_from_key(*key, key_schema.clone());
2022                let result = index.get(&tuple).unwrap();
2023                assert!(result.is_some());
2024                assert_eq!(result.unwrap().slot_num, (*key & 0xFFFFFFFF) as u32);
2025            }
2026
2027            // Test iterator order
2028            let mut iter = TreeIndexIterator::new(index.clone(), ..);
2029            let mut current_key = 1i64;
2030            while let Some(rid) = iter.next().unwrap() {
2031                assert_eq!(rid.slot_num, (current_key & 0xFFFFFFFF) as u32);
2032                current_key += 1;
2033            }
2034            assert_eq!(current_key, keys.len() as i64 + 1);
2035        }
2036    }
2037
2038    /// TEST: BasicScaleTest - equivalent to BusTub's large scale insertion test
2039    #[test]
2040    fn test_basic_scale() {
2041        let (_temp_dir, index, key_schema) = create_test_index(512, 2, 3);
2042
2043        let scale = 1000i64;
2044        let mut keys: Vec<i64> = (1..=scale).collect();
2045
2046        // Shuffle keys to randomize insertion order
2047        let mut seed: u64 = 0x9E3779B97F4A7C15;
2048        for i in (1..keys.len()).rev() {
2049            seed = seed
2050                .wrapping_mul(2862933555777941757)
2051                .wrapping_add(3037000493);
2052            let j = (seed as usize) % (i + 1);
2053            keys.swap(i, j);
2054        }
2055
2056        // Insert all keys and verify immediately
2057        for key in &keys {
2058            let tuple = create_tuple_from_key(*key, key_schema.clone());
2059            let rid = create_rid_from_key(*key);
2060            index.insert(&tuple, rid).unwrap();
2061            let got = index.get(&tuple).unwrap();
2062            if got.is_none() {
2063                println!(
2064                    "After inserting {}, tree=\n{}",
2065                    key,
2066                    index.to_dot().unwrap()
2067                );
2068                panic!("immediate missing key {}", key);
2069            }
2070        }
2071
2072        // // Debug view
2073        // println!(
2074        //     "Tree after {} inserts:\n{}",
2075        //     keys.len(),
2076        //     index.to_dot().unwrap()
2077        // );
2078
2079        // Early probe for a known-missing sample to print context
2080        let probe = 630i64;
2081        let tuple = create_tuple_from_key(probe, key_schema.clone());
2082        if index.get(&tuple).unwrap().is_none() {
2083            let guard = index.find_leaf_page_optimistic(&tuple).unwrap();
2084            let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone()).unwrap();
2085            if let BPlusTreePage::Leaf(leaf) = page {
2086                println!(
2087                    "Early probe leaf for 630 has keys: {:?}",
2088                    leaf.array
2089                        .iter()
2090                        .map(|(t, _)| format!("{}", t))
2091                        .collect::<Vec<_>>()
2092                );
2093            }
2094            panic!("missing key 630 before verification loop");
2095        }
2096
2097        // Verify all keys are retrievable
2098        for key in &keys {
2099            let tuple = create_tuple_from_key(*key, key_schema.clone());
2100            let result = index.get(&tuple).unwrap();
2101            assert!(result.is_some(), "missing key {}", key);
2102
2103            let expected_rid = create_rid_from_key(*key);
2104            let actual_rid = result.unwrap();
2105            assert_eq!(actual_rid.slot_num, expected_rid.slot_num);
2106        }
2107
2108        // Focus probe around 630 for debugging
2109        let probe = 630i64;
2110        let tuple = create_tuple_from_key(probe, key_schema.clone());
2111        if index.get(&tuple).unwrap().is_none() {
2112            let guard = index.find_leaf_page_optimistic(&tuple).unwrap();
2113            let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone()).unwrap();
2114            if let BPlusTreePage::Leaf(leaf) = page {
2115                println!(
2116                    "Probe leaf for 630 has keys: {:?}",
2117                    leaf.array
2118                        .iter()
2119                        .map(|(t, _)| format!("{}", t))
2120                        .collect::<Vec<_>>()
2121                );
2122            }
2123        }
2124    }
2125
2126    /// TEST: Concurrent delete test
2127    #[test]
2128    fn test_concurrent_delete() {
2129        const NUM_ITERS: usize = 10;
2130
2131        for _iter in 0..NUM_ITERS {
2132            let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2133
2134            // Sequential insert
2135            let keys = vec![1i64, 2, 3, 4, 5];
2136            for key in &keys {
2137                let tuple = create_tuple_from_key(*key, key_schema.clone());
2138                let rid = create_rid_from_key(*key);
2139                index.insert(&tuple, rid).unwrap();
2140            }
2141
2142            let index = Arc::new(index);
2143            let remove_keys = vec![1i64, 5, 3, 4];
2144            let mut threads = vec![];
2145
2146            for i in 0..2 {
2147                let index_clone = index.clone();
2148                let key_schema_clone = key_schema.clone();
2149                let remove_keys_clone = remove_keys.clone();
2150
2151                threads.push(std::thread::spawn(move || {
2152                    for key in &remove_keys_clone {
2153                        if (*key as usize) % 2 == i {
2154                            let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2155                            index_clone.delete(&tuple).unwrap();
2156                        }
2157                    }
2158                }));
2159            }
2160
2161            for thread in threads {
2162                thread.join().unwrap();
2163            }
2164
2165            // Check that only key 2 remains
2166            let mut size = 0;
2167            let index_arc = index.clone();
2168            let mut iter = TreeIndexIterator::new(index_arc, ..);
2169            while let Some(rid) = iter.next().unwrap() {
2170                assert_eq!(rid.slot_num, 2);
2171                size += 1;
2172            }
2173            assert_eq!(size, 1);
2174        }
2175    }
2176
2177    /// TEST: Mixed concurrent operations
2178    #[test]
2179    fn test_concurrent_mix() {
2180        // spawn deadlock watchdog
2181        std::thread::spawn(|| loop {
2182            std::thread::sleep(Duration::from_millis(500));
2183            let deadlocks = deadlock::check_deadlock();
2184            if !deadlocks.is_empty() {
2185                eprintln!("DEADLOCK DETECTED: {} cycles", deadlocks.len());
2186                for (i, threads) in deadlocks.iter().enumerate() {
2187                    eprintln!("Cycle {}:", i);
2188                    for t in threads {
2189                        eprintln!("  ThreadId={:?}\n{:?}", t.thread_id(), t.backtrace());
2190                    }
2191                }
2192                panic!("deadlock detected");
2193            }
2194        });
2195        const NUM_ITERS: usize = 5;
2196
2197        for _iter in 0..NUM_ITERS {
2198            let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2199            let index = Arc::new(index);
2200
2201            // Divide keys for insert and delete
2202            let mut for_insert = vec![];
2203            let mut for_delete = vec![];
2204            let total_keys = 20i64; // further scaled down for diagnostics
2205
2206            for i in 1..=total_keys {
2207                if i % 2 == 0 {
2208                    for_insert.push(i);
2209                } else {
2210                    for_delete.push(i);
2211                }
2212            }
2213
2214            // Insert all keys to delete first
2215            for key in &for_delete {
2216                let tuple = create_tuple_from_key(*key, key_schema.clone());
2217                let rid = create_rid_from_key(*key);
2218                index.insert(&tuple, rid).unwrap();
2219            }
2220
2221            let mut threads = vec![];
2222            let num_threads = 5;
2223
2224            for i in 0..num_threads {
2225                let index_clone = index.clone();
2226                let key_schema_clone = key_schema.clone();
2227                let for_insert_clone = for_insert.clone();
2228                let for_delete_clone = for_delete.clone();
2229
2230                threads.push(std::thread::spawn(move || {
2231                    if i % 2 == 0 {
2232                        // Insert thread
2233                        for key in &for_insert_clone {
2234                            let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2235                            let rid = create_rid_from_key(*key);
2236                            index_clone.insert(&tuple, rid).unwrap();
2237                        }
2238                    } else {
2239                        // Delete thread
2240                        for key in &for_delete_clone {
2241                            let tuple = create_tuple_from_key(*key, key_schema_clone.clone());
2242                            index_clone.delete(&tuple).unwrap();
2243                        }
2244                    }
2245                }));
2246            }
2247
2248            for thread in threads {
2249                thread.join().unwrap();
2250            }
2251
2252            // Verify only inserted keys remain
2253            let mut count = 0;
2254            for key in &for_insert {
2255                let tuple = create_tuple_from_key(*key, key_schema.clone());
2256                let result = index.get(&tuple).unwrap();
2257                if result.is_some() {
2258                    count += 1;
2259                }
2260            }
2261            assert_eq!(count, for_insert.len());
2262
2263            // Verify deleted keys are gone
2264            for key in &for_delete {
2265                let tuple = create_tuple_from_key(*key, key_schema.clone());
2266                let result = index.get(&tuple).unwrap();
2267                assert!(result.is_none());
2268            }
2269        }
2270    }
2271
2272    /// TEST: Iterator functionality test
2273    #[test]
2274    fn test_iterator_functionality() {
2275        let (_temp_dir, index, key_schema) = create_test_index(50, 3, 5);
2276
2277        // Insert test data
2278        let keys = vec![1i64, 3, 5, 7, 9];
2279        for key in &keys {
2280            let tuple = create_tuple_from_key(*key, key_schema.clone());
2281            let rid = create_rid_from_key(*key);
2282            index.insert(&tuple, rid).unwrap();
2283        }
2284
2285        let index = Arc::new(index);
2286
2287        // Test range iterator [3, 7]
2288        let start_tuple = create_tuple_from_key(3, key_schema.clone());
2289        let end_tuple = create_tuple_from_key(7, key_schema.clone());
2290        let mut iterator = TreeIndexIterator::new(index.clone(), start_tuple..=end_tuple);
2291
2292        let mut results = vec![];
2293        while let Some(rid) = iterator.next().unwrap() {
2294            results.push(rid.slot_num as i64);
2295        }
2296        assert_eq!(results, vec![3, 5, 7]);
2297
2298        // Test unbounded iterator
2299        let mut iterator = TreeIndexIterator::new(index.clone(), ..);
2300        let mut all_results = vec![];
2301        while let Some(rid) = iterator.next().unwrap() {
2302            all_results.push(rid.slot_num as i64);
2303        }
2304        assert_eq!(all_results, vec![1, 3, 5, 7, 9]);
2305    }
2306
2307    /// TEST: Tree structure tests - leaf split
2308    #[test]
2309    fn test_leaf_split_structure() {
2310        let (_temp_dir, index, key_schema) = create_test_index(50, 10, 3);
2311
2312        // Insert keys to trigger leaf split
2313        for k in [1i64, 2, 3, 4] {
2314            let tuple = create_tuple_from_key(k, key_schema.clone());
2315            let rid = create_rid_from_key(k);
2316            index.insert(&tuple, rid).unwrap();
2317        }
2318
2319        let root_page_id = index.get_root_page_id().unwrap();
2320        let root_guard = index.buffer_pool.fetch_page_read(root_page_id).unwrap();
2321        let (root_page, _) =
2322            BPlusTreePageCodec::decode(root_guard.data(), key_schema.clone()).unwrap();
2323
2324        // Root should be internal page after split
2325        let BPlusTreePage::Internal(root_internal) = root_page else {
2326            panic!("root is not internal after leaf split");
2327        };
2328
2329        // Check structure integrity
2330        let left_pid = root_internal.value_at(0);
2331        let right_pid = root_internal.value_at(1);
2332
2333        let left_guard = index.buffer_pool.fetch_page_read(left_pid).unwrap();
2334        let right_guard = index.buffer_pool.fetch_page_read(right_pid).unwrap();
2335        let (left_page, _) =
2336            BPlusTreePageCodec::decode(left_guard.data(), key_schema.clone()).unwrap();
2337        let (right_page, _) =
2338            BPlusTreePageCodec::decode(right_guard.data(), key_schema.clone()).unwrap();
2339
2340        let BPlusTreePage::Leaf(left_leaf) = left_page else {
2341            panic!("left child not leaf");
2342        };
2343        let BPlusTreePage::Leaf(_right_leaf) = right_page else {
2344            panic!("right child not leaf");
2345        };
2346
2347        // Verify leaf chain linkage
2348        assert_eq!(left_leaf.header.next_page_id, right_pid);
2349    }
2350
2351    // ---------------- Benchmarks (ignored by default) ----------------
2352    #[test]
2353    #[ignore]
2354    fn bench_get_hot_read() {
2355        fn getenv_usize(k: &str, default_v: usize) -> usize {
2356            std::env::var(k)
2357                .ok()
2358                .and_then(|v| v.parse().ok())
2359                .unwrap_or(default_v)
2360        }
2361
2362        let bpm = getenv_usize("QUILL_BENCH_BPM", 1024);
2363        let nkeys = getenv_usize("QUILL_BENCH_N", 20000) as i64;
2364        let ops = getenv_usize("QUILL_BENCH_OPS", 200000);
2365
2366        let (_temp_dir, index, key_schema) = create_test_index(bpm, 3, 64);
2367
2368        // Prepare keys and insert (shuffled)
2369        let mut keys: Vec<i64> = (1..=nkeys).collect();
2370        // simple LCG shuffle
2371        let mut seed: u64 = 0x9E3779B97F4A7C15;
2372        for i in (1..keys.len()).rev() {
2373            seed = seed
2374                .wrapping_mul(2862933555777941757)
2375                .wrapping_add(3037000493);
2376            let j = (seed as usize) % (i + 1);
2377            keys.swap(i, j);
2378        }
2379        for k in &keys {
2380            let t = create_tuple_from_key(*k, key_schema.clone());
2381            index.insert(&t, create_rid_from_key(*k)).unwrap();
2382        }
2383
2384        // Hot set: last 10% keys
2385        let hot_start = (nkeys as usize * 9) / 10;
2386        let hot = &keys[hot_start..];
2387
2388        let start = std::time::Instant::now();
2389        let mut found = 0usize;
2390        // query stream via LCG over hot set
2391        let mut x = 0x243F6A8885A308D3u64;
2392        for _ in 0..ops {
2393            x = x.wrapping_mul(6364136223846793005).wrapping_add(1);
2394            let idx = (x as usize) % hot.len();
2395            let key = hot[idx];
2396            let t = create_tuple_from_key(key, key_schema.clone());
2397            if index.get(&t).unwrap().is_some() {
2398                found += 1;
2399            }
2400        }
2401        let el = start.elapsed();
2402        let qps = (ops as f64) / el.as_secs_f64();
2403        println!(
2404            "bench_get_hot_read: ops={} time={:?} qps={:.1} found={}",
2405            ops, el, qps, found
2406        );
2407    }
2408
2409    #[test]
2410    #[ignore]
2411    fn bench_range_scan() {
2412        fn getenv_usize(k: &str, default_v: usize) -> usize {
2413            std::env::var(k)
2414                .ok()
2415                .and_then(|v| v.parse().ok())
2416                .unwrap_or(default_v)
2417        }
2418
2419        let bpm = getenv_usize("QUILL_BENCH_BPM", 1024);
2420        let nkeys = getenv_usize("QUILL_BENCH_N", 30000) as i64;
2421        let passes = getenv_usize("QUILL_BENCH_PASSES", 20);
2422
2423        let (_temp_dir, index, key_schema) = create_test_index(bpm, 3, 64);
2424
2425        // Insert in random order
2426        let mut keys: Vec<i64> = (1..=nkeys).collect();
2427        let mut seed: u64 = 0x9E3779B97F4A7C15;
2428        for i in (1..keys.len()).rev() {
2429            seed = seed
2430                .wrapping_mul(2862933555777941757)
2431                .wrapping_add(3037000493);
2432            let j = (seed as usize) % (i + 1);
2433            keys.swap(i, j);
2434        }
2435        for k in &keys {
2436            let t = create_tuple_from_key(*k, key_schema.clone());
2437            index.insert(&t, create_rid_from_key(*k)).unwrap();
2438        }
2439
2440        let index = Arc::new(index);
2441        let total = (nkeys as usize) * passes;
2442        let start = std::time::Instant::now();
2443        let mut seen = 0usize;
2444        for _ in 0..passes {
2445            let mut it = TreeIndexIterator::new(index.clone(), ..);
2446            while let Some(_rid) = it.next().unwrap() {
2447                seen += 1;
2448            }
2449        }
2450        let el = start.elapsed();
2451        let tps = (total as f64) / el.as_secs_f64();
2452        println!(
2453            "bench_range_scan: items={} time={:?} ips={:.1} seen={}",
2454            total, el, tps, seen
2455        );
2456    }
2457}