noxu_dbi/cursor_impl.rs
1//! Internal cursor implementation.
2//!
3//!
4//! The core traversal logic mirrors `CursorImpl.getNext()` (line 2546):
5//!
6//! ```text
7//! while (bin != null) {
8//! latchBIN();
9//! if (forward ? ++index < nEntries : --index >= 0) {
10//! if record is valid: return it
11//! } else {
12//! bin = tree.getNextBin(anchorBIN) or tree.getPrevBin(anchorBIN)
13//! index = -1 (or nEntries for backward)
14//! }
15//! }
16//! ```
17//!
18//! Cross-BIN traversal is implemented: when the current BIN is exhausted,
19//! `retrieve_next` calls `Tree::get_next_bin` / `Tree::get_prev_bin` to move
20//! to the adjacent BIN and continues iteration there.
21
22#[cfg(any(test, feature = "testing"))]
23use std::cell::Cell;
24use std::sync::Arc;
25use std::sync::Mutex;
26use std::sync::atomic::{AtomicI64, Ordering};
27
28use bytes::BytesMut;
29use noxu_log::{LogEntryType, LogManager, Provisional, entry::LnLogEntry};
30use noxu_tree::{BinEntry, Tree};
31use noxu_txn::{LockManager, LockType, Locker, Txn};
32
33use crate::dup_key_data;
34use crate::throughput_stats::ThroughputStats;
35use noxu_sync::RwLock;
36use noxu_util::{Lsn, vlsn::NULL_VLSN};
37
38use crate::{
39 DbiError, GetMode, OperationStatus, PutMode, SearchMode,
40 database_impl::DatabaseImpl,
41};
42
43/// Cursor states.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45enum CursorState {
46 NotInitialized,
47 Initialized,
48 Closed,
49}
50
51/// Result flags for cursor search operations.
52pub const FOUND: u32 = 0x1;
53pub const EXACT_KEY: u32 = 0x2;
54pub const FOUND_LAST: u32 = 0x4;
55
56/// Unique cursor ID generator.
57static NEXT_CURSOR_ID: AtomicI64 = AtomicI64::new(1);
58
59// Test-only hook: countdown to forced cursor failure.
60//
61// When the countdown is N (> 0), each `check_state`/`check_initialized` call
62// decrements it by 1. When it reaches 1 the decrement fires, it resets to 0,
63// and the call returns `Err(DbiError::CursorClosed)`.
64//
65// `set_cursor_fail_after(1)` => fail on the next check (the 1st call).
66// `set_cursor_fail_after(2)` => skip the 1st check, fail on the 2nd call.
67//
68// This lets `noxu-db` tests exercise both `map_err` closures inside a single
69// `Database` method (e.g. `get()` has one closure on `search` and another on
70// `get_current`).
71#[cfg(any(test, feature = "testing"))]
72thread_local! {
73 static CURSOR_FAIL_COUNTDOWN: Cell<u32> = const { Cell::new(0) };
74}
75
76/// Set countdown so the Nth cursor-check call returns `DbiError::CursorClosed`.
77/// `n = 1` → fail immediately on the next check.
78/// Only available in test/testing builds.
79#[cfg(any(test, feature = "testing"))]
80pub fn set_cursor_fail_after(n: u32) {
81 CURSOR_FAIL_COUNTDOWN.with(|c| c.set(n));
82}
83
84/// Clear the cursor fail countdown (idempotent).
85#[cfg(any(test, feature = "testing"))]
86pub fn clear_cursor_fail_flag() {
87 CURSOR_FAIL_COUNTDOWN.with(|c| c.set(0));
88}
89
90/// Decrement the countdown and return `true` if this call should fail.
91#[cfg(any(test, feature = "testing"))]
92fn tick_fail() -> bool {
93 CURSOR_FAIL_COUNTDOWN.with(|c| {
94 let v = c.get();
95 if v == 0 {
96 false
97 } else if v == 1 {
98 c.set(0);
99 true
100 } else {
101 c.set(v - 1);
102 false
103 }
104 })
105}
106
107/// The internal implementation of a database cursor.
108///
109/// A CursorImpl tracks a position in a database and provides
110/// get/put/delete operations. The cursor state machine ensures
111/// proper initialization before operations.
112///
113/// a cursor tracks its position via a BIN reference and slot index.
114/// This implementation wires cursor traversal to `noxu_tree::Tree`:
115///
116/// * `get_first` / `get_last` — use `Tree::get_first_node()` /
117/// `Tree::get_last_node()`.
118/// * `retrieve_next` — increments `current_index` within the BIN and, when
119/// the BIN is exhausted, calls `Tree::get_next_bin()` /
120/// `Tree::get_prev_bin()` to cross BIN boundaries
121/// `CursorImpl.getNext()`).
122/// * `search` — uses `Tree::search()` to locate the exact key.
123/// * `put` / `delete` — mutate the tree in-place using `Tree::insert()` /
124/// `Tree::delete()`.
125///
126/// (4096 lines in 7.5.11).
127pub struct CursorImpl {
128 /// Unique cursor ID (for debugging and hashCode).
129 id: i64,
130 /// The database this cursor operates on.
131 db_impl: Arc<RwLock<DatabaseImpl>>,
132 /// The locker (transaction or auto-commit) for this cursor.
133 locker_id: i64,
134 /// Current cursor state.
135 state: CursorState,
136
137 /// Current position: the key at the cursor's position.
138 current_key: Option<Vec<u8>>,
139 /// Current position: the data at the cursor's position.
140 current_data: Option<Vec<u8>>,
141 /// Current position: the LSN of the record.
142 current_lsn: u64,
143 /// Current position: the BIN index (slot in the current BIN).
144 ///
145 /// In this is `CursorImpl.index`. -1 means "before first entry".
146 current_index: i32,
147
148 /// The BIN Arc the cursor is currently pinned to, if any.
149 ///
150 /// Increments `BinStub.cursor_count` via `Tree::pin_bin()` so the
151 /// evictor skips this BIN while the cursor is positioned on it.
152 /// Cleared (and unpinned) when the cursor is closed or moves to a new BIN.
153 current_bin_arc: Option<
154 std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
155 >,
156
157 /// Write-ahead log manager for recording data operations.
158 /// None for read-only cursors or cursors created outside a real env.
159 log_manager: Option<Arc<LogManager>>,
160 /// Cached environment-invalidity flag (X-13).
161 ///
162 /// Cloned from `EnvironmentImpl::is_invalid_flag()` at cursor open time
163 /// so `check_state()` can detect a failed environment without locking.
164 /// `None` for cursors constructed outside a real environment (unit tests).
165 env_invalid: Option<Arc<std::sync::atomic::AtomicBool>>,
166 /// Lock manager for per-record read/write locking.
167 /// None for cursors created outside a real env (e.g., unit tests).
168 ///
169 /// `CursorImpl.locker` — the locker calls `locker.lock(lsn,
170 /// LockType.READ, ...)` via `lockLN()` before returning each record.
171 lock_manager: Option<Arc<LockManager>>,
172
173 /// Optional explicit transaction backing this cursor.
174 ///
175 /// When `Some`, write operations acquire locks via the `Txn` and record
176 /// `WriteLockInfo` (abort before-images) so the transaction can undo each
177 /// modification on abort.
178 ///
179 /// When `None` (auto-commit), write locks are acquired directly from
180 /// `lock_manager` using the cursor's own `id` as the locker and released
181 /// immediately after the write is logged (auto-commit semantics).
182 ///
183 /// (Txn subtype).
184 txn_ref: Option<Arc<Mutex<Txn>>>,
185 /// Throughput counters shared with all cursors on this database.
186 throughput: Arc<ThroughputStats>,
187}
188
189impl CursorImpl {
190 /// Creates a new CursorImpl for the given database.
191 ///
192 /// The cursor is initially in the NotInitialized state and must be
193 /// positioned via a search operation before get/put/delete operations
194 /// can be performed.
195 ///
196 /// # Arguments
197 ///
198 /// * `db_impl` - The database implementation this cursor operates on
199 /// * `locker_id` - The locker (transaction) ID for this cursor
200 pub fn new(db_impl: Arc<RwLock<DatabaseImpl>>, locker_id: i64) -> Self {
201 let throughput = db_impl.read().throughput.clone();
202 CursorImpl {
203 id: NEXT_CURSOR_ID.fetch_add(1, Ordering::Relaxed),
204 db_impl,
205 locker_id,
206 state: CursorState::NotInitialized,
207 current_key: None,
208 current_data: None,
209 current_lsn: noxu_util::NULL_LSN.as_u64(),
210 current_index: -1,
211 current_bin_arc: None,
212 log_manager: None,
213 env_invalid: None,
214 lock_manager: None,
215 txn_ref: None,
216 throughput,
217 }
218 }
219
220 /// Creates a new CursorImpl wired to a WAL.
221 ///
222 /// Write operations (`put`, `delete`) will record `LnLogEntry` entries in
223 /// the provided `LogManager` before mutating the in-memory tree.
224 pub fn with_log_manager(
225 db_impl: Arc<RwLock<DatabaseImpl>>,
226 locker_id: i64,
227 log_manager: Arc<LogManager>,
228 ) -> Self {
229 let throughput = db_impl.read().throughput.clone();
230 CursorImpl {
231 id: NEXT_CURSOR_ID.fetch_add(1, Ordering::Relaxed),
232 db_impl,
233 locker_id,
234 state: CursorState::NotInitialized,
235 current_key: None,
236 current_data: None,
237 current_lsn: noxu_util::NULL_LSN.as_u64(),
238 current_index: -1,
239 current_bin_arc: None,
240 log_manager: Some(log_manager),
241 env_invalid: None,
242 lock_manager: None,
243 txn_ref: None,
244 throughput,
245 }
246 }
247
248 /// Wires the environment-invalidity flag for hot-path validity checks.
249 ///
250 /// Stores a clone of `EnvironmentImpl::is_invalid_flag()` so that
251 /// `check_state()` can detect a failed environment on every cursor
252 /// operation without acquiring the environment lock. X-13 fix.
253 pub fn with_env_invalid(
254 mut self,
255 flag: Arc<std::sync::atomic::AtomicBool>,
256 ) -> Self {
257 self.env_invalid = Some(flag);
258 self
259 }
260
261 /// Wires a lock manager for per-record locking.
262 ///
263 /// `CursorImpl` receiving a `Locker` from
264 /// `DatabaseImpl.openCursor()`. Returns `self` for builder-style chaining.
265 pub fn with_lock_manager(mut self, lock_manager: Arc<LockManager>) -> Self {
266 self.lock_manager = Some(lock_manager);
267 self
268 }
269
270 /// Wires an explicit transaction for write-lock tracking.
271 ///
272 /// When set, write operations (`put`, `delete`) acquire WRITE locks via
273 /// the `Txn` and record abort before-images in `WriteLockInfo`, enabling
274 /// transaction rollback.
275 ///
276 /// Being constructed with a `Txn` locker.
277 /// Returns `self` for builder-style chaining.
278 pub fn with_txn(mut self, txn: Arc<Mutex<Txn>>) -> Self {
279 self.txn_ref = Some(txn);
280 self
281 }
282
283 /// Setter equivalent of [`Self::with_txn`] for callers that need to
284 /// attach a `Txn` to an already-built cursor (e.g. `Database::with_auto_txn`
285 /// which constructs the cursor first, then wires the synthetic auto-txn).
286 pub fn attach_txn(&mut self, txn: Arc<Mutex<Txn>>) {
287 self.txn_ref = Some(txn);
288 }
289
290 /// Gets the before-image (old_data, old_lsn) for `key` from the tree.
291 ///
292 /// Returns `(None, NULL_LSN)` if the key does not exist (new insert).
293 fn get_slot_before_image(&self, key: &[u8]) -> (Option<Vec<u8>>, u64) {
294 let db = self.db_impl.read();
295 if let Some(tree) = db.get_real_tree() {
296 match Self::get_data_from_tree(&tree, key) {
297 Some((data, lsn)) => (Some(data), lsn),
298 None => (None, noxu_util::NULL_LSN.as_u64()),
299 }
300 } else {
301 (None, noxu_util::NULL_LSN.as_u64())
302 }
303 }
304
305 /// Returns true if `key` exists in the committed tree.
306 ///
307 /// `CursorImpl.isPresent()` / lock-check path: with lock-based
308 /// isolation, writes go directly to the BIN, so the tree reflects the
309 /// current committed-or-locked state. Callers that need to check
310 /// existence before a `NoOverwrite`/`NoDupData` insert consult the tree
311 /// directly; if a concurrent writer holds a WRITE lock the subsequent
312 /// `lock_ln()` call will block until that writer commits or aborts.
313 fn key_exists_in_view(&self, key: &[u8]) -> bool {
314 let db = self.db_impl.read();
315 if let Some(tree) = db.get_real_tree() {
316 tree.search(key).map(|sr| sr.exact_parent_found).unwrap_or(false)
317 } else {
318 false
319 }
320 }
321
322 /// Inserts or updates `key`/`data` at `new_lsn` in the B-tree.
323 ///
324 /// `CursorImpl.insertRecordInternal()` / `bin.updateEntry()`:
325 /// writes go directly to the BIN immediately. Read-committed isolation
326 /// is enforced by the lock manager — concurrent readers block on the
327 /// WRITE lock held by this cursor's txn until it commits or aborts.
328 ///
329 /// When the tree reports a **new** insert (`is_new == true`), increments
330 /// the per-database entry count.
331 fn apply_tree_insert(&self, key: Vec<u8>, data: Vec<u8>, new_lsn: Lsn) {
332 let db = self.db_impl.read();
333 if let Some(tree) = db.get_real_tree()
334 && let Ok(is_new) = tree.insert(key, data, new_lsn)
335 && is_new
336 {
337 db.increment_entry_count();
338 }
339 }
340
341 /// Deletes `key` from the B-tree.
342 ///
343 /// `CursorImpl.deleteCurrentRecord()` / `bin.deleteEntry()`:
344 /// the deletion is applied to the BIN immediately. Concurrent readers
345 /// that try to acquire a READ lock on the deleted slot's LSN block until
346 /// the writer's WRITE lock is released (commit or abort).
347 ///
348 /// When the tree confirms the key was actually removed (`deleted == true`),
349 /// decrements the per-database entry count (.
350 /// counter).
351 fn apply_tree_delete(&self, key: Vec<u8>, _del_lsn: Lsn) {
352 let db = self.db_impl.read();
353 if let Some(tree) = db.get_real_tree()
354 && tree.delete(&key)
355 {
356 db.decrement_entry_count();
357 }
358 }
359
360 /// Acquires a WRITE lock for an upcoming write to `key` whose current
361 /// slot LSN is `old_lsn`.
362 ///
363 /// For txn-backed cursors, calls `Txn::lock()` (lock persists until commit/abort).
364 /// For auto-commit cursors (lock_manager only, no txn), uses cursor `id`
365 /// as the locker.
366 ///
367 /// # NULL-LSN insert race coordination
368 ///
369 /// When `old_lsn == NULL_LSN` the record does not yet exist (a brand-new
370 /// insert). Pre-Wave-1A this method returned early in that case, so two
371 /// concurrent auto-commit inserts of the same brand-new key did not
372 /// coordinate through the lock manager — the underlying B+tree latching
373 /// in `noxu-tree` serialised them safely but the deadlock detector could
374 /// not reason about the conflict, and `put_no_overwrite` reported
375 /// `KeyExist` instead of a typed lock-conflict. This is the first F12
376 /// residual.
377 ///
378 /// We now acquire a write lock on a synthetic, key-coordination LSN
379 /// derived from `(db_id, key)` via [`noxu_util::Lsn::synthetic_key_lock_id`].
380 /// The lock lives in the reserved transient-LSN space so it cannot
381 /// collide with a real WAL LSN, and is held until the wrapping txn
382 /// (synthetic auto-txn or explicit txn) commits or aborts — at which
383 /// point a second concurrent inserter for the same key unblocks and
384 /// observes the result of the first insert.
385 ///
386 /// Auto-commit cursors without a `txn_ref` (legacy callers that have
387 /// not been ported to `TxnManager::begin_auto_txn` yet) acquire and
388 /// immediately release the synthetic lock; this still serialises them
389 /// through the lock manager but does not record the conflict on a
390 /// locker for deadlock-detector reasoning. Database::put / delete on
391 /// `txn = None` always wraps in a synthetic auto-txn, so this fallback
392 /// is exercised only by the legacy direct-CursorImpl construction.
393 fn lock_write_before_log(
394 &self,
395 old_lsn: u64,
396 key: &[u8],
397 ) -> Result<(), DbiError> {
398 let null = noxu_util::NULL_LSN.as_u64();
399 let lsn_to_lock = if old_lsn == null {
400 // Brand-new insert: coordinate via a synthetic key lock so
401 // concurrent inserts of the same key serialise through the
402 // lock manager.
403 let db_id = self.db_impl.read().get_id().id() as u64;
404 Lsn::synthetic_key_lock_id(db_id, key)
405 } else {
406 old_lsn
407 };
408 if let Some(txn) = &self.txn_ref {
409 txn.lock()
410 .unwrap()
411 .lock(lsn_to_lock, LockType::Write, false)
412 .map_err(DbiError::TxnError)?;
413 } else if let Some(lm) = &self.lock_manager {
414 lm.lock(lsn_to_lock, self.id, LockType::Write, false, false)
415 .map_err(DbiError::TxnError)?;
416 // Legacy auto-commit (no synthetic auto-txn): release the
417 // synthetic key-coordination lock immediately for new inserts
418 // so subsequent inserts can proceed. For real (non-NULL)
419 // old_lsn, `finalize_write_lock` releases below.
420 if old_lsn == null {
421 let _ = lm.release(lsn_to_lock, self.id);
422 }
423 }
424 Ok(())
425 }
426
427 /// Acquires a synthetic-key write lock for the given key.
428 ///
429 /// Wave 5 / SR9752 / CursorEdgeTest.testReadDeletedUncommitted:
430 /// in-flight deletes physically remove the BIN slot via
431 /// `tree.delete()`, so a concurrent reader looking up the same
432 /// key sees `NotFound` without ever consulting the lock manager
433 /// for the slot's pre-delete LSN. This violates JE's contract:
434 /// uncommitted deletes are dirty data and a no-wait reader must
435 /// see `LockNotAvailable`, blocking readers must wait until the
436 /// deleter commits.
437 ///
438 /// To restore that invariant without rewriting the BIN's
439 /// physical-removal model, the deleter ALSO holds a synthetic-key
440 /// write lock for the duration of the txn. Readers that probe
441 /// the BIN and find no matching key call
442 /// [`Self::contest_synthetic_key_for_missing_read`] which
443 /// attempts a read-lock on the same synthetic-key id; the
444 /// uncontested case is one extra lock-manager round-trip and
445 /// the contested case surfaces the lock conflict to the caller.
446 fn lock_synthetic_key_for_delete(
447 &self,
448 key: &[u8],
449 ) -> Result<(), DbiError> {
450 let db_id = self.db_impl.read().get_id().id() as u64;
451 let synthetic_lsn = Lsn::synthetic_key_lock_id(db_id, key);
452 if let Some(txn) = &self.txn_ref {
453 // Held until commit/abort — readers contending on the
454 // synthetic-key block / fail until the deleter finalises.
455 txn.lock()
456 .unwrap()
457 .lock(synthetic_lsn, LockType::Write, false)
458 .map_err(DbiError::TxnError)?;
459 } else if let Some(lm) = &self.lock_manager {
460 // Legacy auto-commit (no synthetic auto-txn): acquire and
461 // immediately release. The Database::delete path always
462 // wraps in a synthetic auto-txn so the lock is actually
463 // held across the per-record delete; this branch is only
464 // for direct-CursorImpl callers.
465 lm.lock(synthetic_lsn, self.id, LockType::Write, false, false)
466 .map_err(DbiError::TxnError)?;
467 let _ = lm.release(synthetic_lsn, self.id);
468 }
469 Ok(())
470 }
471
472 /// Probes the synthetic-key lock for `key` to detect uncommitted
473 /// deletes after a `NotFound` BIN lookup.
474 ///
475 /// Returns `Ok(())` if the key is genuinely absent (no concurrent
476 /// writer holds the synthetic-key lock); returns the lock-manager
477 /// error otherwise so the caller can surface it to the user.
478 ///
479 /// See [`Self::lock_synthetic_key_for_delete`] for the wider
480 /// rationale. Read-uncommitted txns skip the probe entirely
481 /// (matching the LSN-keyed `lock_ln` early-return).
482 fn contest_synthetic_key_for_missing_read(
483 &self,
484 key: &[u8],
485 ) -> Result<(), DbiError> {
486 let db_id = self.db_impl.read().get_id().id() as u64;
487 let synthetic_lsn = Lsn::synthetic_key_lock_id(db_id, key);
488 if let Some(txn) = &self.txn_ref {
489 let mut guard = txn.lock().unwrap();
490 if guard.is_read_uncommitted_default() {
491 return Ok(());
492 }
493 // CRITICAL: if this txn already owns a Write lock on the
494 // synthetic key (because it is the deleter), short-circuit
495 // — we must NEVER call `release_lock` on a Read acquisition
496 // that aliased an existing Write lock, because the inner
497 // `Txn::lock` unconditionally inserts the lsn into
498 // `read_locks`, and a subsequent `release_lock` would
499 // remove the txn from the lock manager's owner set,
500 // erroneously freeing the Write lock for other lockers.
501 if guard.owns_write_lock(synthetic_lsn) {
502 return Ok(());
503 }
504 // Try non-blocking first to detect contention without
505 // waiting; on contention, switch to blocking (no-wait
506 // txns surface the LockNotAvailable error here).
507 match guard.lock(synthetic_lsn, LockType::Read, true) {
508 Ok(_) => {
509 // Granted immediately — no contender; release
510 // immediately so we don't hold a lock on a
511 // not-found probe. Read-committed and
512 // serializable both treat this as a one-shot
513 // probe (the data does not exist; there is
514 // nothing to keep stable).
515 let _ = guard.release_lock(synthetic_lsn);
516 Ok(())
517 }
518 Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
519 // No-wait txn: surface the typed lock error.
520 guard
521 .lock(synthetic_lsn, LockType::Read, false)
522 .map_err(DbiError::TxnError)?;
523 let _ = guard.release_lock(synthetic_lsn);
524 Ok(())
525 }
526 Err(e) => Err(DbiError::TxnError(e)),
527 }
528 } else if let Some(lm) = &self.lock_manager {
529 match lm.lock(synthetic_lsn, self.id, LockType::Read, true, false) {
530 Ok(_) => {
531 let _ = lm.release(synthetic_lsn, self.id);
532 Ok(())
533 }
534 Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
535 lm.lock(
536 synthetic_lsn,
537 self.id,
538 LockType::Read,
539 false,
540 false,
541 )
542 .map_err(DbiError::TxnError)?;
543 let _ = lm.release(synthetic_lsn, self.id);
544 Ok(())
545 }
546 Err(e) => Err(DbiError::TxnError(e)),
547 }
548 } else {
549 Ok(())
550 }
551 }
552
553 /// Moves the write lock to `new_lsn` and records abort before-image info.
554 ///
555 /// For txn-backed cursors:
556 /// - If `old_lsn` is valid: moves lock via `Txn::move_write_lock_to_new_lsn()`.
557 /// - Otherwise (new insert): acquires a new write lock on `new_lsn`.
558 /// - Records abort info so the txn can undo on abort.
559 /// - Notes the log entry on the txn for TxnCommit/Abort chaining.
560 ///
561 /// For auto-commit cursors:
562 /// - Acquires write lock on `new_lsn`, releases both old and new locks
563 /// immediately (auto-commit releases after the write is logged).
564 ///
565 /// / `Txn.moveWriteLockToNewLsn()`.
566 fn finalize_write_lock(
567 &self,
568 old_lsn: u64,
569 new_lsn: Lsn,
570 abort_key: Option<Vec<u8>>,
571 abort_data: Option<Vec<u8>>,
572 ) -> Result<(), DbiError> {
573 let new_lsn_u64 = new_lsn.as_u64();
574 // Deferred-write or no log manager: no LSN assigned, nothing to lock.
575 if new_lsn_u64 == noxu_util::NULL_LSN.as_u64() {
576 return Ok(());
577 }
578
579 if let Some(txn) = &self.txn_ref {
580 let db_id = self.db_impl.read().get_id().id() as u64;
581 let mut guard = txn.lock().unwrap();
582 if old_lsn != noxu_util::NULL_LSN.as_u64() {
583 // Move the existing write lock from old slot to new slot.
584 guard
585 .move_write_lock_to_new_lsn(old_lsn, new_lsn_u64)
586 .map_err(DbiError::TxnError)?;
587 } else {
588 // New insert: no old lock to move — acquire a fresh write lock.
589 guard
590 .lock(new_lsn_u64, LockType::Write, false)
591 .map_err(DbiError::TxnError)?;
592 }
593 let abort_known_deleted = old_lsn == noxu_util::NULL_LSN.as_u64();
594 guard.set_write_lock_abort_info(
595 new_lsn_u64,
596 old_lsn,
597 abort_key,
598 abort_data,
599 abort_known_deleted,
600 db_id,
601 );
602 guard.note_log_entry(new_lsn_u64);
603 } else if let Some(lm) = &self.lock_manager {
604 // Auto-commit: acquire write lock, then release immediately.
605 lm.lock(new_lsn_u64, self.id, LockType::Write, false, false)
606 .map_err(DbiError::TxnError)?;
607 if old_lsn != noxu_util::NULL_LSN.as_u64() {
608 let _ = lm.release(old_lsn, self.id);
609 }
610 let _ = lm.release(new_lsn_u64, self.id);
611 }
612 Ok(())
613 }
614
615 /// Returns true if the underlying database uses sorted duplicates.
616 ///
617 /// When true, every (key, data) pair is stored as a two-part composite
618 /// key via `dup_key_data::combine()` and the tree uses a custom comparator.
619 #[inline]
620 fn is_sorted_dup(&self) -> bool {
621 self.db_impl.read().get_sorted_duplicates()
622 }
623
624 /// Returns the unique cursor ID.
625 ///
626 /// Used for debugging and cursor tracking.
627 pub fn get_id(&self) -> i64 {
628 self.id
629 }
630
631 /// Returns the database this cursor operates on.
632 pub fn get_database(&self) -> &Arc<RwLock<DatabaseImpl>> {
633 &self.db_impl
634 }
635
636 /// Returns the locker ID.
637 pub fn get_locker_id(&self) -> i64 {
638 self.locker_id
639 }
640
641 /// Returns true if the cursor is initialized (positioned on a record).
642 pub fn is_initialized(&self) -> bool {
643 self.state == CursorState::Initialized
644 }
645
646 /// Returns true if the cursor is closed.
647 pub fn is_closed(&self) -> bool {
648 self.state == CursorState::Closed
649 }
650
651 /// Returns the current key, if positioned.
652 pub fn get_current_key(&self) -> Option<&[u8]> {
653 self.current_key.as_deref()
654 }
655
656 /// Returns the current data, if positioned.
657 pub fn get_current_data(&self) -> Option<&[u8]> {
658 self.current_data.as_deref()
659 }
660
661 /// Returns the current LSN, if positioned.
662 pub fn get_current_lsn(&self) -> u64 {
663 self.current_lsn
664 }
665
666 /// Checks the cursor is not closed.
667 fn check_state(&self) -> Result<(), DbiError> {
668 #[cfg(any(test, feature = "testing"))]
669 if tick_fail() {
670 return Err(DbiError::CursorClosed);
671 }
672 // X-13: check environment validity before cursor state.
673 // Both the explicit invalidation flag and the I/O-failure flag
674 // (io_invalid) are tested so that reads on a failed environment
675 // return EnvironmentFailure rather than stale BIN data.
676 if self.env_invalid.as_ref().is_some_and(|f| f.load(Ordering::Acquire))
677 {
678 return Err(DbiError::EnvironmentFailure {
679 reason: "environment has been invalidated".into(),
680 });
681 }
682 if self
683 .log_manager
684 .as_ref()
685 .is_some_and(|lm| lm.io_invalid.load(Ordering::Acquire))
686 {
687 return Err(DbiError::EnvironmentFailure {
688 reason: "I/O failure: environment invalidated by fsync error"
689 .into(),
690 });
691 }
692 match self.state {
693 CursorState::Closed => Err(DbiError::CursorClosed),
694 _ => Ok(()),
695 }
696 }
697
698 /// Checks the cursor is initialized.
699 fn check_initialized(&self) -> Result<(), DbiError> {
700 #[cfg(any(test, feature = "testing"))]
701 if tick_fail() {
702 return Err(DbiError::CursorClosed);
703 }
704 match self.state {
705 CursorState::Closed => Err(DbiError::CursorClosed),
706 CursorState::NotInitialized => Err(DbiError::CursorNotInitialized),
707 CursorState::Initialized => Ok(()),
708 }
709 }
710
711 /// Positions the cursor at a specific key.
712 ///
713 /// / `CursorImpl.searchRange()`.
714 ///
715 /// Uses `Tree::search(key)` to locate the BIN slot for the key:
716 ///
717 /// * `SearchMode::Set` / `SearchMode::Both` — exact key match required.
718 /// Returns `NotFound` if the key is not present.
719 /// * `SearchMode::SetRange` / `SearchMode::BothRange` — positions at the
720 /// first key >= the search key (range search). Currently degrades to
721 /// an exact-match check; full range support requires iterating forward
722 /// until the key is >= the search key.
723 ///
724 /// # Arguments
725 ///
726 /// * `key` - The key to search for
727 /// * `data` - Optional data for Both/BothRange modes
728 /// * `search_mode` - The search mode (Set, Both, SetRange, BothRange)
729 ///
730 /// # Returns
731 ///
732 /// * `Success` if the key was found and cursor positioned
733 /// * `NotFound` if the key does not exist
734 pub fn search(
735 &mut self,
736 key: &[u8],
737 data: Option<&[u8]>,
738 search_mode: SearchMode,
739 ) -> Result<OperationStatus, DbiError> {
740 self.check_state()?;
741
742 let is_dup = self.is_sorted_dup();
743
744 if is_dup {
745 return self.search_dup(key, data, search_mode);
746 }
747
748 // Non-dup path — single descent via `search_with_data` (Wave-11-I).
749 //
750 // Previously this path made three separate tree descents per `get()`:
751 // 1. `tree.search(key)` — existence check only.
752 // 2. `get_data_from_tree(tree, key)` — re-descended to fetch data.
753 // 3. `find_bin_for_key(root, key)` — re-descended for BIN pinning.
754 // `search_with_data` folds all three into one descent and uses binary
755 // search (`find_entry_compressed`) at the BIN level.
756 let slot = {
757 let db = self.db_impl.read();
758 if let Some(tree) = db.get_real_tree() {
759 tree.search_with_data(key)
760 } else {
761 None
762 }
763 };
764 let found = slot.as_ref().is_some_and(|s| s.found);
765
766 match search_mode {
767 SearchMode::Set | SearchMode::Both => {
768 if found {
769 // SAFETY: found => slot.is_some() && slot.found
770 let slot = slot.unwrap();
771 let slot_data = slot.data;
772 let slot_lsn = slot.lsn;
773 let bin_arc = slot.bin_arc;
774 // If a writer held the write lock when we called lock_ln,
775 // our pre-fetched slot_data is stale — re-read from the BIN
776 // after the writer commits/aborts. If lock_ln returned
777 // immediately (no contention), slot_data is still valid.
778 let contended = self.lock_ln(slot_lsn)?;
779 let final_data = if contended {
780 let db = self.db_impl.read();
781 db.get_real_tree()
782 .and_then(|tree| {
783 Self::get_data_from_tree(&tree, key)
784 })
785 .map(|(d, _)| d)
786 .map(Some)
787 .unwrap_or(slot_data)
788 } else {
789 slot_data
790 };
791 // Audit Finding 4: BDB-JE's SearchBoth is exact-match on
792 // (key, data) regardless of duplicate-set membership; on a
793 // non-dup DB it must still validate that the slot's data
794 // equals the user-supplied data. Pre-fix the `data`
795 // argument was silently dropped and `Success` was
796 // returned for any matching key, contradicting the
797 // documented contract on `Get::SearchBoth`. See
798 // `docs/src/internal/api-audit-2026-05-cursor.md`.
799 if matches!(search_mode, SearchMode::Both) {
800 let user_data = data.unwrap_or(&[]);
801 let stored = final_data.as_deref().unwrap_or(&[]);
802 if stored != user_data {
803 return Ok(OperationStatus::NotFound);
804 }
805 }
806 self.current_key = Some(key.to_vec());
807 self.current_data = final_data;
808 self.current_lsn = slot_lsn;
809 // Use the actual BIN slot index from search_with_data so
810 // that retrieve_next() advances to the correct next slot
811 // rather than always starting from index 1.
812 self.current_index = slot.slot_index as i32;
813 self.state = CursorState::Initialized;
814 // BIN arc already obtained from the single descent.
815 self.update_bin_pin(Some(bin_arc));
816 Ok(OperationStatus::Success)
817 } else {
818 // Wave 5: contest a synthetic-key read lock on the
819 // missing slot to detect uncommitted deletes. See
820 // `lock_synthetic_key_for_delete`.
821 self.contest_synthetic_key_for_missing_read(key)?;
822 Ok(OperationStatus::NotFound)
823 }
824 }
825 SearchMode::SetRange | SearchMode::BothRange => {
826 if found {
827 let slot = slot.unwrap();
828 let slot_data = slot.data;
829 let slot_lsn = slot.lsn;
830 let bin_arc = slot.bin_arc;
831 let contended = self.lock_ln(slot_lsn)?;
832 let final_data = if contended {
833 let db = self.db_impl.read();
834 db.get_real_tree()
835 .and_then(|tree| {
836 Self::get_data_from_tree(&tree, key)
837 })
838 .map(|(d, _)| d)
839 .map(Some)
840 .unwrap_or(slot_data)
841 } else {
842 slot_data
843 };
844 self.current_key = Some(key.to_vec());
845 self.current_data = final_data;
846 self.current_lsn = slot_lsn;
847 // Use the actual BIN slot index (same rationale as Set branch).
848 self.current_index = slot.slot_index as i32;
849 self.state = CursorState::Initialized;
850 // BIN arc already obtained from the single descent.
851 self.update_bin_pin(Some(bin_arc));
852 Ok(OperationStatus::Success)
853 } else {
854 let next_entry: Option<(Vec<u8>, Vec<u8>, u64, usize)> = {
855 let db = self.db_impl.read();
856 if let Some(tree) = db.get_real_tree() {
857 Self::find_range_entry(&tree, key)
858 } else {
859 None
860 }
861 };
862 match next_entry {
863 Some((k, v, lsn, slot_idx)) => {
864 self.lock_ln(lsn)?;
865 // Pin the BIN for the range-found key.
866 let bin_arc = {
867 let db = self.db_impl.read();
868 db.get_real_tree().and_then(|tree| {
869 tree.get_root().and_then(|r| {
870 Self::find_bin_for_key(r, &k)
871 })
872 })
873 };
874 self.current_key = Some(k);
875 self.current_data = Some(v);
876 self.current_lsn = lsn;
877 self.current_index = slot_idx as i32;
878 self.state = CursorState::Initialized;
879 self.update_bin_pin(bin_arc);
880 Ok(OperationStatus::Success)
881 }
882 None => Ok(OperationStatus::NotFound),
883 }
884 }
885 }
886 }
887 }
888
889 /// Sorted-dup variant of `search()`.
890 ///
891 /// For sorted-dup databases (key, data) pairs are stored as two-part
892 /// composite keys `[key][data][packed_key_len]`. This method builds the
893 /// appropriate two-part search key and delegates to the tree's
894 /// comparator-aware range finder.
895 ///
896 /// Dup path from 7.5.
897 fn search_dup(
898 &mut self,
899 key: &[u8],
900 data: Option<&[u8]>,
901 search_mode: SearchMode,
902 ) -> Result<OperationStatus, DbiError> {
903 let search_two_part_key: Vec<u8> = match search_mode {
904 // Both / BothRange: search for the exact (key, data) pair.
905 SearchMode::Both | SearchMode::BothRange => {
906 dup_key_data::combine(key, data.unwrap_or(b""))
907 }
908 // Set / SetRange: position at the first entry whose primary key
909 // >= `key` — use the lower bound (smallest possible two-part key
910 // for this primary key).
911 SearchMode::Set | SearchMode::SetRange => {
912 dup_key_data::lower_bound(key)
913 }
914 };
915
916 let entry: Option<(
917 Vec<u8>,
918 Vec<u8>,
919 usize,
920 u64,
921 std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
922 )> = {
923 let db = self.db_impl.read();
924 if let Some(tree) = db.get_real_tree() {
925 tree.first_entry_at_or_after_with_index(&search_two_part_key)
926 } else {
927 None
928 }
929 };
930
931 match entry {
932 Some((raw_key, _, idx, slot_lsn, bin_arc)) => {
933 // raw_key is the two-part key found; check that the primary
934 // key part matches what was requested (for Set and Both).
935 let matches = match search_mode {
936 SearchMode::Set => dup_key_data::matches_key(&raw_key, key),
937 SearchMode::Both => raw_key == search_two_part_key,
938 SearchMode::SetRange => {
939 // Any key >= the search key is valid.
940 true
941 }
942 SearchMode::BothRange => {
943 // Position at the first (key, data) where data >=
944 // the given data; primary key must still match.
945 dup_key_data::matches_key(&raw_key, key)
946 }
947 };
948 if matches {
949 self.lock_ln(slot_lsn)?;
950 // Store the raw two-part key; get_current() will decode it.
951 self.current_key = Some(raw_key);
952 self.current_data = None; // decoded lazily in get_current()
953 self.current_lsn = slot_lsn;
954 // Wave 11-N Bug 2 fix: store the actual BIN index, not
955 // a hard-coded 0. Pre-fix the cursor reported
956 // current_index = 0 after every dup search, which made
957 // the subsequent NextDup compute next_index = 1 in the
958 // BIN's slot space. For any primary not occupying
959 // BIN slot 0 the read either landed on a different
960 // primary's dup (apply_dup_filter rejected it as
961 // NotFound) or returned an unrelated entry entirely.
962 // Storing the real slot index plus pinning the BIN
963 // closes the bug and matches the invariant maintained
964 // by `get_first` / `get_last`.
965 self.current_index = idx as i32;
966 self.state = CursorState::Initialized;
967 self.update_bin_pin(Some(bin_arc));
968 Ok(OperationStatus::Success)
969 } else {
970 Ok(OperationStatus::NotFound)
971 }
972 }
973 None => Ok(OperationStatus::NotFound),
974 }
975 }
976
977 /// Acquires a read lock on a log record by LSN.
978 ///
979 /// `CursorImpl.lockLN(LockType.READ)`. When no lock manager
980 /// is wired (read-only cursors / unit tests) this is a no-op.
981 ///
982 /// For txn-backed cursors the lock is tracked in the `Txn` and held until
983 /// commit/abort. For auto-commit cursors the lock is acquired (to wait
984 /// for any current exclusive writer to finish) and then released
985 /// immediately — mirroring `AutoTxn` single-operation semantics.
986 ///
987 /// **SERIALIZABLE isolation (T-F2)**: when the cursor's txn has
988 /// `is_serializable_isolation()` set, this acquires `LockType::RangeRead`
989 /// instead of `LockType::Read`, mirroring JE `Cursor.getLockType(rangeLock
990 /// = true)`. `RangeRead` conflicts with a concurrent `RangeInsert` on the
991 /// same LSN, blocking or triggering a restart on phantom inserts.
992 ///
993 /// Returns an error only when the lock would deadlock or the locker is
994 /// invalid; `NULL_LSN` records are skipped (lock-free slots).
995 ///
996 /// Returns `Ok(contended)` where `contended = true` means the lock was
997 /// not immediately available — a concurrent writer held an exclusive lock
998 /// and we had to wait. When `contended` is `true`, any data pre-fetched
999 /// before calling this method may be stale (the writer may have committed
1000 /// or aborted during the wait), and the caller should re-read from the BIN.
1001 /// When `contended` is `false`, the lock was granted immediately with no
1002 /// intervening write, so pre-fetched data remains valid.
1003 ///
1004 /// Returns `Err(DbiError::TxnError(TxnError::RangeRestart))` if a
1005 /// concurrent `RangeInsert` owner caused a range restart — the caller
1006 /// must abort the current scan position and restart the operation.
1007 fn lock_ln(&self, lsn: u64) -> Result<bool, DbiError> {
1008 if lsn == noxu_util::NULL_LSN.as_u64() {
1009 return Ok(false);
1010 }
1011 if let Some(txn) = &self.txn_ref {
1012 let mut guard = txn.lock().unwrap();
1013 // F2: read-uncommitted txns skip read-lock acquisition
1014 // entirely. This mirrors the per-operation
1015 // `LockMode::ReadUncommitted` path but applies to every
1016 // read on the txn.
1017 if guard.is_read_uncommitted_default() {
1018 return Ok(false);
1019 }
1020 // T-F2: SERIALIZABLE cursors acquire RangeRead to protect against
1021 // phantom inserts. All other isolation levels use Read.
1022 let lock_type = if guard.is_serializable_isolation() {
1023 LockType::RangeRead
1024 } else {
1025 LockType::Read
1026 };
1027 // Try non-blocking first to detect write contention without waiting.
1028 let contended = match guard.lock(lsn, lock_type, true) {
1029 Ok(_) => false, // granted immediately — no concurrent writer
1030 Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
1031 // A writer holds the lock; block until they commit/abort.
1032 guard
1033 .lock(lsn, lock_type, false)
1034 .map_err(DbiError::TxnError)?;
1035 true
1036 }
1037 // RangeRestart: a concurrent RangeInsert owner caused a
1038 // restart signal — propagate immediately so the caller can
1039 // restart the scan. This is the JE RangeRestartException path.
1040 Err(e) => return Err(DbiError::TxnError(e)),
1041 };
1042 // Read-committed: release the read lock immediately after each
1043 // operation so concurrent writers are not blocked for the txn
1044 // duration. Under serializable isolation the lock is held until
1045 // commit/abort (tracked in Txn.read_locks).
1046 if guard.is_read_committed_isolation() {
1047 guard.release_lock(lsn).map_err(DbiError::TxnError)?;
1048 }
1049 Ok(contended)
1050 } else if let Some(lm) = &self.lock_manager {
1051 // Auto-commit: detect contention via non-blocking attempt first.
1052 // Auto-commit cursors do not provide serializable phantom protection
1053 // across multiple operations; use Read regardless of isolation.
1054 let contended =
1055 match lm.lock(lsn, self.id, LockType::Read, true, false) {
1056 Ok(_) => {
1057 lm.release(lsn, self.id).map_err(DbiError::TxnError)?;
1058 false
1059 }
1060 Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
1061 lm.lock(lsn, self.id, LockType::Read, false, false)
1062 .map_err(DbiError::TxnError)?;
1063 lm.release(lsn, self.id).map_err(DbiError::TxnError)?;
1064 true
1065 }
1066 Err(e) => return Err(DbiError::TxnError(e)),
1067 };
1068 Ok(contended)
1069 } else {
1070 Ok(false)
1071 }
1072 }
1073
1074 /// Acquires a `RangeInsert` lock on the successor key's LSN for a new
1075 /// SERIALIZABLE insert, implementing JE's next-key locking protocol.
1076 ///
1077 /// When a transaction inserts a brand-new key `key` (i.e.
1078 /// `old_lsn == NULL_LSN`), this method:
1079 ///
1080 /// 1. Looks up the first committed key at-or-after `key` in the tree
1081 /// (the would-be successor of the new key).
1082 /// 2. Acquires `RangeInsert` on that successor's LSN so that any
1083 /// concurrent SERIALIZABLE scanner holding `RangeRead` on the same
1084 /// slot is either blocked (insert waits) or triggers a restart (scan
1085 /// gets `RangeRestart`).
1086 /// 3. If no successor exists (the new key would be the last key in the
1087 /// database), acquires `RangeInsert` on the per-database EOF sentinel
1088 /// LSN so scans that called `lock_eof_for_scan` on the same sentinel
1089 /// are protected.
1090 ///
1091 /// Skipped when:
1092 /// - `old_lsn != NULL_LSN` (this is an update, not a new insert; the
1093 /// existing `Write` lock on the old LSN already conflicts with any
1094 /// concurrent `RangeRead`).
1095 /// - The cursor has no txn (auto-commit: locks released per-op; no
1096 /// cross-op phantom protection).
1097 /// - The txn already owns any lock on the successor LSN (same-txn
1098 /// insert+scan: avoids an illegal RangeRead→RangeInsert upgrade).
1099 ///
1100 /// Note: `RangeInsert` is acquired for ALL new-key inserts, regardless of
1101 /// the inserter's isolation level. A concurrent SERIALIZABLE scanner
1102 /// holding `RangeRead` on the successor will be blocked or restarted.
1103 /// For non-serializable scanners, `RangeRead` is never held, so the
1104 /// `RangeInsert` is granted immediately with no contention.
1105 ///
1106 /// Mirror of JE `CursorImpl.lockForInsert()` / next-key locking.
1107 fn lock_range_insert(
1108 &self,
1109 key: &[u8],
1110 old_lsn: u64,
1111 ) -> Result<(), DbiError> {
1112 // Only needed for genuinely new inserts.
1113 if old_lsn != noxu_util::NULL_LSN.as_u64() {
1114 return Ok(());
1115 }
1116 let txn = match &self.txn_ref {
1117 Some(t) => t,
1118 None => return Ok(()), // auto-commit: no cross-op protection
1119 };
1120 let mut guard = txn.lock().unwrap();
1121 // Find the first committed key at-or-after `key` (the successor of
1122 // the key being inserted).
1123 let successor_lsn: u64 = {
1124 let db = self.db_impl.read();
1125 match db.get_real_tree() {
1126 Some(tree) => {
1127 match tree.first_entry_at_or_after(key) {
1128 Some((_k, _v, lsn)) => lsn,
1129 None => {
1130 // No successor: the new key will be the last key
1131 // in the database. Use the per-database EOF
1132 // sentinel so a concurrent scanner that called
1133 // lock_eof_for_scan is protected.
1134 let db_id = db.get_id().id() as u64;
1135 noxu_util::Lsn::eof_lock_lsn(db_id)
1136 }
1137 }
1138 }
1139 None => {
1140 // Empty tree: use EOF sentinel.
1141 let db_id = db.get_id().id() as u64;
1142 noxu_util::Lsn::eof_lock_lsn(db_id)
1143 }
1144 }
1145 };
1146 // Guard: if the same txn already owns any lock on the successor LSN
1147 // (e.g. a RangeRead from scanning the successor key), skip acquisition
1148 // to avoid an illegal RangeRead→RangeInsert upgrade in the lock manager.
1149 // The existing RangeRead already blocks concurrent insertions from other
1150 // transactions, so no additional protection is needed.
1151 if guard.owns_any_lock(successor_lsn) {
1152 return Ok(());
1153 }
1154 guard
1155 .lock(successor_lsn, LockType::RangeInsert, false)
1156 .map_err(DbiError::TxnError)?;
1157 Ok(())
1158 }
1159
1160 /// Acquires a `RangeRead` lock on the per-database EOF sentinel LSN.
1161 ///
1162 /// Called by a SERIALIZABLE forward scan when it reaches the end of the
1163 /// key space (no more keys to read). This protects against phantom
1164 /// inserts of keys that sort after every currently-scanned key: a
1165 /// concurrent inserter will acquire `RangeInsert` on the same sentinel
1166 /// and be blocked until this scan's transaction commits.
1167 ///
1168 /// No-op unless the cursor is backed by a SERIALIZABLE transaction.
1169 ///
1170 /// Mirror of JE `CursorImpl.lockEof(LockType.RANGE_READ)`.
1171 fn lock_eof_for_scan(&self) -> Result<(), DbiError> {
1172 let txn = match &self.txn_ref {
1173 Some(t) => t,
1174 None => return Ok(()),
1175 };
1176 let mut guard = txn.lock().unwrap();
1177 if !guard.is_serializable_isolation() {
1178 return Ok(());
1179 }
1180 let eof_lsn = {
1181 let db = self.db_impl.read();
1182 let db_id = db.get_id().id() as u64;
1183 noxu_util::Lsn::eof_lock_lsn(db_id)
1184 };
1185 // If the txn already owns any lock on the EOF sentinel (e.g. from a
1186 // prior scan that also reached EOF), skip acquisition.
1187 if guard.owns_any_lock(eof_lsn) {
1188 return Ok(());
1189 }
1190 // Non-blocking attempt first; on RangeInsert conflict we get Restart.
1191 match guard.lock(eof_lsn, LockType::RangeRead, true) {
1192 Ok(_) => Ok(()),
1193 Err(noxu_txn::TxnError::LockNotAvailable { .. }) => {
1194 guard
1195 .lock(eof_lsn, LockType::RangeRead, false)
1196 .map_err(DbiError::TxnError)?;
1197 Ok(())
1198 }
1199 Err(e) => Err(DbiError::TxnError(e)),
1200 }
1201 }
1202
1203 /// Fetches the data associated with `key` from a tree (BIN-level lookup).
1204 ///
1205 /// Returns `(data, slot_lsn)` so the caller can acquire a read lock.
1206 ///
1207 /// Data-read path in `CursorImpl.lockAndGetCurrent()`.
1208 fn get_data_from_tree(tree: &Tree, key: &[u8]) -> Option<(Vec<u8>, u64)> {
1209 use noxu_tree::tree::TreeNode;
1210 let root = tree.get_root()?;
1211 // Descend to the BIN that should contain `key` (not always the leftmost).
1212 let bin_arc = Self::find_bin_for_key(root, key)?;
1213 let guard = bin_arc.read();
1214 match &*guard {
1215 TreeNode::Bottom(bin) => {
1216 // BIN entries store compressed (suffix) keys under the BIN's
1217 // key_prefix. If the key doesn't start with the prefix,
1218 // it is not in this BIN — return None rather than panicking.
1219 if !bin.key_prefix.is_empty()
1220 && !key.starts_with(bin.key_prefix.as_slice())
1221 {
1222 return None;
1223 }
1224 let suffix = bin.compress_key(key);
1225 bin.entries
1226 .iter()
1227 .find(|e| e.key.as_slice() == suffix.as_slice())
1228 .map(|e| {
1229 (e.data.clone().unwrap_or_default(), e.lsn.as_u64())
1230 })
1231 }
1232 _ => None,
1233 }
1234 }
1235
1236 /// Finds the first entry in the tree whose key >= `key`.
1237 ///
1238 /// Returns `(key, data, slot_lsn)` so the caller can acquire a read lock.
1239 ///
1240 /// # Algorithm
1241 ///
1242 /// SearchGte is a two-step probe:
1243 ///
1244 /// 1. Locate the BIN that *should* contain `key` via
1245 /// `find_bin_for_key` and scan it for the smallest entry whose
1246 /// full key is `>= key`. The seed `key` is *not* required to
1247 /// share the BIN's learned `key_prefix` — we explicitly handle
1248 /// the three legal seed/`key_prefix` relationships:
1249 ///
1250 /// * `key.starts_with(key_prefix)` — cheap suffix comparison;
1251 /// the stored `entries[i].key` are suffixes under that prefix,
1252 /// so we compare against `&key[plen..]`.
1253 /// * `key < key_prefix` lexicographically — every full key in
1254 /// this BIN starts with `key_prefix` and is therefore strictly
1255 /// greater than `key`; the answer is `entries[0]`. This
1256 /// includes the common case of a short search seed (e.g.
1257 /// `b"K\0"`) on a BIN whose learned prefix has grown longer
1258 /// than the seed (`b"K\0bucket\0…"`).
1259 /// * `key > key_prefix` lexicographically — every full key in
1260 /// this BIN is strictly less than `key`; nothing here matches,
1261 /// fall through to step 2.
1262 ///
1263 /// 2. If step 1 returned nothing (either no entry in the chosen
1264 /// BIN satisfies `>= key`, or the BIN was empty / the seed sits
1265 /// lex-after the BIN's prefix) call `Tree::get_next_bin(key)`
1266 /// and return its first entry, which by B+tree invariants is
1267 /// strictly greater than `key`.
1268 ///
1269 /// # Why step 2's first entry is the correct answer
1270 ///
1271 /// `find_bin_for_key` descends by picking, at each internal level,
1272 /// the largest separator `<= key`. If it lands on BIN `B` reached
1273 /// via slot `p` of some ancestor, then `separator(p) <= key` and
1274 /// (when slot `p+1` exists) `separator(p+1) > key` strictly —
1275 /// otherwise descent would have picked `p+1`. By the B+tree
1276 /// key-range invariant every key in the subtree rooted at `slot(p+1)`
1277 /// is `>= separator(p+1) > key`. `Tree::get_next_bin` returns the
1278 /// leftmost BIN of exactly that next-sibling subtree, so its first
1279 /// entry is the smallest key in the whole tree that is `> key`.
1280 /// One probe, deterministically correct — no looping needed.
1281 ///
1282 /// # Locking
1283 ///
1284 /// The step-1 BIN read lock is released before step 2 fires so that
1285 /// `get_next_bin`'s own latch-coupled descent is unconstrained and
1286 /// other threads (especially writers crossing this BIN) are not
1287 /// blocked on a lock we no longer need.
1288 ///
1289 /// # Empty intermediate BINs
1290 ///
1291 /// If the chosen BIN is empty *and* `get_next_bin` returns an empty
1292 /// BIN (a transient state under delete-heavy workloads, before the
1293 /// cleaner has collapsed it), this returns `None` and the caller
1294 /// reports `NotFound`. This matches `Get::Next`'s behaviour today;
1295 /// see also the follow-up note in
1296 /// `cursor_search_gte_skips_past_empty_bin_is_pre_existing_limit`.
1297 fn find_range_entry(
1298 tree: &Tree,
1299 key: &[u8],
1300 ) -> Option<(Vec<u8>, Vec<u8>, u64, usize)> {
1301 use noxu_tree::tree::TreeNode;
1302
1303 // Step 1: scan the BIN that should contain `key`. The read lock
1304 // is dropped at the end of this block before step 2 runs.
1305 let in_current: Option<(Vec<u8>, Vec<u8>, u64, usize)> = {
1306 let root = tree.get_root()?;
1307 // Use find_bin_for_key so range searches also work for non-leftmost BINs.
1308 let bin_arc = Self::find_bin_for_key(root, key)?;
1309 let guard = bin_arc.read();
1310 match &*guard {
1311 TreeNode::Bottom(bin) => {
1312 let plen = bin.key_prefix.len();
1313
1314 if plen != 0 && !key.starts_with(bin.key_prefix.as_slice())
1315 {
1316 // Seed does not share this BIN's learned prefix.
1317 // Decide by lex-comparing seed against key_prefix;
1318 // never call compress_key (which requires `starts_with`).
1319 if key < bin.key_prefix.as_slice() {
1320 // Every key in this BIN is > seed.
1321 bin.entries.first().and_then(|e| {
1322 bin.get_full_key(0).map(|fk| {
1323 (
1324 fk,
1325 e.data.clone().unwrap_or_default(),
1326 e.lsn.as_u64(),
1327 0usize,
1328 )
1329 })
1330 })
1331 } else {
1332 // Every key in this BIN is < seed; let step 2
1333 // handle it.
1334 None
1335 }
1336 } else {
1337 // Cheap path: suffix comparison.
1338 let suffix = &key[plen..];
1339 bin.entries
1340 .iter()
1341 .enumerate()
1342 .find(|(_, e)| e.key.as_slice() >= suffix)
1343 .and_then(|(i, e)| {
1344 bin.get_full_key(i).map(|fk| {
1345 (
1346 fk,
1347 e.data.clone().unwrap_or_default(),
1348 e.lsn.as_u64(),
1349 i,
1350 )
1351 })
1352 })
1353 }
1354 }
1355 _ => None,
1356 }
1357 // bin_arc read lock dropped here.
1358 };
1359
1360 if let Some(r) = in_current {
1361 return Some(r);
1362 }
1363
1364 // Step 2: chosen BIN had nothing >= key. By B+tree invariants the
1365 // first entry of the next BIN is strictly > key, which satisfies
1366 // SearchGte. No iteration: one call, one answer.
1367 // The first entry of the next BIN is at slot index 0.
1368 let next = tree.get_next_bin(key)?;
1369 let e = next.into_iter().next()?;
1370 Some((e.key, e.data.unwrap_or_default(), e.lsn.as_u64(), 0))
1371 }
1372
1373 /// Descends from the given node to the leftmost BIN, returning its Arc.
1374 fn descend_to_bin(
1375 node: std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1376 ) -> Option<std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>>
1377 {
1378 use noxu_tree::tree::TreeNode;
1379 let mut current = node;
1380 loop {
1381 let (is_bin, child) = {
1382 let g = current.read();
1383 let is_bin = g.is_bin();
1384 let child = if !is_bin {
1385 match &*g {
1386 TreeNode::Internal(n) => {
1387 n.entries.first().and_then(|e| e.child.clone())
1388 }
1389 _ => None,
1390 }
1391 } else {
1392 None
1393 };
1394 (is_bin, child)
1395 };
1396 if is_bin {
1397 return Some(current);
1398 }
1399 current = child?;
1400 }
1401 }
1402
1403 /// Descends from the given node to the rightmost BIN, returning its Arc.
1404 fn descend_to_last_bin(
1405 node: std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1406 ) -> Option<std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>>
1407 {
1408 use noxu_tree::tree::TreeNode;
1409 let mut current = node;
1410 loop {
1411 let (is_bin, child) = {
1412 let g = current.read();
1413 let is_bin = g.is_bin();
1414 let child = if !is_bin {
1415 match &*g {
1416 TreeNode::Internal(n) => {
1417 n.entries.last().and_then(|e| e.child.clone())
1418 }
1419 _ => None,
1420 }
1421 } else {
1422 None
1423 };
1424 (is_bin, child)
1425 };
1426 if is_bin {
1427 return Some(current);
1428 }
1429 current = child?;
1430 }
1431 }
1432
1433 /// Positions the cursor at the first (smallest) record in the database.
1434 ///
1435 /// .
1436 ///
1437 /// Uses `Tree::get_first_node()` to descend to the leftmost BIN, then
1438 /// positions the cursor at slot 0.
1439 ///
1440 /// # Returns
1441 ///
1442 /// * `Success` if the tree is non-empty
1443 /// * `NotFound` if the tree is empty
1444 pub fn get_first(&mut self) -> Result<OperationStatus, DbiError> {
1445 self.check_state()?;
1446
1447 let result: Option<(
1448 Vec<u8>,
1449 Vec<u8>,
1450 i32,
1451 u64,
1452 std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1453 )> = {
1454 let db = self.db_impl.read();
1455 if let Some(tree) = db.get_real_tree() {
1456 if tree.is_empty() {
1457 None
1458 } else {
1459 use noxu_tree::tree::TreeNode;
1460 tree.get_root().and_then(|r| {
1461 let bin_arc = Self::descend_to_bin(r)?;
1462 let (key, data, lsn) = {
1463 let g = bin_arc.read();
1464 match &*g {
1465 TreeNode::Bottom(bin) => {
1466 if bin.entries.is_empty() {
1467 return None;
1468 }
1469 (
1470 bin.get_full_key(0).unwrap_or_default(),
1471 bin.entries[0]
1472 .data
1473 .clone()
1474 .unwrap_or_default(),
1475 bin.entries[0].lsn.as_u64(),
1476 )
1477 }
1478 _ => return None,
1479 }
1480 };
1481 Some((key, data, 0i32, lsn, bin_arc))
1482 })
1483 }
1484 } else {
1485 None
1486 }
1487 };
1488
1489 match result {
1490 Some((key, data, idx, lsn, bin_arc)) => {
1491 self.lock_ln(lsn)?;
1492 self.current_key = Some(key);
1493 self.current_data = Some(data);
1494 self.current_lsn = lsn;
1495 self.current_index = idx;
1496 self.state = CursorState::Initialized;
1497 self.update_bin_pin(Some(bin_arc));
1498 Ok(OperationStatus::Success)
1499 }
1500 None => {
1501 // Empty tree. T-F2: for SERIALIZABLE, lock the EOF sentinel
1502 // so inserts into the (currently empty) database are blocked.
1503 self.lock_eof_for_scan()?;
1504 Ok(OperationStatus::NotFound)
1505 }
1506 }
1507 }
1508
1509 /// Positions the cursor at the last (largest) record in the database.
1510 ///
1511 /// .
1512 ///
1513 /// Uses `Tree::get_last_node()` to descend to the rightmost BIN, then
1514 /// positions the cursor at the last slot.
1515 ///
1516 /// # Returns
1517 ///
1518 /// * `Success` if the tree is non-empty
1519 /// * `NotFound` if the tree is empty
1520 pub fn get_last(&mut self) -> Result<OperationStatus, DbiError> {
1521 self.check_state()?;
1522
1523 let result: Option<(
1524 Vec<u8>,
1525 Vec<u8>,
1526 i32,
1527 u64,
1528 std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1529 )> = {
1530 let db = self.db_impl.read();
1531 if let Some(tree) = db.get_real_tree() {
1532 if tree.is_empty() {
1533 None
1534 } else {
1535 use noxu_tree::tree::TreeNode;
1536 tree.get_root().and_then(|r| {
1537 let bin_arc = Self::descend_to_last_bin(r)?;
1538 let (key, data, last_idx, lsn) = {
1539 let g = bin_arc.read();
1540 match &*g {
1541 TreeNode::Bottom(bin) => {
1542 let n = bin.entries.len();
1543 if n == 0 {
1544 return None;
1545 }
1546 let last_idx = n - 1;
1547 (
1548 bin.get_full_key(last_idx)
1549 .unwrap_or_default(),
1550 bin.entries[last_idx]
1551 .data
1552 .clone()
1553 .unwrap_or_default(),
1554 last_idx as i32,
1555 bin.entries[last_idx].lsn.as_u64(),
1556 )
1557 }
1558 _ => return None,
1559 }
1560 };
1561 Some((key, data, last_idx, lsn, bin_arc))
1562 })
1563 }
1564 } else {
1565 None
1566 }
1567 };
1568
1569 match result {
1570 Some((key, data, idx, lsn, bin_arc)) => {
1571 self.lock_ln(lsn)?;
1572 self.current_key = Some(key);
1573 self.current_data = Some(data);
1574 self.current_lsn = lsn;
1575 self.current_index = idx;
1576 self.state = CursorState::Initialized;
1577 self.update_bin_pin(Some(bin_arc));
1578 Ok(OperationStatus::Success)
1579 }
1580 None => Ok(OperationStatus::NotFound),
1581 }
1582 }
1583
1584 /// Retrieves the current record.
1585 ///
1586 /// Returns the key and data at the cursor's current position.
1587 ///
1588 /// # Returns
1589 ///
1590 /// A tuple of (key, data) for the current record.
1591 ///
1592 /// # Errors
1593 ///
1594 /// * `CursorNotInitialized` if the cursor is not positioned on a record
1595 /// * `CursorClosed` if the cursor has been closed
1596 pub fn get_current(&self) -> Result<(Vec<u8>, Vec<u8>), DbiError> {
1597 self.check_initialized()?;
1598
1599 let raw_key =
1600 self.current_key.clone().ok_or(DbiError::CursorNotInitialized)?;
1601 let raw_data = self.current_data.clone().unwrap_or_default();
1602
1603 // For sorted-dup databases the tree stores two-part composite keys.
1604 // current_key holds the raw two-part key; split it for the caller.
1605 if self.is_sorted_dup()
1606 && let Some((pk, data)) = dup_key_data::split(&raw_key)
1607 {
1608 return Ok((pk, data));
1609 }
1610 Ok((raw_key, raw_data))
1611 }
1612
1613 /// Returns true if the slot the cursor is positioned on has been deleted
1614 /// since the cursor was last positioned.
1615 ///
1616 /// : analogous to checking KNOWN_DELETED_BIT / entry removal on
1617 /// Cursor.getCurrentLN() path — returns KEYEMPTY when the record is gone.
1618 pub fn is_current_slot_deleted(&self) -> bool {
1619 use noxu_tree::tree::TreeNode;
1620 let current_key = match &self.current_key {
1621 Some(k) => k,
1622 None => return false,
1623 };
1624 let bin_arc = match &self.current_bin_arc {
1625 Some(a) => a,
1626 None => return false,
1627 };
1628 let idx = self.current_index as usize;
1629 let guard = bin_arc.read();
1630 if let TreeNode::Bottom(bin) = &*guard {
1631 if idx >= bin.entries.len() {
1632 return true; // entry was removed
1633 }
1634 let plen = bin.key_prefix.len();
1635 let expected_suffix: &[u8] =
1636 if plen == 0 || current_key.len() <= plen {
1637 current_key.as_slice()
1638 } else {
1639 ¤t_key[plen..]
1640 };
1641 let stored = bin.entries[idx].key.as_slice();
1642 if stored != expected_suffix {
1643 return true; // different key at this index = deleted and shifted
1644 }
1645 bin.entries[idx].known_deleted
1646 } else {
1647 false
1648 }
1649 }
1650
1651 /// Moves the cursor to the next/previous record.
1652 ///
1653 /// .
1654 ///
1655 /// Advances `current_index` within the current BIN. When the BIN is
1656 /// exhausted (forward: `index >= nEntries`; backward: `index < 0`) the
1657 /// cursor moves to the adjacent BIN via `Tree::get_next_bin()` /
1658 /// `Tree::get_prev_bin()`, mirroring call to
1659 /// `tree.getNextBin(anchorBIN)` / `tree.getPrevBin(anchorBIN)`.
1660 ///
1661 /// The GetMode parameter controls direction and duplicate handling:
1662 ///
1663 /// * `Next` / `NextNoDup` / `NextDup` — move forward
1664 /// * `Prev` / `PrevNoDup` / `PrevDup` — move backward
1665 ///
1666 /// # Returns
1667 ///
1668 /// * `Success` if positioned on a new record
1669 /// * `NotFound` if there are no more records in that direction
1670 pub fn retrieve_next(
1671 &mut self,
1672 mode: GetMode,
1673 ) -> Result<OperationStatus, DbiError> {
1674 self.check_state()?;
1675
1676 if self.state == CursorState::NotInitialized {
1677 return Ok(OperationStatus::NotFound);
1678 }
1679
1680 let is_dup = self.is_sorted_dup();
1681
1682 // BDB-JE contract: NEXT_DUP / PREV_DUP advance only within the
1683 // duplicate-set of the current key. On a non-sorted-dup database
1684 // every key has exactly one record, so there can never be another
1685 // duplicate of the current position — the only correct answer is
1686 // NotFound. Without this early-return, the dup-filter below is
1687 // gated on `is_dup` and the cursor would silently degenerate into
1688 // plain Next / Prev semantics, returning the next *different* key
1689 // and violating the documented contract. See
1690 // `docs/src/internal/api-audit-2026-05-cursor.md` Finding 5.
1691 if !is_dup && matches!(mode, GetMode::NextDup | GetMode::PrevDup) {
1692 return Ok(OperationStatus::NotFound);
1693 }
1694
1695 // For NextDup/PrevDup/NextNoDup/PrevNoDup, capture the primary key of
1696 // the current position before advancing.
1697 let current_primary_key: Option<Vec<u8>> = if is_dup {
1698 self.current_key.as_ref().and_then(|raw| dup_key_data::get_key(raw))
1699 } else {
1700 None
1701 };
1702
1703 let forward = mode.is_forward();
1704 let next_index = if forward {
1705 self.current_index + 1
1706 } else {
1707 self.current_index - 1
1708 };
1709
1710 // Within-BIN traversal.
1711 //
1712 // Fast path (O(1)): use the pinned `current_bin_arc` to read
1713 // `next_index` directly, avoiding a root-to-leaf B-tree traversal on
1714 // every cursor step.
1715 //
1716 // Slow path (O(log N)): only taken when `current_bin_arc` is not yet
1717 // set (e.g. first advance after `get_first()` in an older code path).
1718 // We save the discovered arc so subsequent steps use the fast path.
1719 use noxu_tree::tree::TreeNode;
1720 let entry: Option<(Vec<u8>, Vec<u8>, i32, u64)>;
1721 let new_bin_arc: Option<
1722 std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
1723 >;
1724
1725 if let Some(bin_arc) = &self.current_bin_arc {
1726 // Fast path: pinned BIN — no tree traversal.
1727 {
1728 let g = bin_arc.read();
1729 if let TreeNode::Bottom(bin) = &*g {
1730 if next_index >= 0 && next_index < bin.entries.len() as i32
1731 {
1732 let idx = next_index as usize;
1733 entry = Some((
1734 bin.get_full_key(idx).unwrap_or_default(),
1735 bin.entries[idx].data.clone().unwrap_or_default(),
1736 next_index,
1737 bin.entries[idx].lsn.as_u64(),
1738 ));
1739 } else {
1740 entry = None; // BIN exhausted — fall through to cross-BIN
1741 }
1742 } else {
1743 entry = None;
1744 }
1745 }
1746 new_bin_arc = None;
1747 } else {
1748 // Slow path: traverse from root, then pin the discovered BIN.
1749 let current_key_slice_opt =
1750 self.current_key.as_deref().map(|s| s.to_vec());
1751 let db = self.db_impl.read();
1752 if let Some(tree) = db.get_real_tree() {
1753 if tree.is_empty() {
1754 entry = None;
1755 new_bin_arc = None;
1756 } else if let (Some(current_key), Some(root)) =
1757 (current_key_slice_opt.as_deref(), tree.get_root())
1758 {
1759 if let Some(bin_arc) =
1760 Self::find_bin_for_key(root, current_key)
1761 {
1762 // Clone so we can move the arc after the read guard is dropped.
1763 let arc_to_save = bin_arc.clone();
1764 {
1765 let g = bin_arc.read();
1766 if let TreeNode::Bottom(bin) = &*g {
1767 if next_index >= 0
1768 && next_index < bin.entries.len() as i32
1769 {
1770 let idx = next_index as usize;
1771 entry = Some((
1772 bin.get_full_key(idx)
1773 .unwrap_or_default(),
1774 bin.entries[idx]
1775 .data
1776 .clone()
1777 .unwrap_or_default(),
1778 next_index,
1779 bin.entries[idx].lsn.as_u64(),
1780 ));
1781 new_bin_arc = Some(arc_to_save);
1782 } else {
1783 entry = None;
1784 new_bin_arc = None;
1785 }
1786 } else {
1787 entry = None;
1788 new_bin_arc = None;
1789 }
1790 }
1791 } else {
1792 entry = None;
1793 new_bin_arc = None;
1794 }
1795 } else {
1796 entry = None;
1797 new_bin_arc = None;
1798 }
1799 } else {
1800 entry = None;
1801 new_bin_arc = None;
1802 }
1803 }
1804
1805 // Pin the BIN we discovered via the slow path.
1806 if new_bin_arc.is_some() {
1807 self.update_bin_pin(new_bin_arc);
1808 }
1809
1810 if let Some((key, data, idx, lsn)) = entry {
1811 // For dup-mode traversal modes, filter by primary key.
1812 if is_dup {
1813 let s = self.apply_dup_filter(
1814 key,
1815 data,
1816 idx,
1817 lsn,
1818 mode,
1819 current_primary_key.as_deref(),
1820 forward,
1821 )?;
1822 return Ok(s);
1823 }
1824 self.lock_ln(lsn)?;
1825 self.current_key = Some(key);
1826 self.current_data = Some(data);
1827 self.current_lsn = lsn;
1828 self.current_index = idx;
1829 return Ok(OperationStatus::Success);
1830 }
1831
1832 // Current BIN exhausted — cross to adjacent BIN.
1833 let anchor_key: Vec<u8> = match &self.current_key {
1834 Some(k) => k.clone(),
1835 None => return Ok(OperationStatus::NotFound),
1836 };
1837
1838 let adjacent_entries: Option<Vec<BinEntry>> = {
1839 let db = self.db_impl.read();
1840 if let Some(tree) = db.get_real_tree() {
1841 if forward {
1842 tree.get_next_bin(&anchor_key)
1843 } else {
1844 tree.get_prev_bin(&anchor_key)
1845 }
1846 } else {
1847 None
1848 }
1849 };
1850
1851 match adjacent_entries {
1852 Some(entries) if !entries.is_empty() => {
1853 let (raw_key, raw_data, idx, lsn) = if forward {
1854 let e = entries.into_iter().next().unwrap();
1855 (e.key, e.data.unwrap_or_default(), 0i32, e.lsn.as_u64())
1856 } else {
1857 let last_idx = (entries.len() - 1) as i32;
1858 let e = entries.into_iter().last().unwrap();
1859 (
1860 e.key,
1861 e.data.unwrap_or_default(),
1862 last_idx,
1863 e.lsn.as_u64(),
1864 )
1865 };
1866 if is_dup {
1867 let s = self.apply_dup_filter(
1868 raw_key,
1869 raw_data,
1870 idx,
1871 lsn,
1872 mode,
1873 current_primary_key.as_deref(),
1874 forward,
1875 )?;
1876 return Ok(s);
1877 }
1878 self.lock_ln(lsn)?;
1879 // Crossed into a new BIN — update the cursor pin.
1880 let new_key_ref = raw_key.clone();
1881 let bin_arc = {
1882 let db = self.db_impl.read();
1883 db.get_real_tree().and_then(|tree| {
1884 tree.get_root().and_then(|r| {
1885 Self::find_bin_for_key(r, &new_key_ref)
1886 })
1887 })
1888 };
1889 self.current_key = Some(raw_key);
1890 self.current_data = Some(raw_data);
1891 self.current_lsn = lsn;
1892 self.current_index = idx;
1893 self.update_bin_pin(bin_arc);
1894 Ok(OperationStatus::Success)
1895 }
1896 _ => {
1897 // Reached the end of the key space (no adjacent BIN).
1898 // T-F2: for a SERIALIZABLE forward scan, acquire RangeRead
1899 // on the per-database EOF sentinel so concurrent inserts of
1900 // keys past the current last key are blocked until this
1901 // transaction commits.
1902 if forward {
1903 self.lock_eof_for_scan()?;
1904 }
1905 Ok(OperationStatus::NotFound)
1906 }
1907 }
1908 }
1909
1910 /// Applies sorted-dup filtering rules after moving to `(raw_key, raw_data,
1911 /// idx)`.
1912 ///
1913 /// * `NextDup` / `PrevDup` — succeed only if the new entry's primary key
1914 /// equals the saved primary key; return NotFound otherwise.
1915 /// * `NextNoDup` / `PrevNoDup` — advance past all entries that share the
1916 /// same primary key as the saved position, returning the first entry with
1917 /// a DIFFERENT primary key.
1918 /// * `Next` / `Prev` — accept any entry.
1919 ///
1920 /// Wave 11-N (Bug 4): every accept site re-finds and pins the BIN that
1921 /// contains `raw_key`. Pre-fix the cross-BIN paths in this function
1922 /// updated `current_key` / `current_index` but left `current_bin_arc`
1923 /// pointing at the prior BIN, so the next `retrieve_next` fast-path
1924 /// would read `next_index = current_index + 1` from the old BIN —
1925 /// effectively re-emitting old entries and (for large secondary
1926 /// indexes) preventing the walk from terminating.
1927 fn apply_dup_filter(
1928 &mut self,
1929 mut raw_key: Vec<u8>,
1930 mut raw_data: Vec<u8>,
1931 mut idx: i32,
1932 mut lsn: u64,
1933 mode: GetMode,
1934 prev_primary_key: Option<&[u8]>,
1935 forward: bool,
1936 ) -> Result<OperationStatus, DbiError> {
1937 loop {
1938 let new_pk = dup_key_data::get_key(&raw_key);
1939 match mode {
1940 GetMode::NextDup | GetMode::PrevDup => {
1941 // Stay on the same primary key.
1942 let same = match (&new_pk, prev_primary_key) {
1943 (Some(npk), Some(ppk)) => npk.as_slice() == ppk,
1944 _ => false,
1945 };
1946 if same {
1947 self.lock_ln(lsn)?;
1948 let bin_arc = self.find_bin_arc_for_key(&raw_key);
1949 self.current_key = Some(raw_key);
1950 self.current_data = Some(raw_data);
1951 self.current_lsn = lsn;
1952 self.current_index = idx;
1953 self.update_bin_pin(bin_arc);
1954 return Ok(OperationStatus::Success);
1955 } else {
1956 return Ok(OperationStatus::NotFound);
1957 }
1958 }
1959 GetMode::NextNoDup | GetMode::PrevNoDup => {
1960 // Skip entries with the same primary key as `prev_primary_key`.
1961 let same = match (&new_pk, prev_primary_key) {
1962 (Some(npk), Some(ppk)) => npk.as_slice() == ppk,
1963 _ => false,
1964 };
1965 if !same {
1966 self.lock_ln(lsn)?;
1967 let bin_arc = self.find_bin_arc_for_key(&raw_key);
1968 self.current_key = Some(raw_key);
1969 self.current_data = Some(raw_data);
1970 self.current_lsn = lsn;
1971 self.current_index = idx;
1972 self.update_bin_pin(bin_arc);
1973 return Ok(OperationStatus::Success);
1974 }
1975 // Need to advance further.
1976 // Increment/decrement idx and try to read from the tree.
1977 if forward {
1978 idx += 1;
1979 } else {
1980 idx -= 1;
1981 }
1982 let next = {
1983 let db = self.db_impl.read();
1984 if let Some(tree) = db.get_real_tree() {
1985 if tree.is_empty() {
1986 None
1987 } else {
1988 use noxu_tree::tree::TreeNode;
1989 tree.get_root().and_then(|r| {
1990 // Use the current raw_key to find the BIN.
1991 let bin_arc =
1992 Self::find_bin_for_key(r, &raw_key)?;
1993 let g = bin_arc.read();
1994 match &*g {
1995 TreeNode::Bottom(bin) => {
1996 if idx < 0
1997 || idx
1998 >= bin.entries.len() as i32
1999 {
2000 None
2001 } else {
2002 let i = idx as usize;
2003 Some((
2004 bin.get_full_key(i)
2005 .unwrap_or_default(),
2006 bin.entries[i]
2007 .data
2008 .clone()
2009 .unwrap_or_default(),
2010 idx,
2011 bin.entries[i].lsn.as_u64(),
2012 ))
2013 }
2014 }
2015 _ => None,
2016 }
2017 })
2018 }
2019 } else {
2020 None
2021 }
2022 };
2023 match next {
2024 Some((k, d, i, l)) => {
2025 raw_key = k;
2026 raw_data = d;
2027 idx = i;
2028 lsn = l;
2029 // Loop continues.
2030 }
2031 None => {
2032 // BIN exhausted — cross to adjacent BIN.
2033 let anchor = raw_key.clone();
2034 let adj: Option<Vec<BinEntry>> = {
2035 let db = self.db_impl.read();
2036 if let Some(tree) = db.get_real_tree() {
2037 if forward {
2038 tree.get_next_bin(&anchor)
2039 } else {
2040 tree.get_prev_bin(&anchor)
2041 }
2042 } else {
2043 None
2044 }
2045 };
2046 match adj {
2047 Some(entries) if !entries.is_empty() => {
2048 let (k, d, i, l) = if forward {
2049 let e =
2050 entries.into_iter().next().unwrap();
2051 (
2052 e.key,
2053 e.data.unwrap_or_default(),
2054 0i32,
2055 e.lsn.as_u64(),
2056 )
2057 } else {
2058 let li = (entries.len() - 1) as i32;
2059 let e =
2060 entries.into_iter().last().unwrap();
2061 (
2062 e.key,
2063 e.data.unwrap_or_default(),
2064 li,
2065 e.lsn.as_u64(),
2066 )
2067 };
2068 raw_key = k;
2069 raw_data = d;
2070 idx = i;
2071 lsn = l;
2072 // Loop continues.
2073 }
2074 _ => return Ok(OperationStatus::NotFound),
2075 }
2076 }
2077 }
2078 }
2079 // Next / Prev: accept any entry.
2080 GetMode::Next | GetMode::Prev => {
2081 self.lock_ln(lsn)?;
2082 let bin_arc = self.find_bin_arc_for_key(&raw_key);
2083 self.current_key = Some(raw_key);
2084 self.current_data = Some(raw_data);
2085 self.current_lsn = lsn;
2086 self.current_index = idx;
2087 self.update_bin_pin(bin_arc);
2088 return Ok(OperationStatus::Success);
2089 }
2090 }
2091 }
2092 }
2093
2094 /// Descends from `node` to the BIN whose key range contains `key`.
2095 ///
2096 /// This mirrors the search path in `Tree::search()` — at each upper IN
2097 /// we follow the child slot with the largest key <= `key`. Returns the
2098 /// `Arc` of the matching BIN, or `None` if the tree is empty / malformed.
2099 fn find_bin_for_key(
2100 node: std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
2101 key: &[u8],
2102 ) -> Option<std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>>
2103 {
2104 use noxu_tree::tree::TreeNode;
2105 let mut current = node;
2106 loop {
2107 let (is_bin, child) = {
2108 let g = current.read();
2109 let is_bin = g.is_bin();
2110 let child = if !is_bin {
2111 match &*g {
2112 TreeNode::Internal(n) => {
2113 if n.entries.is_empty() {
2114 return None;
2115 }
2116 // Slot 0 carries a virtual key (-infinity); follow
2117 // the largest key <= search key (same logic as
2118 // Tree::search and Tree::insert_recursive).
2119 let mut idx = 0usize;
2120 for (i, entry) in n.entries.iter().enumerate() {
2121 if i == 0 {
2122 idx = 0;
2123 } else if entry.key.as_slice() <= key {
2124 idx = i;
2125 } else {
2126 break;
2127 }
2128 }
2129 n.entries.get(idx).and_then(|e| e.child.clone())
2130 }
2131 _ => None,
2132 }
2133 } else {
2134 None
2135 };
2136 (is_bin, child)
2137 };
2138 if is_bin {
2139 return Some(current);
2140 }
2141 current = child?;
2142 }
2143 }
2144
2145 /// Inserts or updates a record at the cursor position.
2146 ///
2147 /// Write path:
2148 ///
2149 /// 1. Checks state and, for `Current` mode, that the cursor is initialized.
2150 /// 2. For `NoOverwrite`: searches the tree; returns `KeyExist` if found.
2151 /// 3. Calls `Tree::insert(key, data, lsn)` to insert/update in the BIN.
2152 /// 4. Updates the cursor position to the newly written record.
2153 ///
2154 /// Note: locking (step 2 in the) and WAL logging (step 3 in the) are not
2155 /// yet wired here — they require LogManager integration (P0 gap).
2156 ///
2157 /// # Arguments
2158 ///
2159 /// * `key` - The key to insert/update
2160 /// * `data` - The data value
2161 /// * `put_mode` - The insertion mode
2162 ///
2163 /// # Returns
2164 ///
2165 /// * `Success` if the record was inserted/updated
2166 /// * `KeyExist` if NoOverwrite mode and key already exists
2167 pub fn put(
2168 &mut self,
2169 key: &[u8],
2170 data: &[u8],
2171 put_mode: PutMode,
2172 ) -> Result<OperationStatus, DbiError> {
2173 self.check_state()?;
2174
2175 // For sorted-dup databases: encode (key, data) as a two-part composite
2176 // key. The tree stores `combine(key, data)` with no slot data.
2177 // Dup path in 7.5.
2178 if self.is_sorted_dup() {
2179 return self.put_dup(key, data, put_mode);
2180 }
2181
2182 match put_mode {
2183 PutMode::Current => {
2184 self.check_initialized()?;
2185 let current_key = self
2186 .current_key
2187 .clone()
2188 .ok_or(DbiError::CursorNotInitialized)?;
2189 let (old_data, old_lsn) =
2190 self.get_slot_before_image(¤t_key);
2191 self.lock_write_before_log(old_lsn, ¤t_key)?;
2192 let new_lsn = self.log_ln_write(
2193 ¤t_key,
2194 Some(data),
2195 self.locker_id,
2196 )?;
2197 self.finalize_write_lock(
2198 old_lsn,
2199 new_lsn,
2200 Some(current_key.clone()),
2201 old_data,
2202 )?;
2203 self.apply_tree_insert(current_key, data.to_vec(), new_lsn);
2204 self.current_data = Some(data.to_vec());
2205 self.current_lsn = new_lsn.as_u64();
2206 Ok(OperationStatus::Success)
2207 }
2208 PutMode::NoOverwrite => {
2209 if self.key_exists_in_view(key) {
2210 return Ok(OperationStatus::KeyExist);
2211 }
2212 // New insert: old_lsn may be NULL (key did not exist
2213 // when we read the BIN above) OR may be a real LSN if
2214 // a concurrent thread inserted between our
2215 // `key_exists_in_view` check above and our
2216 // `get_slot_before_image` call here.
2217 let (old_data, old_lsn) = self.get_slot_before_image(key);
2218 // T-F2: acquire RangeInsert on the successor key's LSN so
2219 // concurrent SERIALIZABLE scanners that have already passed
2220 // this key's position are blocked until we commit. No-op
2221 // for non-serializable txns or updates (old_lsn != NULL).
2222 self.lock_range_insert(key, old_lsn)?;
2223 self.lock_write_before_log(old_lsn, key)?;
2224 // Re-check `key_exists_in_view` AFTER acquiring the
2225 // synthetic-key / per-LSN write lock. A concurrent
2226 // inserter for the same brand-new key may have
2227 // committed while we were either blocked on the
2228 // synthetic key lock (NULL_LSN insert race) OR
2229 // blocked on the slot's write lock that the other
2230 // inserter held until commit. In both cases we
2231 // must report `KeyExist` instead of overwriting,
2232 // because `NoOverwrite` semantics forbid silently
2233 // replacing an existing record. Closes the first
2234 // F12 residual end-to-end.
2235 if self.key_exists_in_view(key) {
2236 return Ok(OperationStatus::KeyExist);
2237 }
2238 let new_lsn =
2239 self.log_ln_write(key, Some(data), self.locker_id)?;
2240 self.finalize_write_lock(
2241 old_lsn,
2242 new_lsn,
2243 Some(key.to_vec()),
2244 old_data,
2245 )?;
2246 self.apply_tree_insert(key.to_vec(), data.to_vec(), new_lsn);
2247 self.current_key = Some(key.to_vec());
2248 self.current_data = Some(data.to_vec());
2249 self.current_lsn = new_lsn.as_u64();
2250 self.current_index = 0;
2251 self.state = CursorState::Initialized;
2252 Ok(OperationStatus::Success)
2253 }
2254 // NoDupData on a non-dup database behaves like NoOverwrite:
2255 // returns KeyExist if the key already exists, otherwise inserts.
2256 // `Cursor.putNoDupData()` non-dup branch.
2257 PutMode::NoDupData => {
2258 if self.key_exists_in_view(key) {
2259 return Ok(OperationStatus::KeyExist);
2260 }
2261 let (old_data, old_lsn) = self.get_slot_before_image(key);
2262 // T-F2: same as NoOverwrite path.
2263 self.lock_range_insert(key, old_lsn)?;
2264 self.lock_write_before_log(old_lsn, key)?;
2265 // See the NoOverwrite re-check above for rationale.
2266 if self.key_exists_in_view(key) {
2267 return Ok(OperationStatus::KeyExist);
2268 }
2269 let new_lsn =
2270 self.log_ln_write(key, Some(data), self.locker_id)?;
2271 self.finalize_write_lock(
2272 old_lsn,
2273 new_lsn,
2274 Some(key.to_vec()),
2275 old_data,
2276 )?;
2277 self.apply_tree_insert(key.to_vec(), data.to_vec(), new_lsn);
2278 self.current_key = Some(key.to_vec());
2279 self.current_data = Some(data.to_vec());
2280 self.current_lsn = new_lsn.as_u64();
2281 self.current_index = 0;
2282 self.state = CursorState::Initialized;
2283 Ok(OperationStatus::Success)
2284 }
2285 PutMode::Overwrite => {
2286 let (old_data, old_lsn) = self.get_slot_before_image(key);
2287 // T-F2: acquire RangeInsert if this is a brand-new key
2288 // (old_lsn == NULL_LSN). For existing-key updates the
2289 // Write lock on old_lsn already conflicts with RangeRead.
2290 self.lock_range_insert(key, old_lsn)?;
2291 self.lock_write_before_log(old_lsn, key)?;
2292 let new_lsn =
2293 self.log_ln_write(key, Some(data), self.locker_id)?;
2294 self.finalize_write_lock(
2295 old_lsn,
2296 new_lsn,
2297 Some(key.to_vec()),
2298 old_data,
2299 )?;
2300 self.apply_tree_insert(key.to_vec(), data.to_vec(), new_lsn);
2301 self.current_key = Some(key.to_vec());
2302 self.current_data = Some(data.to_vec());
2303 self.current_lsn = new_lsn.as_u64();
2304 self.current_index = 0;
2305 self.state = CursorState::Initialized;
2306 Ok(OperationStatus::Success)
2307 }
2308 }
2309 }
2310
2311 /// Sorted-dup variant of `put()`.
2312 ///
2313 /// Encodes (key, data) as a two-part composite key and stores it in the
2314 /// tree with empty slot data. The tree's custom comparator ensures
2315 /// correct ordering.
2316 ///
2317 /// Dup path from 7.5.
2318 /// Dup path from 7.5.
2319 fn put_dup(
2320 &mut self,
2321 key: &[u8],
2322 data: &[u8],
2323 put_mode: PutMode,
2324 ) -> Result<OperationStatus, DbiError> {
2325 let two_part_key = dup_key_data::combine(key, data);
2326
2327 match put_mode {
2328 // --- Current: replace the data of the currently-positioned record ---
2329 PutMode::Current => {
2330 // In dup mode, "current" is the two-part key at the cursor
2331 // position; replacing it means deleting the old two-part key
2332 // and inserting a new one (delete old, insert new).
2333 self.check_initialized()?;
2334 let old_key = self
2335 .current_key
2336 .clone()
2337 .ok_or(DbiError::CursorNotInitialized)?;
2338 let del_lsn =
2339 self.log_ln_write(&old_key, None, self.locker_id)?;
2340 self.apply_tree_delete(old_key, del_lsn);
2341 let new_lsn = self.log_ln_write(
2342 &two_part_key,
2343 Some(b""),
2344 self.locker_id,
2345 )?;
2346 self.apply_tree_insert(two_part_key.clone(), vec![], new_lsn);
2347 self.current_key = Some(two_part_key);
2348 self.current_data = None;
2349 self.current_lsn = new_lsn.as_u64();
2350 return Ok(OperationStatus::Success);
2351 }
2352 // --- Overwrite: insert or replace the exact (key, data) pair ---
2353 PutMode::Overwrite => {
2354 // SR9752 Part 2 (Wave 5): register the brand-new sorted-dup
2355 // insert with the cursor's txn / lock manager so abort-undo
2356 // can roll the dup back. Distinguish update vs. insert: if
2357 // the (key, data) pair already exists, this is a no-op for
2358 // the counter and the slot LSN moves; if the pair is new,
2359 // the abort-undo deletes the slot.
2360 let exists_old_lsn: u64 = {
2361 let db = self.db_impl.read();
2362 db.get_real_tree()
2363 .and_then(|tree| {
2364 Self::get_data_from_tree(&tree, &two_part_key)
2365 })
2366 .map(|(_, lsn)| lsn)
2367 .unwrap_or(noxu_util::NULL_LSN.as_u64())
2368 };
2369 self.lock_write_before_log(exists_old_lsn, &two_part_key)?;
2370 let new_lsn = self.log_ln_write(
2371 &two_part_key,
2372 Some(b""),
2373 self.locker_id,
2374 )?;
2375 self.finalize_write_lock(
2376 exists_old_lsn,
2377 new_lsn,
2378 Some(two_part_key.clone()),
2379 None,
2380 )?;
2381 self.apply_tree_insert(two_part_key.clone(), vec![], new_lsn);
2382 self.current_key = Some(two_part_key);
2383 self.current_data = None;
2384 self.current_lsn = new_lsn.as_u64();
2385 self.current_index = 0;
2386 self.state = CursorState::Initialized;
2387 return Ok(OperationStatus::Success);
2388 }
2389 // --- NoDupData: (key, data) pair uniqueness check ---
2390 PutMode::NoDupData => {
2391 // Return KeyExist if the exact (key, data) pair already exists.
2392 // Mirrors JE's Cursor.putNoDupData() semantics.
2393 let exists = {
2394 let db = self.db_impl.read();
2395 if let Some(tree) = db.get_real_tree() {
2396 tree.search(&two_part_key)
2397 .map(|sr| sr.exact_parent_found)
2398 .unwrap_or(false)
2399 } else {
2400 false
2401 }
2402 };
2403 if exists {
2404 return Ok(OperationStatus::KeyExist);
2405 }
2406 }
2407 // --- NoOverwrite: key-only uniqueness check (JE semantics) ---
2408 PutMode::NoOverwrite => {
2409 // JE invariant (DatabaseTest.testPutNoOverwriteInADupDb*):
2410 // once ANY (key, *) pair exists for this key, a putNoOverwrite
2411 // of the same key with ANY data value must return KEYEXIST.
2412 // This is different from NoDupData which checks (key,data).
2413 let key_exists = {
2414 let db = self.db_impl.read();
2415 if let Some(tree) = db.get_real_tree() {
2416 let lb = dup_key_data::lower_bound(key);
2417 tree.first_entry_at_or_after_with_index(&lb)
2418 .map(|(found_key, _, _, _, _)| {
2419 dup_key_data::matches_key(&found_key, key)
2420 })
2421 .unwrap_or(false)
2422 } else {
2423 false
2424 }
2425 };
2426 if key_exists {
2427 return Ok(OperationStatus::KeyExist);
2428 }
2429 }
2430 }
2431
2432 // --- Common insert path for NoDupData / NoOverwrite ---
2433 // Reached only when the existence check above passed (no early return).
2434 // v1.6 (Wave 2A): register the insert with the cursor's txn /
2435 // lock manager so abort-undo can roll back the new dup.
2436 // old_lsn is NULL_LSN: the existence check confirmed the pair is absent.
2437 let old_lsn = noxu_util::NULL_LSN.as_u64();
2438 self.lock_write_before_log(old_lsn, &two_part_key)?;
2439 let new_lsn =
2440 self.log_ln_write(&two_part_key, Some(b""), self.locker_id)?;
2441 self.finalize_write_lock(
2442 old_lsn,
2443 new_lsn,
2444 Some(two_part_key.clone()),
2445 None,
2446 )?;
2447 // Use apply_tree_insert so the per-database entry counter is bumped
2448 // on a new (key, data) pair — `Database::count()` reads this counter.
2449 self.apply_tree_insert(two_part_key.clone(), vec![], new_lsn);
2450 self.current_key = Some(two_part_key);
2451 self.current_data = None;
2452 self.current_lsn = new_lsn.as_u64();
2453 self.current_index = 0;
2454 self.state = CursorState::Initialized;
2455 Ok(OperationStatus::Success)
2456 }
2457
2458 /// Writes an LN (Leaf Node) log entry for a put or delete operation.
2459 ///
2460 /// Returns the LSN assigned to the entry, or NULL_LSN if no log manager
2461 /// is configured (e.g., read-only or test cursor).
2462 fn log_ln_write(
2463 &self,
2464 key: &[u8],
2465 data: Option<&[u8]>,
2466 txn_id: i64,
2467 ) -> Result<Lsn, DbiError> {
2468 // Deferred-write databases skip WAL logging entirely.
2469 // Data is flushed to disk only at eviction or checkpoint.
2470 // `CursorImpl.java` deferred-write check before logManager.log().
2471 if self.db_impl.read().is_deferred_write() {
2472 return Ok(noxu_util::NULL_LSN);
2473 }
2474
2475 let lm = match &self.log_manager {
2476 Some(lm) => lm,
2477 None => return Ok(noxu_util::NULL_LSN),
2478 };
2479
2480 let db_id = self.db_impl.read().get_id().id() as u64;
2481 let txn_id_opt = if txn_id != 0 { Some(txn_id) } else { None };
2482
2483 let entry = LnLogEntry::new(
2484 db_id,
2485 txn_id_opt,
2486 Lsn::from_u64(self.current_lsn), // abort_lsn: before-image LSN (current slot LSN before this write)
2487 false, // abort_known_deleted
2488 None, // abort_key
2489 None, // abort_data
2490 NULL_VLSN, // abort_vlsn
2491 0, // abort_expiration
2492 true, // embedded_ln
2493 key.to_vec(),
2494 data.map(|d| d.to_vec()),
2495 0, // expiration
2496 NULL_VLSN, // vlsn
2497 );
2498
2499 let mut buf = BytesMut::with_capacity(entry.log_size());
2500 entry.write_to_log(&mut buf);
2501
2502 let entry_type = if data.is_some() {
2503 if txn_id_opt.is_some() {
2504 LogEntryType::InsertLNTxn
2505 } else {
2506 LogEntryType::InsertLN
2507 }
2508 } else if txn_id_opt.is_some() {
2509 LogEntryType::DeleteLNTxn
2510 } else {
2511 LogEntryType::DeleteLN
2512 };
2513
2514 // Pass the previous slot LSN as old_lsn so the UtilizationTracker
2515 // marks the previous version obsolete (the: countObsoleteNode with oldLsn).
2516 let old_lsn_opt = if self.current_lsn != noxu_util::NULL_LSN.as_u64() {
2517 Some(Lsn::from_u64(self.current_lsn))
2518 } else {
2519 None
2520 };
2521
2522 lm.log_with_old_lsn(
2523 entry_type,
2524 &buf,
2525 Provisional::No,
2526 false,
2527 false,
2528 old_lsn_opt,
2529 )
2530 .map_err(DbiError::from)
2531 }
2532
2533 /// Deletes the record at the cursor position.
2534 ///
2535 /// Delete path:
2536 ///
2537 /// 1. Checks that the cursor is initialized.
2538 /// 2. Writes a DeleteLN log entry to the WAL (if log manager is present).
2539 /// 3. Calls `Tree::delete(key)` to remove the entry from the BIN.
2540 /// 4. Resets cursor to NotInitialized (matching behaviour).
2541 ///
2542 /// # Returns
2543 ///
2544 /// * `Success` if the record was deleted
2545 ///
2546 /// # Errors
2547 ///
2548 /// * `CursorNotInitialized` if cursor is not positioned
2549 /// * `CursorClosed` if cursor has been closed
2550 pub fn delete(&mut self) -> Result<OperationStatus, DbiError> {
2551 self.check_initialized()?;
2552
2553 // For sorted-dup databases, current_key IS the two-part composite key
2554 // stored in the tree. For non-dup databases it is the plain key.
2555 // In both cases current_key is the correct tree-delete key.
2556 if let Some(tree_key) = self.current_key.clone() {
2557 let (old_data, old_lsn) = self.get_slot_before_image(&tree_key);
2558 self.lock_write_before_log(old_lsn, &tree_key)?;
2559 // Wave 5: also hold a synthetic-key write lock for the
2560 // duration of the txn so concurrent readers that probe the
2561 // BIN post-physical-removal can detect contention via
2562 // `contest_synthetic_key_for_missing_read`.
2563 self.lock_synthetic_key_for_delete(&tree_key)?;
2564 let del_lsn = self.log_ln_write(&tree_key, None, self.locker_id)?;
2565 self.finalize_write_lock(
2566 old_lsn,
2567 del_lsn,
2568 Some(tree_key.clone()),
2569 old_data,
2570 )?;
2571 self.apply_tree_delete(tree_key, del_lsn);
2572 }
2573
2574 self.current_key = None;
2575 self.current_data = None;
2576 self.current_lsn = noxu_util::NULL_LSN.as_u64();
2577 self.current_index = -1;
2578 self.state = CursorState::NotInitialized;
2579
2580 Ok(OperationStatus::Success)
2581 }
2582
2583 /// Counts the number of duplicates at the current key position.
2584 ///
2585 /// For sorted-dup databases, traverses all records sharing the same
2586 /// primary key. For non-dup databases, returns 1 if positioned.
2587 ///
2588 /// 7.5.
2589 ///
2590 /// # Returns
2591 ///
2592 /// The count of duplicate records at the current key.
2593 ///
2594 /// # Errors
2595 ///
2596 /// * `CursorNotInitialized` if cursor is not positioned
2597 /// * `CursorClosed` if cursor has been closed
2598 pub fn count(&self) -> Result<i64, DbiError> {
2599 self.check_initialized()?;
2600
2601 // For sorted-dup databases, count all entries sharing the same primary
2602 // key as the current position.
2603 //
2604 // Strategy (Wave 11-N Bug 1 fix): clone the cursor at the current
2605 // position, walk backward with PrevDup until NotFound (which leaves
2606 // scratch on the FIRST dup of the primary), then walk forward with
2607 // NextDup counting successful steps. The total count is
2608 // `forward + 1` because the forward walk visits every dup *after*
2609 // the first, plus the one scratch is parked on at the start of the
2610 // forward walk.
2611 //
2612 // Pre-fix the formula was `backward + 1 + forward`, which double
2613 // counted: the backward walk left scratch on the first dup
2614 // already, so the forward walk re-traverses every dup including
2615 // the original position. The result for an N-dup primary observed
2616 // at offset `i` was `i + N` instead of `N`.
2617 if self.is_sorted_dup() {
2618 let mut scratch = self.dup(true)?;
2619 // Walk backward to the first dup of this primary. We do not
2620 // count these steps — they are pure repositioning.
2621 while let Ok(OperationStatus::Success) =
2622 scratch.retrieve_next(GetMode::PrevDup)
2623 {}
2624 // scratch is now parked on the first dup of this primary.
2625 let mut forward: i64 = 0;
2626 while let Ok(OperationStatus::Success) =
2627 scratch.retrieve_next(GetMode::NextDup)
2628 {
2629 forward += 1;
2630 }
2631 return Ok(forward + 1);
2632 }
2633
2634 Ok(1)
2635 }
2636
2637 /// Creates a duplicate of this cursor at the same position.
2638 ///
2639 /// If `same_position` is true, the new cursor is positioned at the
2640 /// same record as this cursor. Otherwise, the new cursor is created
2641 /// in the NotInitialized state.
2642 ///
2643 /// The duplicated cursor shares the same locker (transaction) as
2644 /// the original cursor.
2645 ///
2646 /// # Arguments
2647 ///
2648 /// * `same_position` - Whether to copy the current position
2649 ///
2650 /// # Returns
2651 ///
2652 /// A new CursorImpl with the same or uninitialized position.
2653 ///
2654 /// # Errors
2655 ///
2656 /// * `CursorClosed` if the cursor has been closed
2657 pub fn dup(&self, same_position: bool) -> Result<CursorImpl, DbiError> {
2658 self.check_state()?;
2659
2660 let mut new_cursor = match &self.log_manager {
2661 Some(lm) => CursorImpl::with_log_manager(
2662 self.db_impl.clone(),
2663 self.locker_id,
2664 lm.clone(),
2665 ),
2666 None => CursorImpl::new(self.db_impl.clone(), self.locker_id),
2667 };
2668 if let Some(lm) = &self.lock_manager {
2669 new_cursor.lock_manager = Some(lm.clone());
2670 }
2671
2672 if same_position && self.state == CursorState::Initialized {
2673 new_cursor.current_key = self.current_key.clone();
2674 new_cursor.current_data = self.current_data.clone();
2675 new_cursor.current_lsn = self.current_lsn;
2676 new_cursor.current_index = self.current_index;
2677 new_cursor.state = CursorState::Initialized;
2678 }
2679
2680 Ok(new_cursor)
2681 }
2682
2683 /// Closes the cursor.
2684 ///
2685 /// Releases all resources held by the cursor, including any BIN latches
2686 /// and cursor-level locks. After closing, all operations on the cursor
2687 /// will return `CursorClosed` errors.
2688 ///
2689 /// Closing a cursor multiple times is safe and has no effect after the
2690 /// Updates the cursor's BIN pin when moving to a new BIN.
2691 ///
2692 /// Decrements on the old BIN (if any) and increments it
2693 /// on (if ). No-op when the cursor stays on the same BIN
2694 /// (pointer equality checked via ).
2695 /// Re-descends the tree to find the BIN that contains `key`. Used
2696 /// by the sorted-dup cross-BIN paths in `apply_dup_filter` to
2697 /// re-pin `current_bin_arc` after a BIN boundary is crossed.
2698 fn find_bin_arc_for_key(
2699 &self,
2700 key: &[u8],
2701 ) -> Option<std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>>
2702 {
2703 let db = self.db_impl.read();
2704 let tree = db.get_real_tree()?;
2705 let root = tree.get_root()?;
2706 Self::find_bin_for_key(root, key)
2707 }
2708
2709 ///
2710 /// Matching /
2711 /// calls in cursor positioning.
2712 fn update_bin_pin(
2713 &mut self,
2714 new_bin: Option<
2715 std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::tree::TreeNode>>,
2716 >,
2717 ) {
2718 // Same BIN — nothing to do.
2719 match (&self.current_bin_arc, &new_bin) {
2720 (Some(old), Some(new)) if std::sync::Arc::ptr_eq(old, new) => {
2721 return;
2722 }
2723 _ => {}
2724 }
2725 // Unpin old BIN.
2726 if let Some(old_arc) = self.current_bin_arc.take() {
2727 noxu_tree::Tree::unpin_bin(&old_arc);
2728 }
2729 // Pin new BIN.
2730 if let Some(ref new_arc) = new_bin {
2731 noxu_tree::Tree::pin_bin(new_arc);
2732 }
2733 self.current_bin_arc = new_bin;
2734 }
2735
2736 /// first close.
2737 ///
2738 /// # Returns
2739 ///
2740 /// always (never fails).
2741 pub fn close(&mut self) -> Result<(), DbiError> {
2742 if self.state == CursorState::Closed {
2743 return Ok(());
2744 }
2745
2746 // Release BIN pin — prevents evictor from seeing a stale cursor_count.
2747 self.update_bin_pin(None);
2748
2749 self.current_key = None;
2750 self.current_data = None;
2751 self.current_lsn = noxu_util::NULL_LSN.as_u64();
2752 self.current_index = -1;
2753 self.state = CursorState::Closed;
2754
2755 Ok(())
2756 }
2757}
2758
2759impl Drop for CursorImpl {
2760 /// Ensures the cursor is closed when dropped.
2761 ///
2762 /// This provides automatic cleanup if the user forgets to explicitly
2763 /// close the cursor. Note that it's still better practice to call
2764 /// close() explicitly to handle potential errors.
2765 fn drop(&mut self) {
2766 if self.state != CursorState::Closed {
2767 let _ = self.close();
2768 }
2769 }
2770}
2771
2772#[cfg(test)]
2773#[expect(clippy::field_reassign_with_default)]
2774mod tests {
2775 use super::*;
2776 use crate::{DatabaseConfig, DatabaseId, DbType};
2777
2778 /// Creates a test DatabaseImpl for cursor testing.
2779 fn create_test_database() -> Arc<RwLock<DatabaseImpl>> {
2780 let db_id = DatabaseId::new(1);
2781 let config = DatabaseConfig::default();
2782 let db_impl = DatabaseImpl::new(
2783 db_id,
2784 "test_db".to_string(),
2785 DbType::User,
2786 &config,
2787 );
2788 Arc::new(RwLock::new(db_impl))
2789 }
2790
2791 #[test]
2792 fn test_new_cursor_not_initialized() {
2793 let db = create_test_database();
2794 let cursor = CursorImpl::new(db, 100);
2795
2796 assert!(!cursor.is_initialized());
2797 assert!(!cursor.is_closed());
2798 assert_eq!(cursor.get_locker_id(), 100);
2799 assert!(cursor.get_current_key().is_none());
2800 assert!(cursor.get_current_data().is_none());
2801 }
2802
2803 #[test]
2804 fn test_search_positions_cursor() {
2805 let db = create_test_database();
2806 let mut cursor = CursorImpl::new(db, 100);
2807
2808 let key = b"test_key";
2809 let data = b"test_data";
2810
2811 // Insert into tree first, then search.
2812 cursor.put(key, data, PutMode::Overwrite).unwrap();
2813 let status = cursor.search(key, Some(data), SearchMode::Set).unwrap();
2814 assert_eq!(status, OperationStatus::Success);
2815 assert!(cursor.is_initialized());
2816 assert_eq!(cursor.get_current_key(), Some(key.as_slice()));
2817 assert_eq!(cursor.get_current_data(), Some(data.as_slice()));
2818 }
2819
2820 #[test]
2821 fn test_get_current_after_search() {
2822 let db = create_test_database();
2823 let mut cursor = CursorImpl::new(db, 100);
2824
2825 let key = b"my_key";
2826 let data = b"my_data";
2827
2828 // Insert into tree first, then search.
2829 cursor.put(key, data, PutMode::Overwrite).unwrap();
2830 cursor.search(key, Some(data), SearchMode::Set).unwrap();
2831 let (ret_key, ret_data) = cursor.get_current().unwrap();
2832
2833 assert_eq!(ret_key, key);
2834 assert_eq!(ret_data, data);
2835 }
2836
2837 #[test]
2838 fn test_get_current_before_initialization() {
2839 let db = create_test_database();
2840 let cursor = CursorImpl::new(db, 100);
2841
2842 let result = cursor.get_current();
2843 assert!(matches!(result, Err(DbiError::CursorNotInitialized)));
2844 }
2845
2846 #[test]
2847 fn test_retrieve_next_from_uninitialized() {
2848 let db = create_test_database();
2849 let mut cursor = CursorImpl::new(db, 100);
2850
2851 let status = cursor.retrieve_next(GetMode::Next).unwrap();
2852 assert_eq!(status, OperationStatus::NotFound);
2853 }
2854
2855 #[test]
2856 fn test_put_overwrite() {
2857 let db = create_test_database();
2858 let mut cursor = CursorImpl::new(db, 100);
2859
2860 let key = b"key1";
2861 let data = b"data1";
2862
2863 let status = cursor.put(key, data, PutMode::Overwrite).unwrap();
2864 assert_eq!(status, OperationStatus::Success);
2865 assert!(cursor.is_initialized());
2866 assert_eq!(cursor.get_current_key(), Some(key.as_slice()));
2867 }
2868
2869 #[test]
2870 fn test_put_no_overwrite_when_key_exists() {
2871 let db = create_test_database();
2872 let mut cursor = CursorImpl::new(db, 100);
2873
2874 let key = b"key1";
2875 let data1 = b"data1";
2876 let data2 = b"data2";
2877
2878 // First put succeeds
2879 cursor.put(key, data1, PutMode::Overwrite).unwrap();
2880
2881 // Second put with NoOverwrite should return KeyExist
2882 let status = cursor.put(key, data2, PutMode::NoOverwrite).unwrap();
2883 assert_eq!(status, OperationStatus::KeyExist);
2884 }
2885
2886 #[test]
2887 fn test_put_current_requires_initialization() {
2888 let db = create_test_database();
2889 let mut cursor = CursorImpl::new(db, 100);
2890
2891 let key = b"key1";
2892 let data = b"data1";
2893
2894 let result = cursor.put(key, data, PutMode::Current);
2895 assert!(matches!(result, Err(DbiError::CursorNotInitialized)));
2896 }
2897
2898 #[test]
2899 fn test_put_current_after_initialization() {
2900 let db = create_test_database();
2901 let mut cursor = CursorImpl::new(db, 100);
2902
2903 let key = b"key1";
2904 let data1 = b"data1";
2905 let data2 = b"data2";
2906
2907 // Insert first, then search to position cursor, then update with Current mode.
2908 cursor.put(key, data1, PutMode::Overwrite).unwrap();
2909 cursor.search(key, Some(data1), SearchMode::Set).unwrap();
2910
2911 // Update with Current mode
2912 let status = cursor.put(key, data2, PutMode::Current).unwrap();
2913 assert_eq!(status, OperationStatus::Success);
2914 assert_eq!(cursor.get_current_data(), Some(data2.as_slice()));
2915 }
2916
2917 #[test]
2918 fn test_delete_requires_initialization() {
2919 let db = create_test_database();
2920 let mut cursor = CursorImpl::new(db, 100);
2921
2922 let result = cursor.delete();
2923 assert!(matches!(result, Err(DbiError::CursorNotInitialized)));
2924 }
2925
2926 #[test]
2927 fn test_delete_resets_state() {
2928 let db = create_test_database();
2929 let mut cursor = CursorImpl::new(db, 100);
2930
2931 let key = b"key1";
2932 let data = b"data1";
2933
2934 // Insert, search to position, then delete.
2935 cursor.put(key, data, PutMode::Overwrite).unwrap();
2936 cursor.search(key, Some(data), SearchMode::Set).unwrap();
2937 assert!(cursor.is_initialized());
2938
2939 // Delete
2940 let status = cursor.delete().unwrap();
2941 assert_eq!(status, OperationStatus::Success);
2942 assert!(!cursor.is_initialized());
2943 assert!(cursor.get_current_key().is_none());
2944 }
2945
2946 #[test]
2947 fn test_dup_with_same_position() {
2948 let db = create_test_database();
2949 let mut cursor = CursorImpl::new(db, 100);
2950
2951 let key = b"key1";
2952 let data = b"data1";
2953
2954 // Insert, search to position, then dup.
2955 cursor.put(key, data, PutMode::Overwrite).unwrap();
2956 cursor.search(key, Some(data), SearchMode::Set).unwrap();
2957
2958 // Duplicate with same position
2959 let dup_cursor = cursor.dup(true).unwrap();
2960 assert!(dup_cursor.is_initialized());
2961 assert_eq!(dup_cursor.get_current_key(), Some(key.as_slice()));
2962 assert_eq!(dup_cursor.get_current_data(), Some(data.as_slice()));
2963 assert_eq!(dup_cursor.get_locker_id(), 100);
2964
2965 // Should have different IDs
2966 assert_ne!(cursor.get_id(), dup_cursor.get_id());
2967 }
2968
2969 #[test]
2970 fn test_dup_without_same_position() {
2971 let db = create_test_database();
2972 let mut cursor = CursorImpl::new(db, 100);
2973
2974 let key = b"key1";
2975 let data = b"data1";
2976
2977 // Insert, search to position, then dup without position.
2978 cursor.put(key, data, PutMode::Overwrite).unwrap();
2979 cursor.search(key, Some(data), SearchMode::Set).unwrap();
2980
2981 // Duplicate without position
2982 let dup_cursor = cursor.dup(false).unwrap();
2983 assert!(!dup_cursor.is_initialized());
2984 assert!(dup_cursor.get_current_key().is_none());
2985 assert_eq!(dup_cursor.get_locker_id(), 100);
2986 }
2987
2988 #[test]
2989 fn test_close_sets_state() {
2990 let db = create_test_database();
2991 let mut cursor = CursorImpl::new(db, 100);
2992
2993 cursor.close().unwrap();
2994 assert!(cursor.is_closed());
2995 }
2996
2997 #[test]
2998 fn test_operations_after_close() {
2999 let db = create_test_database();
3000 let mut cursor = CursorImpl::new(db, 100);
3001
3002 cursor.close().unwrap();
3003
3004 // All operations should return CursorClosed
3005 assert!(matches!(
3006 cursor.search(b"key", None, SearchMode::Set),
3007 Err(DbiError::CursorClosed)
3008 ));
3009 assert!(matches!(cursor.get_current(), Err(DbiError::CursorClosed)));
3010 assert!(matches!(
3011 cursor.retrieve_next(GetMode::Next),
3012 Err(DbiError::CursorClosed)
3013 ));
3014 assert!(matches!(
3015 cursor.put(b"key", b"data", PutMode::Overwrite),
3016 Err(DbiError::CursorClosed)
3017 ));
3018 assert!(matches!(cursor.delete(), Err(DbiError::CursorClosed)));
3019 assert!(matches!(cursor.count(), Err(DbiError::CursorClosed)));
3020 assert!(matches!(cursor.dup(true), Err(DbiError::CursorClosed)));
3021 }
3022
3023 #[test]
3024 fn test_close_idempotent() {
3025 let db = create_test_database();
3026 let mut cursor = CursorImpl::new(db, 100);
3027
3028 cursor.close().unwrap();
3029 cursor.close().unwrap(); // Should not panic
3030 assert!(cursor.is_closed());
3031 }
3032
3033 #[test]
3034 fn test_drop_calls_close() {
3035 let db = create_test_database();
3036 let mut cursor = CursorImpl::new(db.clone(), 100);
3037
3038 let key = b"key1";
3039 let data = b"data1";
3040 cursor.put(key, data, PutMode::Overwrite).unwrap();
3041 cursor.search(key, Some(data), SearchMode::Set).unwrap();
3042
3043 // Drop without explicit close
3044 drop(cursor);
3045
3046 // Create another cursor to verify no issues
3047 let cursor2 = CursorImpl::new(db, 200);
3048 assert!(!cursor2.is_closed());
3049 }
3050
3051 #[test]
3052 fn test_count_returns_one() {
3053 let db = create_test_database();
3054 let mut cursor = CursorImpl::new(db, 100);
3055
3056 let key = b"key1";
3057 let data = b"data1";
3058 cursor.put(key, data, PutMode::Overwrite).unwrap();
3059 cursor.search(key, Some(data), SearchMode::Set).unwrap();
3060
3061 let count = cursor.count().unwrap();
3062 assert_eq!(count, 1);
3063 }
3064
3065 #[test]
3066 fn test_unique_cursor_ids() {
3067 let db = create_test_database();
3068 let cursor1 = CursorImpl::new(db.clone(), 100);
3069 let cursor2 = CursorImpl::new(db.clone(), 100);
3070 let cursor3 = CursorImpl::new(db, 100);
3071
3072 assert_ne!(cursor1.get_id(), cursor2.get_id());
3073 assert_ne!(cursor2.get_id(), cursor3.get_id());
3074 assert_ne!(cursor1.get_id(), cursor3.get_id());
3075 }
3076
3077 // -----------------------------------------------------------------------
3078 // New unit tests for real B-tree traversal (get_first, get_last,
3079 // retrieve_next).
3080 // -----------------------------------------------------------------------
3081
3082 /// get_first on an empty database returns NotFound.
3083 ///
3084 /// positionFirstOrLast on an empty tree.
3085 #[test]
3086 fn test_get_first_empty_tree() {
3087 let db = create_test_database();
3088 let mut cursor = CursorImpl::new(db, 100);
3089 let status = cursor.get_first().unwrap();
3090 assert_eq!(status, OperationStatus::NotFound);
3091 }
3092
3093 /// get_last on an empty database returns NotFound.
3094 #[test]
3095 fn test_get_last_empty_tree() {
3096 let db = create_test_database();
3097 let mut cursor = CursorImpl::new(db, 100);
3098 let status = cursor.get_last().unwrap();
3099 assert_eq!(status, OperationStatus::NotFound);
3100 }
3101
3102 /// get_first positions at smallest key after multiple puts.
3103 #[test]
3104 fn test_get_first_after_multiple_puts() {
3105 let db = create_test_database();
3106 let mut cursor = CursorImpl::new(db, 100);
3107
3108 cursor.put(b"mango", b"m", PutMode::Overwrite).unwrap();
3109 cursor.put(b"apple", b"a", PutMode::Overwrite).unwrap();
3110 cursor.put(b"kiwi", b"k", PutMode::Overwrite).unwrap();
3111
3112 let s = cursor.get_first().unwrap();
3113 assert_eq!(s, OperationStatus::Success);
3114 assert_eq!(cursor.get_current_key(), Some(b"apple".as_slice()));
3115 assert_eq!(cursor.get_current_data(), Some(b"a".as_slice()));
3116 }
3117
3118 /// get_last positions at largest key after multiple puts.
3119 #[test]
3120 fn test_get_last_after_multiple_puts() {
3121 let db = create_test_database();
3122 let mut cursor = CursorImpl::new(db, 100);
3123
3124 cursor.put(b"apple", b"a", PutMode::Overwrite).unwrap();
3125 cursor.put(b"mango", b"m", PutMode::Overwrite).unwrap();
3126 cursor.put(b"kiwi", b"k", PutMode::Overwrite).unwrap();
3127
3128 let s = cursor.get_last().unwrap();
3129 assert_eq!(s, OperationStatus::Success);
3130 assert_eq!(cursor.get_current_key(), Some(b"mango".as_slice()));
3131 assert_eq!(cursor.get_current_data(), Some(b"m".as_slice()));
3132 }
3133
3134 /// retrieve_next(Next) advances forward through the BIN.
3135 ///
3136 #[test]
3137 fn test_retrieve_next_forward() {
3138 let db = create_test_database();
3139 let mut cursor = CursorImpl::new(db, 100);
3140
3141 cursor.put(b"a", b"1", PutMode::Overwrite).unwrap();
3142 cursor.put(b"b", b"2", PutMode::Overwrite).unwrap();
3143 cursor.put(b"c", b"3", PutMode::Overwrite).unwrap();
3144
3145 cursor.get_first().unwrap();
3146 assert_eq!(cursor.get_current_key(), Some(b"a".as_slice()));
3147
3148 let s = cursor.retrieve_next(GetMode::Next).unwrap();
3149 assert_eq!(s, OperationStatus::Success);
3150 assert_eq!(cursor.get_current_key(), Some(b"b".as_slice()));
3151
3152 let s = cursor.retrieve_next(GetMode::Next).unwrap();
3153 assert_eq!(s, OperationStatus::Success);
3154 assert_eq!(cursor.get_current_key(), Some(b"c".as_slice()));
3155
3156 let s = cursor.retrieve_next(GetMode::Next).unwrap();
3157 assert_eq!(s, OperationStatus::NotFound, "should be exhausted");
3158 }
3159
3160 /// retrieve_next(Prev) traverses backward through the BIN.
3161 ///
3162 #[test]
3163 fn test_retrieve_next_backward() {
3164 let db = create_test_database();
3165 let mut cursor = CursorImpl::new(db, 100);
3166
3167 cursor.put(b"a", b"1", PutMode::Overwrite).unwrap();
3168 cursor.put(b"b", b"2", PutMode::Overwrite).unwrap();
3169 cursor.put(b"c", b"3", PutMode::Overwrite).unwrap();
3170
3171 cursor.get_last().unwrap();
3172 assert_eq!(cursor.get_current_key(), Some(b"c".as_slice()));
3173
3174 let s = cursor.retrieve_next(GetMode::Prev).unwrap();
3175 assert_eq!(s, OperationStatus::Success);
3176 assert_eq!(cursor.get_current_key(), Some(b"b".as_slice()));
3177
3178 let s = cursor.retrieve_next(GetMode::Prev).unwrap();
3179 assert_eq!(s, OperationStatus::Success);
3180 assert_eq!(cursor.get_current_key(), Some(b"a".as_slice()));
3181
3182 let s = cursor.retrieve_next(GetMode::Prev).unwrap();
3183 assert_eq!(s, OperationStatus::NotFound, "should be exhausted");
3184 }
3185
3186 /// A single key: get_first succeeds; retrieve_next(Next) returns NotFound.
3187 #[test]
3188 fn test_single_entry_traversal() {
3189 let db = create_test_database();
3190 let mut cursor = CursorImpl::new(db, 100);
3191
3192 cursor.put(b"only", b"val", PutMode::Overwrite).unwrap();
3193
3194 let s = cursor.get_first().unwrap();
3195 assert_eq!(s, OperationStatus::Success);
3196 assert_eq!(cursor.get_current_key(), Some(b"only".as_slice()));
3197
3198 let s = cursor.retrieve_next(GetMode::Next).unwrap();
3199 assert_eq!(s, OperationStatus::NotFound);
3200 }
3201
3202 /// retrieve_next from NotInitialized state returns NotFound (not an error).
3203 ///
3204 /// The: getNext asserts mustBeInitialized; we convert this to
3205 /// NotFound per Rust convention.
3206 #[test]
3207 fn test_retrieve_next_from_not_initialized_returns_not_found() {
3208 let db = create_test_database();
3209 let mut cursor = CursorImpl::new(db, 100);
3210
3211 let s = cursor.retrieve_next(GetMode::Next).unwrap();
3212 assert_eq!(s, OperationStatus::NotFound);
3213 }
3214
3215 /// put + NoOverwrite returns KeyExist when key is already in the tree.
3216 #[test]
3217 fn test_put_no_overwrite_tree_check() {
3218 let db = create_test_database();
3219 let mut cursor = CursorImpl::new(db, 100);
3220
3221 cursor.put(b"key", b"v1", PutMode::Overwrite).unwrap();
3222 let s = cursor.put(b"key", b"v2", PutMode::NoOverwrite).unwrap();
3223 assert_eq!(s, OperationStatus::KeyExist);
3224
3225 // Verify original value is still there.
3226 cursor.search(b"key", None, SearchMode::Set).unwrap();
3227 let (_, data) = cursor.get_current().unwrap();
3228 assert_eq!(data, b"v1");
3229 }
3230
3231 /// After delete the tree no longer contains the key (search returns NotFound).
3232 #[test]
3233 fn test_delete_removes_from_tree() {
3234 let db = create_test_database();
3235 let mut cursor = CursorImpl::new(db, 100);
3236
3237 cursor.put(b"key", b"val", PutMode::Overwrite).unwrap();
3238 cursor.search(b"key", None, SearchMode::Set).unwrap();
3239 cursor.delete().unwrap();
3240
3241 let s = cursor.search(b"key", None, SearchMode::Set).unwrap();
3242 assert_eq!(s, OperationStatus::NotFound);
3243 }
3244
3245 /// Range search: positions at the first key >= search key.
3246 #[test]
3247 fn test_search_set_range_finds_ge_key() {
3248 let db = create_test_database();
3249 let mut cursor = CursorImpl::new(db, 100);
3250
3251 cursor.put(b"aaa", b"1", PutMode::Overwrite).unwrap();
3252 cursor.put(b"bbb", b"2", PutMode::Overwrite).unwrap();
3253 cursor.put(b"ccc", b"3", PutMode::Overwrite).unwrap();
3254
3255 // Search for "bb" (not present) — should land on "bbb".
3256 let s = cursor.search(b"bb", None, SearchMode::SetRange).unwrap();
3257 assert_eq!(s, OperationStatus::Success);
3258 assert_eq!(cursor.get_current_key(), Some(b"bbb".as_slice()));
3259 }
3260
3261 /// Range search beyond all keys returns NotFound.
3262 #[test]
3263 fn test_search_set_range_beyond_all_keys() {
3264 let db = create_test_database();
3265 let mut cursor = CursorImpl::new(db, 100);
3266
3267 cursor.put(b"aaa", b"1", PutMode::Overwrite).unwrap();
3268 cursor.put(b"bbb", b"2", PutMode::Overwrite).unwrap();
3269
3270 let s = cursor.search(b"zzz", None, SearchMode::SetRange).unwrap();
3271 assert_eq!(s, OperationStatus::NotFound);
3272 }
3273
3274 // -----------------------------------------------------------------------
3275 // Sorted-duplicate key tests
3276 // -----------------------------------------------------------------------
3277
3278 fn create_dup_database() -> Arc<RwLock<DatabaseImpl>> {
3279 let db_id = DatabaseId::new(2);
3280 let mut config = DatabaseConfig::default();
3281 config.sorted_duplicates = true;
3282 let db_impl = DatabaseImpl::new(
3283 db_id,
3284 "dup_test_db".to_string(),
3285 DbType::User,
3286 &config,
3287 );
3288 Arc::new(RwLock::new(db_impl))
3289 }
3290
3291 /// Basic put + get_current round-trip for sorted-dup database.
3292 ///
3293 /// `DupKeyDataTest.testCombineSplit()`.
3294 #[test]
3295 fn test_dup_put_and_get_current() {
3296 let db = create_dup_database();
3297 let mut cursor = CursorImpl::new(db, 1);
3298
3299 let s = cursor.put(b"key", b"data", PutMode::Overwrite).unwrap();
3300 assert_eq!(s, OperationStatus::Success);
3301
3302 let (pk, d) = cursor.get_current().unwrap();
3303 assert_eq!(pk, b"key");
3304 assert_eq!(d, b"data");
3305 }
3306
3307 /// Multiple data values for the same primary key.
3308 ///
3309 /// `SortedDuplicatesTest.testMultipleDups()`.
3310 #[test]
3311 fn test_dup_multiple_data_per_key() {
3312 let db = create_dup_database();
3313 let mut cursor = CursorImpl::new(db, 1);
3314
3315 cursor.put(b"key", b"aaa", PutMode::Overwrite).unwrap();
3316 cursor.put(b"key", b"bbb", PutMode::Overwrite).unwrap();
3317 cursor.put(b"key", b"ccc", PutMode::Overwrite).unwrap();
3318
3319 // search Set: positions at the first entry for "key"
3320 let s = cursor.search(b"key", None, SearchMode::Set).unwrap();
3321 assert_eq!(s, OperationStatus::Success);
3322
3323 let (pk, d) = cursor.get_current().unwrap();
3324 assert_eq!(pk, b"key");
3325 assert_eq!(d, b"aaa", "first dup should have smallest data");
3326 }
3327
3328 /// search Both: positions at the exact (key, data) pair.
3329 ///
3330 /// `CursorImpl.searchBothExact()` dup path.
3331 #[test]
3332 fn test_dup_search_both_exact() {
3333 let db = create_dup_database();
3334 let mut cursor = CursorImpl::new(db, 1);
3335
3336 cursor.put(b"key", b"aaa", PutMode::Overwrite).unwrap();
3337 cursor.put(b"key", b"bbb", PutMode::Overwrite).unwrap();
3338 cursor.put(b"key", b"ccc", PutMode::Overwrite).unwrap();
3339
3340 let s = cursor.search(b"key", Some(b"bbb"), SearchMode::Both).unwrap();
3341 assert_eq!(s, OperationStatus::Success);
3342 let (pk, d) = cursor.get_current().unwrap();
3343 assert_eq!(pk, b"key");
3344 assert_eq!(d, b"bbb");
3345 }
3346
3347 /// search Both: returns NotFound when exact pair doesn't exist.
3348 #[test]
3349 fn test_dup_search_both_not_found() {
3350 let db = create_dup_database();
3351 let mut cursor = CursorImpl::new(db, 1);
3352
3353 cursor.put(b"key", b"aaa", PutMode::Overwrite).unwrap();
3354
3355 let s = cursor.search(b"key", Some(b"zzz"), SearchMode::Both).unwrap();
3356 assert_eq!(s, OperationStatus::NotFound);
3357 }
3358
3359 /// NoDupData returns KeyExist when exact (key, data) already stored.
3360 ///
3361 /// `SortedDuplicatesTest.testNoDupData()`.
3362 #[test]
3363 fn test_dup_no_dup_data_returns_key_exist() {
3364 let db = create_dup_database();
3365 let mut cursor = CursorImpl::new(db, 1);
3366
3367 cursor.put(b"key", b"val", PutMode::Overwrite).unwrap();
3368
3369 let s = cursor.put(b"key", b"val", PutMode::NoDupData).unwrap();
3370 assert_eq!(s, OperationStatus::KeyExist);
3371 }
3372
3373 /// NoDupData succeeds for a different data value under the same key.
3374 #[test]
3375 fn test_dup_no_dup_data_different_data_ok() {
3376 let db = create_dup_database();
3377 let mut cursor = CursorImpl::new(db, 1);
3378
3379 cursor.put(b"key", b"val1", PutMode::Overwrite).unwrap();
3380
3381 let s = cursor.put(b"key", b"val2", PutMode::NoDupData).unwrap();
3382 assert_eq!(s, OperationStatus::Success);
3383 }
3384
3385 /// NextDup traversal visits all dups of the current primary key.
3386 ///
3387 /// `CursorImpl.getNext(GetMode.NEXT_DUP)` path.
3388 #[test]
3389 fn test_dup_next_dup_traversal() {
3390 let db = create_dup_database();
3391 let mut cursor = CursorImpl::new(db, 1);
3392
3393 cursor.put(b"key", b"a", PutMode::Overwrite).unwrap();
3394 cursor.put(b"key", b"b", PutMode::Overwrite).unwrap();
3395 cursor.put(b"key", b"c", PutMode::Overwrite).unwrap();
3396 // Different primary key — should NOT appear in NextDup.
3397 cursor.put(b"zzz", b"x", PutMode::Overwrite).unwrap();
3398
3399 // Position at first dup.
3400 cursor.search(b"key", None, SearchMode::Set).unwrap();
3401 let (_, d) = cursor.get_current().unwrap();
3402 assert_eq!(d, b"a");
3403
3404 let s = cursor.retrieve_next(GetMode::NextDup).unwrap();
3405 assert_eq!(s, OperationStatus::Success);
3406 let (pk, d) = cursor.get_current().unwrap();
3407 assert_eq!(pk, b"key");
3408 assert_eq!(d, b"b");
3409
3410 let s = cursor.retrieve_next(GetMode::NextDup).unwrap();
3411 assert_eq!(s, OperationStatus::Success);
3412 let (_, d) = cursor.get_current().unwrap();
3413 assert_eq!(d, b"c");
3414
3415 // No more dups for "key".
3416 let s = cursor.retrieve_next(GetMode::NextDup).unwrap();
3417 assert_eq!(s, OperationStatus::NotFound);
3418 }
3419
3420 /// NextNoDup skips all dups of the current primary key.
3421 ///
3422 /// `CursorImpl.getNext(GetMode.NEXT_NO_DUP)`.
3423 #[test]
3424 fn test_dup_next_no_dup_skips_dups() {
3425 let db = create_dup_database();
3426 let mut cursor = CursorImpl::new(db, 1);
3427
3428 cursor.put(b"aaa", b"1", PutMode::Overwrite).unwrap();
3429 cursor.put(b"aaa", b"2", PutMode::Overwrite).unwrap();
3430 cursor.put(b"bbb", b"x", PutMode::Overwrite).unwrap();
3431
3432 // Position at first entry for "aaa".
3433 cursor.search(b"aaa", None, SearchMode::Set).unwrap();
3434 let (pk, _) = cursor.get_current().unwrap();
3435 assert_eq!(pk, b"aaa");
3436
3437 // NextNoDup should skip "aaa" dups and land on "bbb".
3438 let s = cursor.retrieve_next(GetMode::NextNoDup).unwrap();
3439 assert_eq!(s, OperationStatus::Success);
3440 let (pk, d) = cursor.get_current().unwrap();
3441 assert_eq!(pk, b"bbb");
3442 assert_eq!(d, b"x");
3443 }
3444
3445 /// Dup delete removes only the specific (key, data) pair.
3446 ///
3447 /// `SortedDuplicatesTest.testDeleteDup()`.
3448 #[test]
3449 fn test_dup_delete_specific_pair() {
3450 let db = create_dup_database();
3451 let mut cursor = CursorImpl::new(db, 1);
3452
3453 cursor.put(b"key", b"a", PutMode::Overwrite).unwrap();
3454 cursor.put(b"key", b"b", PutMode::Overwrite).unwrap();
3455
3456 // Position at "key"/"b" and delete it.
3457 cursor.search(b"key", Some(b"b"), SearchMode::Both).unwrap();
3458 cursor.delete().unwrap();
3459
3460 // "key"/"a" should still exist.
3461 let s = cursor.search(b"key", None, SearchMode::Set).unwrap();
3462 assert_eq!(s, OperationStatus::Success);
3463 let (pk, d) = cursor.get_current().unwrap();
3464 assert_eq!(pk, b"key");
3465 assert_eq!(d, b"a");
3466
3467 // "key"/"b" should be gone.
3468 let s = cursor.search(b"key", Some(b"b"), SearchMode::Both).unwrap();
3469 assert_eq!(s, OperationStatus::NotFound);
3470 }
3471
3472 /// Dup prefix-ambiguity ordering is correct.
3473 ///
3474 ///
3475 /// Key "a" data "bc" must sort before key "ab" data "c".
3476 #[test]
3477 fn test_dup_ordering_prefix_ambiguity() {
3478 let db = create_dup_database();
3479 let mut cursor = CursorImpl::new(db, 1);
3480
3481 // "ab"/"c" inserted first to stress comparator.
3482 cursor.put(b"ab", b"c", PutMode::Overwrite).unwrap();
3483 cursor.put(b"a", b"bc", PutMode::Overwrite).unwrap();
3484
3485 // Forward scan should give ("a","bc") then ("ab","c").
3486 cursor.get_first().unwrap();
3487 let (pk, d) = cursor.get_current().unwrap();
3488 assert_eq!(pk, b"a");
3489 assert_eq!(d, b"bc");
3490
3491 cursor.retrieve_next(GetMode::Next).unwrap();
3492 let (pk, d) = cursor.get_current().unwrap();
3493 assert_eq!(pk, b"ab");
3494 assert_eq!(d, b"c");
3495 }
3496
3497 // -----------------------------------------------------------------------
3498 // Cross-BIN cursor traversal test
3499 // -----------------------------------------------------------------------
3500
3501 /// Full forward scan visits all 200 entries across multiple BINs in sorted
3502 /// order.
3503 ///
3504 /// We use a DatabaseImpl whose underlying Tree is created with a small
3505 /// `max_entries_per_node` (4) so that 200 inserts force many splits and
3506 /// fill multiple BINs. The cursor must cross every BIN boundary without
3507 /// losing any entry.
3508 ///
3509 /// CursorImplTest multi-BIN scan: insert N records, open
3510 /// cursor at first, call getNext() until NotFound, assert count == N and
3511 /// keys are in ascending order.
3512 #[test]
3513 fn test_full_scan_crosses_multiple_bins() {
3514 // Build a database with a small node fanout (4) so 200 inserts force
3515 // many BIN splits. DatabaseConfig::node_max_entries controls the
3516 // Tree::max_entries_per_node passed to Tree::new().
3517 let db_id = DatabaseId::new(42);
3518 let mut config = DatabaseConfig::default();
3519 config.set_node_max_entries(4); // tiny fanout → many BINs
3520 let db_impl = DatabaseImpl::new(
3521 db_id,
3522 "scan_test".to_string(),
3523 DbType::User,
3524 &config,
3525 );
3526 let db = Arc::new(RwLock::new(db_impl));
3527
3528 const N: usize = 200;
3529
3530 // Insert 200 entries with zero-padded decimal keys so lexicographic
3531 // order == numeric order.
3532 {
3533 let mut cursor = CursorImpl::new(db.clone(), 1);
3534 for i in 0..N {
3535 let key = format!("{:08}", i).into_bytes();
3536 let val = format!("v{}", i).into_bytes();
3537 let s = cursor.put(&key, &val, PutMode::Overwrite).unwrap();
3538 assert_eq!(s, OperationStatus::Success, "put {} failed", i);
3539 }
3540 }
3541
3542 // Forward scan: get_first + repeated get_next.
3543 let mut cursor = CursorImpl::new(db.clone(), 2);
3544 let s = cursor.get_first().unwrap();
3545 assert_eq!(s, OperationStatus::Success, "get_first should succeed");
3546
3547 let mut visited: Vec<Vec<u8>> = Vec::new();
3548 visited.push(cursor.get_current_key().unwrap().to_vec());
3549
3550 loop {
3551 let s = cursor.retrieve_next(GetMode::Next).unwrap();
3552 match s {
3553 OperationStatus::Success => {
3554 visited.push(cursor.get_current_key().unwrap().to_vec());
3555 }
3556 OperationStatus::NotFound => break,
3557 other => panic!("unexpected status {:?}", other),
3558 }
3559 }
3560
3561 assert_eq!(
3562 visited.len(),
3563 N,
3564 "full scan must visit exactly {} entries, got {}",
3565 N,
3566 visited.len()
3567 );
3568
3569 // Verify keys are in ascending (sorted) order.
3570 for i in 1..visited.len() {
3571 assert!(
3572 visited[i - 1] < visited[i],
3573 "keys out of order at position {}: {:?} >= {:?}",
3574 i,
3575 std::str::from_utf8(&visited[i - 1]).unwrap_or("?"),
3576 std::str::from_utf8(&visited[i]).unwrap_or("?"),
3577 );
3578 }
3579
3580 // Backward scan: get_last + repeated get_prev.
3581 let mut cursor_back = CursorImpl::new(db, 3);
3582 let s = cursor_back.get_last().unwrap();
3583 assert_eq!(s, OperationStatus::Success, "get_last should succeed");
3584
3585 let mut visited_back: Vec<Vec<u8>> = Vec::new();
3586 visited_back.push(cursor_back.get_current_key().unwrap().to_vec());
3587
3588 loop {
3589 let s = cursor_back.retrieve_next(GetMode::Prev).unwrap();
3590 match s {
3591 OperationStatus::Success => {
3592 visited_back
3593 .push(cursor_back.get_current_key().unwrap().to_vec());
3594 }
3595 OperationStatus::NotFound => break,
3596 other => panic!("unexpected backward status {:?}", other),
3597 }
3598 }
3599
3600 assert_eq!(
3601 visited_back.len(),
3602 N,
3603 "backward scan must visit exactly {} entries, got {}",
3604 N,
3605 visited_back.len()
3606 );
3607
3608 // Backward scan should be the reverse of forward scan.
3609 let mut visited_back_rev = visited_back.clone();
3610 visited_back_rev.reverse();
3611 assert_eq!(
3612 visited_back_rev, visited,
3613 "backward scan reversed must equal forward scan"
3614 );
3615 }
3616}