Skip to main content

noxu_dbi/
cursor_impl.rs

1//! Internal cursor implementation.
2//!
3//!
4//! The core traversal logic mirrors `CursorImpl.getNext()` (line 2546):
5//!
6//! ```text
7//! while (bin != null) {
8//!     latchBIN();
9//!     if (forward ? ++index < nEntries : --index >= 0) {
10//!         if record is valid: return it
11//!     } else {
12//!         bin = tree.getNextBin(anchorBIN) or tree.getPrevBin(anchorBIN)
13//!         index = -1  (or nEntries for backward)
14//!     }
15//! }
16//! ```
17//!
18//! Cross-BIN traversal is implemented: when the current BIN is exhausted,
19//! `retrieve_next` calls `Tree::get_next_bin` / `Tree::get_prev_bin` to move
20//! to the adjacent BIN and continues iteration there.
21
22#[cfg(any(test, feature = "testing"))]
23use std::cell::Cell;
24use std::sync::Arc;
25use std::sync::Mutex;
26use std::sync::atomic::{AtomicI64, Ordering};
27
28use bytes::BytesMut;
29use noxu_log::{LogEntryType, LogManager, Provisional, entry::LnLogEntry};
30use noxu_tree::{BinEntry, Tree};
31use noxu_txn::{LockManager, LockType, Locker, Txn};
32
33use crate::dup_key_data;
34use crate::throughput_stats::ThroughputStats;
35use noxu_sync::RwLock;
36use noxu_util::{Lsn, vlsn::NULL_VLSN};
37
38use crate::{
39    DbiError, GetMode, OperationStatus, PutMode, SearchMode,
40    database_impl::DatabaseImpl,
41};
42
43/// Cursor states.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45enum CursorState {
46    NotInitialized,
47    Initialized,
48    Closed,
49}
50
51/// Result flags for cursor search operations.
52pub const FOUND: u32 = 0x1;
53pub const EXACT_KEY: u32 = 0x2;
54pub const FOUND_LAST: u32 = 0x4;
55
56/// Unique cursor ID generator.
57static NEXT_CURSOR_ID: AtomicI64 = AtomicI64::new(1);
58
59// Test-only hook: countdown to forced cursor failure.
60//
61// When the countdown is N (> 0), each `check_state`/`check_initialized` call
62// decrements it by 1.  When it reaches 1 the decrement fires, it resets to 0,
63// and the call returns `Err(DbiError::CursorClosed)`.
64//
65// `set_cursor_fail_after(1)` => fail on the next check (the 1st call).
66// `set_cursor_fail_after(2)` => skip the 1st check, fail on the 2nd call.
67//
68// This lets `noxu-db` tests exercise both `map_err` closures inside a single
69// `Database` method (e.g. `get()` has one closure on `search` and another on
70// `get_current`).
71#[cfg(any(test, feature = "testing"))]
72thread_local! {
73    static CURSOR_FAIL_COUNTDOWN: Cell<u32> = const { Cell::new(0) };
74}
75
76/// Set countdown so the Nth cursor-check call returns `DbiError::CursorClosed`.
77/// `n = 1` → fail immediately on the next check.
78/// Only available in test/testing builds.
79#[cfg(any(test, feature = "testing"))]
80pub fn set_cursor_fail_after(n: u32) {
81    CURSOR_FAIL_COUNTDOWN.with(|c| c.set(n));
82}
83
84/// Clear the cursor fail countdown (idempotent).
85#[cfg(any(test, feature = "testing"))]
86pub fn clear_cursor_fail_flag() {
87    CURSOR_FAIL_COUNTDOWN.with(|c| c.set(0));
88}
89
90/// Decrement the countdown and return `true` if this call should fail.
91#[cfg(any(test, feature = "testing"))]
92fn tick_fail() -> bool {
93    CURSOR_FAIL_COUNTDOWN.with(|c| {
94        let v = c.get();
95        if v == 0 {
96            false
97        } else if v == 1 {
98            c.set(0);
99            true
100        } else {
101            c.set(v - 1);
102            false
103        }
104    })
105}
106
107/// The internal implementation of a database cursor.
108///
109/// A CursorImpl tracks a position in a database and provides
110/// get/put/delete operations. The cursor state machine ensures
111/// proper initialization before operations.
112///
113/// a cursor tracks its position via a BIN reference and slot index.
114/// This implementation wires cursor traversal to `noxu_tree::Tree`:
115///
116/// * `get_first` / `get_last` — use `Tree::get_first_node()` /
117///   `Tree::get_last_node()`.
118/// * `retrieve_next` — increments `current_index` within the BIN and, when
119///   the BIN is exhausted, calls `Tree::get_next_bin()` /
120///   `Tree::get_prev_bin()` to cross BIN boundaries
121///   `CursorImpl.getNext()`).
122/// * `search` — uses `Tree::search()` to locate the exact key.
123/// * `put` / `delete` — mutate the tree in-place using `Tree::insert()` /
124///   `Tree::delete()`.
125///
126/// (4096 lines in 7.5.11).
127pub struct CursorImpl {
128    /// Unique cursor ID (for debugging and hashCode).
129    id: i64,
130    /// The database this cursor operates on.
131    db_impl: Arc<RwLock<DatabaseImpl>>,
132    /// The locker (transaction or auto-commit) for this cursor.
133    locker_id: i64,
134    /// Current cursor state.
135    state: CursorState,
136
137    /// Current position: the key at the cursor's position.
138    current_key: Option<Vec<u8>>,
139    /// Current position: the data at the cursor's position.
140    current_data: Option<Vec<u8>>,
141    /// Current position: the LSN of the record.
142    current_lsn: u64,
143    /// Current position: the BIN index (slot in the current BIN).
144    ///
145    /// In this is `CursorImpl.index`. -1 means "before first entry".
146    current_index: i32,
147
148    /// The BIN Arc the cursor is currently pinned to, if any.
149    ///
150    /// Increments `BinStub.cursor_count` via `Tree::pin_bin()` so the
151    /// evictor skips this BIN while the cursor is positioned on it.
152    /// Cleared (and unpinned) when the cursor is closed or moves to a new BIN.
153    current_bin_arc: Option<
154        std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
155    >,
156
157    /// Write-ahead log manager for recording data operations.
158    /// None for read-only cursors or cursors created outside a real env.
159    log_manager: Option<Arc<LogManager>>,
160    /// Cached environment-invalidity flag (X-13).
161    ///
162    /// Cloned from `EnvironmentImpl::is_invalid_flag()` at cursor open time
163    /// so `check_state()` can detect a failed environment without locking.
164    /// `None` for cursors constructed outside a real environment (unit tests).
165    env_invalid: Option<Arc<std::sync::atomic::AtomicBool>>,
166    /// Lock manager for per-record read/write locking.
167    /// None for cursors created outside a real env (e.g., unit tests).
168    ///
169    /// `CursorImpl.locker` — the locker calls `locker.lock(lsn,
170    /// LockType.READ, ...)` via `lockLN()` before returning each record.
171    lock_manager: Option<Arc<LockManager>>,
172
173    /// Optional explicit transaction backing this cursor.
174    ///
175    /// When `Some`, write operations acquire locks via the `Txn` and record
176    /// `WriteLockInfo` (abort before-images) so the transaction can undo each
177    /// modification on abort.
178    ///
179    /// When `None` (auto-commit), write locks are acquired directly from
180    /// `lock_manager` using the cursor's own `id` as the locker and released
181    /// immediately after the write is logged (auto-commit semantics).
182    ///
183    /// (Txn subtype).
184    txn_ref: Option<Arc<Mutex<Txn>>>,
185    /// Throughput counters shared with all cursors on this database.
186    throughput: Arc<ThroughputStats>,
187}
188
189impl CursorImpl {
190    /// Creates a new CursorImpl for the given database.
191    ///
192    /// The cursor is initially in the NotInitialized state and must be
193    /// positioned via a search operation before get/put/delete operations
194    /// can be performed.
195    ///
196    /// # Arguments
197    ///
198    /// * `db_impl` - The database implementation this cursor operates on
199    /// * `locker_id` - The locker (transaction) ID for this cursor
200    pub fn new(db_impl: Arc<RwLock<DatabaseImpl>>, locker_id: i64) -> Self {
201        let throughput = db_impl.read().throughput.clone();
202        CursorImpl {
203            id: NEXT_CURSOR_ID.fetch_add(1, Ordering::Relaxed),
204            db_impl,
205            locker_id,
206            state: CursorState::NotInitialized,
207            current_key: None,
208            current_data: None,
209            current_lsn: noxu_util::NULL_LSN.as_u64(),
210            current_index: -1,
211            current_bin_arc: None,
212            log_manager: None,
213            env_invalid: None,
214            lock_manager: None,
215            txn_ref: None,
216            throughput,
217        }
218    }
219
220    /// Creates a new CursorImpl wired to a WAL.
221    ///
222    /// Write operations (`put`, `delete`) will record `LnLogEntry` entries in
223    /// the provided `LogManager` before mutating the in-memory tree.
224    pub fn with_log_manager(
225        db_impl: Arc<RwLock<DatabaseImpl>>,
226        locker_id: i64,
227        log_manager: Arc<LogManager>,
228    ) -> Self {
229        let throughput = db_impl.read().throughput.clone();
230        CursorImpl {
231            id: NEXT_CURSOR_ID.fetch_add(1, Ordering::Relaxed),
232            db_impl,
233            locker_id,
234            state: CursorState::NotInitialized,
235            current_key: None,
236            current_data: None,
237            current_lsn: noxu_util::NULL_LSN.as_u64(),
238            current_index: -1,
239            current_bin_arc: None,
240            log_manager: Some(log_manager),
241            env_invalid: None,
242            lock_manager: None,
243            txn_ref: None,
244            throughput,
245        }
246    }
247
248    /// Wires the environment-invalidity flag for hot-path validity checks.
249    ///
250    /// Stores a clone of `EnvironmentImpl::is_invalid_flag()` so that
251    /// `check_state()` can detect a failed environment on every cursor
252    /// operation without acquiring the environment lock.  X-13 fix.
253    pub fn with_env_invalid(
254        mut self,
255        flag: Arc<std::sync::atomic::AtomicBool>,
256    ) -> Self {
257        self.env_invalid = Some(flag);
258        self
259    }
260
261    /// Wires a lock manager for per-record locking.
262    ///
263    /// `CursorImpl` receiving a `Locker` from
264    /// `DatabaseImpl.openCursor()`.  Returns `self` for builder-style chaining.
265    pub fn with_lock_manager(mut self, lock_manager: Arc<LockManager>) -> Self {
266        self.lock_manager = Some(lock_manager);
267        self
268    }
269
270    /// Wires an explicit transaction for write-lock tracking.
271    ///
272    /// When set, write operations (`put`, `delete`) acquire WRITE locks via
273    /// the `Txn` and record abort before-images in `WriteLockInfo`, enabling
274    /// transaction rollback.
275    ///
276    /// Being constructed with a `Txn` locker.
277    /// Returns `self` for builder-style chaining.
278    pub fn with_txn(mut self, txn: Arc<Mutex<Txn>>) -> Self {
279        self.txn_ref = Some(txn);
280        self
281    }
282
283    /// Setter equivalent of [`Self::with_txn`] for callers that need to
284    /// attach a `Txn` to an already-built cursor (e.g. `Database::with_auto_txn`
285    /// which constructs the cursor first, then wires the synthetic auto-txn).
286    pub fn attach_txn(&mut self, txn: Arc<Mutex<Txn>>) {
287        self.txn_ref = Some(txn);
288    }
289
290    /// Gets the before-image (old_data, old_lsn) for `key` from the tree.
291    ///
292    /// Returns `(None, NULL_LSN)` if the key does not exist (new insert).
293    fn get_slot_before_image(&self, key: &[u8]) -> (Option<Vec<u8>>, u64) {
294        let db = self.db_impl.read();
295        if let Some(tree) = db.get_real_tree() {
296            match Self::get_data_from_tree(&tree, key) {
297                Some((data, lsn)) => (Some(data), lsn),
298                None => (None, noxu_util::NULL_LSN.as_u64()),
299            }
300        } else {
301            (None, noxu_util::NULL_LSN.as_u64())
302        }
303    }
304
305    /// Returns true if `key` exists in the committed tree.
306    ///
307    /// `CursorImpl.isPresent()` / lock-check path: with lock-based
308    /// isolation, writes go directly to the BIN, so the tree reflects the
309    /// current committed-or-locked state.  Callers that need to check
310    /// existence before a `NoOverwrite`/`NoDupData` insert consult the tree
311    /// directly; if a concurrent writer holds a WRITE lock the subsequent
312    /// `lock_ln()` call will block until that writer commits or aborts.
313    fn key_exists_in_view(&self, key: &[u8]) -> bool {
314        let db = self.db_impl.read();
315        if let Some(tree) = db.get_real_tree() {
316            tree.search(key).map(|sr| sr.exact_parent_found).unwrap_or(false)
317        } else {
318            false
319        }
320    }
321
322    /// Inserts or updates `key`/`data` at `new_lsn` in the B-tree.
323    ///
324    /// `CursorImpl.insertRecordInternal()` / `bin.updateEntry()`:
325    /// writes go directly to the BIN immediately.  Read-committed isolation
326    /// is enforced by the lock manager — concurrent readers block on the
327    /// WRITE lock held by this cursor's txn until it commits or aborts.
328    ///
329    /// When the tree reports a **new** insert (`is_new == true`), increments
330    /// the per-database entry count.
331    fn apply_tree_insert(&self, key: Vec<u8>, data: Vec<u8>, new_lsn: Lsn) {
332        let db = self.db_impl.read();
333        if let Some(tree) = db.get_real_tree()
334            && let Ok(is_new) = tree.insert(key, data, new_lsn)
335            && is_new
336        {
337            db.increment_entry_count();
338        }
339    }
340
341    /// Deletes `key` from the B-tree.
342    ///
343    /// `CursorImpl.deleteCurrentRecord()` / `bin.deleteEntry()`:
344    /// the deletion is applied to the BIN immediately.  Concurrent readers
345    /// that try to acquire a READ lock on the deleted slot's LSN block until
346    /// the writer's WRITE lock is released (commit or abort).
347    ///
348    /// When the tree confirms the key was actually removed (`deleted == true`),
349    /// decrements the per-database entry count (.
350    /// counter).
351    fn apply_tree_delete(&self, key: Vec<u8>, _del_lsn: Lsn) {
352        let db = self.db_impl.read();
353        if let Some(tree) = db.get_real_tree()
354            && tree.delete(&key)
355        {
356            db.decrement_entry_count();
357        }
358    }
359
360    /// Acquires a WRITE lock for an upcoming write to `key` whose current
361    /// slot LSN is `old_lsn`.
362    ///
363    /// For txn-backed cursors, calls `Txn::lock()` (lock persists until commit/abort).
364    /// For auto-commit cursors (lock_manager only, no txn), uses cursor `id`
365    /// as the locker.
366    ///
367    /// # NULL-LSN insert race coordination
368    ///
369    /// When `old_lsn == NULL_LSN` the record does not yet exist (a brand-new
370    /// insert).  Pre-Wave-1A this method returned early in that case, so two
371    /// concurrent auto-commit inserts of the same brand-new key did not
372    /// coordinate through the lock manager — the underlying B+tree latching
373    /// in `noxu-tree` serialised them safely but the deadlock detector could
374    /// not reason about the conflict, and `put_no_overwrite` reported
375    /// `KeyExist` instead of a typed lock-conflict.  This is the first F12
376    /// residual.
377    ///
378    /// We now acquire a write lock on a synthetic, key-coordination LSN
379    /// derived from `(db_id, key)` via [`noxu_util::Lsn::synthetic_key_lock_id`].
380    /// The lock lives in the reserved transient-LSN space so it cannot
381    /// collide with a real WAL LSN, and is held until the wrapping txn
382    /// (synthetic auto-txn or explicit txn) commits or aborts — at which
383    /// point a second concurrent inserter for the same key unblocks and
384    /// observes the result of the first insert.
385    ///
386    /// Auto-commit cursors without a `txn_ref` (legacy callers that have
387    /// not been ported to `TxnManager::begin_auto_txn` yet) acquire and
388    /// immediately release the synthetic lock; this still serialises them
389    /// through the lock manager but does not record the conflict on a
390    /// locker for deadlock-detector reasoning.  Database::put / delete on
391    /// `txn = None` always wraps in a synthetic auto-txn, so this fallback
392    /// is exercised only by the legacy direct-CursorImpl construction.
393    fn lock_write_before_log(
394        &self,
395        old_lsn: u64,
396        key: &[u8],
397    ) -> Result<(), DbiError> {
398        let null = noxu_util::NULL_LSN.as_u64();
399        let lsn_to_lock = if old_lsn == null {
400            // Brand-new insert: coordinate via a synthetic key lock so
401            // concurrent inserts of the same key serialise through the
402            // lock manager.
403            let db_id = self.db_impl.read().get_id().id() as u64;
404            Lsn::synthetic_key_lock_id(db_id, key)
405        } else {
406            old_lsn
407        };
408        if let Some(txn) = &self.txn_ref {
409            txn.lock()
410                .unwrap()
411                .lock(lsn_to_lock, LockType::Write, false)
412                .map_err(DbiError::TxnError)?;
413        } else if let Some(lm) = &self.lock_manager {
414            lm.lock(lsn_to_lock, self.id, LockType::Write, false, false)
415                .map_err(DbiError::TxnError)?;
416            // Legacy auto-commit (no synthetic auto-txn): release the
417            // synthetic key-coordination lock immediately for new inserts
418            // so subsequent inserts can proceed.  For real (non-NULL)
419            // old_lsn, `finalize_write_lock` releases below.
420            if old_lsn == null {
421                let _ = lm.release(lsn_to_lock, self.id);
422            }
423        }
424        Ok(())
425    }
426
427    /// Acquires a synthetic-key write lock for the given key.
428    ///
429    /// Wave 5 / SR9752 / CursorEdgeTest.testReadDeletedUncommitted:
430    /// in-flight deletes physically remove the BIN slot via
431    /// `tree.delete()`, so a concurrent reader looking up the same
432    /// key sees `NotFound` without ever consulting the lock manager
433    /// for the slot's pre-delete LSN.  This violates JE's contract:
434    /// uncommitted deletes are dirty data and a no-wait reader must
435    /// see `LockNotAvailable`, blocking readers must wait until the
436    /// deleter commits.
437    ///
438    /// To restore that invariant without rewriting the BIN's
439    /// physical-removal model, the deleter ALSO holds a synthetic-key
440    /// write lock for the duration of the txn.  Readers that probe
441    /// the BIN and find no matching key call
442    /// [`Self::contest_synthetic_key_for_missing_read`] which
443    /// attempts a read-lock on the same synthetic-key id; the
444    /// uncontested case is one extra lock-manager round-trip and
445    /// the contested case surfaces the lock conflict to the caller.
446    fn lock_synthetic_key_for_delete(
447        &self,
448        key: &[u8],
449    ) -> Result<(), DbiError> {
450        let db_id = self.db_impl.read().get_id().id() as u64;
451        let synthetic_lsn = Lsn::synthetic_key_lock_id(db_id, key);
452        if let Some(txn) = &self.txn_ref {
453            // Held until commit/abort — readers contending on the
454            // synthetic-key block / fail until the deleter finalises.
455            txn.lock()
456                .unwrap()
457                .lock(synthetic_lsn, LockType::Write, false)
458                .map_err(DbiError::TxnError)?;
459        } else if let Some(lm) = &self.lock_manager {
460            // Legacy auto-commit (no synthetic auto-txn): acquire and
461            // immediately release.  The Database::delete path always
462            // wraps in a synthetic auto-txn so the lock is actually
463            // held across the per-record delete; this branch is only
464            // for direct-CursorImpl callers.
465            lm.lock(synthetic_lsn, self.id, LockType::Write, false, false)
466                .map_err(DbiError::TxnError)?;
467            let _ = lm.release(synthetic_lsn, self.id);
468        }
469        Ok(())
470    }
471
472    /// Probes the synthetic-key lock for `key` to detect uncommitted
473    /// deletes after a `NotFound` BIN lookup.
474    ///
475    /// Returns `Ok(())` if the key is genuinely absent (no concurrent
476    /// writer holds the synthetic-key lock); returns the lock-manager
477    /// error otherwise so the caller can surface it to the user.
478    ///
479    /// See [`Self::lock_synthetic_key_for_delete`] for the wider
480    /// rationale.  Read-uncommitted txns skip the probe entirely
481    /// (matching the LSN-keyed `lock_ln` early-return).
482    fn contest_synthetic_key_for_missing_read(
483        &self,
484        key: &[u8],
485    ) -> Result<(), DbiError> {
486        let db_id = self.db_impl.read().get_id().id() as u64;
487        let synthetic_lsn = Lsn::synthetic_key_lock_id(db_id, key);
488        if let Some(txn) = &self.txn_ref {
489            let mut guard = txn.lock().unwrap();
490            if guard.is_read_uncommitted_default() {
491                return Ok(());
492            }
493            // CRITICAL: if this txn already owns a Write lock on the
494            // synthetic key (because it is the deleter), short-circuit
495            // — we must NEVER call `release_lock` on a Read acquisition
496            // that aliased an existing Write lock, because the inner
497            // `Txn::lock` unconditionally inserts the lsn into
498            // `read_locks`, and a subsequent `release_lock` would
499            // remove the txn from the lock manager's owner set,
500            // erroneously freeing the Write lock for other lockers.
501            if guard.owns_write_lock(synthetic_lsn) {
502                return Ok(());
503            }
504            // Try non-blocking first to detect contention without
505            // waiting; on contention, switch to blocking (no-wait
506            // txns surface the LockNotAvailable error here).
507            match guard.lock(synthetic_lsn, LockType::Read, true) {
508                Ok(_) => {
509                    // Granted immediately — no contender; release
510                    // immediately so we don't hold a lock on a
511                    // not-found probe.  Read-committed and
512                    // serializable both treat this as a one-shot
513                    // probe (the data does not exist; there is
514                    // nothing to keep stable).
515                    let _ = guard.release_lock(synthetic_lsn);
516                    Ok(())
517                }
518                Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
519                    // No-wait txn: surface the typed lock error.
520                    guard
521                        .lock(synthetic_lsn, LockType::Read, false)
522                        .map_err(DbiError::TxnError)?;
523                    let _ = guard.release_lock(synthetic_lsn);
524                    Ok(())
525                }
526                Err(e) => Err(DbiError::TxnError(e)),
527            }
528        } else if let Some(lm) = &self.lock_manager {
529            match lm.lock(synthetic_lsn, self.id, LockType::Read, true, false) {
530                Ok(_) => {
531                    let _ = lm.release(synthetic_lsn, self.id);
532                    Ok(())
533                }
534                Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
535                    lm.lock(
536                        synthetic_lsn,
537                        self.id,
538                        LockType::Read,
539                        false,
540                        false,
541                    )
542                    .map_err(DbiError::TxnError)?;
543                    let _ = lm.release(synthetic_lsn, self.id);
544                    Ok(())
545                }
546                Err(e) => Err(DbiError::TxnError(e)),
547            }
548        } else {
549            Ok(())
550        }
551    }
552
553    /// Moves the write lock to `new_lsn` and records abort before-image info.
554    ///
555    /// For txn-backed cursors:
556    ///   - If `old_lsn` is valid: moves lock via `Txn::move_write_lock_to_new_lsn()`.
557    ///   - Otherwise (new insert): acquires a new write lock on `new_lsn`.
558    ///   - Records abort info so the txn can undo on abort.
559    ///   - Notes the log entry on the txn for TxnCommit/Abort chaining.
560    ///
561    /// For auto-commit cursors:
562    ///   - Acquires write lock on `new_lsn`, releases both old and new locks
563    ///     immediately (auto-commit releases after the write is logged).
564    ///
565    /// / `Txn.moveWriteLockToNewLsn()`.
566    fn finalize_write_lock(
567        &self,
568        old_lsn: u64,
569        new_lsn: Lsn,
570        abort_key: Option<Vec<u8>>,
571        abort_data: Option<Vec<u8>>,
572    ) -> Result<(), DbiError> {
573        let new_lsn_u64 = new_lsn.as_u64();
574        // Deferred-write or no log manager: no LSN assigned, nothing to lock.
575        if new_lsn_u64 == noxu_util::NULL_LSN.as_u64() {
576            return Ok(());
577        }
578
579        if let Some(txn) = &self.txn_ref {
580            let db_id = self.db_impl.read().get_id().id() as u64;
581            let mut guard = txn.lock().unwrap();
582            if old_lsn != noxu_util::NULL_LSN.as_u64() {
583                // Move the existing write lock from old slot to new slot.
584                guard
585                    .move_write_lock_to_new_lsn(old_lsn, new_lsn_u64)
586                    .map_err(DbiError::TxnError)?;
587            } else {
588                // New insert: no old lock to move — acquire a fresh write lock.
589                guard
590                    .lock(new_lsn_u64, LockType::Write, false)
591                    .map_err(DbiError::TxnError)?;
592            }
593            let abort_known_deleted = old_lsn == noxu_util::NULL_LSN.as_u64();
594            guard.set_write_lock_abort_info(
595                new_lsn_u64,
596                old_lsn,
597                abort_key,
598                abort_data,
599                abort_known_deleted,
600                db_id,
601            );
602            guard.note_log_entry(new_lsn_u64);
603        } else if let Some(lm) = &self.lock_manager {
604            // Auto-commit: acquire write lock, then release immediately.
605            lm.lock(new_lsn_u64, self.id, LockType::Write, false, false)
606                .map_err(DbiError::TxnError)?;
607            if old_lsn != noxu_util::NULL_LSN.as_u64() {
608                let _ = lm.release(old_lsn, self.id);
609            }
610            let _ = lm.release(new_lsn_u64, self.id);
611        }
612        Ok(())
613    }
614
615    /// Returns true if the underlying database uses sorted duplicates.
616    ///
617    /// When true, every (key, data) pair is stored as a two-part composite
618    /// key via `dup_key_data::combine()` and the tree uses a custom comparator.
619    #[inline]
620    fn is_sorted_dup(&self) -> bool {
621        self.db_impl.read().get_sorted_duplicates()
622    }
623
624    /// Returns the unique cursor ID.
625    ///
626    /// Used for debugging and cursor tracking.
627    pub fn get_id(&self) -> i64 {
628        self.id
629    }
630
631    /// Returns the database this cursor operates on.
632    pub fn get_database(&self) -> &Arc<RwLock<DatabaseImpl>> {
633        &self.db_impl
634    }
635
636    /// Returns the locker ID.
637    pub fn get_locker_id(&self) -> i64 {
638        self.locker_id
639    }
640
641    /// Returns true if the cursor is initialized (positioned on a record).
642    pub fn is_initialized(&self) -> bool {
643        self.state == CursorState::Initialized
644    }
645
646    /// Returns true if the cursor is closed.
647    pub fn is_closed(&self) -> bool {
648        self.state == CursorState::Closed
649    }
650
651    /// Returns the current key, if positioned.
652    pub fn get_current_key(&self) -> Option<&[u8]> {
653        self.current_key.as_deref()
654    }
655
656    /// Returns the current data, if positioned.
657    pub fn get_current_data(&self) -> Option<&[u8]> {
658        self.current_data.as_deref()
659    }
660
661    /// Returns the current LSN, if positioned.
662    pub fn get_current_lsn(&self) -> u64 {
663        self.current_lsn
664    }
665
666    /// Checks the cursor is not closed.
667    fn check_state(&self) -> Result<(), DbiError> {
668        #[cfg(any(test, feature = "testing"))]
669        if tick_fail() {
670            return Err(DbiError::CursorClosed);
671        }
672        // X-13: check environment validity before cursor state.
673        // Both the explicit invalidation flag and the I/O-failure flag
674        // (io_invalid) are tested so that reads on a failed environment
675        // return EnvironmentFailure rather than stale BIN data.
676        if self.env_invalid.as_ref().is_some_and(|f| f.load(Ordering::Acquire))
677        {
678            return Err(DbiError::EnvironmentFailure {
679                reason: "environment has been invalidated".into(),
680            });
681        }
682        if self
683            .log_manager
684            .as_ref()
685            .is_some_and(|lm| lm.io_invalid.load(Ordering::Acquire))
686        {
687            return Err(DbiError::EnvironmentFailure {
688                reason: "I/O failure: environment invalidated by fsync error"
689                    .into(),
690            });
691        }
692        match self.state {
693            CursorState::Closed => Err(DbiError::CursorClosed),
694            _ => Ok(()),
695        }
696    }
697
698    /// Checks the cursor is initialized.
699    fn check_initialized(&self) -> Result<(), DbiError> {
700        #[cfg(any(test, feature = "testing"))]
701        if tick_fail() {
702            return Err(DbiError::CursorClosed);
703        }
704        match self.state {
705            CursorState::Closed => Err(DbiError::CursorClosed),
706            CursorState::NotInitialized => Err(DbiError::CursorNotInitialized),
707            CursorState::Initialized => Ok(()),
708        }
709    }
710
711    /// Positions the cursor at a specific key.
712    ///
713    /// / `CursorImpl.searchRange()`.
714    ///
715    /// Uses `Tree::search(key)` to locate the BIN slot for the key:
716    ///
717    /// * `SearchMode::Set` / `SearchMode::Both` — exact key match required.
718    ///   Returns `NotFound` if the key is not present.
719    /// * `SearchMode::SetRange` / `SearchMode::BothRange` — positions at the
720    ///   first key >= the search key (range search).  Currently degrades to
721    ///   an exact-match check; full range support requires iterating forward
722    ///   until the key is >= the search key.
723    ///
724    /// # Arguments
725    ///
726    /// * `key` - The key to search for
727    /// * `data` - Optional data for Both/BothRange modes
728    /// * `search_mode` - The search mode (Set, Both, SetRange, BothRange)
729    ///
730    /// # Returns
731    ///
732    /// * `Success` if the key was found and cursor positioned
733    /// * `NotFound` if the key does not exist
734    pub fn search(
735        &mut self,
736        key: &[u8],
737        data: Option<&[u8]>,
738        search_mode: SearchMode,
739    ) -> Result<OperationStatus, DbiError> {
740        self.check_state()?;
741
742        let is_dup = self.is_sorted_dup();
743
744        if is_dup {
745            return self.search_dup(key, data, search_mode);
746        }
747
748        // Non-dup path — single descent via `search_with_data` (Wave-11-I).
749        //
750        // Previously this path made three separate tree descents per `get()`:
751        //   1. `tree.search(key)` — existence check only.
752        //   2. `get_data_from_tree(tree, key)` — re-descended to fetch data.
753        //   3. `find_bin_for_key(root, key)` — re-descended for BIN pinning.
754        // `search_with_data` folds all three into one descent and uses binary
755        // search (`find_entry_compressed`) at the BIN level.
756        let slot = {
757            let db = self.db_impl.read();
758            if let Some(tree) = db.get_real_tree() {
759                tree.search_with_data(key)
760            } else {
761                None
762            }
763        };
764        let found = slot.as_ref().is_some_and(|s| s.found);
765
766        match search_mode {
767            SearchMode::Set | SearchMode::Both => {
768                if found {
769                    // SAFETY: found => slot.is_some() && slot.found
770                    let slot = slot.unwrap();
771                    let slot_data = slot.data;
772                    let slot_lsn = slot.lsn;
773                    let bin_arc = slot.bin_arc;
774                    // If a writer held the write lock when we called lock_ln,
775                    // our pre-fetched slot_data is stale — re-read from the BIN
776                    // after the writer commits/aborts.  If lock_ln returned
777                    // immediately (no contention), slot_data is still valid.
778                    let contended = self.lock_ln(slot_lsn)?;
779                    let final_data = if contended {
780                        let db = self.db_impl.read();
781                        db.get_real_tree()
782                            .and_then(|tree| {
783                                Self::get_data_from_tree(&tree, key)
784                            })
785                            .map(|(d, _)| d)
786                            .map(Some)
787                            .unwrap_or(slot_data)
788                    } else {
789                        slot_data
790                    };
791                    // Audit Finding 4: BDB-JE's SearchBoth is exact-match on
792                    // (key, data) regardless of duplicate-set membership; on a
793                    // non-dup DB it must still validate that the slot's data
794                    // equals the user-supplied data.  Pre-fix the `data`
795                    // argument was silently dropped and `Success` was
796                    // returned for any matching key, contradicting the
797                    // documented contract on `Get::SearchBoth`.  See
798                    // `docs/src/internal/api-audit-2026-05-cursor.md`.
799                    if matches!(search_mode, SearchMode::Both) {
800                        let user_data = data.unwrap_or(&[]);
801                        let stored = final_data.as_deref().unwrap_or(&[]);
802                        if stored != user_data {
803                            return Ok(OperationStatus::NotFound);
804                        }
805                    }
806                    self.current_key = Some(key.to_vec());
807                    self.current_data = final_data;
808                    self.current_lsn = slot_lsn;
809                    // Use the actual BIN slot index from search_with_data so
810                    // that retrieve_next() advances to the correct next slot
811                    // rather than always starting from index 1.
812                    self.current_index = slot.slot_index as i32;
813                    self.state = CursorState::Initialized;
814                    // BIN arc already obtained from the single descent.
815                    self.update_bin_pin(Some(bin_arc));
816                    Ok(OperationStatus::Success)
817                } else {
818                    // Wave 5: contest a synthetic-key read lock on the
819                    // missing slot to detect uncommitted deletes.  See
820                    // `lock_synthetic_key_for_delete`.
821                    self.contest_synthetic_key_for_missing_read(key)?;
822                    Ok(OperationStatus::NotFound)
823                }
824            }
825            SearchMode::SetRange | SearchMode::BothRange => {
826                if found {
827                    let slot = slot.unwrap();
828                    let slot_data = slot.data;
829                    let slot_lsn = slot.lsn;
830                    let bin_arc = slot.bin_arc;
831                    let contended = self.lock_ln(slot_lsn)?;
832                    let final_data = if contended {
833                        let db = self.db_impl.read();
834                        db.get_real_tree()
835                            .and_then(|tree| {
836                                Self::get_data_from_tree(&tree, key)
837                            })
838                            .map(|(d, _)| d)
839                            .map(Some)
840                            .unwrap_or(slot_data)
841                    } else {
842                        slot_data
843                    };
844                    self.current_key = Some(key.to_vec());
845                    self.current_data = final_data;
846                    self.current_lsn = slot_lsn;
847                    // Use the actual BIN slot index (same rationale as Set branch).
848                    self.current_index = slot.slot_index as i32;
849                    self.state = CursorState::Initialized;
850                    // BIN arc already obtained from the single descent.
851                    self.update_bin_pin(Some(bin_arc));
852                    Ok(OperationStatus::Success)
853                } else {
854                    let next_entry: Option<(Vec<u8>, Vec<u8>, u64, usize)> = {
855                        let db = self.db_impl.read();
856                        if let Some(tree) = db.get_real_tree() {
857                            Self::find_range_entry(&tree, key)
858                        } else {
859                            None
860                        }
861                    };
862                    match next_entry {
863                        Some((k, v, lsn, slot_idx)) => {
864                            self.lock_ln(lsn)?;
865                            // Pin the BIN for the range-found key.
866                            let bin_arc = {
867                                let db = self.db_impl.read();
868                                db.get_real_tree().and_then(|tree| {
869                                    tree.get_root().and_then(|r| {
870                                        Self::find_bin_for_key(r, &k)
871                                    })
872                                })
873                            };
874                            self.current_key = Some(k);
875                            self.current_data = Some(v);
876                            self.current_lsn = lsn;
877                            self.current_index = slot_idx as i32;
878                            self.state = CursorState::Initialized;
879                            self.update_bin_pin(bin_arc);
880                            Ok(OperationStatus::Success)
881                        }
882                        None => Ok(OperationStatus::NotFound),
883                    }
884                }
885            }
886        }
887    }
888
889    /// Sorted-dup variant of `search()`.
890    ///
891    /// For sorted-dup databases (key, data) pairs are stored as two-part
892    /// composite keys `[key][data][packed_key_len]`.  This method builds the
893    /// appropriate two-part search key and delegates to the tree's
894    /// comparator-aware range finder.
895    ///
896    /// Dup path from 7.5.
897    fn search_dup(
898        &mut self,
899        key: &[u8],
900        data: Option<&[u8]>,
901        search_mode: SearchMode,
902    ) -> Result<OperationStatus, DbiError> {
903        let search_two_part_key: Vec<u8> = match search_mode {
904            // Both / BothRange: search for the exact (key, data) pair.
905            SearchMode::Both | SearchMode::BothRange => {
906                dup_key_data::combine(key, data.unwrap_or(b""))
907            }
908            // Set / SetRange: position at the first entry whose primary key
909            // >= `key` — use the lower bound (smallest possible two-part key
910            // for this primary key).
911            SearchMode::Set | SearchMode::SetRange => {
912                dup_key_data::lower_bound(key)
913            }
914        };
915
916        let entry: Option<(
917            Vec<u8>,
918            Vec<u8>,
919            usize,
920            u64,
921            std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
922        )> = {
923            let db = self.db_impl.read();
924            if let Some(tree) = db.get_real_tree() {
925                tree.first_entry_at_or_after_with_index(&search_two_part_key)
926            } else {
927                None
928            }
929        };
930
931        match entry {
932            Some((raw_key, _, idx, slot_lsn, bin_arc)) => {
933                // raw_key is the two-part key found; check that the primary
934                // key part matches what was requested (for Set and Both).
935                let matches = match search_mode {
936                    SearchMode::Set => dup_key_data::matches_key(&raw_key, key),
937                    SearchMode::Both => raw_key == search_two_part_key,
938                    SearchMode::SetRange => {
939                        // Any key >= the search key is valid.
940                        true
941                    }
942                    SearchMode::BothRange => {
943                        // Position at the first (key, data) where data >=
944                        // the given data; primary key must still match.
945                        dup_key_data::matches_key(&raw_key, key)
946                    }
947                };
948                if matches {
949                    self.lock_ln(slot_lsn)?;
950                    // Store the raw two-part key; get_current() will decode it.
951                    self.current_key = Some(raw_key);
952                    self.current_data = None; // decoded lazily in get_current()
953                    self.current_lsn = slot_lsn;
954                    // Wave 11-N Bug 2 fix: store the actual BIN index, not
955                    // a hard-coded 0.  Pre-fix the cursor reported
956                    // current_index = 0 after every dup search, which made
957                    // the subsequent NextDup compute next_index = 1 in the
958                    // BIN's slot space.  For any primary not occupying
959                    // BIN slot 0 the read either landed on a different
960                    // primary's dup (apply_dup_filter rejected it as
961                    // NotFound) or returned an unrelated entry entirely.
962                    // Storing the real slot index plus pinning the BIN
963                    // closes the bug and matches the invariant maintained
964                    // by `get_first` / `get_last`.
965                    self.current_index = idx as i32;
966                    self.state = CursorState::Initialized;
967                    self.update_bin_pin(Some(bin_arc));
968                    Ok(OperationStatus::Success)
969                } else {
970                    Ok(OperationStatus::NotFound)
971                }
972            }
973            None => Ok(OperationStatus::NotFound),
974        }
975    }
976
977    /// Acquires a read lock on a log record by LSN.
978    ///
979    /// `CursorImpl.lockLN(LockType.READ)`.  When no lock manager
980    /// is wired (read-only cursors / unit tests) this is a no-op.
981    ///
982    /// For txn-backed cursors the lock is tracked in the `Txn` and held until
983    /// commit/abort.  For auto-commit cursors the lock is acquired (to wait
984    /// for any current exclusive writer to finish) and then released
985    /// immediately — mirroring `AutoTxn` single-operation semantics.
986    ///
987    /// **SERIALIZABLE isolation (T-F2)**: when the cursor's txn has
988    /// `is_serializable_isolation()` set, this acquires `LockType::RangeRead`
989    /// instead of `LockType::Read`, mirroring JE `Cursor.getLockType(rangeLock
990    /// = true)`.  `RangeRead` conflicts with a concurrent `RangeInsert` on the
991    /// same LSN, blocking or triggering a restart on phantom inserts.
992    ///
993    /// Returns an error only when the lock would deadlock or the locker is
994    /// invalid; `NULL_LSN` records are skipped (lock-free slots).
995    ///
996    /// Returns `Ok(contended)` where `contended = true` means the lock was
997    /// not immediately available — a concurrent writer held an exclusive lock
998    /// and we had to wait.  When `contended` is `true`, any data pre-fetched
999    /// before calling this method may be stale (the writer may have committed
1000    /// or aborted during the wait), and the caller should re-read from the BIN.
1001    /// When `contended` is `false`, the lock was granted immediately with no
1002    /// intervening write, so pre-fetched data remains valid.
1003    ///
1004    /// Returns `Err(DbiError::TxnError(TxnError::RangeRestart))` if a
1005    /// concurrent `RangeInsert` owner caused a range restart — the caller
1006    /// must abort the current scan position and restart the operation.
1007    fn lock_ln(&self, lsn: u64) -> Result<bool, DbiError> {
1008        if lsn == noxu_util::NULL_LSN.as_u64() {
1009            return Ok(false);
1010        }
1011        if let Some(txn) = &self.txn_ref {
1012            let mut guard = txn.lock().unwrap();
1013            // F2: read-uncommitted txns skip read-lock acquisition
1014            // entirely.  This mirrors the per-operation
1015            // `LockMode::ReadUncommitted` path but applies to every
1016            // read on the txn.
1017            if guard.is_read_uncommitted_default() {
1018                return Ok(false);
1019            }
1020            // T-F2: SERIALIZABLE cursors acquire RangeRead to protect against
1021            // phantom inserts.  All other isolation levels use Read.
1022            let lock_type = if guard.is_serializable_isolation() {
1023                LockType::RangeRead
1024            } else {
1025                LockType::Read
1026            };
1027            // Try non-blocking first to detect write contention without waiting.
1028            let contended = match guard.lock(lsn, lock_type, true) {
1029                Ok(_) => false, // granted immediately — no concurrent writer
1030                Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
1031                    // A writer holds the lock; block until they commit/abort.
1032                    guard
1033                        .lock(lsn, lock_type, false)
1034                        .map_err(DbiError::TxnError)?;
1035                    true
1036                }
1037                // RangeRestart: a concurrent RangeInsert owner caused a
1038                // restart signal — propagate immediately so the caller can
1039                // restart the scan.  This is the JE RangeRestartException path.
1040                Err(e) => return Err(DbiError::TxnError(e)),
1041            };
1042            // Read-committed: release the read lock immediately after each
1043            // operation so concurrent writers are not blocked for the txn
1044            // duration.  Under serializable isolation the lock is held until
1045            // commit/abort (tracked in Txn.read_locks).
1046            if guard.is_read_committed_isolation() {
1047                guard.release_lock(lsn).map_err(DbiError::TxnError)?;
1048            }
1049            Ok(contended)
1050        } else if let Some(lm) = &self.lock_manager {
1051            // Auto-commit: detect contention via non-blocking attempt first.
1052            // Auto-commit cursors do not provide serializable phantom protection
1053            // across multiple operations; use Read regardless of isolation.
1054            let contended =
1055                match lm.lock(lsn, self.id, LockType::Read, true, false) {
1056                    Ok(_) => {
1057                        lm.release(lsn, self.id).map_err(DbiError::TxnError)?;
1058                        false
1059                    }
1060                    Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
1061                        lm.lock(lsn, self.id, LockType::Read, false, false)
1062                            .map_err(DbiError::TxnError)?;
1063                        lm.release(lsn, self.id).map_err(DbiError::TxnError)?;
1064                        true
1065                    }
1066                    Err(e) => return Err(DbiError::TxnError(e)),
1067                };
1068            Ok(contended)
1069        } else {
1070            Ok(false)
1071        }
1072    }
1073
1074    /// Acquires a `RangeInsert` lock on the successor key's LSN for a new
1075    /// SERIALIZABLE insert, implementing JE's next-key locking protocol.
1076    ///
1077    /// When a transaction inserts a brand-new key `key` (i.e.
1078    /// `old_lsn == NULL_LSN`), this method:
1079    ///
1080    /// 1. Looks up the first committed key at-or-after `key` in the tree
1081    ///    (the would-be successor of the new key).
1082    /// 2. Acquires `RangeInsert` on that successor's LSN so that any
1083    ///    concurrent SERIALIZABLE scanner holding `RangeRead` on the same
1084    ///    slot is either blocked (insert waits) or triggers a restart (scan
1085    ///    gets `RangeRestart`).
1086    /// 3. If no successor exists (the new key would be the last key in the
1087    ///    database), acquires `RangeInsert` on the per-database EOF sentinel
1088    ///    LSN so scans that called `lock_eof_for_scan` on the same sentinel
1089    ///    are protected.
1090    ///
1091    /// Skipped when:
1092    /// - `old_lsn != NULL_LSN` (this is an update, not a new insert; the
1093    ///   existing `Write` lock on the old LSN already conflicts with any
1094    ///   concurrent `RangeRead`).
1095    /// - The cursor has no txn (auto-commit: locks released per-op; no
1096    ///   cross-op phantom protection).
1097    /// - The txn already owns any lock on the successor LSN (same-txn
1098    ///   insert+scan: avoids an illegal RangeRead→RangeInsert upgrade).
1099    ///
1100    /// Note: `RangeInsert` is acquired for ALL new-key inserts, regardless of
1101    /// the inserter's isolation level.  A concurrent SERIALIZABLE scanner
1102    /// holding `RangeRead` on the successor will be blocked or restarted.
1103    /// For non-serializable scanners, `RangeRead` is never held, so the
1104    /// `RangeInsert` is granted immediately with no contention.
1105    ///
1106    /// Mirror of JE `CursorImpl.lockForInsert()` / next-key locking.
1107    fn lock_range_insert(
1108        &self,
1109        key: &[u8],
1110        old_lsn: u64,
1111    ) -> Result<(), DbiError> {
1112        // Only needed for genuinely new inserts.
1113        if old_lsn != noxu_util::NULL_LSN.as_u64() {
1114            return Ok(());
1115        }
1116        let txn = match &self.txn_ref {
1117            Some(t) => t,
1118            None => return Ok(()), // auto-commit: no cross-op protection
1119        };
1120        let mut guard = txn.lock().unwrap();
1121        // Find the first committed key at-or-after `key` (the successor of
1122        // the key being inserted).
1123        let successor_lsn: u64 = {
1124            let db = self.db_impl.read();
1125            match db.get_real_tree() {
1126                Some(tree) => {
1127                    match tree.first_entry_at_or_after(key) {
1128                        Some((_k, _v, lsn)) => lsn,
1129                        None => {
1130                            // No successor: the new key will be the last key
1131                            // in the database.  Use the per-database EOF
1132                            // sentinel so a concurrent scanner that called
1133                            // lock_eof_for_scan is protected.
1134                            let db_id = db.get_id().id() as u64;
1135                            noxu_util::Lsn::eof_lock_lsn(db_id)
1136                        }
1137                    }
1138                }
1139                None => {
1140                    // Empty tree: use EOF sentinel.
1141                    let db_id = db.get_id().id() as u64;
1142                    noxu_util::Lsn::eof_lock_lsn(db_id)
1143                }
1144            }
1145        };
1146        // Guard: if the same txn already owns any lock on the successor LSN
1147        // (e.g. a RangeRead from scanning the successor key), skip acquisition
1148        // to avoid an illegal RangeRead→RangeInsert upgrade in the lock manager.
1149        // The existing RangeRead already blocks concurrent insertions from other
1150        // transactions, so no additional protection is needed.
1151        if guard.owns_any_lock(successor_lsn) {
1152            return Ok(());
1153        }
1154        guard
1155            .lock(successor_lsn, LockType::RangeInsert, false)
1156            .map_err(DbiError::TxnError)?;
1157        Ok(())
1158    }
1159
1160    /// Acquires a `RangeRead` lock on the per-database EOF sentinel LSN.
1161    ///
1162    /// Called by a SERIALIZABLE forward scan when it reaches the end of the
1163    /// key space (no more keys to read).  This protects against phantom
1164    /// inserts of keys that sort after every currently-scanned key: a
1165    /// concurrent inserter will acquire `RangeInsert` on the same sentinel
1166    /// and be blocked until this scan's transaction commits.
1167    ///
1168    /// No-op unless the cursor is backed by a SERIALIZABLE transaction.
1169    ///
1170    /// Mirror of JE `CursorImpl.lockEof(LockType.RANGE_READ)`.
1171    fn lock_eof_for_scan(&self) -> Result<(), DbiError> {
1172        let txn = match &self.txn_ref {
1173            Some(t) => t,
1174            None => return Ok(()),
1175        };
1176        let mut guard = txn.lock().unwrap();
1177        if !guard.is_serializable_isolation() {
1178            return Ok(());
1179        }
1180        let eof_lsn = {
1181            let db = self.db_impl.read();
1182            let db_id = db.get_id().id() as u64;
1183            noxu_util::Lsn::eof_lock_lsn(db_id)
1184        };
1185        // If the txn already owns any lock on the EOF sentinel (e.g. from a
1186        // prior scan that also reached EOF), skip acquisition.
1187        if guard.owns_any_lock(eof_lsn) {
1188            return Ok(());
1189        }
1190        // Non-blocking attempt first; on RangeInsert conflict we get Restart.
1191        match guard.lock(eof_lsn, LockType::RangeRead, true) {
1192            Ok(_) => Ok(()),
1193            Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
1194                guard
1195                    .lock(eof_lsn, LockType::RangeRead, false)
1196                    .map_err(DbiError::TxnError)?;
1197                Ok(())
1198            }
1199            Err(e) => Err(DbiError::TxnError(e)),
1200        }
1201    }
1202
1203    /// Fetches the data associated with `key` from a tree (BIN-level lookup).
1204    ///
1205    /// Returns `(data, slot_lsn)` so the caller can acquire a read lock.
1206    ///
1207    /// Data-read path in `CursorImpl.lockAndGetCurrent()`.
1208    fn get_data_from_tree(tree: &Tree, key: &[u8]) -> Option<(Vec<u8>, u64)> {
1209        use noxu_tree::tree::TreeNode;
1210        let root = tree.get_root()?;
1211        // Descend to the BIN that should contain `key` (not always the leftmost).
1212        let bin_arc = Self::find_bin_for_key(root, key)?;
1213        let guard = bin_arc.read();
1214        match &*guard {
1215            TreeNode::Bottom(bin) => {
1216                // BIN entries store compressed (suffix) keys under the BIN's
1217                // key_prefix. If the key doesn't start with the prefix,
1218                // it is not in this BIN — return None rather than panicking.
1219                if !bin.key_prefix.is_empty()
1220                    && !key.starts_with(bin.key_prefix.as_slice())
1221                {
1222                    return None;
1223                }
1224                let suffix = bin.compress_key(key);
1225                bin.entries
1226                    .iter()
1227                    .find(|e| e.key.as_slice() == suffix.as_slice())
1228                    .map(|e| {
1229                        (e.data.clone().unwrap_or_default(), e.lsn.as_u64())
1230                    })
1231            }
1232            _ => None,
1233        }
1234    }
1235
1236    /// Finds the first entry in the tree whose key >= `key`.
1237    ///
1238    /// Returns `(key, data, slot_lsn)` so the caller can acquire a read lock.
1239    ///
1240    /// # Algorithm
1241    ///
1242    /// SearchGte is a two-step probe:
1243    ///
1244    ///   1. Locate the BIN that *should* contain `key` via
1245    ///      `find_bin_for_key` and scan it for the smallest entry whose
1246    ///      full key is `>= key`.  The seed `key` is *not* required to
1247    ///      share the BIN's learned `key_prefix` — we explicitly handle
1248    ///      the three legal seed/`key_prefix` relationships:
1249    ///
1250    ///      * `key.starts_with(key_prefix)` — cheap suffix comparison;
1251    ///        the stored `entries[i].key` are suffixes under that prefix,
1252    ///        so we compare against `&key[plen..]`.
1253    ///      * `key < key_prefix` lexicographically — every full key in
1254    ///        this BIN starts with `key_prefix` and is therefore strictly
1255    ///        greater than `key`; the answer is `entries[0]`.  This
1256    ///        includes the common case of a short search seed (e.g.
1257    ///        `b"K\0"`) on a BIN whose learned prefix has grown longer
1258    ///        than the seed (`b"K\0bucket\0…"`).
1259    ///      * `key > key_prefix` lexicographically — every full key in
1260    ///        this BIN is strictly less than `key`; nothing here matches,
1261    ///        fall through to step 2.
1262    ///
1263    ///   2. If step 1 returned nothing (either no entry in the chosen
1264    ///      BIN satisfies `>= key`, or the BIN was empty / the seed sits
1265    ///      lex-after the BIN's prefix) call `Tree::get_next_bin(key)`
1266    ///      and return its first entry, which by B+tree invariants is
1267    ///      strictly greater than `key`.
1268    ///
1269    /// # Why step 2's first entry is the correct answer
1270    ///
1271    /// `find_bin_for_key` descends by picking, at each internal level,
1272    /// the largest separator `<= key`.  If it lands on BIN `B` reached
1273    /// via slot `p` of some ancestor, then `separator(p) <= key` and
1274    /// (when slot `p+1` exists) `separator(p+1) > key` strictly —
1275    /// otherwise descent would have picked `p+1`.  By the B+tree
1276    /// key-range invariant every key in the subtree rooted at `slot(p+1)`
1277    /// is `>= separator(p+1) > key`.  `Tree::get_next_bin` returns the
1278    /// leftmost BIN of exactly that next-sibling subtree, so its first
1279    /// entry is the smallest key in the whole tree that is `> key`.
1280    /// One probe, deterministically correct — no looping needed.
1281    ///
1282    /// # Locking
1283    ///
1284    /// The step-1 BIN read lock is released before step 2 fires so that
1285    /// `get_next_bin`'s own latch-coupled descent is unconstrained and
1286    /// other threads (especially writers crossing this BIN) are not
1287    /// blocked on a lock we no longer need.
1288    ///
1289    /// # Empty intermediate BINs
1290    ///
1291    /// If the chosen BIN is empty *and* `get_next_bin` returns an empty
1292    /// BIN (a transient state under delete-heavy workloads, before the
1293    /// cleaner has collapsed it), this returns `None` and the caller
1294    /// reports `NotFound`.  This matches `Get::Next`'s behaviour today;
1295    /// see also the follow-up note in
1296    /// `cursor_search_gte_skips_past_empty_bin_is_pre_existing_limit`.
1297    fn find_range_entry(
1298        tree: &Tree,
1299        key: &[u8],
1300    ) -> Option<(Vec<u8>, Vec<u8>, u64, usize)> {
1301        use noxu_tree::tree::TreeNode;
1302
1303        // Step 1: scan the BIN that should contain `key`.  The read lock
1304        // is dropped at the end of this block before step 2 runs.
1305        let in_current: Option<(Vec<u8>, Vec<u8>, u64, usize)> = {
1306            let root = tree.get_root()?;
1307            // Use find_bin_for_key so range searches also work for non-leftmost BINs.
1308            let bin_arc = Self::find_bin_for_key(root, key)?;
1309            let guard = bin_arc.read();
1310            match &*guard {
1311                TreeNode::Bottom(bin) => {
1312                    let plen = bin.key_prefix.len();
1313
1314                    if plen != 0 && !key.starts_with(bin.key_prefix.as_slice())
1315                    {
1316                        // Seed does not share this BIN's learned prefix.
1317                        // Decide by lex-comparing seed against key_prefix;
1318                        // never call compress_key (which requires `starts_with`).
1319                        if key < bin.key_prefix.as_slice() {
1320                            // Every key in this BIN is > seed.
1321                            bin.entries.first().and_then(|e| {
1322                                bin.get_full_key(0).map(|fk| {
1323                                    (
1324                                        fk,
1325                                        e.data.clone().unwrap_or_default(),
1326                                        e.lsn.as_u64(),
1327                                        0usize,
1328                                    )
1329                                })
1330                            })
1331                        } else {
1332                            // Every key in this BIN is < seed; let step 2
1333                            // handle it.
1334                            None
1335                        }
1336                    } else {
1337                        // Cheap path: suffix comparison.
1338                        let suffix = &key[plen..];
1339                        bin.entries
1340                            .iter()
1341                            .enumerate()
1342                            .find(|(_, e)| e.key.as_slice() >= suffix)
1343                            .and_then(|(i, e)| {
1344                                bin.get_full_key(i).map(|fk| {
1345                                    (
1346                                        fk,
1347                                        e.data.clone().unwrap_or_default(),
1348                                        e.lsn.as_u64(),
1349                                        i,
1350                                    )
1351                                })
1352                            })
1353                    }
1354                }
1355                _ => None,
1356            }
1357            // bin_arc read lock dropped here.
1358        };
1359
1360        if let Some(r) = in_current {
1361            return Some(r);
1362        }
1363
1364        // Step 2: chosen BIN had nothing >= key.  By B+tree invariants the
1365        // first entry of the next BIN is strictly > key, which satisfies
1366        // SearchGte.  No iteration: one call, one answer.
1367        // The first entry of the next BIN is at slot index 0.
1368        let next = tree.get_next_bin(key)?;
1369        let e = next.into_iter().next()?;
1370        Some((e.key, e.data.unwrap_or_default(), e.lsn.as_u64(), 0))
1371    }
1372
1373    /// Descends from the given node to the leftmost BIN, returning its Arc.
1374    fn descend_to_bin(
1375        node: std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1376    ) -> Option<std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>>
1377    {
1378        use noxu_tree::tree::TreeNode;
1379        let mut current = node;
1380        loop {
1381            let (is_bin, child) = {
1382                let g = current.read();
1383                let is_bin = g.is_bin();
1384                let child = if !is_bin {
1385                    match &*g {
1386                        TreeNode::Internal(n) => {
1387                            n.entries.first().and_then(|e| e.child.clone())
1388                        }
1389                        _ => None,
1390                    }
1391                } else {
1392                    None
1393                };
1394                (is_bin, child)
1395            };
1396            if is_bin {
1397                return Some(current);
1398            }
1399            current = child?;
1400        }
1401    }
1402
1403    /// Descends from the given node to the rightmost BIN, returning its Arc.
1404    fn descend_to_last_bin(
1405        node: std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1406    ) -> Option<std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>>
1407    {
1408        use noxu_tree::tree::TreeNode;
1409        let mut current = node;
1410        loop {
1411            let (is_bin, child) = {
1412                let g = current.read();
1413                let is_bin = g.is_bin();
1414                let child = if !is_bin {
1415                    match &*g {
1416                        TreeNode::Internal(n) => {
1417                            n.entries.last().and_then(|e| e.child.clone())
1418                        }
1419                        _ => None,
1420                    }
1421                } else {
1422                    None
1423                };
1424                (is_bin, child)
1425            };
1426            if is_bin {
1427                return Some(current);
1428            }
1429            current = child?;
1430        }
1431    }
1432
1433    /// Positions the cursor at the first (smallest) record in the database.
1434    ///
1435    /// .
1436    ///
1437    /// Uses `Tree::get_first_node()` to descend to the leftmost BIN, then
1438    /// positions the cursor at slot 0.
1439    ///
1440    /// # Returns
1441    ///
1442    /// * `Success` if the tree is non-empty
1443    /// * `NotFound` if the tree is empty
1444    pub fn get_first(&mut self) -> Result<OperationStatus, DbiError> {
1445        self.check_state()?;
1446
1447        let result: Option<(
1448            Vec<u8>,
1449            Vec<u8>,
1450            i32,
1451            u64,
1452            std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1453        )> = {
1454            let db = self.db_impl.read();
1455            if let Some(tree) = db.get_real_tree() {
1456                if tree.is_empty() {
1457                    None
1458                } else {
1459                    use noxu_tree::tree::TreeNode;
1460                    tree.get_root().and_then(|r| {
1461                        let bin_arc = Self::descend_to_bin(r)?;
1462                        let (key, data, lsn) = {
1463                            let g = bin_arc.read();
1464                            match &*g {
1465                                TreeNode::Bottom(bin) => {
1466                                    if bin.entries.is_empty() {
1467                                        return None;
1468                                    }
1469                                    (
1470                                        bin.get_full_key(0).unwrap_or_default(),
1471                                        bin.entries[0]
1472                                            .data
1473                                            .clone()
1474                                            .unwrap_or_default(),
1475                                        bin.entries[0].lsn.as_u64(),
1476                                    )
1477                                }
1478                                _ => return None,
1479                            }
1480                        };
1481                        Some((key, data, 0i32, lsn, bin_arc))
1482                    })
1483                }
1484            } else {
1485                None
1486            }
1487        };
1488
1489        match result {
1490            Some((key, data, idx, lsn, bin_arc)) => {
1491                self.lock_ln(lsn)?;
1492                self.current_key = Some(key);
1493                self.current_data = Some(data);
1494                self.current_lsn = lsn;
1495                self.current_index = idx;
1496                self.state = CursorState::Initialized;
1497                self.update_bin_pin(Some(bin_arc));
1498                Ok(OperationStatus::Success)
1499            }
1500            None => {
1501                // Empty tree.  T-F2: for SERIALIZABLE, lock the EOF sentinel
1502                // so inserts into the (currently empty) database are blocked.
1503                self.lock_eof_for_scan()?;
1504                Ok(OperationStatus::NotFound)
1505            }
1506        }
1507    }
1508
1509    /// Positions the cursor at the last (largest) record in the database.
1510    ///
1511    /// .
1512    ///
1513    /// Uses `Tree::get_last_node()` to descend to the rightmost BIN, then
1514    /// positions the cursor at the last slot.
1515    ///
1516    /// # Returns
1517    ///
1518    /// * `Success` if the tree is non-empty
1519    /// * `NotFound` if the tree is empty
1520    pub fn get_last(&mut self) -> Result<OperationStatus, DbiError> {
1521        self.check_state()?;
1522
1523        let result: Option<(
1524            Vec<u8>,
1525            Vec<u8>,
1526            i32,
1527            u64,
1528            std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1529        )> = {
1530            let db = self.db_impl.read();
1531            if let Some(tree) = db.get_real_tree() {
1532                if tree.is_empty() {
1533                    None
1534                } else {
1535                    use noxu_tree::tree::TreeNode;
1536                    tree.get_root().and_then(|r| {
1537                        let bin_arc = Self::descend_to_last_bin(r)?;
1538                        let (key, data, last_idx, lsn) = {
1539                            let g = bin_arc.read();
1540                            match &*g {
1541                                TreeNode::Bottom(bin) => {
1542                                    let n = bin.entries.len();
1543                                    if n == 0 {
1544                                        return None;
1545                                    }
1546                                    let last_idx = n - 1;
1547                                    (
1548                                        bin.get_full_key(last_idx)
1549                                            .unwrap_or_default(),
1550                                        bin.entries[last_idx]
1551                                            .data
1552                                            .clone()
1553                                            .unwrap_or_default(),
1554                                        last_idx as i32,
1555                                        bin.entries[last_idx].lsn.as_u64(),
1556                                    )
1557                                }
1558                                _ => return None,
1559                            }
1560                        };
1561                        Some((key, data, last_idx, lsn, bin_arc))
1562                    })
1563                }
1564            } else {
1565                None
1566            }
1567        };
1568
1569        match result {
1570            Some((key, data, idx, lsn, bin_arc)) => {
1571                self.lock_ln(lsn)?;
1572                self.current_key = Some(key);
1573                self.current_data = Some(data);
1574                self.current_lsn = lsn;
1575                self.current_index = idx;
1576                self.state = CursorState::Initialized;
1577                self.update_bin_pin(Some(bin_arc));
1578                Ok(OperationStatus::Success)
1579            }
1580            None => Ok(OperationStatus::NotFound),
1581        }
1582    }
1583
1584    /// Retrieves the current record.
1585    ///
1586    /// Returns the key and data at the cursor's current position.
1587    ///
1588    /// # Returns
1589    ///
1590    /// A tuple of (key, data) for the current record.
1591    ///
1592    /// # Errors
1593    ///
1594    /// * `CursorNotInitialized` if the cursor is not positioned on a record
1595    /// * `CursorClosed` if the cursor has been closed
1596    pub fn get_current(&self) -> Result<(Vec<u8>, Vec<u8>), DbiError> {
1597        self.check_initialized()?;
1598
1599        let raw_key =
1600            self.current_key.clone().ok_or(DbiError::CursorNotInitialized)?;
1601        let raw_data = self.current_data.clone().unwrap_or_default();
1602
1603        // For sorted-dup databases the tree stores two-part composite keys.
1604        // current_key holds the raw two-part key; split it for the caller.
1605        if self.is_sorted_dup()
1606            && let Some((pk, data)) = dup_key_data::split(&raw_key)
1607        {
1608            return Ok((pk, data));
1609        }
1610        Ok((raw_key, raw_data))
1611    }
1612
1613    /// Returns true if the slot the cursor is positioned on has been deleted
1614    /// since the cursor was last positioned.
1615    ///
1616    /// : analogous to checking KNOWN_DELETED_BIT / entry removal on
1617    /// Cursor.getCurrentLN() path — returns KEYEMPTY when the record is gone.
1618    pub fn is_current_slot_deleted(&self) -> bool {
1619        use noxu_tree::tree::TreeNode;
1620        let current_key = match &self.current_key {
1621            Some(k) => k,
1622            None => return false,
1623        };
1624        let bin_arc = match &self.current_bin_arc {
1625            Some(a) => a,
1626            None => return false,
1627        };
1628        let idx = self.current_index as usize;
1629        let guard = bin_arc.read();
1630        if let TreeNode::Bottom(bin) = &*guard {
1631            if idx >= bin.entries.len() {
1632                return true; // entry was removed
1633            }
1634            let plen = bin.key_prefix.len();
1635            let expected_suffix: &[u8] =
1636                if plen == 0 || current_key.len() <= plen {
1637                    current_key.as_slice()
1638                } else {
1639                    &current_key[plen..]
1640                };
1641            let stored = bin.entries[idx].key.as_slice();
1642            if stored != expected_suffix {
1643                return true; // different key at this index = deleted and shifted
1644            }
1645            bin.entries[idx].known_deleted
1646        } else {
1647            false
1648        }
1649    }
1650
1651    /// Moves the cursor to the next/previous record.
1652    ///
1653    /// .
1654    ///
1655    /// Advances `current_index` within the current BIN.  When the BIN is
1656    /// exhausted (forward: `index >= nEntries`; backward: `index < 0`) the
1657    /// cursor moves to the adjacent BIN via `Tree::get_next_bin()` /
1658    /// `Tree::get_prev_bin()`, mirroring call to
1659    /// `tree.getNextBin(anchorBIN)` / `tree.getPrevBin(anchorBIN)`.
1660    ///
1661    /// The GetMode parameter controls direction and duplicate handling:
1662    ///
1663    /// * `Next` / `NextNoDup` / `NextDup` — move forward
1664    /// * `Prev` / `PrevNoDup` / `PrevDup` — move backward
1665    ///
1666    /// # Returns
1667    ///
1668    /// * `Success` if positioned on a new record
1669    /// * `NotFound` if there are no more records in that direction
1670    pub fn retrieve_next(
1671        &mut self,
1672        mode: GetMode,
1673    ) -> Result<OperationStatus, DbiError> {
1674        self.check_state()?;
1675
1676        if self.state == CursorState::NotInitialized {
1677            return Ok(OperationStatus::NotFound);
1678        }
1679
1680        let is_dup = self.is_sorted_dup();
1681
1682        // BDB-JE contract: NEXT_DUP / PREV_DUP advance only within the
1683        // duplicate-set of the current key.  On a non-sorted-dup database
1684        // every key has exactly one record, so there can never be another
1685        // duplicate of the current position — the only correct answer is
1686        // NotFound.  Without this early-return, the dup-filter below is
1687        // gated on `is_dup` and the cursor would silently degenerate into
1688        // plain Next / Prev semantics, returning the next *different* key
1689        // and violating the documented contract.  See
1690        // `docs/src/internal/api-audit-2026-05-cursor.md` Finding 5.
1691        if !is_dup && matches!(mode, GetMode::NextDup | GetMode::PrevDup) {
1692            return Ok(OperationStatus::NotFound);
1693        }
1694
1695        // For NextDup/PrevDup/NextNoDup/PrevNoDup, capture the primary key of
1696        // the current position before advancing.
1697        let current_primary_key: Option<Vec<u8>> = if is_dup {
1698            self.current_key.as_ref().and_then(|raw| dup_key_data::get_key(raw))
1699        } else {
1700            None
1701        };
1702
1703        let forward = mode.is_forward();
1704        let next_index = if forward {
1705            self.current_index + 1
1706        } else {
1707            self.current_index - 1
1708        };
1709
1710        // Within-BIN traversal.
1711        //
1712        // Fast path (O(1)): use the pinned `current_bin_arc` to read
1713        // `next_index` directly, avoiding a root-to-leaf B-tree traversal on
1714        // every cursor step.
1715        //
1716        // Slow path (O(log N)): only taken when `current_bin_arc` is not yet
1717        // set (e.g. first advance after `get_first()` in an older code path).
1718        // We save the discovered arc so subsequent steps use the fast path.
1719        use noxu_tree::tree::TreeNode;
1720        let entry: Option<(Vec<u8>, Vec<u8>, i32, u64)>;
1721        let new_bin_arc: Option<
1722            std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1723        >;
1724
1725        if let Some(bin_arc) = &self.current_bin_arc {
1726            // Fast path: pinned BIN — no tree traversal.
1727            {
1728                let g = bin_arc.read();
1729                if let TreeNode::Bottom(bin) = &*g {
1730                    if next_index >= 0 && next_index < bin.entries.len() as i32
1731                    {
1732                        let idx = next_index as usize;
1733                        entry = Some((
1734                            bin.get_full_key(idx).unwrap_or_default(),
1735                            bin.entries[idx].data.clone().unwrap_or_default(),
1736                            next_index,
1737                            bin.entries[idx].lsn.as_u64(),
1738                        ));
1739                    } else {
1740                        entry = None; // BIN exhausted — fall through to cross-BIN
1741                    }
1742                } else {
1743                    entry = None;
1744                }
1745            }
1746            new_bin_arc = None;
1747        } else {
1748            // Slow path: traverse from root, then pin the discovered BIN.
1749            let current_key_slice_opt =
1750                self.current_key.as_deref().map(|s| s.to_vec());
1751            let db = self.db_impl.read();
1752            if let Some(tree) = db.get_real_tree() {
1753                if tree.is_empty() {
1754                    entry = None;
1755                    new_bin_arc = None;
1756                } else if let (Some(current_key), Some(root)) =
1757                    (current_key_slice_opt.as_deref(), tree.get_root())
1758                {
1759                    if let Some(bin_arc) =
1760                        Self::find_bin_for_key(root, current_key)
1761                    {
1762                        // Clone so we can move the arc after the read guard is dropped.
1763                        let arc_to_save = bin_arc.clone();
1764                        {
1765                            let g = bin_arc.read();
1766                            if let TreeNode::Bottom(bin) = &*g {
1767                                if next_index >= 0
1768                                    && next_index < bin.entries.len() as i32
1769                                {
1770                                    let idx = next_index as usize;
1771                                    entry = Some((
1772                                        bin.get_full_key(idx)
1773                                            .unwrap_or_default(),
1774                                        bin.entries[idx]
1775                                            .data
1776                                            .clone()
1777                                            .unwrap_or_default(),
1778                                        next_index,
1779                                        bin.entries[idx].lsn.as_u64(),
1780                                    ));
1781                                    new_bin_arc = Some(arc_to_save);
1782                                } else {
1783                                    entry = None;
1784                                    new_bin_arc = None;
1785                                }
1786                            } else {
1787                                entry = None;
1788                                new_bin_arc = None;
1789                            }
1790                        }
1791                    } else {
1792                        entry = None;
1793                        new_bin_arc = None;
1794                    }
1795                } else {
1796                    entry = None;
1797                    new_bin_arc = None;
1798                }
1799            } else {
1800                entry = None;
1801                new_bin_arc = None;
1802            }
1803        }
1804
1805        // Pin the BIN we discovered via the slow path.
1806        if new_bin_arc.is_some() {
1807            self.update_bin_pin(new_bin_arc);
1808        }
1809
1810        if let Some((key, data, idx, lsn)) = entry {
1811            // For dup-mode traversal modes, filter by primary key.
1812            if is_dup {
1813                let s = self.apply_dup_filter(
1814                    key,
1815                    data,
1816                    idx,
1817                    lsn,
1818                    mode,
1819                    current_primary_key.as_deref(),
1820                    forward,
1821                )?;
1822                return Ok(s);
1823            }
1824            self.lock_ln(lsn)?;
1825            self.current_key = Some(key);
1826            self.current_data = Some(data);
1827            self.current_lsn = lsn;
1828            self.current_index = idx;
1829            return Ok(OperationStatus::Success);
1830        }
1831
1832        // Current BIN exhausted — cross to adjacent BIN.
1833        let anchor_key: Vec<u8> = match &self.current_key {
1834            Some(k) => k.clone(),
1835            None => return Ok(OperationStatus::NotFound),
1836        };
1837
1838        let adjacent_entries: Option<Vec<BinEntry>> = {
1839            let db = self.db_impl.read();
1840            if let Some(tree) = db.get_real_tree() {
1841                if forward {
1842                    tree.get_next_bin(&anchor_key)
1843                } else {
1844                    tree.get_prev_bin(&anchor_key)
1845                }
1846            } else {
1847                None
1848            }
1849        };
1850
1851        match adjacent_entries {
1852            Some(entries) if !entries.is_empty() => {
1853                let (raw_key, raw_data, idx, lsn) = if forward {
1854                    let e = entries.into_iter().next().unwrap();
1855                    (e.key, e.data.unwrap_or_default(), 0i32, e.lsn.as_u64())
1856                } else {
1857                    let last_idx = (entries.len() - 1) as i32;
1858                    let e = entries.into_iter().last().unwrap();
1859                    (
1860                        e.key,
1861                        e.data.unwrap_or_default(),
1862                        last_idx,
1863                        e.lsn.as_u64(),
1864                    )
1865                };
1866                if is_dup {
1867                    let s = self.apply_dup_filter(
1868                        raw_key,
1869                        raw_data,
1870                        idx,
1871                        lsn,
1872                        mode,
1873                        current_primary_key.as_deref(),
1874                        forward,
1875                    )?;
1876                    return Ok(s);
1877                }
1878                self.lock_ln(lsn)?;
1879                // Crossed into a new BIN — update the cursor pin.
1880                let new_key_ref = raw_key.clone();
1881                let bin_arc = {
1882                    let db = self.db_impl.read();
1883                    db.get_real_tree().and_then(|tree| {
1884                        tree.get_root().and_then(|r| {
1885                            Self::find_bin_for_key(r, &new_key_ref)
1886                        })
1887                    })
1888                };
1889                self.current_key = Some(raw_key);
1890                self.current_data = Some(raw_data);
1891                self.current_lsn = lsn;
1892                self.current_index = idx;
1893                self.update_bin_pin(bin_arc);
1894                Ok(OperationStatus::Success)
1895            }
1896            _ => {
1897                // Reached the end of the key space (no adjacent BIN).
1898                // T-F2: for a SERIALIZABLE forward scan, acquire RangeRead
1899                // on the per-database EOF sentinel so concurrent inserts of
1900                // keys past the current last key are blocked until this
1901                // transaction commits.
1902                if forward {
1903                    self.lock_eof_for_scan()?;
1904                }
1905                Ok(OperationStatus::NotFound)
1906            }
1907        }
1908    }
1909
1910    /// Applies sorted-dup filtering rules after moving to `(raw_key, raw_data,
1911    /// idx)`.
1912    ///
1913    /// * `NextDup` / `PrevDup` — succeed only if the new entry's primary key
1914    ///   equals the saved primary key; return NotFound otherwise.
1915    /// * `NextNoDup` / `PrevNoDup` — advance past all entries that share the
1916    ///   same primary key as the saved position, returning the first entry with
1917    ///   a DIFFERENT primary key.
1918    /// * `Next` / `Prev` — accept any entry.
1919    ///
1920    /// Wave 11-N (Bug 4): every accept site re-finds and pins the BIN that
1921    /// contains `raw_key`.  Pre-fix the cross-BIN paths in this function
1922    /// updated `current_key` / `current_index` but left `current_bin_arc`
1923    /// pointing at the prior BIN, so the next `retrieve_next` fast-path
1924    /// would read `next_index = current_index + 1` from the old BIN —
1925    /// effectively re-emitting old entries and (for large secondary
1926    /// indexes) preventing the walk from terminating.
1927    fn apply_dup_filter(
1928        &mut self,
1929        mut raw_key: Vec<u8>,
1930        mut raw_data: Vec<u8>,
1931        mut idx: i32,
1932        mut lsn: u64,
1933        mode: GetMode,
1934        prev_primary_key: Option<&[u8]>,
1935        forward: bool,
1936    ) -> Result<OperationStatus, DbiError> {
1937        loop {
1938            let new_pk = dup_key_data::get_key(&raw_key);
1939            match mode {
1940                GetMode::NextDup | GetMode::PrevDup => {
1941                    // Stay on the same primary key.
1942                    let same = match (&new_pk, prev_primary_key) {
1943                        (Some(npk), Some(ppk)) => npk.as_slice() == ppk,
1944                        _ => false,
1945                    };
1946                    if same {
1947                        self.lock_ln(lsn)?;
1948                        let bin_arc = self.find_bin_arc_for_key(&raw_key);
1949                        self.current_key = Some(raw_key);
1950                        self.current_data = Some(raw_data);
1951                        self.current_lsn = lsn;
1952                        self.current_index = idx;
1953                        self.update_bin_pin(bin_arc);
1954                        return Ok(OperationStatus::Success);
1955                    } else {
1956                        return Ok(OperationStatus::NotFound);
1957                    }
1958                }
1959                GetMode::NextNoDup | GetMode::PrevNoDup => {
1960                    // Skip entries with the same primary key as `prev_primary_key`.
1961                    let same = match (&new_pk, prev_primary_key) {
1962                        (Some(npk), Some(ppk)) => npk.as_slice() == ppk,
1963                        _ => false,
1964                    };
1965                    if !same {
1966                        self.lock_ln(lsn)?;
1967                        let bin_arc = self.find_bin_arc_for_key(&raw_key);
1968                        self.current_key = Some(raw_key);
1969                        self.current_data = Some(raw_data);
1970                        self.current_lsn = lsn;
1971                        self.current_index = idx;
1972                        self.update_bin_pin(bin_arc);
1973                        return Ok(OperationStatus::Success);
1974                    }
1975                    // Need to advance further.
1976                    // Increment/decrement idx and try to read from the tree.
1977                    if forward {
1978                        idx += 1;
1979                    } else {
1980                        idx -= 1;
1981                    }
1982                    let next = {
1983                        let db = self.db_impl.read();
1984                        if let Some(tree) = db.get_real_tree() {
1985                            if tree.is_empty() {
1986                                None
1987                            } else {
1988                                use noxu_tree::tree::TreeNode;
1989                                tree.get_root().and_then(|r| {
1990                                    // Use the current raw_key to find the BIN.
1991                                    let bin_arc =
1992                                        Self::find_bin_for_key(r, &raw_key)?;
1993                                    let g = bin_arc.read();
1994                                    match &*g {
1995                                        TreeNode::Bottom(bin) => {
1996                                            if idx < 0
1997                                                || idx
1998                                                    >= bin.entries.len() as i32
1999                                            {
2000                                                None
2001                                            } else {
2002                                                let i = idx as usize;
2003                                                Some((
2004                                                    bin.get_full_key(i)
2005                                                        .unwrap_or_default(),
2006                                                    bin.entries[i]
2007                                                        .data
2008                                                        .clone()
2009                                                        .unwrap_or_default(),
2010                                                    idx,
2011                                                    bin.entries[i].lsn.as_u64(),
2012                                                ))
2013                                            }
2014                                        }
2015                                        _ => None,
2016                                    }
2017                                })
2018                            }
2019                        } else {
2020                            None
2021                        }
2022                    };
2023                    match next {
2024                        Some((k, d, i, l)) => {
2025                            raw_key = k;
2026                            raw_data = d;
2027                            idx = i;
2028                            lsn = l;
2029                            // Loop continues.
2030                        }
2031                        None => {
2032                            // BIN exhausted — cross to adjacent BIN.
2033                            let anchor = raw_key.clone();
2034                            let adj: Option<Vec<BinEntry>> = {
2035                                let db = self.db_impl.read();
2036                                if let Some(tree) = db.get_real_tree() {
2037                                    if forward {
2038                                        tree.get_next_bin(&anchor)
2039                                    } else {
2040                                        tree.get_prev_bin(&anchor)
2041                                    }
2042                                } else {
2043                                    None
2044                                }
2045                            };
2046                            match adj {
2047                                Some(entries) if !entries.is_empty() => {
2048                                    let (k, d, i, l) = if forward {
2049                                        let e =
2050                                            entries.into_iter().next().unwrap();
2051                                        (
2052                                            e.key,
2053                                            e.data.unwrap_or_default(),
2054                                            0i32,
2055                                            e.lsn.as_u64(),
2056                                        )
2057                                    } else {
2058                                        let li = (entries.len() - 1) as i32;
2059                                        let e =
2060                                            entries.into_iter().last().unwrap();
2061                                        (
2062                                            e.key,
2063                                            e.data.unwrap_or_default(),
2064                                            li,
2065                                            e.lsn.as_u64(),
2066                                        )
2067                                    };
2068                                    raw_key = k;
2069                                    raw_data = d;
2070                                    idx = i;
2071                                    lsn = l;
2072                                    // Loop continues.
2073                                }
2074                                _ => return Ok(OperationStatus::NotFound),
2075                            }
2076                        }
2077                    }
2078                }
2079                // Next / Prev: accept any entry.
2080                GetMode::Next | GetMode::Prev => {
2081                    self.lock_ln(lsn)?;
2082                    let bin_arc = self.find_bin_arc_for_key(&raw_key);
2083                    self.current_key = Some(raw_key);
2084                    self.current_data = Some(raw_data);
2085                    self.current_lsn = lsn;
2086                    self.current_index = idx;
2087                    self.update_bin_pin(bin_arc);
2088                    return Ok(OperationStatus::Success);
2089                }
2090            }
2091        }
2092    }
2093
2094    /// Descends from `node` to the BIN whose key range contains `key`.
2095    ///
2096    /// This mirrors the search path in `Tree::search()` — at each upper IN
2097    /// we follow the child slot with the largest key <= `key`.  Returns the
2098    /// `Arc` of the matching BIN, or `None` if the tree is empty / malformed.
2099    fn find_bin_for_key(
2100        node: std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
2101        key: &[u8],
2102    ) -> Option<std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>>
2103    {
2104        use noxu_tree::tree::TreeNode;
2105        let mut current = node;
2106        loop {
2107            let (is_bin, child) = {
2108                let g = current.read();
2109                let is_bin = g.is_bin();
2110                let child = if !is_bin {
2111                    match &*g {
2112                        TreeNode::Internal(n) => {
2113                            if n.entries.is_empty() {
2114                                return None;
2115                            }
2116                            // Slot 0 carries a virtual key (-infinity); follow
2117                            // the largest key <= search key (same logic as
2118                            // Tree::search and Tree::insert_recursive).
2119                            let mut idx = 0usize;
2120                            for (i, entry) in n.entries.iter().enumerate() {
2121                                if i == 0 {
2122                                    idx = 0;
2123                                } else if entry.key.as_slice() <= key {
2124                                    idx = i;
2125                                } else {
2126                                    break;
2127                                }
2128                            }
2129                            n.entries.get(idx).and_then(|e| e.child.clone())
2130                        }
2131                        _ => None,
2132                    }
2133                } else {
2134                    None
2135                };
2136                (is_bin, child)
2137            };
2138            if is_bin {
2139                return Some(current);
2140            }
2141            current = child?;
2142        }
2143    }
2144
2145    /// Inserts or updates a record at the cursor position.
2146    ///
2147    /// Write path:
2148    ///
2149    /// 1. Checks state and, for `Current` mode, that the cursor is initialized.
2150    /// 2. For `NoOverwrite`: searches the tree; returns `KeyExist` if found.
2151    /// 3. Calls `Tree::insert(key, data, lsn)` to insert/update in the BIN.
2152    /// 4. Updates the cursor position to the newly written record.
2153    ///
2154    /// Note: locking (step 2 in the) and WAL logging (step 3 in the) are not
2155    /// yet wired here — they require LogManager integration (P0 gap).
2156    ///
2157    /// # Arguments
2158    ///
2159    /// * `key` - The key to insert/update
2160    /// * `data` - The data value
2161    /// * `put_mode` - The insertion mode
2162    ///
2163    /// # Returns
2164    ///
2165    /// * `Success` if the record was inserted/updated
2166    /// * `KeyExist` if NoOverwrite mode and key already exists
2167    pub fn put(
2168        &mut self,
2169        key: &[u8],
2170        data: &[u8],
2171        put_mode: PutMode,
2172    ) -> Result<OperationStatus, DbiError> {
2173        self.check_state()?;
2174
2175        // For sorted-dup databases: encode (key, data) as a two-part composite
2176        // key.  The tree stores `combine(key, data)` with no slot data.
2177        // Dup path in 7.5.
2178        if self.is_sorted_dup() {
2179            return self.put_dup(key, data, put_mode);
2180        }
2181
2182        match put_mode {
2183            PutMode::Current => {
2184                self.check_initialized()?;
2185                let current_key = self
2186                    .current_key
2187                    .clone()
2188                    .ok_or(DbiError::CursorNotInitialized)?;
2189                let (old_data, old_lsn) =
2190                    self.get_slot_before_image(&current_key);
2191                self.lock_write_before_log(old_lsn, &current_key)?;
2192                let new_lsn = self.log_ln_write(
2193                    &current_key,
2194                    Some(data),
2195                    self.locker_id,
2196                )?;
2197                self.finalize_write_lock(
2198                    old_lsn,
2199                    new_lsn,
2200                    Some(current_key.clone()),
2201                    old_data,
2202                )?;
2203                self.apply_tree_insert(current_key, data.to_vec(), new_lsn);
2204                self.current_data = Some(data.to_vec());
2205                self.current_lsn = new_lsn.as_u64();
2206                Ok(OperationStatus::Success)
2207            }
2208            PutMode::NoOverwrite => {
2209                if self.key_exists_in_view(key) {
2210                    return Ok(OperationStatus::KeyExist);
2211                }
2212                // New insert: old_lsn may be NULL (key did not exist
2213                // when we read the BIN above) OR may be a real LSN if
2214                // a concurrent thread inserted between our
2215                // `key_exists_in_view` check above and our
2216                // `get_slot_before_image` call here.
2217                let (old_data, old_lsn) = self.get_slot_before_image(key);
2218                // T-F2: acquire RangeInsert on the successor key's LSN so
2219                // concurrent SERIALIZABLE scanners that have already passed
2220                // this key's position are blocked until we commit.  No-op
2221                // for non-serializable txns or updates (old_lsn != NULL).
2222                self.lock_range_insert(key, old_lsn)?;
2223                self.lock_write_before_log(old_lsn, key)?;
2224                // Re-check `key_exists_in_view` AFTER acquiring the
2225                // synthetic-key / per-LSN write lock.  A concurrent
2226                // inserter for the same brand-new key may have
2227                // committed while we were either blocked on the
2228                // synthetic key lock (NULL_LSN insert race) OR
2229                // blocked on the slot's write lock that the other
2230                // inserter held until commit.  In both cases we
2231                // must report `KeyExist` instead of overwriting,
2232                // because `NoOverwrite` semantics forbid silently
2233                // replacing an existing record.  Closes the first
2234                // F12 residual end-to-end.
2235                if self.key_exists_in_view(key) {
2236                    return Ok(OperationStatus::KeyExist);
2237                }
2238                let new_lsn =
2239                    self.log_ln_write(key, Some(data), self.locker_id)?;
2240                self.finalize_write_lock(
2241                    old_lsn,
2242                    new_lsn,
2243                    Some(key.to_vec()),
2244                    old_data,
2245                )?;
2246                self.apply_tree_insert(key.to_vec(), data.to_vec(), new_lsn);
2247                self.current_key = Some(key.to_vec());
2248                self.current_data = Some(data.to_vec());
2249                self.current_lsn = new_lsn.as_u64();
2250                self.current_index = 0;
2251                self.state = CursorState::Initialized;
2252                Ok(OperationStatus::Success)
2253            }
2254            // NoDupData on a non-dup database behaves like NoOverwrite:
2255            // returns KeyExist if the key already exists, otherwise inserts.
2256            // `Cursor.putNoDupData()` non-dup branch.
2257            PutMode::NoDupData => {
2258                if self.key_exists_in_view(key) {
2259                    return Ok(OperationStatus::KeyExist);
2260                }
2261                let (old_data, old_lsn) = self.get_slot_before_image(key);
2262                // T-F2: same as NoOverwrite path.
2263                self.lock_range_insert(key, old_lsn)?;
2264                self.lock_write_before_log(old_lsn, key)?;
2265                // See the NoOverwrite re-check above for rationale.
2266                if self.key_exists_in_view(key) {
2267                    return Ok(OperationStatus::KeyExist);
2268                }
2269                let new_lsn =
2270                    self.log_ln_write(key, Some(data), self.locker_id)?;
2271                self.finalize_write_lock(
2272                    old_lsn,
2273                    new_lsn,
2274                    Some(key.to_vec()),
2275                    old_data,
2276                )?;
2277                self.apply_tree_insert(key.to_vec(), data.to_vec(), new_lsn);
2278                self.current_key = Some(key.to_vec());
2279                self.current_data = Some(data.to_vec());
2280                self.current_lsn = new_lsn.as_u64();
2281                self.current_index = 0;
2282                self.state = CursorState::Initialized;
2283                Ok(OperationStatus::Success)
2284            }
2285            PutMode::Overwrite => {
2286                let (old_data, old_lsn) = self.get_slot_before_image(key);
2287                // T-F2: acquire RangeInsert if this is a brand-new key
2288                // (old_lsn == NULL_LSN).  For existing-key updates the
2289                // Write lock on old_lsn already conflicts with RangeRead.
2290                self.lock_range_insert(key, old_lsn)?;
2291                self.lock_write_before_log(old_lsn, key)?;
2292                let new_lsn =
2293                    self.log_ln_write(key, Some(data), self.locker_id)?;
2294                self.finalize_write_lock(
2295                    old_lsn,
2296                    new_lsn,
2297                    Some(key.to_vec()),
2298                    old_data,
2299                )?;
2300                self.apply_tree_insert(key.to_vec(), data.to_vec(), new_lsn);
2301                self.current_key = Some(key.to_vec());
2302                self.current_data = Some(data.to_vec());
2303                self.current_lsn = new_lsn.as_u64();
2304                self.current_index = 0;
2305                self.state = CursorState::Initialized;
2306                Ok(OperationStatus::Success)
2307            }
2308        }
2309    }
2310
2311    /// Sorted-dup variant of `put()`.
2312    ///
2313    /// Encodes (key, data) as a two-part composite key and stores it in the
2314    /// tree with empty slot data.  The tree's custom comparator ensures
2315    /// correct ordering.
2316    ///
2317    /// Dup path from 7.5.
2318    /// Dup path from 7.5.
2319    fn put_dup(
2320        &mut self,
2321        key: &[u8],
2322        data: &[u8],
2323        put_mode: PutMode,
2324    ) -> Result<OperationStatus, DbiError> {
2325        let two_part_key = dup_key_data::combine(key, data);
2326
2327        match put_mode {
2328            // --- Current: replace the data of the currently-positioned record ---
2329            PutMode::Current => {
2330                // In dup mode, "current" is the two-part key at the cursor
2331                // position; replacing it means deleting the old two-part key
2332                // and inserting a new one (delete old, insert new).
2333                self.check_initialized()?;
2334                let old_key = self
2335                    .current_key
2336                    .clone()
2337                    .ok_or(DbiError::CursorNotInitialized)?;
2338                let del_lsn =
2339                    self.log_ln_write(&old_key, None, self.locker_id)?;
2340                self.apply_tree_delete(old_key, del_lsn);
2341                let new_lsn = self.log_ln_write(
2342                    &two_part_key,
2343                    Some(b""),
2344                    self.locker_id,
2345                )?;
2346                self.apply_tree_insert(two_part_key.clone(), vec![], new_lsn);
2347                self.current_key = Some(two_part_key);
2348                self.current_data = None;
2349                self.current_lsn = new_lsn.as_u64();
2350                return Ok(OperationStatus::Success);
2351            }
2352            // --- Overwrite: insert or replace the exact (key, data) pair ---
2353            PutMode::Overwrite => {
2354                // SR9752 Part 2 (Wave 5): register the brand-new sorted-dup
2355                // insert with the cursor's txn / lock manager so abort-undo
2356                // can roll the dup back.  Distinguish update vs. insert: if
2357                // the (key, data) pair already exists, this is a no-op for
2358                // the counter and the slot LSN moves; if the pair is new,
2359                // the abort-undo deletes the slot.
2360                let exists_old_lsn: u64 = {
2361                    let db = self.db_impl.read();
2362                    db.get_real_tree()
2363                        .and_then(|tree| {
2364                            Self::get_data_from_tree(&tree, &two_part_key)
2365                        })
2366                        .map(|(_, lsn)| lsn)
2367                        .unwrap_or(noxu_util::NULL_LSN.as_u64())
2368                };
2369                self.lock_write_before_log(exists_old_lsn, &two_part_key)?;
2370                let new_lsn = self.log_ln_write(
2371                    &two_part_key,
2372                    Some(b""),
2373                    self.locker_id,
2374                )?;
2375                self.finalize_write_lock(
2376                    exists_old_lsn,
2377                    new_lsn,
2378                    Some(two_part_key.clone()),
2379                    None,
2380                )?;
2381                self.apply_tree_insert(two_part_key.clone(), vec![], new_lsn);
2382                self.current_key = Some(two_part_key);
2383                self.current_data = None;
2384                self.current_lsn = new_lsn.as_u64();
2385                self.current_index = 0;
2386                self.state = CursorState::Initialized;
2387                return Ok(OperationStatus::Success);
2388            }
2389            // --- NoDupData: (key, data) pair uniqueness check ---
2390            PutMode::NoDupData => {
2391                // Return KeyExist if the exact (key, data) pair already exists.
2392                // Mirrors JE's Cursor.putNoDupData() semantics.
2393                let exists = {
2394                    let db = self.db_impl.read();
2395                    if let Some(tree) = db.get_real_tree() {
2396                        tree.search(&two_part_key)
2397                            .map(|sr| sr.exact_parent_found)
2398                            .unwrap_or(false)
2399                    } else {
2400                        false
2401                    }
2402                };
2403                if exists {
2404                    return Ok(OperationStatus::KeyExist);
2405                }
2406            }
2407            // --- NoOverwrite: key-only uniqueness check (JE semantics) ---
2408            PutMode::NoOverwrite => {
2409                // JE invariant (DatabaseTest.testPutNoOverwriteInADupDb*):
2410                // once ANY (key, *) pair exists for this key, a putNoOverwrite
2411                // of the same key with ANY data value must return KEYEXIST.
2412                // This is different from NoDupData which checks (key,data).
2413                let key_exists = {
2414                    let db = self.db_impl.read();
2415                    if let Some(tree) = db.get_real_tree() {
2416                        let lb = dup_key_data::lower_bound(key);
2417                        tree.first_entry_at_or_after_with_index(&lb)
2418                            .map(|(found_key, _, _, _, _)| {
2419                                dup_key_data::matches_key(&found_key, key)
2420                            })
2421                            .unwrap_or(false)
2422                    } else {
2423                        false
2424                    }
2425                };
2426                if key_exists {
2427                    return Ok(OperationStatus::KeyExist);
2428                }
2429            }
2430        }
2431
2432        // --- Common insert path for NoDupData / NoOverwrite ---
2433        // Reached only when the existence check above passed (no early return).
2434        // v1.6 (Wave 2A): register the insert with the cursor's txn /
2435        // lock manager so abort-undo can roll back the new dup.
2436        // old_lsn is NULL_LSN: the existence check confirmed the pair is absent.
2437        let old_lsn = noxu_util::NULL_LSN.as_u64();
2438        self.lock_write_before_log(old_lsn, &two_part_key)?;
2439        let new_lsn =
2440            self.log_ln_write(&two_part_key, Some(b""), self.locker_id)?;
2441        self.finalize_write_lock(
2442            old_lsn,
2443            new_lsn,
2444            Some(two_part_key.clone()),
2445            None,
2446        )?;
2447        // Use apply_tree_insert so the per-database entry counter is bumped
2448        // on a new (key, data) pair — `Database::count()` reads this counter.
2449        self.apply_tree_insert(two_part_key.clone(), vec![], new_lsn);
2450        self.current_key = Some(two_part_key);
2451        self.current_data = None;
2452        self.current_lsn = new_lsn.as_u64();
2453        self.current_index = 0;
2454        self.state = CursorState::Initialized;
2455        Ok(OperationStatus::Success)
2456    }
2457
2458    /// Writes an LN (Leaf Node) log entry for a put or delete operation.
2459    ///
2460    /// Returns the LSN assigned to the entry, or NULL_LSN if no log manager
2461    /// is configured (e.g., read-only or test cursor).
2462    fn log_ln_write(
2463        &self,
2464        key: &[u8],
2465        data: Option<&[u8]>,
2466        txn_id: i64,
2467    ) -> Result<Lsn, DbiError> {
2468        // Deferred-write databases skip WAL logging entirely.
2469        // Data is flushed to disk only at eviction or checkpoint.
2470        // `CursorImpl.java` deferred-write check before logManager.log().
2471        if self.db_impl.read().is_deferred_write() {
2472            return Ok(noxu_util::NULL_LSN);
2473        }
2474
2475        let lm = match &self.log_manager {
2476            Some(lm) => lm,
2477            None => return Ok(noxu_util::NULL_LSN),
2478        };
2479
2480        let db_id = self.db_impl.read().get_id().id() as u64;
2481        let txn_id_opt = if txn_id != 0 { Some(txn_id) } else { None };
2482
2483        let entry = LnLogEntry::new(
2484            db_id,
2485            txn_id_opt,
2486            Lsn::from_u64(self.current_lsn), // abort_lsn: before-image LSN (current slot LSN before this write)
2487            false,                           // abort_known_deleted
2488            None,                            // abort_key
2489            None,                            // abort_data
2490            NULL_VLSN,                       // abort_vlsn
2491            0,                               // abort_expiration
2492            true,                            // embedded_ln
2493            key.to_vec(),
2494            data.map(|d| d.to_vec()),
2495            0,         // expiration
2496            NULL_VLSN, // vlsn
2497        );
2498
2499        let mut buf = BytesMut::with_capacity(entry.log_size());
2500        entry.write_to_log(&mut buf);
2501
2502        let entry_type = if data.is_some() {
2503            if txn_id_opt.is_some() {
2504                LogEntryType::InsertLNTxn
2505            } else {
2506                LogEntryType::InsertLN
2507            }
2508        } else if txn_id_opt.is_some() {
2509            LogEntryType::DeleteLNTxn
2510        } else {
2511            LogEntryType::DeleteLN
2512        };
2513
2514        // Pass the previous slot LSN as old_lsn so the UtilizationTracker
2515        // marks the previous version obsolete (the: countObsoleteNode with oldLsn).
2516        let old_lsn_opt = if self.current_lsn != noxu_util::NULL_LSN.as_u64() {
2517            Some(Lsn::from_u64(self.current_lsn))
2518        } else {
2519            None
2520        };
2521
2522        lm.log_with_old_lsn(
2523            entry_type,
2524            &buf,
2525            Provisional::No,
2526            false,
2527            false,
2528            old_lsn_opt,
2529        )
2530        .map_err(DbiError::from)
2531    }
2532
2533    /// Deletes the record at the cursor position.
2534    ///
2535    /// Delete path:
2536    ///
2537    /// 1. Checks that the cursor is initialized.
2538    /// 2. Writes a DeleteLN log entry to the WAL (if log manager is present).
2539    /// 3. Calls `Tree::delete(key)` to remove the entry from the BIN.
2540    /// 4. Resets cursor to NotInitialized (matching behaviour).
2541    ///
2542    /// # Returns
2543    ///
2544    /// * `Success` if the record was deleted
2545    ///
2546    /// # Errors
2547    ///
2548    /// * `CursorNotInitialized` if cursor is not positioned
2549    /// * `CursorClosed` if cursor has been closed
2550    pub fn delete(&mut self) -> Result<OperationStatus, DbiError> {
2551        self.check_initialized()?;
2552
2553        // For sorted-dup databases, current_key IS the two-part composite key
2554        // stored in the tree.  For non-dup databases it is the plain key.
2555        // In both cases current_key is the correct tree-delete key.
2556        if let Some(tree_key) = self.current_key.clone() {
2557            let (old_data, old_lsn) = self.get_slot_before_image(&tree_key);
2558            self.lock_write_before_log(old_lsn, &tree_key)?;
2559            // Wave 5: also hold a synthetic-key write lock for the
2560            // duration of the txn so concurrent readers that probe the
2561            // BIN post-physical-removal can detect contention via
2562            // `contest_synthetic_key_for_missing_read`.
2563            self.lock_synthetic_key_for_delete(&tree_key)?;
2564            let del_lsn = self.log_ln_write(&tree_key, None, self.locker_id)?;
2565            self.finalize_write_lock(
2566                old_lsn,
2567                del_lsn,
2568                Some(tree_key.clone()),
2569                old_data,
2570            )?;
2571            self.apply_tree_delete(tree_key, del_lsn);
2572        }
2573
2574        self.current_key = None;
2575        self.current_data = None;
2576        self.current_lsn = noxu_util::NULL_LSN.as_u64();
2577        self.current_index = -1;
2578        self.state = CursorState::NotInitialized;
2579
2580        Ok(OperationStatus::Success)
2581    }
2582
2583    /// Counts the number of duplicates at the current key position.
2584    ///
2585    /// For sorted-dup databases, traverses all records sharing the same
2586    /// primary key. For non-dup databases, returns 1 if positioned.
2587    ///
2588    /// 7.5.
2589    ///
2590    /// # Returns
2591    ///
2592    /// The count of duplicate records at the current key.
2593    ///
2594    /// # Errors
2595    ///
2596    /// * `CursorNotInitialized` if cursor is not positioned
2597    /// * `CursorClosed` if cursor has been closed
2598    pub fn count(&self) -> Result<i64, DbiError> {
2599        self.check_initialized()?;
2600
2601        // For sorted-dup databases, count all entries sharing the same primary
2602        // key as the current position.
2603        //
2604        // Strategy (Wave 11-N Bug 1 fix): clone the cursor at the current
2605        // position, walk backward with PrevDup until NotFound (which leaves
2606        // scratch on the FIRST dup of the primary), then walk forward with
2607        // NextDup counting successful steps.  The total count is
2608        // `forward + 1` because the forward walk visits every dup *after*
2609        // the first, plus the one scratch is parked on at the start of the
2610        // forward walk.
2611        //
2612        // Pre-fix the formula was `backward + 1 + forward`, which double
2613        // counted: the backward walk left scratch on the first dup
2614        // already, so the forward walk re-traverses every dup including
2615        // the original position.  The result for an N-dup primary observed
2616        // at offset `i` was `i + N` instead of `N`.
2617        if self.is_sorted_dup() {
2618            let mut scratch = self.dup(true)?;
2619            // Walk backward to the first dup of this primary.  We do not
2620            // count these steps — they are pure repositioning.
2621            while let Ok(OperationStatus::Success) =
2622                scratch.retrieve_next(GetMode::PrevDup)
2623            {}
2624            // scratch is now parked on the first dup of this primary.
2625            let mut forward: i64 = 0;
2626            while let Ok(OperationStatus::Success) =
2627                scratch.retrieve_next(GetMode::NextDup)
2628            {
2629                forward += 1;
2630            }
2631            return Ok(forward + 1);
2632        }
2633
2634        Ok(1)
2635    }
2636
2637    /// Creates a duplicate of this cursor at the same position.
2638    ///
2639    /// If `same_position` is true, the new cursor is positioned at the
2640    /// same record as this cursor. Otherwise, the new cursor is created
2641    /// in the NotInitialized state.
2642    ///
2643    /// The duplicated cursor shares the same locker (transaction) as
2644    /// the original cursor.
2645    ///
2646    /// # Arguments
2647    ///
2648    /// * `same_position` - Whether to copy the current position
2649    ///
2650    /// # Returns
2651    ///
2652    /// A new CursorImpl with the same or uninitialized position.
2653    ///
2654    /// # Errors
2655    ///
2656    /// * `CursorClosed` if the cursor has been closed
2657    pub fn dup(&self, same_position: bool) -> Result<CursorImpl, DbiError> {
2658        self.check_state()?;
2659
2660        let mut new_cursor = match &self.log_manager {
2661            Some(lm) => CursorImpl::with_log_manager(
2662                self.db_impl.clone(),
2663                self.locker_id,
2664                lm.clone(),
2665            ),
2666            None => CursorImpl::new(self.db_impl.clone(), self.locker_id),
2667        };
2668        if let Some(lm) = &self.lock_manager {
2669            new_cursor.lock_manager = Some(lm.clone());
2670        }
2671
2672        if same_position && self.state == CursorState::Initialized {
2673            new_cursor.current_key = self.current_key.clone();
2674            new_cursor.current_data = self.current_data.clone();
2675            new_cursor.current_lsn = self.current_lsn;
2676            new_cursor.current_index = self.current_index;
2677            new_cursor.state = CursorState::Initialized;
2678        }
2679
2680        Ok(new_cursor)
2681    }
2682
2683    /// Closes the cursor.
2684    ///
2685    /// Releases all resources held by the cursor, including any BIN latches
2686    /// and cursor-level locks. After closing, all operations on the cursor
2687    /// will return `CursorClosed` errors.
2688    ///
2689    /// Closing a cursor multiple times is safe and has no effect after the
2690    /// Updates the cursor's BIN pin when moving to a new BIN.
2691    ///
2692    /// Decrements  on the old BIN (if any) and increments it
2693    /// on  (if ).  No-op when the cursor stays on the same BIN
2694    /// (pointer equality checked via ).
2695    /// Re-descends the tree to find the BIN that contains `key`.  Used
2696    /// by the sorted-dup cross-BIN paths in `apply_dup_filter` to
2697    /// re-pin `current_bin_arc` after a BIN boundary is crossed.
2698    fn find_bin_arc_for_key(
2699        &self,
2700        key: &[u8],
2701    ) -> Option<std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>>
2702    {
2703        let db = self.db_impl.read();
2704        let tree = db.get_real_tree()?;
2705        let root = tree.get_root()?;
2706        Self::find_bin_for_key(root, key)
2707    }
2708
2709    ///
2710    /// Matching  /
2711    ///  calls in cursor positioning.
2712    fn update_bin_pin(
2713        &mut self,
2714        new_bin: Option<
2715            std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
2716        >,
2717    ) {
2718        // Same BIN — nothing to do.
2719        match (&self.current_bin_arc, &new_bin) {
2720            (Some(old), Some(new)) if std::sync::Arc::ptr_eq(old, new) => {
2721                return;
2722            }
2723            _ => {}
2724        }
2725        // Unpin old BIN.
2726        if let Some(old_arc) = self.current_bin_arc.take() {
2727            noxu_tree::Tree::unpin_bin(&old_arc);
2728        }
2729        // Pin new BIN.
2730        if let Some(ref new_arc) = new_bin {
2731            noxu_tree::Tree::pin_bin(new_arc);
2732        }
2733        self.current_bin_arc = new_bin;
2734    }
2735
2736    /// first close.
2737    ///
2738    /// # Returns
2739    ///
2740    ///  always (never fails).
2741    pub fn close(&mut self) -> Result<(), DbiError> {
2742        if self.state == CursorState::Closed {
2743            return Ok(());
2744        }
2745
2746        // Release BIN pin — prevents evictor from seeing a stale cursor_count.
2747        self.update_bin_pin(None);
2748
2749        self.current_key = None;
2750        self.current_data = None;
2751        self.current_lsn = noxu_util::NULL_LSN.as_u64();
2752        self.current_index = -1;
2753        self.state = CursorState::Closed;
2754
2755        Ok(())
2756    }
2757}
2758
2759impl Drop for CursorImpl {
2760    /// Ensures the cursor is closed when dropped.
2761    ///
2762    /// This provides automatic cleanup if the user forgets to explicitly
2763    /// close the cursor. Note that it's still better practice to call
2764    /// close() explicitly to handle potential errors.
2765    fn drop(&mut self) {
2766        if self.state != CursorState::Closed {
2767            let _ = self.close();
2768        }
2769    }
2770}
2771
2772#[cfg(test)]
2773#[expect(clippy::field_reassign_with_default)]
2774mod tests {
2775    use super::*;
2776    use crate::{DatabaseConfig, DatabaseId, DbType};
2777
2778    /// Creates a test DatabaseImpl for cursor testing.
2779    fn create_test_database() -> Arc<RwLock<DatabaseImpl>> {
2780        let db_id = DatabaseId::new(1);
2781        let config = DatabaseConfig::default();
2782        let db_impl = DatabaseImpl::new(
2783            db_id,
2784            "test_db".to_string(),
2785            DbType::User,
2786            &config,
2787        );
2788        Arc::new(RwLock::new(db_impl))
2789    }
2790
2791    #[test]
2792    fn test_new_cursor_not_initialized() {
2793        let db = create_test_database();
2794        let cursor = CursorImpl::new(db, 100);
2795
2796        assert!(!cursor.is_initialized());
2797        assert!(!cursor.is_closed());
2798        assert_eq!(cursor.get_locker_id(), 100);
2799        assert!(cursor.get_current_key().is_none());
2800        assert!(cursor.get_current_data().is_none());
2801    }
2802
2803    #[test]
2804    fn test_search_positions_cursor() {
2805        let db = create_test_database();
2806        let mut cursor = CursorImpl::new(db, 100);
2807
2808        let key = b"test_key";
2809        let data = b"test_data";
2810
2811        // Insert into tree first, then search.
2812        cursor.put(key, data, PutMode::Overwrite).unwrap();
2813        let status = cursor.search(key, Some(data), SearchMode::Set).unwrap();
2814        assert_eq!(status, OperationStatus::Success);
2815        assert!(cursor.is_initialized());
2816        assert_eq!(cursor.get_current_key(), Some(key.as_slice()));
2817        assert_eq!(cursor.get_current_data(), Some(data.as_slice()));
2818    }
2819
2820    #[test]
2821    fn test_get_current_after_search() {
2822        let db = create_test_database();
2823        let mut cursor = CursorImpl::new(db, 100);
2824
2825        let key = b"my_key";
2826        let data = b"my_data";
2827
2828        // Insert into tree first, then search.
2829        cursor.put(key, data, PutMode::Overwrite).unwrap();
2830        cursor.search(key, Some(data), SearchMode::Set).unwrap();
2831        let (ret_key, ret_data) = cursor.get_current().unwrap();
2832
2833        assert_eq!(ret_key, key);
2834        assert_eq!(ret_data, data);
2835    }
2836
2837    #[test]
2838    fn test_get_current_before_initialization() {
2839        let db = create_test_database();
2840        let cursor = CursorImpl::new(db, 100);
2841
2842        let result = cursor.get_current();
2843        assert!(matches!(result, Err(DbiError::CursorNotInitialized)));
2844    }
2845
2846    #[test]
2847    fn test_retrieve_next_from_uninitialized() {
2848        let db = create_test_database();
2849        let mut cursor = CursorImpl::new(db, 100);
2850
2851        let status = cursor.retrieve_next(GetMode::Next).unwrap();
2852        assert_eq!(status, OperationStatus::NotFound);
2853    }
2854
2855    #[test]
2856    fn test_put_overwrite() {
2857        let db = create_test_database();
2858        let mut cursor = CursorImpl::new(db, 100);
2859
2860        let key = b"key1";
2861        let data = b"data1";
2862
2863        let status = cursor.put(key, data, PutMode::Overwrite).unwrap();
2864        assert_eq!(status, OperationStatus::Success);
2865        assert!(cursor.is_initialized());
2866        assert_eq!(cursor.get_current_key(), Some(key.as_slice()));
2867    }
2868
2869    #[test]
2870    fn test_put_no_overwrite_when_key_exists() {
2871        let db = create_test_database();
2872        let mut cursor = CursorImpl::new(db, 100);
2873
2874        let key = b"key1";
2875        let data1 = b"data1";
2876        let data2 = b"data2";
2877
2878        // First put succeeds
2879        cursor.put(key, data1, PutMode::Overwrite).unwrap();
2880
2881        // Second put with NoOverwrite should return KeyExist
2882        let status = cursor.put(key, data2, PutMode::NoOverwrite).unwrap();
2883        assert_eq!(status, OperationStatus::KeyExist);
2884    }
2885
2886    #[test]
2887    fn test_put_current_requires_initialization() {
2888        let db = create_test_database();
2889        let mut cursor = CursorImpl::new(db, 100);
2890
2891        let key = b"key1";
2892        let data = b"data1";
2893
2894        let result = cursor.put(key, data, PutMode::Current);
2895        assert!(matches!(result, Err(DbiError::CursorNotInitialized)));
2896    }
2897
2898    #[test]
2899    fn test_put_current_after_initialization() {
2900        let db = create_test_database();
2901        let mut cursor = CursorImpl::new(db, 100);
2902
2903        let key = b"key1";
2904        let data1 = b"data1";
2905        let data2 = b"data2";
2906
2907        // Insert first, then search to position cursor, then update with Current mode.
2908        cursor.put(key, data1, PutMode::Overwrite).unwrap();
2909        cursor.search(key, Some(data1), SearchMode::Set).unwrap();
2910
2911        // Update with Current mode
2912        let status = cursor.put(key, data2, PutMode::Current).unwrap();
2913        assert_eq!(status, OperationStatus::Success);
2914        assert_eq!(cursor.get_current_data(), Some(data2.as_slice()));
2915    }
2916
2917    #[test]
2918    fn test_delete_requires_initialization() {
2919        let db = create_test_database();
2920        let mut cursor = CursorImpl::new(db, 100);
2921
2922        let result = cursor.delete();
2923        assert!(matches!(result, Err(DbiError::CursorNotInitialized)));
2924    }
2925
2926    #[test]
2927    fn test_delete_resets_state() {
2928        let db = create_test_database();
2929        let mut cursor = CursorImpl::new(db, 100);
2930
2931        let key = b"key1";
2932        let data = b"data1";
2933
2934        // Insert, search to position, then delete.
2935        cursor.put(key, data, PutMode::Overwrite).unwrap();
2936        cursor.search(key, Some(data), SearchMode::Set).unwrap();
2937        assert!(cursor.is_initialized());
2938
2939        // Delete
2940        let status = cursor.delete().unwrap();
2941        assert_eq!(status, OperationStatus::Success);
2942        assert!(!cursor.is_initialized());
2943        assert!(cursor.get_current_key().is_none());
2944    }
2945
2946    #[test]
2947    fn test_dup_with_same_position() {
2948        let db = create_test_database();
2949        let mut cursor = CursorImpl::new(db, 100);
2950
2951        let key = b"key1";
2952        let data = b"data1";
2953
2954        // Insert, search to position, then dup.
2955        cursor.put(key, data, PutMode::Overwrite).unwrap();
2956        cursor.search(key, Some(data), SearchMode::Set).unwrap();
2957
2958        // Duplicate with same position
2959        let dup_cursor = cursor.dup(true).unwrap();
2960        assert!(dup_cursor.is_initialized());
2961        assert_eq!(dup_cursor.get_current_key(), Some(key.as_slice()));
2962        assert_eq!(dup_cursor.get_current_data(), Some(data.as_slice()));
2963        assert_eq!(dup_cursor.get_locker_id(), 100);
2964
2965        // Should have different IDs
2966        assert_ne!(cursor.get_id(), dup_cursor.get_id());
2967    }
2968
2969    #[test]
2970    fn test_dup_without_same_position() {
2971        let db = create_test_database();
2972        let mut cursor = CursorImpl::new(db, 100);
2973
2974        let key = b"key1";
2975        let data = b"data1";
2976
2977        // Insert, search to position, then dup without position.
2978        cursor.put(key, data, PutMode::Overwrite).unwrap();
2979        cursor.search(key, Some(data), SearchMode::Set).unwrap();
2980
2981        // Duplicate without position
2982        let dup_cursor = cursor.dup(false).unwrap();
2983        assert!(!dup_cursor.is_initialized());
2984        assert!(dup_cursor.get_current_key().is_none());
2985        assert_eq!(dup_cursor.get_locker_id(), 100);
2986    }
2987
2988    #[test]
2989    fn test_close_sets_state() {
2990        let db = create_test_database();
2991        let mut cursor = CursorImpl::new(db, 100);
2992
2993        cursor.close().unwrap();
2994        assert!(cursor.is_closed());
2995    }
2996
2997    #[test]
2998    fn test_operations_after_close() {
2999        let db = create_test_database();
3000        let mut cursor = CursorImpl::new(db, 100);
3001
3002        cursor.close().unwrap();
3003
3004        // All operations should return CursorClosed
3005        assert!(matches!(
3006            cursor.search(b"key", None, SearchMode::Set),
3007            Err(DbiError::CursorClosed)
3008        ));
3009        assert!(matches!(cursor.get_current(), Err(DbiError::CursorClosed)));
3010        assert!(matches!(
3011            cursor.retrieve_next(GetMode::Next),
3012            Err(DbiError::CursorClosed)
3013        ));
3014        assert!(matches!(
3015            cursor.put(b"key", b"data", PutMode::Overwrite),
3016            Err(DbiError::CursorClosed)
3017        ));
3018        assert!(matches!(cursor.delete(), Err(DbiError::CursorClosed)));
3019        assert!(matches!(cursor.count(), Err(DbiError::CursorClosed)));
3020        assert!(matches!(cursor.dup(true), Err(DbiError::CursorClosed)));
3021    }
3022
3023    #[test]
3024    fn test_close_idempotent() {
3025        let db = create_test_database();
3026        let mut cursor = CursorImpl::new(db, 100);
3027
3028        cursor.close().unwrap();
3029        cursor.close().unwrap(); // Should not panic
3030        assert!(cursor.is_closed());
3031    }
3032
3033    #[test]
3034    fn test_drop_calls_close() {
3035        let db = create_test_database();
3036        let mut cursor = CursorImpl::new(db.clone(), 100);
3037
3038        let key = b"key1";
3039        let data = b"data1";
3040        cursor.put(key, data, PutMode::Overwrite).unwrap();
3041        cursor.search(key, Some(data), SearchMode::Set).unwrap();
3042
3043        // Drop without explicit close
3044        drop(cursor);
3045
3046        // Create another cursor to verify no issues
3047        let cursor2 = CursorImpl::new(db, 200);
3048        assert!(!cursor2.is_closed());
3049    }
3050
3051    #[test]
3052    fn test_count_returns_one() {
3053        let db = create_test_database();
3054        let mut cursor = CursorImpl::new(db, 100);
3055
3056        let key = b"key1";
3057        let data = b"data1";
3058        cursor.put(key, data, PutMode::Overwrite).unwrap();
3059        cursor.search(key, Some(data), SearchMode::Set).unwrap();
3060
3061        let count = cursor.count().unwrap();
3062        assert_eq!(count, 1);
3063    }
3064
3065    #[test]
3066    fn test_unique_cursor_ids() {
3067        let db = create_test_database();
3068        let cursor1 = CursorImpl::new(db.clone(), 100);
3069        let cursor2 = CursorImpl::new(db.clone(), 100);
3070        let cursor3 = CursorImpl::new(db, 100);
3071
3072        assert_ne!(cursor1.get_id(), cursor2.get_id());
3073        assert_ne!(cursor2.get_id(), cursor3.get_id());
3074        assert_ne!(cursor1.get_id(), cursor3.get_id());
3075    }
3076
3077    // -----------------------------------------------------------------------
3078    // New unit tests for real B-tree traversal (get_first, get_last,
3079    // retrieve_next).
3080    // -----------------------------------------------------------------------
3081
3082    /// get_first on an empty database returns NotFound.
3083    ///
3084    /// positionFirstOrLast on an empty tree.
3085    #[test]
3086    fn test_get_first_empty_tree() {
3087        let db = create_test_database();
3088        let mut cursor = CursorImpl::new(db, 100);
3089        let status = cursor.get_first().unwrap();
3090        assert_eq!(status, OperationStatus::NotFound);
3091    }
3092
3093    /// get_last on an empty database returns NotFound.
3094    #[test]
3095    fn test_get_last_empty_tree() {
3096        let db = create_test_database();
3097        let mut cursor = CursorImpl::new(db, 100);
3098        let status = cursor.get_last().unwrap();
3099        assert_eq!(status, OperationStatus::NotFound);
3100    }
3101
3102    /// get_first positions at smallest key after multiple puts.
3103    #[test]
3104    fn test_get_first_after_multiple_puts() {
3105        let db = create_test_database();
3106        let mut cursor = CursorImpl::new(db, 100);
3107
3108        cursor.put(b"mango", b"m", PutMode::Overwrite).unwrap();
3109        cursor.put(b"apple", b"a", PutMode::Overwrite).unwrap();
3110        cursor.put(b"kiwi", b"k", PutMode::Overwrite).unwrap();
3111
3112        let s = cursor.get_first().unwrap();
3113        assert_eq!(s, OperationStatus::Success);
3114        assert_eq!(cursor.get_current_key(), Some(b"apple".as_slice()));
3115        assert_eq!(cursor.get_current_data(), Some(b"a".as_slice()));
3116    }
3117
3118    /// get_last positions at largest key after multiple puts.
3119    #[test]
3120    fn test_get_last_after_multiple_puts() {
3121        let db = create_test_database();
3122        let mut cursor = CursorImpl::new(db, 100);
3123
3124        cursor.put(b"apple", b"a", PutMode::Overwrite).unwrap();
3125        cursor.put(b"mango", b"m", PutMode::Overwrite).unwrap();
3126        cursor.put(b"kiwi", b"k", PutMode::Overwrite).unwrap();
3127
3128        let s = cursor.get_last().unwrap();
3129        assert_eq!(s, OperationStatus::Success);
3130        assert_eq!(cursor.get_current_key(), Some(b"mango".as_slice()));
3131        assert_eq!(cursor.get_current_data(), Some(b"m".as_slice()));
3132    }
3133
3134    /// retrieve_next(Next) advances forward through the BIN.
3135    ///
3136    #[test]
3137    fn test_retrieve_next_forward() {
3138        let db = create_test_database();
3139        let mut cursor = CursorImpl::new(db, 100);
3140
3141        cursor.put(b"a", b"1", PutMode::Overwrite).unwrap();
3142        cursor.put(b"b", b"2", PutMode::Overwrite).unwrap();
3143        cursor.put(b"c", b"3", PutMode::Overwrite).unwrap();
3144
3145        cursor.get_first().unwrap();
3146        assert_eq!(cursor.get_current_key(), Some(b"a".as_slice()));
3147
3148        let s = cursor.retrieve_next(GetMode::Next).unwrap();
3149        assert_eq!(s, OperationStatus::Success);
3150        assert_eq!(cursor.get_current_key(), Some(b"b".as_slice()));
3151
3152        let s = cursor.retrieve_next(GetMode::Next).unwrap();
3153        assert_eq!(s, OperationStatus::Success);
3154        assert_eq!(cursor.get_current_key(), Some(b"c".as_slice()));
3155
3156        let s = cursor.retrieve_next(GetMode::Next).unwrap();
3157        assert_eq!(s, OperationStatus::NotFound, "should be exhausted");
3158    }
3159
3160    /// retrieve_next(Prev) traverses backward through the BIN.
3161    ///
3162    #[test]
3163    fn test_retrieve_next_backward() {
3164        let db = create_test_database();
3165        let mut cursor = CursorImpl::new(db, 100);
3166
3167        cursor.put(b"a", b"1", PutMode::Overwrite).unwrap();
3168        cursor.put(b"b", b"2", PutMode::Overwrite).unwrap();
3169        cursor.put(b"c", b"3", PutMode::Overwrite).unwrap();
3170
3171        cursor.get_last().unwrap();
3172        assert_eq!(cursor.get_current_key(), Some(b"c".as_slice()));
3173
3174        let s = cursor.retrieve_next(GetMode::Prev).unwrap();
3175        assert_eq!(s, OperationStatus::Success);
3176        assert_eq!(cursor.get_current_key(), Some(b"b".as_slice()));
3177
3178        let s = cursor.retrieve_next(GetMode::Prev).unwrap();
3179        assert_eq!(s, OperationStatus::Success);
3180        assert_eq!(cursor.get_current_key(), Some(b"a".as_slice()));
3181
3182        let s = cursor.retrieve_next(GetMode::Prev).unwrap();
3183        assert_eq!(s, OperationStatus::NotFound, "should be exhausted");
3184    }
3185
3186    /// A single key: get_first succeeds; retrieve_next(Next) returns NotFound.
3187    #[test]
3188    fn test_single_entry_traversal() {
3189        let db = create_test_database();
3190        let mut cursor = CursorImpl::new(db, 100);
3191
3192        cursor.put(b"only", b"val", PutMode::Overwrite).unwrap();
3193
3194        let s = cursor.get_first().unwrap();
3195        assert_eq!(s, OperationStatus::Success);
3196        assert_eq!(cursor.get_current_key(), Some(b"only".as_slice()));
3197
3198        let s = cursor.retrieve_next(GetMode::Next).unwrap();
3199        assert_eq!(s, OperationStatus::NotFound);
3200    }
3201
3202    /// retrieve_next from NotInitialized state returns NotFound (not an error).
3203    ///
3204    /// The: getNext asserts mustBeInitialized; we convert this to
3205    /// NotFound per Rust convention.
3206    #[test]
3207    fn test_retrieve_next_from_not_initialized_returns_not_found() {
3208        let db = create_test_database();
3209        let mut cursor = CursorImpl::new(db, 100);
3210
3211        let s = cursor.retrieve_next(GetMode::Next).unwrap();
3212        assert_eq!(s, OperationStatus::NotFound);
3213    }
3214
3215    /// put + NoOverwrite returns KeyExist when key is already in the tree.
3216    #[test]
3217    fn test_put_no_overwrite_tree_check() {
3218        let db = create_test_database();
3219        let mut cursor = CursorImpl::new(db, 100);
3220
3221        cursor.put(b"key", b"v1", PutMode::Overwrite).unwrap();
3222        let s = cursor.put(b"key", b"v2", PutMode::NoOverwrite).unwrap();
3223        assert_eq!(s, OperationStatus::KeyExist);
3224
3225        // Verify original value is still there.
3226        cursor.search(b"key", None, SearchMode::Set).unwrap();
3227        let (_, data) = cursor.get_current().unwrap();
3228        assert_eq!(data, b"v1");
3229    }
3230
3231    /// After delete the tree no longer contains the key (search returns NotFound).
3232    #[test]
3233    fn test_delete_removes_from_tree() {
3234        let db = create_test_database();
3235        let mut cursor = CursorImpl::new(db, 100);
3236
3237        cursor.put(b"key", b"val", PutMode::Overwrite).unwrap();
3238        cursor.search(b"key", None, SearchMode::Set).unwrap();
3239        cursor.delete().unwrap();
3240
3241        let s = cursor.search(b"key", None, SearchMode::Set).unwrap();
3242        assert_eq!(s, OperationStatus::NotFound);
3243    }
3244
3245    /// Range search: positions at the first key >= search key.
3246    #[test]
3247    fn test_search_set_range_finds_ge_key() {
3248        let db = create_test_database();
3249        let mut cursor = CursorImpl::new(db, 100);
3250
3251        cursor.put(b"aaa", b"1", PutMode::Overwrite).unwrap();
3252        cursor.put(b"bbb", b"2", PutMode::Overwrite).unwrap();
3253        cursor.put(b"ccc", b"3", PutMode::Overwrite).unwrap();
3254
3255        // Search for "bb" (not present) — should land on "bbb".
3256        let s = cursor.search(b"bb", None, SearchMode::SetRange).unwrap();
3257        assert_eq!(s, OperationStatus::Success);
3258        assert_eq!(cursor.get_current_key(), Some(b"bbb".as_slice()));
3259    }
3260
3261    /// Range search beyond all keys returns NotFound.
3262    #[test]
3263    fn test_search_set_range_beyond_all_keys() {
3264        let db = create_test_database();
3265        let mut cursor = CursorImpl::new(db, 100);
3266
3267        cursor.put(b"aaa", b"1", PutMode::Overwrite).unwrap();
3268        cursor.put(b"bbb", b"2", PutMode::Overwrite).unwrap();
3269
3270        let s = cursor.search(b"zzz", None, SearchMode::SetRange).unwrap();
3271        assert_eq!(s, OperationStatus::NotFound);
3272    }
3273
3274    // -----------------------------------------------------------------------
3275    // Sorted-duplicate key tests
3276    // -----------------------------------------------------------------------
3277
3278    fn create_dup_database() -> Arc<RwLock<DatabaseImpl>> {
3279        let db_id = DatabaseId::new(2);
3280        let mut config = DatabaseConfig::default();
3281        config.sorted_duplicates = true;
3282        let db_impl = DatabaseImpl::new(
3283            db_id,
3284            "dup_test_db".to_string(),
3285            DbType::User,
3286            &config,
3287        );
3288        Arc::new(RwLock::new(db_impl))
3289    }
3290
3291    /// Basic put + get_current round-trip for sorted-dup database.
3292    ///
3293    /// `DupKeyDataTest.testCombineSplit()`.
3294    #[test]
3295    fn test_dup_put_and_get_current() {
3296        let db = create_dup_database();
3297        let mut cursor = CursorImpl::new(db, 1);
3298
3299        let s = cursor.put(b"key", b"data", PutMode::Overwrite).unwrap();
3300        assert_eq!(s, OperationStatus::Success);
3301
3302        let (pk, d) = cursor.get_current().unwrap();
3303        assert_eq!(pk, b"key");
3304        assert_eq!(d, b"data");
3305    }
3306
3307    /// Multiple data values for the same primary key.
3308    ///
3309    /// `SortedDuplicatesTest.testMultipleDups()`.
3310    #[test]
3311    fn test_dup_multiple_data_per_key() {
3312        let db = create_dup_database();
3313        let mut cursor = CursorImpl::new(db, 1);
3314
3315        cursor.put(b"key", b"aaa", PutMode::Overwrite).unwrap();
3316        cursor.put(b"key", b"bbb", PutMode::Overwrite).unwrap();
3317        cursor.put(b"key", b"ccc", PutMode::Overwrite).unwrap();
3318
3319        // search Set: positions at the first entry for "key"
3320        let s = cursor.search(b"key", None, SearchMode::Set).unwrap();
3321        assert_eq!(s, OperationStatus::Success);
3322
3323        let (pk, d) = cursor.get_current().unwrap();
3324        assert_eq!(pk, b"key");
3325        assert_eq!(d, b"aaa", "first dup should have smallest data");
3326    }
3327
3328    /// search Both: positions at the exact (key, data) pair.
3329    ///
3330    /// `CursorImpl.searchBothExact()` dup path.
3331    #[test]
3332    fn test_dup_search_both_exact() {
3333        let db = create_dup_database();
3334        let mut cursor = CursorImpl::new(db, 1);
3335
3336        cursor.put(b"key", b"aaa", PutMode::Overwrite).unwrap();
3337        cursor.put(b"key", b"bbb", PutMode::Overwrite).unwrap();
3338        cursor.put(b"key", b"ccc", PutMode::Overwrite).unwrap();
3339
3340        let s = cursor.search(b"key", Some(b"bbb"), SearchMode::Both).unwrap();
3341        assert_eq!(s, OperationStatus::Success);
3342        let (pk, d) = cursor.get_current().unwrap();
3343        assert_eq!(pk, b"key");
3344        assert_eq!(d, b"bbb");
3345    }
3346
3347    /// search Both: returns NotFound when exact pair doesn't exist.
3348    #[test]
3349    fn test_dup_search_both_not_found() {
3350        let db = create_dup_database();
3351        let mut cursor = CursorImpl::new(db, 1);
3352
3353        cursor.put(b"key", b"aaa", PutMode::Overwrite).unwrap();
3354
3355        let s = cursor.search(b"key", Some(b"zzz"), SearchMode::Both).unwrap();
3356        assert_eq!(s, OperationStatus::NotFound);
3357    }
3358
3359    /// NoDupData returns KeyExist when exact (key, data) already stored.
3360    ///
3361    /// `SortedDuplicatesTest.testNoDupData()`.
3362    #[test]
3363    fn test_dup_no_dup_data_returns_key_exist() {
3364        let db = create_dup_database();
3365        let mut cursor = CursorImpl::new(db, 1);
3366
3367        cursor.put(b"key", b"val", PutMode::Overwrite).unwrap();
3368
3369        let s = cursor.put(b"key", b"val", PutMode::NoDupData).unwrap();
3370        assert_eq!(s, OperationStatus::KeyExist);
3371    }
3372
3373    /// NoDupData succeeds for a different data value under the same key.
3374    #[test]
3375    fn test_dup_no_dup_data_different_data_ok() {
3376        let db = create_dup_database();
3377        let mut cursor = CursorImpl::new(db, 1);
3378
3379        cursor.put(b"key", b"val1", PutMode::Overwrite).unwrap();
3380
3381        let s = cursor.put(b"key", b"val2", PutMode::NoDupData).unwrap();
3382        assert_eq!(s, OperationStatus::Success);
3383    }
3384
3385    /// NextDup traversal visits all dups of the current primary key.
3386    ///
3387    /// `CursorImpl.getNext(GetMode.NEXT_DUP)` path.
3388    #[test]
3389    fn test_dup_next_dup_traversal() {
3390        let db = create_dup_database();
3391        let mut cursor = CursorImpl::new(db, 1);
3392
3393        cursor.put(b"key", b"a", PutMode::Overwrite).unwrap();
3394        cursor.put(b"key", b"b", PutMode::Overwrite).unwrap();
3395        cursor.put(b"key", b"c", PutMode::Overwrite).unwrap();
3396        // Different primary key — should NOT appear in NextDup.
3397        cursor.put(b"zzz", b"x", PutMode::Overwrite).unwrap();
3398
3399        // Position at first dup.
3400        cursor.search(b"key", None, SearchMode::Set).unwrap();
3401        let (_, d) = cursor.get_current().unwrap();
3402        assert_eq!(d, b"a");
3403
3404        let s = cursor.retrieve_next(GetMode::NextDup).unwrap();
3405        assert_eq!(s, OperationStatus::Success);
3406        let (pk, d) = cursor.get_current().unwrap();
3407        assert_eq!(pk, b"key");
3408        assert_eq!(d, b"b");
3409
3410        let s = cursor.retrieve_next(GetMode::NextDup).unwrap();
3411        assert_eq!(s, OperationStatus::Success);
3412        let (_, d) = cursor.get_current().unwrap();
3413        assert_eq!(d, b"c");
3414
3415        // No more dups for "key".
3416        let s = cursor.retrieve_next(GetMode::NextDup).unwrap();
3417        assert_eq!(s, OperationStatus::NotFound);
3418    }
3419
3420    /// NextNoDup skips all dups of the current primary key.
3421    ///
3422    /// `CursorImpl.getNext(GetMode.NEXT_NO_DUP)`.
3423    #[test]
3424    fn test_dup_next_no_dup_skips_dups() {
3425        let db = create_dup_database();
3426        let mut cursor = CursorImpl::new(db, 1);
3427
3428        cursor.put(b"aaa", b"1", PutMode::Overwrite).unwrap();
3429        cursor.put(b"aaa", b"2", PutMode::Overwrite).unwrap();
3430        cursor.put(b"bbb", b"x", PutMode::Overwrite).unwrap();
3431
3432        // Position at first entry for "aaa".
3433        cursor.search(b"aaa", None, SearchMode::Set).unwrap();
3434        let (pk, _) = cursor.get_current().unwrap();
3435        assert_eq!(pk, b"aaa");
3436
3437        // NextNoDup should skip "aaa" dups and land on "bbb".
3438        let s = cursor.retrieve_next(GetMode::NextNoDup).unwrap();
3439        assert_eq!(s, OperationStatus::Success);
3440        let (pk, d) = cursor.get_current().unwrap();
3441        assert_eq!(pk, b"bbb");
3442        assert_eq!(d, b"x");
3443    }
3444
3445    /// Dup delete removes only the specific (key, data) pair.
3446    ///
3447    /// `SortedDuplicatesTest.testDeleteDup()`.
3448    #[test]
3449    fn test_dup_delete_specific_pair() {
3450        let db = create_dup_database();
3451        let mut cursor = CursorImpl::new(db, 1);
3452
3453        cursor.put(b"key", b"a", PutMode::Overwrite).unwrap();
3454        cursor.put(b"key", b"b", PutMode::Overwrite).unwrap();
3455
3456        // Position at "key"/"b" and delete it.
3457        cursor.search(b"key", Some(b"b"), SearchMode::Both).unwrap();
3458        cursor.delete().unwrap();
3459
3460        // "key"/"a" should still exist.
3461        let s = cursor.search(b"key", None, SearchMode::Set).unwrap();
3462        assert_eq!(s, OperationStatus::Success);
3463        let (pk, d) = cursor.get_current().unwrap();
3464        assert_eq!(pk, b"key");
3465        assert_eq!(d, b"a");
3466
3467        // "key"/"b" should be gone.
3468        let s = cursor.search(b"key", Some(b"b"), SearchMode::Both).unwrap();
3469        assert_eq!(s, OperationStatus::NotFound);
3470    }
3471
3472    /// Dup prefix-ambiguity ordering is correct.
3473    ///
3474    ///
3475    /// Key "a" data "bc" must sort before key "ab" data "c".
3476    #[test]
3477    fn test_dup_ordering_prefix_ambiguity() {
3478        let db = create_dup_database();
3479        let mut cursor = CursorImpl::new(db, 1);
3480
3481        // "ab"/"c" inserted first to stress comparator.
3482        cursor.put(b"ab", b"c", PutMode::Overwrite).unwrap();
3483        cursor.put(b"a", b"bc", PutMode::Overwrite).unwrap();
3484
3485        // Forward scan should give ("a","bc") then ("ab","c").
3486        cursor.get_first().unwrap();
3487        let (pk, d) = cursor.get_current().unwrap();
3488        assert_eq!(pk, b"a");
3489        assert_eq!(d, b"bc");
3490
3491        cursor.retrieve_next(GetMode::Next).unwrap();
3492        let (pk, d) = cursor.get_current().unwrap();
3493        assert_eq!(pk, b"ab");
3494        assert_eq!(d, b"c");
3495    }
3496
3497    // -----------------------------------------------------------------------
3498    // Cross-BIN cursor traversal test
3499    // -----------------------------------------------------------------------
3500
3501    /// Full forward scan visits all 200 entries across multiple BINs in sorted
3502    /// order.
3503    ///
3504    /// We use a DatabaseImpl whose underlying Tree is created with a small
3505    /// `max_entries_per_node` (4) so that 200 inserts force many splits and
3506    /// fill multiple BINs.  The cursor must cross every BIN boundary without
3507    /// losing any entry.
3508    ///
3509    /// CursorImplTest multi-BIN scan: insert N records, open
3510    /// cursor at first, call getNext() until NotFound, assert count == N and
3511    /// keys are in ascending order.
3512    #[test]
3513    fn test_full_scan_crosses_multiple_bins() {
3514        // Build a database with a small node fanout (4) so 200 inserts force
3515        // many BIN splits.  DatabaseConfig::node_max_entries controls the
3516        // Tree::max_entries_per_node passed to Tree::new().
3517        let db_id = DatabaseId::new(42);
3518        let mut config = DatabaseConfig::default();
3519        config.set_node_max_entries(4); // tiny fanout → many BINs
3520        let db_impl = DatabaseImpl::new(
3521            db_id,
3522            "scan_test".to_string(),
3523            DbType::User,
3524            &config,
3525        );
3526        let db = Arc::new(RwLock::new(db_impl));
3527
3528        const N: usize = 200;
3529
3530        // Insert 200 entries with zero-padded decimal keys so lexicographic
3531        // order == numeric order.
3532        {
3533            let mut cursor = CursorImpl::new(db.clone(), 1);
3534            for i in 0..N {
3535                let key = format!("{:08}", i).into_bytes();
3536                let val = format!("v{}", i).into_bytes();
3537                let s = cursor.put(&key, &val, PutMode::Overwrite).unwrap();
3538                assert_eq!(s, OperationStatus::Success, "put {} failed", i);
3539            }
3540        }
3541
3542        // Forward scan: get_first + repeated get_next.
3543        let mut cursor = CursorImpl::new(db.clone(), 2);
3544        let s = cursor.get_first().unwrap();
3545        assert_eq!(s, OperationStatus::Success, "get_first should succeed");
3546
3547        let mut visited: Vec<Vec<u8>> = Vec::new();
3548        visited.push(cursor.get_current_key().unwrap().to_vec());
3549
3550        loop {
3551            let s = cursor.retrieve_next(GetMode::Next).unwrap();
3552            match s {
3553                OperationStatus::Success => {
3554                    visited.push(cursor.get_current_key().unwrap().to_vec());
3555                }
3556                OperationStatus::NotFound => break,
3557                other => panic!("unexpected status {:?}", other),
3558            }
3559        }
3560
3561        assert_eq!(
3562            visited.len(),
3563            N,
3564            "full scan must visit exactly {} entries, got {}",
3565            N,
3566            visited.len()
3567        );
3568
3569        // Verify keys are in ascending (sorted) order.
3570        for i in 1..visited.len() {
3571            assert!(
3572                visited[i - 1] < visited[i],
3573                "keys out of order at position {}: {:?} >= {:?}",
3574                i,
3575                std::str::from_utf8(&visited[i - 1]).unwrap_or("?"),
3576                std::str::from_utf8(&visited[i]).unwrap_or("?"),
3577            );
3578        }
3579
3580        // Backward scan: get_last + repeated get_prev.
3581        let mut cursor_back = CursorImpl::new(db, 3);
3582        let s = cursor_back.get_last().unwrap();
3583        assert_eq!(s, OperationStatus::Success, "get_last should succeed");
3584
3585        let mut visited_back: Vec<Vec<u8>> = Vec::new();
3586        visited_back.push(cursor_back.get_current_key().unwrap().to_vec());
3587
3588        loop {
3589            let s = cursor_back.retrieve_next(GetMode::Prev).unwrap();
3590            match s {
3591                OperationStatus::Success => {
3592                    visited_back
3593                        .push(cursor_back.get_current_key().unwrap().to_vec());
3594                }
3595                OperationStatus::NotFound => break,
3596                other => panic!("unexpected backward status {:?}", other),
3597            }
3598        }
3599
3600        assert_eq!(
3601            visited_back.len(),
3602            N,
3603            "backward scan must visit exactly {} entries, got {}",
3604            N,
3605            visited_back.len()
3606        );
3607
3608        // Backward scan should be the reverse of forward scan.
3609        let mut visited_back_rev = visited_back.clone();
3610        visited_back_rev.reverse();
3611        assert_eq!(
3612            visited_back_rev, visited,
3613            "backward scan reversed must equal forward scan"
3614        );
3615    }
3616}