Skip to main content

noxu_db/
database.rs

1//! Database handle.
2//!
3
4use crate::cursor::Cursor;
5use crate::cursor_config::CursorConfig;
6use crate::database_config::DatabaseConfig;
7use crate::database_entry::DatabaseEntry;
8use crate::database_stats::{BtreeStats, DatabaseStats};
9use crate::error::{NoxuError, Result};
10use crate::join_config::JoinConfig;
11use crate::join_cursor::JoinCursor;
12use crate::lock_mode::LockMode;
13use crate::operation_status::OperationStatus;
14use crate::preload::{PreloadConfig, PreloadStats};
15use crate::read_options::ReadOptions;
16use crate::secondary_cursor::SecondaryCursor;
17use crate::sequence::Sequence;
18use crate::sequence_config::SequenceConfig;
19use crate::stats_config::StatsConfig;
20use crate::transaction::Transaction;
21use crate::write_options::WriteOptions;
22use bytes::Bytes;
23use noxu_dbi::{
24    CursorImpl, DatabaseImpl, EnvironmentImpl, GetMode, PutMode, SearchMode,
25    ThroughputStats,
26};
27use noxu_log::LogManager;
28use noxu_sync::{Mutex, RwLock};
29use noxu_txn::{Durability, LockManager, TxnManager, UndoRecord};
30use noxu_util::lsn::Lsn;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33
34/// A database handle.
35///
36///
37///
38/// Database handles provide methods for inserting, retrieving, and
39/// deleting records. A database belongs to a single environment.
40///
41/// # Example
42/// ```ignore
43/// use noxu_db::{Environment, EnvironmentConfig, DatabaseConfig, DatabaseEntry};
44/// use std::path::PathBuf;
45///
46/// let env_config = EnvironmentConfig::new(PathBuf::from("/tmp/mydb"))
47///     .allow_create(true);
48/// let env = Environment::open(env_config).unwrap();
49///
50/// let db_config = DatabaseConfig::new().allow_create(true);
51/// let db = env.open_database(None, "mydb", &db_config).unwrap();
52///
53/// let key = DatabaseEntry::from_bytes(b"key1");
54/// let value = DatabaseEntry::from_bytes(b"value1");
55/// db.put( &key, &value).unwrap();
56///
57/// db.close().unwrap();
58/// env.close().unwrap();
59/// ```
60pub struct Database {
61    /// Name of this database
62    name: String,
63    /// Database ID
64    id: u64,
65    /// Configuration
66    config: DatabaseConfig,
67    /// The underlying DatabaseImpl (shared with the EnvironmentImpl).
68    pub(crate) db_impl: Arc<RwLock<DatabaseImpl>>,
69    /// Back-reference to the owning EnvironmentImpl (for close/cleanup).
70    env_impl: Arc<Mutex<EnvironmentImpl>>,
71    /// Shared open flag — same `Arc<AtomicBool>` as the environment's
72    /// `DatabaseHandle.open`, so that `Database::close()` automatically
73    /// marks the environment-side handle as closed too.
74    open: Arc<AtomicBool>,
75    /// Throughput counters for this database's operations.
76    ///
77    /// Cloned from `DatabaseImpl.throughput` at open time so that
78    /// `get()`, `put()`, `delete()` can increment stats without
79    /// locking `db_impl`.
80    throughput: Arc<ThroughputStats>,
81    /// Cached lock manager — acquired once at open, never changes.
82    /// Eliminates per-operation `env_impl.lock()` on the hot read/write path.
83    lock_manager: Arc<LockManager>,
84    /// Cached log manager — acquired once at open, None for no-WAL envs.
85    /// Eliminates per-operation `env_impl.lock()` on the hot read/write path.
86    log_manager: Option<Arc<LogManager>>,
87    /// Cached disk-limit tracker (JE: the env's `getDiskLimitViolation()`).
88    /// `Some` only for user databases; internal databases are exempt from the
89    /// limit so the cleaner/checkpointer can still write to free space.
90    /// Wired into each user cursor so the write path can refuse writes while a
91    /// disk limit is violated without locking `env_impl`.
92    disk_limit: Option<Arc<noxu_dbi::disk_limit::DiskLimitTracker>>,
93    /// Cached environment-invalidity flag (X-13).
94    ///
95    /// Cloned from `EnvironmentImpl::is_invalid_flag()` at `Database::new()`
96    /// time so `check_open()` can detect a failed environment without
97    /// acquiring `env_impl.lock()` on every read/write operation.
98    env_invalid: Arc<std::sync::atomic::AtomicBool>,
99    /// Cached cleaner throttle — acquired once at open, None when no cleaner.
100    /// Used by put() for write-path backpressure without locking env_impl.
101    cleaner_throttle: Option<Arc<noxu_cleaner::CleanerThrottle>>,
102    /// Cached cleaner file protector — acquired once at open, None when no
103    /// cleaner.  Passed to a `DiskOrderedCursor` producer so it can protect
104    /// the files it scans from cleaner deletion mid-scan (CLN-7).
105    file_protector: Option<Arc<noxu_cleaner::FileProtector>>,
106    /// Cached transaction manager — acquired once at open.
107    ///
108    /// Used by [`Self::with_auto_txn`] to allocate a synthetic auto-commit
109    /// `Txn` per `txn = None` write so the lock manager sees a typed
110    /// locker id from the explicit-txn id space (`"auto-txn:<id>"` in
111    /// deadlock messages) and the auto-commit op gets full abort-undo
112    /// semantics on any error path.  Closes the F12 residuals.
113    txn_manager: Arc<TxnManager>,
114    /// EV-15: cached evictor — acquired once at open, drives per-write
115    /// synchronous critical eviction (write back-pressure) without locking
116    /// env_impl.  Mirrors JE `EnvironmentImpl.criticalEviction` being called
117    /// before every cursor operation.
118    evictor: Arc<noxu_dbi::Evictor>,
119    /// If true, auto-commit writes skip the log flush entirely (: TXN_NO_SYNC).
120    no_sync: bool,
121    /// If true, auto-commit writes flush to OS but skip fdatasync (: TXN_WRITE_NO_SYNC).
122    write_no_sync: bool,
123    /// Registered secondary indexes that automatically maintain themselves
124    /// when this primary is written.  v1.6 (Decision 1B / audit C3 — the
125    /// associate()-style hook): every [`SecondaryDatabase`] opened against
126    /// this primary downgrades its `Arc<SecondaryHookState>` to a
127    /// `Weak<dyn SecondaryHook>` and pushes it here.  `Database::put` and
128    /// `Database::delete` walk the list under the same caller-supplied
129    /// txn so primary writes and secondary index updates commit / abort
130    /// atomically.
131    ///
132    /// Stored behind an `Arc<RwLock<…>>` (rather than directly on the
133    /// `Database` body) so registrations performed through one of the
134    /// `Arc<Mutex<Database>>` clones the user typically holds become
135    /// visible to every other clone of the same primary.
136    pub(crate) secondaries: Arc<
137        RwLock<
138            Vec<
139                std::sync::Weak<
140                    dyn crate::secondary_database::SecondaryHook + Send + Sync,
141                >,
142            >,
143        >,
144    >,
145    /// Foreign-key referrer registry: every child secondary whose
146    /// `foreign_key_database` points at *this* primary downgrades its
147    /// hook to a `Weak<dyn FkReferrer>` and pushes it here.  When this
148    /// primary is deleted, every entry is consulted to apply
149    /// `ForeignKeyDeleteAction::Abort` (v1.6 step 8) /
150    /// `Cascade` (step 9) / `Nullify` (step 10).
151    pub(crate) fk_referrers: Arc<
152        RwLock<
153            Vec<
154                std::sync::Weak<
155                    dyn crate::secondary_database::FkReferrer + Send + Sync,
156                >,
157            >,
158        >,
159    >,
160}
161
162/// State of a database handle.
163///
164///
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum DbState {
167    /// Database is open and operational
168    Open,
169    /// Database has been closed
170    Closed,
171    /// Database is in an invalid state
172    Invalid,
173}
174
175impl Database {
176    /// Creates a CursorImpl, wired to the WAL and lock manager when the
177    /// environment has them.
178    ///
179    /// Uses cached `lock_manager` / `log_manager` to avoid acquiring
180    /// `env_impl.lock()` on every operation.
181    fn make_cursor(&self) -> CursorImpl {
182        self.make_cursor_with_locker(0)
183    }
184
185    /// Creates a CursorImpl with an explicit `locker_id`.
186    ///
187    /// Auto-commit cursors use `0`; transactional cursors must use the
188    /// owning `Transaction::id` so that the LN log entries written by
189    /// `cursor.put` / `cursor.delete` carry the txn id and recovery's
190    /// commit/abort tracking can correctly skip aborted txns.  Without
191    /// this, every LN entry was written with `txn_id = None` (the
192    /// auto-commit form) and recovery treated aborted-txn writes as
193    /// committed once `env_impl.close()` started running on `env.close()`
194    /// after a successful commit/abort (F1).
195    fn make_cursor_with_locker(&self, locker_id: i64) -> CursorImpl {
196        match &self.log_manager {
197            Some(lm) => {
198                let mut c = CursorImpl::with_log_manager(
199                    Arc::clone(&self.db_impl),
200                    locker_id,
201                    Arc::clone(lm),
202                )
203                .with_env_invalid(Arc::clone(&self.env_invalid))
204                .with_lock_manager(Arc::clone(&self.lock_manager))
205                .with_txn_manager(Arc::clone(&self.txn_manager));
206                // Gate user writes on the disk limit (None for internal DBs).
207                if let Some(dl) = &self.disk_limit {
208                    c = c.with_disk_limit(Arc::clone(dl));
209                }
210                c
211            }
212            None => CursorImpl::new(Arc::clone(&self.db_impl), locker_id)
213                .with_env_invalid(Arc::clone(&self.env_invalid))
214                .with_lock_manager(Arc::clone(&self.lock_manager)),
215        }
216    }
217
218    /// Creates a CursorImpl without a lock manager (dirty-read / read-uncommitted).
219    ///
220    /// Used by `get_with_options()` when `ReadOptions.lock_mode == ReadUncommitted`.
221    /// Skips all lock acquisition so the cursor reads directly from the BIN
222    /// without blocking on write locks — mirrors 's read-uncommitted cursor.
223    fn make_cursor_no_lock(&self) -> CursorImpl {
224        match &self.log_manager {
225            Some(lm) => CursorImpl::with_log_manager(
226                Arc::clone(&self.db_impl),
227                0,
228                Arc::clone(lm),
229            )
230            .with_env_invalid(Arc::clone(&self.env_invalid)),
231            None => CursorImpl::new(Arc::clone(&self.db_impl), 0)
232                .with_env_invalid(Arc::clone(&self.env_invalid)),
233        }
234    }
235
236    /// Creates a CursorImpl wired to the given transaction for write-lock tracking.
237    ///
238    /// Behaves like `make_cursor()` but additionally calls `.with_txn()` so
239    /// that write operations acquire locks via the transaction's `Txn` and
240    /// record abort before-images in `WriteLockInfo`.
241    ///
242    /// In which passes the
243    /// transaction's `Locker` to the new `CursorImpl`.
244    fn make_cursor_for_txn(&self, txn: &Transaction) -> CursorImpl {
245        // Use the transaction id as the cursor's locker_id so that LN
246        // log entries written under this cursor carry the txn id
247        // (recovery's analysis pass uses LN.txn_id together with
248        // TxnCommit / TxnAbort records to decide whether to redo or
249        // undo the LN). Pre-fix this was hardcoded to 0, which made
250        // every txn-LN look like an auto-commit LN and caused
251        // recovery to redo aborted writes.
252        let cursor = self.make_cursor_with_locker(txn.id() as i64);
253        if let Some(inner) = txn.get_inner_txn() {
254            cursor.with_txn(inner)
255        } else {
256            cursor
257        }
258    }
259
260    /// Allocates a synthetic auto-commit `Txn` and runs `op` under it.
261    ///
262    /// `op` receives an auto-commit-wired [`CursorImpl`] (locker_id = 0
263    /// so the LN is logged with the auto-commit `InsertLN` / `DeleteLN`
264    /// form, txn_ref set so the lock manager sees the synthetic auto-txn
265    /// as the owner).
266    ///
267    /// On `Ok(value)`:
268    ///   * Drops the cursor (closing it).
269    ///   * Calls [`Txn::commit_with_durability`] on the synthetic auto-txn
270    ///     with a [`Durability`] derived from the database's
271    ///     `no_sync` / `write_no_sync` config; this releases all locks
272    ///     and (for `CommitSync`) fsyncs up to the LN's LSN via
273    ///     `LogManager::flush_sync_if_needed` for many-to-one fsync
274    ///     coalescing under concurrent write load.
275    ///   * Records the commit in the txn manager statistics and removes
276    ///     the diagnostic locker label.
277    ///
278    /// On `Err(e)`:
279    ///   * Drops the cursor.
280    ///   * Calls [`Txn::abort_collect_undo`] to harvest before-image undo
281    ///     records WITHOUT releasing the held write locks (so a reader
282    ///     blocked on a write lock cannot observe the in-flight value
283    ///     before we restore the before-image).
284    ///   * Applies the undo records to the in-memory B-tree.
285    ///   * Calls [`Txn::release_all_locks`] to drain the held locks.
286    ///   * Records the abort in the txn manager statistics and removes
287    ///     the diagnostic locker label.
288    ///   * Returns `Err(e)`.
289    ///
290    /// Closes the first F12 residual: an auto-commit op now goes through
291    /// the same lock-tracking and abort-undo machinery as an explicit
292    /// transaction, so two concurrent inserts of the same brand-new key
293    /// serialise through the lock manager and a forced mid-write failure
294    /// rolls back the in-memory tree mutation.
295    fn with_auto_txn<F, T>(&self, op: F) -> Result<T>
296    where
297        F: FnOnce(&mut CursorImpl) -> Result<T>,
298    {
299        let auto_txn =
300            self.txn_manager.begin_auto_txn(self.log_manager.clone());
301        let synthetic_id = auto_txn.id_as_locker();
302        let auto_txn_arc = Arc::new(std::sync::Mutex::new(auto_txn));
303
304        let mut cursor = self.make_cursor();
305        cursor.attach_txn(Arc::clone(&auto_txn_arc));
306
307        let result = op(&mut cursor);
308        // Drop the cursor handle (un-pins BIN, drops Arc<Mutex<Txn>> ref).
309        drop(cursor);
310
311        // Reclaim sole ownership of the synthetic auto-txn so we can
312        // finalise it.  All cursors and their Arcs were dropped above.
313        let mut auto_txn = match Arc::try_unwrap(auto_txn_arc) {
314            Ok(m) => m.into_inner().unwrap_or_else(|p| p.into_inner()),
315            Err(_arc) => {
316                // A cursor escaped the closure with a clone of the txn
317                // Arc.  This is a caller-side bug — leak the auto-txn
318                // (its Drop calls `close()` which aborts) and surface a
319                // typed error.  No undo applied because we cannot
320                // safely take the Txn out of the shared Arc.
321                return Err(NoxuError::OperationNotAllowed(
322                    "with_auto_txn: synthetic auto-txn outlived cursor scope"
323                        .to_string(),
324                ));
325            }
326        };
327        let txn_manager = Arc::clone(&self.txn_manager);
328
329        match result {
330            Ok(value) => {
331                let durability = if self.no_sync {
332                    Durability::CommitNoSync
333                } else if self.write_no_sync {
334                    Durability::CommitWriteNoSync
335                } else {
336                    Durability::CommitSync
337                };
338                if let Err(e) = auto_txn.commit_with_durability(durability) {
339                    // Commit failed (e.g. log fsync error).  Undo the
340                    // in-memory tree write so we are not left in an
341                    // inconsistent state, then surface the error.
342                    let undo_records =
343                        auto_txn.abort_collect_undo().unwrap_or_default();
344                    self.apply_auto_txn_undo(undo_records);
345                    auto_txn.release_all_locks();
346                    txn_manager.abort_txn(synthetic_id);
347                    return Err(NoxuError::OperationNotAllowed(format!(
348                        "auto-commit fsync failed: {e}"
349                    )));
350                }
351                txn_manager.commit_txn(synthetic_id);
352                Ok(value)
353            }
354            Err(e) => {
355                // Phase 1: collect undo without releasing write locks.
356                let undo_records =
357                    auto_txn.abort_collect_undo().unwrap_or_default();
358                // Phase 2: apply undo while write locks are still held
359                // so any concurrent reader blocked on a write lock
360                // cannot observe the in-flight value.
361                self.apply_auto_txn_undo(undo_records);
362                // Phase 3: drain locks.
363                auto_txn.release_all_locks();
364                txn_manager.abort_txn(synthetic_id);
365                Err(e)
366            }
367        }
368    }
369
370    /// Applies undo records collected from a synthetic auto-txn to the
371    /// in-memory B-tree of `self`.  Mirrors the per-`undo_record` block
372    /// in `Transaction::abort()` but specialised for the
373    /// single-database `with_auto_txn` case so we do not need to thread
374    /// the env-impl in.
375    fn apply_auto_txn_undo(&self, mut undo_records: Vec<UndoRecord>) {
376        // Apply newest-LSN first so multi-step writes (delete + reinsert)
377        // unwind in reverse-operation order.  See the matching sort in
378        // `Transaction::abort()`.
379        undo_records.sort_by_key(|r| std::cmp::Reverse(r.current_lsn));
380        let db_id_match = self.id;
381        let db_guard = self.db_impl.read();
382        let Some(tree) = db_guard.get_real_tree() else { return };
383        for undo in undo_records {
384            // The synthetic auto-txn touches only this database, but be
385            // defensive in case future changes broaden the contract.
386            if undo.database_id != db_id_match {
387                continue;
388            }
389            let Some(abort_key) = undo.abort_key else { continue };
390            if undo.abort_known_deleted {
391                if tree.delete(&abort_key) {
392                    db_guard.decrement_entry_count();
393                }
394            } else if let Some(abort_data) = undo.abort_data {
395                let lsn = noxu_util::Lsn::from_u64(undo.abort_lsn);
396                if let Ok(is_new) = tree.insert(abort_key, abort_data, lsn)
397                    && is_new
398                {
399                    // Restoring a slot that the aborted txn had deleted:
400                    // re-bump the counter that the in-memory delete
401                    // already decremented.
402                    db_guard.increment_entry_count();
403                }
404            }
405        }
406    }
407
408    /// Auto-commit flush: when `txn` is `None` (auto-commit mode), flush and
409    /// fsync the log before returning to the caller.
410    ///
411    /// `write_lsn` is the LSN assigned to the write operation just performed.
412    /// Port of`LogManager.flushTo(lsn)`: if a concurrent committer already
413    /// flushed past `write_lsn`, the fdatasync is skipped entirely, giving
414    /// natural many:1 fsync coalescing under concurrent write load with no
415    /// explicit group-commit configuration required.
416    #[allow(dead_code)] // unwired helper kept for the documented coalescing path
417    fn auto_commit_sync(
418        &self,
419        txn: Option<&Transaction>,
420        write_lsn: Lsn,
421    ) -> Result<()> {
422        if txn.is_some() {
423            return Ok(()); // explicit txn handles its own commit/fsync
424        }
425        if self.no_sync {
426            return Ok(()); // : TXN_NO_SYNC — skip log flush entirely
427        }
428        if let Some(lm) = &self.log_manager {
429            if self.write_no_sync {
430                // : TXN_WRITE_NO_SYNC — flush to OS buffer, no fdatasync
431                lm.flush_no_sync().map_err(|e| {
432                    NoxuError::OperationNotAllowed(e.to_string())
433                })?;
434            } else {
435                // : flushTo(lsn) — skip if already covered by another flush.
436                lm.flush_sync_if_needed(write_lsn).map_err(|e| {
437                    NoxuError::OperationNotAllowed(e.to_string())
438                })?;
439            }
440        }
441        Ok(())
442    }
443
444    /// Creates a new database handle.
445    ///
446    /// Internal constructor called by Environment.
447    ///
448    /// `open_flag` is a shared `Arc<AtomicBool>` that is also stored in the
449    /// environment's `DatabaseHandle` for this database.  Setting it to `false`
450    /// (via `Database::close()`) simultaneously marks the env-side handle as
451    /// closed, allowing `Environment::close()` to succeed without a separate
452    /// callback.
453    pub(crate) fn new(
454        name: String,
455        id: u64,
456        config: DatabaseConfig,
457        db_impl: Arc<RwLock<DatabaseImpl>>,
458        env_impl: Arc<Mutex<EnvironmentImpl>>,
459        open_flag: Arc<AtomicBool>,
460        no_sync: bool,
461        write_no_sync: bool,
462    ) -> Self {
463        let throughput = db_impl.read().throughput.clone();
464        // Cache the manager Arcs at construction so hot-path operations
465        // (get/put/delete) never need to re-acquire env_impl.lock().
466        let (
467            lock_manager,
468            log_manager,
469            cleaner_throttle,
470            file_protector,
471            txn_manager,
472            env_invalid,
473            evictor,
474        ) = {
475            let env = env_impl.lock();
476            let lm = Arc::clone(env.get_lock_manager());
477            let logm = env.get_log_manager();
478            let ct = env.get_cleaner_throttle();
479            let fp = env.get_file_protector();
480            let txnm = Arc::clone(env.get_txn_manager());
481            let inv = env.is_invalid_flag();
482            let ev = env.get_evictor();
483            (lm, logm, ct, fp, txnm, inv, ev)
484        };
485        // Disk-limit tracker: wire it only for user databases (JE exempts
486        // internal DBs in Cursor.checkUpdatesAllowed via
487        // dbImpl.getDbType().isInternal()). Internal DBs leave this None so
488        // the cleaner/checkpointer/recovery writes through them are never
489        // blocked by the limit.
490        let disk_limit = if db_impl.read().get_db_type().is_internal() {
491            None
492        } else {
493            Some(env_impl.lock().get_disk_limit())
494        };
495        Database {
496            name,
497            id,
498            config,
499            db_impl,
500            env_impl,
501            open: open_flag,
502            throughput,
503            lock_manager,
504            log_manager,
505            disk_limit,
506            env_invalid,
507            cleaner_throttle,
508            file_protector,
509            txn_manager,
510            evictor,
511            no_sync,
512            write_no_sync,
513            secondaries: Arc::new(RwLock::new(Vec::new())),
514            fk_referrers: Arc::new(RwLock::new(Vec::new())),
515        }
516    }
517
518    /// Retrieves a record by key, auto-committing the read.
519    ///
520    /// Idiomatic Rust lookup: `Some(value)` if found, `None` if the key
521    /// is absent, `Err` only on a real failure (review P0-3). Keys accept
522    /// any `impl AsRef<[u8]>` (review P1-3) — `b"k"`, `&str`, `Vec<u8>`,
523    /// `Bytes`, etc., no `DatabaseEntry` wrapper required.
524    ///
525    /// For an explicit transaction use [`Self::get_in`]; for zero-alloc
526    /// buffer reuse or partial reads use [`Self::get_into`].
527    ///
528    /// # Errors
529    /// Returns an error if the database is closed or the environment failed.
530    pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<Bytes>> {
531        self.get_bytes(None, key.as_ref())
532    }
533
534    /// Retrieves a record by key within an explicit transaction (review
535    /// P0-2: auto-commit vs transactional is a *named* choice, not a bare
536    /// `None`).
537    ///
538    /// # Errors
539    /// Returns an error if the database is closed, the environment failed,
540    /// or a transactional handle is used against a non-transactional DB.
541    pub fn get_in(
542        &self,
543        txn: &Transaction,
544        key: impl AsRef<[u8]>,
545    ) -> Result<Option<Bytes>> {
546        self.get_bytes(Some(txn), key.as_ref())
547    }
548
549    /// Shared `Result<Option<Bytes>>` read used by [`Self::get`] /
550    /// [`Self::get_in`] (review P0-3). Semantics are identical to the
551    /// pre-7.0 `get` out-param path — found = `Some`, NotFound = `None`,
552    /// error = `Err` — only the *shape* changed.
553    fn get_bytes(
554        &self,
555        txn: Option<&Transaction>,
556        key_bytes: &[u8],
557    ) -> Result<Option<Bytes>> {
558        self.check_open()?;
559        self.reject_txn_on_non_txnal_db(txn.is_some())?;
560        observe_span!(
561            "db_get",
562            db_name = self.name.as_str(),
563            key_size = key_bytes.len(),
564        );
565        let _obs_timer = observe_timer_start!();
566        observe_counter!("noxu_db_operations_total", "op" => "get");
567
568        let mut cursor = match txn {
569            Some(t) => self.make_cursor_for_txn(t),
570            None => self.make_cursor(),
571        };
572        match cursor
573            .search(key_bytes, None, SearchMode::Set)
574            .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
575        {
576            noxu_dbi::OperationStatus::Success => {
577                let (_, value) = cursor.get_current().map_err(|e| {
578                    NoxuError::OperationNotAllowed(e.to_string())
579                })?;
580                self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
581                observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "get");
582                Ok(Some(Bytes::from(value)))
583            }
584            _ => {
585                self.throughput
586                    .n_pri_search_fails
587                    .fetch_add(1, Ordering::Relaxed);
588                observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "get");
589                Ok(None)
590            }
591        }
592    }
593
594    /// Zero-alloc / partial-read escape hatch (review P0-3, P1-3).
595    ///
596    /// Reads into a caller-owned [`DatabaseEntry`] so the buffer can be
597    /// reused across calls, and honours `data.is_partial()` for
598    /// partial reads (the offset/length machinery that `DatabaseEntry`
599    /// exists for). Returns `true` if the record was found.
600    ///
601    /// `txn` is `Option` here because this is the low-level escape hatch;
602    /// the idiomatic named-choice surface is [`Self::get`] / [`Self::get_in`].
603    ///
604    /// # Errors
605    /// Returns an error if the database is closed or the environment failed.
606    pub fn get_into(
607        &self,
608        txn: Option<&Transaction>,
609        key: impl AsRef<[u8]>,
610        data: &mut DatabaseEntry,
611    ) -> Result<bool> {
612        self.check_open()?;
613        self.reject_txn_on_non_txnal_db(txn.is_some())?;
614        let key_bytes = key.as_ref();
615
616        let mut cursor = match txn {
617            Some(t) => self.make_cursor_for_txn(t),
618            None => self.make_cursor(),
619        };
620        match cursor
621            .search(key_bytes, None, SearchMode::Set)
622            .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
623        {
624            noxu_dbi::OperationStatus::Success => {
625                let (_, value) = cursor.get_current().map_err(|e| {
626                    NoxuError::OperationNotAllowed(e.to_string())
627                })?;
628                // Partial get: return only the requested slice.
629                if data.is_partial() {
630                    let off = data.partial_offset();
631                    let len = data.partial_length();
632                    let end = (off + len).min(value.len());
633                    let slice =
634                        if off < value.len() { &value[off..end] } else { &[] };
635                    data.set_data(slice);
636                } else {
637                    data.set_data(&value);
638                }
639                self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
640                Ok(true)
641            }
642            _ => {
643                self.throughput
644                    .n_pri_search_fails
645                    .fetch_add(1, Ordering::Relaxed);
646                Ok(false)
647            }
648        }
649    }
650
651    /// Retrieves a record with per-operation read options (escape hatch;
652    /// review P0-3 returns `Result<Option<Bytes>>`, P1-3 takes
653    /// `impl AsRef<[u8]>`).
654    ///
655    /// Mirrors `Cursor.get()` with `ReadOptions` applied:
656    /// - `LockMode::ReadUncommitted` — dirty read, no lock acquired
657    /// - `LockMode::ReadCommitted` — read-committed isolation (standard locking)
658    /// - `LockMode::Rmw` — acquire write lock for read-modify-write
659    /// - `LockMode::Default` — environment default isolation
660    ///
661    /// `CacheMode` in `ReadOptions` is advisory (accepted but not yet honored):
662    /// the per-operation hint does not reach the evictor and has no effect
663    /// today. See [`crate::CacheMode`] for the tracking note.
664    ///
665    /// # Arguments
666    /// * `txn` - Optional transaction handle
667    /// * `key` - The search key
668    /// * `opts` - Per-operation read options (isolation, cache hints)
669    ///
670    /// # Returns
671    /// `Some(value)` if found, `None` otherwise.
672    pub fn get_with_options(
673        &self,
674        txn: Option<&Transaction>,
675        key: impl AsRef<[u8]>,
676        opts: &ReadOptions,
677    ) -> Result<Option<Bytes>> {
678        self.check_open()?;
679        self.reject_txn_on_non_txnal_db(txn.is_some())?;
680        let key_bytes = key.as_ref();
681        observe_span!(
682            "db_get_with_options",
683            db_name = self.name.as_str(),
684            key_size = key_bytes.len(),
685            lock_mode = format!("{:?}", opts.lock_mode),
686        );
687        let _obs_timer = observe_timer_start!();
688        observe_counter!("noxu_db_operations_total", "op" => "get_with_options");
689
690        let mut cursor = match opts.lock_mode {
691            LockMode::ReadUncommitted => self.make_cursor_no_lock(),
692            _ => match txn {
693                Some(t) => self.make_cursor_for_txn(t),
694                None => self.make_cursor(),
695            },
696        };
697
698        match cursor
699            .search(key_bytes, None, SearchMode::Set)
700            .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
701        {
702            noxu_dbi::OperationStatus::Success => {
703                let (_, value) = cursor.get_current().map_err(|e| {
704                    NoxuError::OperationNotAllowed(e.to_string())
705                })?;
706                // JE LockMode.RMW (Cursor.java:5281): an RMW read takes a WRITE
707                // lock on the record so a later update in the same txn cannot
708                // deadlock and a concurrent writer blocks at read time.
709                if matches!(opts.lock_mode, LockMode::Rmw) {
710                    cursor.upgrade_current_to_write_lock().map_err(|e| {
711                        NoxuError::OperationNotAllowed(e.to_string())
712                    })?;
713                }
714                self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
715                observe_timer_record!(
716                    _obs_timer,
717                    "noxu_db_operation_duration_seconds",
718                    "op" => "get_with_options"
719                );
720                Ok(Some(Bytes::from(value)))
721            }
722            _ => {
723                self.throughput
724                    .n_pri_search_fails
725                    .fetch_add(1, Ordering::Relaxed);
726                observe_timer_record!(
727                    _obs_timer,
728                    "noxu_db_operation_duration_seconds",
729                    "op" => "get_with_options"
730                );
731                Ok(None)
732            }
733        }
734    }
735
736    /// Inserts or updates a record, auto-committing the write (review
737    /// P0-2: auto-commit is the unadorned `put`; the transactional form is
738    /// the named [`Self::put_in`]). Keys and values accept any
739    /// `impl AsRef<[u8]>` (review P1-3).
740    ///
741    /// # Errors
742    /// Returns an error if the database is closed or read-only.
743    pub fn put(
744        &self,
745        key: impl AsRef<[u8]>,
746        data: impl AsRef<[u8]>,
747    ) -> Result<()> {
748        self.put_bytes(None, key.as_ref(), data.as_ref())
749    }
750
751    /// Inserts or updates a record within an explicit transaction (review
752    /// P0-2).
753    ///
754    /// # Errors
755    /// Returns an error if the database is closed, read-only, or a
756    /// transactional handle is used against a non-transactional DB.
757    pub fn put_in(
758        &self,
759        txn: &Transaction,
760        key: impl AsRef<[u8]>,
761        data: impl AsRef<[u8]>,
762    ) -> Result<()> {
763        self.put_bytes(Some(txn), key.as_ref(), data.as_ref())
764    }
765
766    /// Shared byte-slice put used by [`Self::put`] / [`Self::put_in`]
767    /// (review P0-2/P1-3). Preserves the v6 semantics exactly — secondary
768    /// maintenance, triggers, auto-commit fsync and cleaner backpressure —
769    /// only the public signature shape changed.
770    fn put_bytes(
771        &self,
772        txn: Option<&Transaction>,
773        key_bytes: &[u8],
774        data_bytes: &[u8],
775    ) -> Result<()> {
776        self.check_open()?;
777        self.reject_txn_on_non_txnal_db(txn.is_some())?;
778        self.check_writable()?;
779        // EV-15: per-write synchronous critical eviction (write back-pressure).
780        // JE EnvironmentImpl.criticalEviction is called before every cursor
781        // operation; when the cache is critically over budget this writer
782        // thread evicts a bounded batch itself before allocating more.
783        self.evictor.do_critical_eviction();
784        observe_span!(
785            "db_put",
786            db_name = self.name.as_str(),
787            key_size = key_bytes.len(),
788            data_size = data_bytes.len(),
789        );
790        let _obs_timer = observe_timer_start!();
791        observe_counter!("noxu_db_operations_total", "op" => "put");
792
793        // v1.6 (audit C3 / step 6): if any secondaries are registered,
794        // capture the pre-put value of this key (if it exists) BEFORE
795        // the overwrite so we can pass it as `old_data` to the
796        // secondary key creator below.  When the put is a fresh
797        // insert the read returns NotFound and `old_data_for_secondaries`
798        // remains `None`.  We re-read here using the caller's txn for
799        // the read so isolation is honoured.
800        let secondaries_pre = self.live_secondaries();
801        let need_old_data =
802            !secondaries_pre.is_empty() || self.has_user_triggers();
803        let old_data_for_secondaries: Option<Vec<u8>> = if !need_old_data {
804            None
805        } else {
806            self.get_bytes(txn, key_bytes)?.map(|b| b.to_vec())
807        };
808
809        match txn {
810            Some(t) => {
811                let mut cursor = self.make_cursor_for_txn(t);
812                cursor
813                    .put(key_bytes, data_bytes, PutMode::Overwrite)
814                    .map_err(NoxuError::from)?;
815            }
816            None => {
817                // Wrap the write in a synthetic auto-commit `Txn` so the
818                // lock manager sees a typed locker id ("auto-txn:<id>")
819                // and any error rolls back the in-memory tree write
820                // through `Txn::abort_collect_undo`.
821                self.with_auto_txn(|cursor| {
822                    cursor
823                        .put(key_bytes, data_bytes, PutMode::Overwrite)
824                        .map_err(NoxuError::from)?;
825                    Ok(())
826                })?;
827            }
828        }
829
830        // DB-TRIG: fire Trigger.put within the transaction, after the change
831        // is applied.  oldData = None for an insert, Some(prev) for an update;
832        // newData is the bytes just written.  JE
833        // `TriggerManager.runPutTriggers(locker, dbImpl, key, oldData,
834        // newData)`.
835        self.fire_put_triggers(
836            txn,
837            key_bytes,
838            old_data_for_secondaries.as_deref(),
839            data_bytes,
840        );
841
842        // v1.6 (audit C3 — the associate()-style hook): drive every
843        // registered secondary index under the same caller-supplied
844        // txn so the primary record and its secondary entries commit /
845        // abort together.
846        let secondaries = self.live_secondaries();
847        if !secondaries.is_empty() {
848            let key_entry = DatabaseEntry::from_bytes(key_bytes);
849            let new_entry = DatabaseEntry::from_bytes(data_bytes);
850            let old_entry: Option<DatabaseEntry> = old_data_for_secondaries
851                .as_deref()
852                .map(DatabaseEntry::from_bytes);
853            for hook in secondaries {
854                hook.maintain(
855                    txn,
856                    &key_entry,
857                    old_entry.as_ref(),
858                    Some(&new_entry),
859                )?;
860            }
861        }
862
863        // Apply cleaner write-path backpressure for auto-commit (no-txn) bulk
864        // writes: if the log write rate exceeds the cleaner's capacity, sleep
865        // briefly so the cleaner can keep up.  Transactional paths handle this
866        // in Transaction::commit_with_durability() instead.
867        if txn.is_none()
868            && let Some(delay) = self
869                .cleaner_throttle
870                .as_ref()
871                .and_then(|t| t.should_throttle_writer())
872        {
873            std::thread::sleep(delay);
874        }
875
876        self.throughput.n_pri_updates.fetch_add(1, Ordering::Relaxed);
877        observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "put");
878        Ok(())
879    }
880
881    /// Partial-read/-write escape hatch (review P1-3: `DatabaseEntry` is
882    /// retained only where the offset/length actually matter).
883    ///
884    /// `data` must be configured with [`DatabaseEntry::set_partial`]; the
885    /// existing record's bytes outside `[offset, offset+length)` are
886    /// preserved and only the specified range is replaced (JE
887    /// `LN.combinePuts()`). The supplied `data` length must equal the
888    /// configured partial length.
889    ///
890    /// # Errors
891    /// Returns [`NoxuError::IllegalArgument`] if the data length does not
892    /// match the partial length, or if the database is closed / read-only.
893    pub fn put_partial(
894        &self,
895        txn: Option<&Transaction>,
896        key: impl AsRef<[u8]>,
897        data: &DatabaseEntry,
898    ) -> Result<()> {
899        self.check_open()?;
900        self.reject_txn_on_non_txnal_db(txn.is_some())?;
901        self.check_writable()?;
902        let key_bytes = key.as_ref();
903
904        // Partial put: read-modify-write using the partial offset/length.
905        // LN.combinePuts() — existing bytes outside [offset..offset+length]
906        // are preserved; only the specified range is replaced with new data.
907        // A length mismatch is rejected with a typed error rather than
908        // silently truncating (matches JE's exact-equality requirement).
909        let write_bytes: Vec<u8> = if data.is_partial() {
910            let new_bytes = data.data_opt().unwrap_or(&[]);
911            let off = data.partial_offset();
912            let len = data.partial_length();
913            if new_bytes.len() != len {
914                return Err(NoxuError::IllegalArgument(format!(
915                    "partial put: data length {} does not match \
916                     partial_length {} (partial_offset={}); JE \
917                     requires exact equality",
918                    new_bytes.len(),
919                    len,
920                    off
921                )));
922            }
923            // Fetch the existing record to splice into.
924            let existing = match self.get_bytes(txn, key_bytes)? {
925                Some(b) => b.to_vec(),
926                None => vec![0u8; off + len],
927            };
928            let total_len = (off + len).max(existing.len());
929            let mut patched = existing;
930            patched.resize(total_len, 0);
931            patched[off..off + len].copy_from_slice(new_bytes);
932            patched
933        } else {
934            data.data_opt().unwrap_or(&[]).to_vec()
935        };
936
937        self.put_bytes(txn, key_bytes, &write_bytes)
938    }
939
940    /// Inserts or updates a record with per-operation write options
941    /// (escape hatch; review P1-3 takes `impl AsRef<[u8]>`).
942    ///
943    /// Extends `put()` with `WriteOptions` support:
944    /// - `ttl` — if > 0, sets a per-record TTL expiration (hours from now); the
945    ///   record will be treated as expired and invisible after the TTL elapses.
946    /// - `update_ttl` — if true and the record already exists, refreshes its TTL.
947    /// - `cache_mode` — advisory cache hint, accepted but not yet honored (no
948    ///   effect today; see [`crate::CacheMode`]).
949    ///
950    /// # Arguments
951    /// * `txn` - Optional transaction handle
952    /// * `key` - The key to insert/update
953    /// * `data` - The data to store
954    /// * `opts` - Per-operation write options (TTL, cache hints)
955    pub fn put_with_options(
956        &self,
957        txn: Option<&Transaction>,
958        key: impl AsRef<[u8]>,
959        data: impl AsRef<[u8]>,
960        opts: &WriteOptions,
961    ) -> Result<()> {
962        let key_bytes = key.as_ref();
963        self.put_bytes(txn, key_bytes, data.as_ref())?;
964
965        // Apply TTL to the just-written BIN slot when requested.
966        //
967        // Audit database F8 (Wave 2C-4) — partial fix: the TTL update is
968        // still in-memory only; recovery does not yet replay it.  The
969        // engine cannot distinguish insert-vs-update at this layer, so we
970        // always apply when `ttl > 0` and the underlying write succeeded.
971        if opts.ttl > 0 {
972            let expiration_hours =
973                noxu_util::current_time_hours().saturating_add(opts.ttl as u32);
974            self.db_impl
975                .read()
976                .update_key_expiration(key_bytes, expiration_hours);
977        }
978
979        Ok(())
980    }
981
982    /// Inserts a record, failing if the key already exists; auto-commits
983    /// (review P0-2/P0-3/P1-3).
984    ///
985    /// Returns `Ok(true)` if the record was inserted, `Ok(false)` if the
986    /// key already existed (the prior `OperationStatus::KeyExists` collapses
987    /// to the boolean per review P0-3).
988    ///
989    /// # Errors
990    /// Returns an error if the database is closed or read-only.
991    pub fn put_no_overwrite(
992        &self,
993        key: impl AsRef<[u8]>,
994        data: impl AsRef<[u8]>,
995    ) -> Result<bool> {
996        self.put_no_overwrite_bytes(None, key.as_ref(), data.as_ref())
997    }
998
999    /// Inserts a record within an explicit transaction, failing if the key
1000    /// already exists (review P0-2). `Ok(true)` = inserted.
1001    ///
1002    /// # Errors
1003    /// Returns an error if the database is closed, read-only, or a
1004    /// transactional handle is used against a non-transactional DB.
1005    pub fn put_no_overwrite_in(
1006        &self,
1007        txn: &Transaction,
1008        key: impl AsRef<[u8]>,
1009        data: impl AsRef<[u8]>,
1010    ) -> Result<bool> {
1011        self.put_no_overwrite_bytes(Some(txn), key.as_ref(), data.as_ref())
1012    }
1013
1014    /// Shared byte-slice no-overwrite insert.  `Ok(true)` = inserted,
1015    /// `Ok(false)` = key already present.
1016    fn put_no_overwrite_bytes(
1017        &self,
1018        txn: Option<&Transaction>,
1019        key_bytes: &[u8],
1020        data_bytes: &[u8],
1021    ) -> Result<bool> {
1022        self.check_open()?;
1023        self.reject_txn_on_non_txnal_db(txn.is_some())?;
1024        self.check_writable()?;
1025
1026        let inserted = match txn {
1027            Some(t) => {
1028                let mut cursor = self.make_cursor_for_txn(t);
1029                !matches!(
1030                    cursor
1031                        .put(key_bytes, data_bytes, PutMode::NoOverwrite)
1032                        .map_err(NoxuError::from)?,
1033                    noxu_dbi::OperationStatus::KeyExist
1034                )
1035            }
1036            None => self.with_auto_txn(|cursor| {
1037                cursor
1038                    .put(key_bytes, data_bytes, PutMode::NoOverwrite)
1039                    .map_err(NoxuError::from)
1040                    .map(|s| !matches!(s, noxu_dbi::OperationStatus::KeyExist))
1041            })?,
1042        };
1043        if inserted {
1044            // DB-TRIG: a successful no-overwrite put is always an insert, so
1045            // oldData is None.  JE `TriggerManager.runPutTriggers`.
1046            self.fire_put_triggers(txn, key_bytes, None, data_bytes);
1047            self.throughput.n_pri_inserts.fetch_add(1, Ordering::Relaxed);
1048        } else {
1049            self.throughput.n_pri_insert_fails.fetch_add(1, Ordering::Relaxed);
1050        }
1051        Ok(inserted)
1052    }
1053
1054    /// Deletes a record by key, auto-committing (review P0-2/P0-3/P1-3).
1055    ///
1056    /// Returns `Ok(true)` if a record was deleted, `Ok(false)` if the key
1057    /// was absent (the prior `OperationStatus::NotFound` collapses to the
1058    /// boolean per review P0-3).
1059    ///
1060    /// # Errors
1061    /// Returns an error if the database is closed or read-only.
1062    pub fn delete(&self, key: impl AsRef<[u8]>) -> Result<bool> {
1063        self.delete_bytes(None, key.as_ref())
1064    }
1065
1066    /// Deletes a record by key within an explicit transaction (review
1067    /// P0-2). `Ok(true)` = deleted.
1068    ///
1069    /// # Errors
1070    /// Returns an error if the database is closed, read-only, or a
1071    /// transactional handle is used against a non-transactional DB.
1072    pub fn delete_in(
1073        &self,
1074        txn: &Transaction,
1075        key: impl AsRef<[u8]>,
1076    ) -> Result<bool> {
1077        self.delete_bytes(Some(txn), key.as_ref())
1078    }
1079
1080    /// Shared byte-slice delete.  `Ok(true)` = deleted, `Ok(false)` = key
1081    /// absent.  Preserves the v6 dup-handling, FK referrer, secondary and
1082    /// trigger semantics exactly.
1083    fn delete_bytes(
1084        &self,
1085        txn: Option<&Transaction>,
1086        key_bytes: &[u8],
1087    ) -> Result<bool> {
1088        self.check_open()?;
1089        self.reject_txn_on_non_txnal_db(txn.is_some())?;
1090        self.check_writable()?;
1091        // EV-15: per-write synchronous critical eviction (write back-pressure).
1092        self.evictor.do_critical_eviction();
1093        observe_span!(
1094            "db_delete",
1095            db_name = self.name.as_str(),
1096            key_size = key_bytes.len(),
1097        );
1098        let _obs_timer = observe_timer_start!();
1099        observe_counter!("noxu_db_operations_total", "op" => "delete");
1100
1101        // FK referrers and secondary hooks consume a `&DatabaseEntry` key;
1102        // build one once from the bytes (review P1-3: `DatabaseEntry` stays
1103        // an internal-plumbing detail, not the public surface).
1104        let key = DatabaseEntry::from_bytes(key_bytes);
1105
1106        // v1.6 (audit C3): if any secondaries are registered we must
1107        // capture the pre-delete primary data on each iteration so the
1108        // secondary key creator can recompute every (sec_key, pri_key)
1109        // pair to remove.  Collected outside the cursor closure so
1110        // the auto-commit and explicit-txn paths share one buffer.
1111        let secondaries = self.live_secondaries();
1112        let track_old_data =
1113            !secondaries.is_empty() || self.has_user_triggers();
1114        let mut deleted_old_values: Vec<Vec<u8>> = Vec::new();
1115
1116        // v1.6 (audit C2 / Decision 2C — step 8): consult any FK
1117        // referrers BEFORE the delete is applied.  An Abort action
1118        // raises a typed error and prevents the foreign delete from
1119        // happening at all (matching JE's `ForeignConstraintException`
1120        // semantics).  Cascade / Nullify (steps 9 / 10) mutate child
1121        // records under the same caller-supplied txn so the foreign
1122        // delete and its consequences commit / abort together.
1123        let fk_referrers = self.live_fk_referrers();
1124        if !fk_referrers.is_empty() {
1125            for referrer in &fk_referrers {
1126                referrer.on_foreign_key_deleted(txn, &key)?;
1127            }
1128        }
1129
1130        // Inner closure shared between the explicit-txn and synthetic
1131        // auto-txn paths: scans + deletes every duplicate of `key_bytes`
1132        // through the supplied `cursor`.  See comment in pre-Wave-1A
1133        // delete for the dup-loop rationale (BDB-JE
1134        // `Database.delete(key)` semantics).
1135        let mut run_delete = |cursor: &mut CursorImpl| -> Result<bool> {
1136            let mut deleted_any = false;
1137            while let noxu_dbi::OperationStatus::Success = cursor
1138                .search(key_bytes, None, SearchMode::Set)
1139                .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
1140            {
1141                if track_old_data {
1142                    let (_, v) = cursor.get_current().map_err(|e| {
1143                        NoxuError::OperationNotAllowed(e.to_string())
1144                    })?;
1145                    deleted_old_values.push(v);
1146                }
1147                cursor.delete().map_err(|e| {
1148                    NoxuError::OperationNotAllowed(e.to_string())
1149                })?;
1150                deleted_any = true;
1151            }
1152            Ok(deleted_any)
1153        };
1154
1155        let deleted_any = match txn {
1156            Some(t) => {
1157                let mut cursor = self.make_cursor_for_txn(t);
1158                run_delete(&mut cursor)?
1159            }
1160            None => self.with_auto_txn(&mut run_delete)?,
1161        };
1162
1163        // v1.6 (audit C3): fan out the secondary cleanup under the
1164        // caller's txn so the primary delete and every secondary
1165        // tombstone commit / abort together.
1166        if deleted_any && !secondaries.is_empty() {
1167            for old_bytes in &deleted_old_values {
1168                let old_entry = DatabaseEntry::from_bytes(old_bytes);
1169                for hook in &secondaries {
1170                    hook.maintain(txn, &key, Some(&old_entry), None)?;
1171                }
1172            }
1173        }
1174
1175        // DB-TRIG: fire Trigger.delete within the transaction for each
1176        // removed (key, data) pair, after the change is applied.  JE
1177        // `TriggerManager.runDeleteTriggers(locker, dbImpl, key, oldData)`.
1178        if deleted_any && self.has_user_triggers() {
1179            for old_bytes in &deleted_old_values {
1180                self.fire_delete_triggers(txn, key_bytes, old_bytes);
1181            }
1182        }
1183
1184        let status = if deleted_any {
1185            OperationStatus::Success
1186        } else {
1187            OperationStatus::NotFound
1188        };
1189        if status == OperationStatus::Success {
1190            self.throughput.n_pri_deletes.fetch_add(1, Ordering::Relaxed);
1191        } else {
1192            self.throughput.n_pri_delete_fails.fetch_add(1, Ordering::Relaxed);
1193        }
1194        observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "delete");
1195        Ok(deleted_any)
1196    }
1197
1198    /// Opens an auto-commit cursor for iterating over database records
1199    /// (review P0-1: returns `Cursor<'_>`; review P0-2: auto-commit is the
1200    /// unadorned form, the transactional form is [`Self::open_cursor_in`]).
1201    ///
1202    /// In auto-commit mode each cursor write is its own transaction and is
1203    /// fsynced before returning.
1204    ///
1205    /// # Arguments
1206    /// * `config` - Optional cursor configuration
1207    ///
1208    /// # Errors
1209    /// Returns an error if the database is closed.
1210    pub fn open_cursor(
1211        &self,
1212        config: Option<&CursorConfig>,
1213    ) -> Result<Cursor<'static>> {
1214        self.open_cursor_internal(None, config)
1215    }
1216
1217    /// Opens a cursor bound to an explicit transaction (review P0-1/P0-2).
1218    ///
1219    /// The cursor binds to the transaction's `Locker`: every cursor `get`
1220    /// acquires shared locks tracked by the txn, every cursor `put`/`delete`
1221    /// acquires exclusive locks and is rolled back if the txn aborts.
1222    ///
1223    /// The returned `Cursor<'txn>` borrows `txn`, so the borrow checker
1224    /// rejects any attempt to commit or drop the transaction while the
1225    /// cursor is still alive — the old "close the cursor before commit"
1226    /// prose invariant is now a compile error.
1227    ///
1228    /// # Arguments
1229    /// * `txn` - the transaction the cursor participates in
1230    /// * `config` - Optional cursor configuration
1231    ///
1232    /// # Errors
1233    /// Returns an error if the database is closed or a transactional handle
1234    /// is used against a non-transactional database.
1235    pub fn open_cursor_in<'txn>(
1236        &self,
1237        txn: &'txn Transaction,
1238        config: Option<&CursorConfig>,
1239    ) -> Result<Cursor<'txn>> {
1240        self.open_cursor_internal(Some(txn), config)
1241    }
1242
1243    /// Shared cursor-open used by [`Self::open_cursor`] /
1244    /// [`Self::open_cursor_in`].  The `'txn` lifetime flows from the
1245    /// `Option<&'txn Transaction>` into the returned `Cursor<'txn>`
1246    /// (review P0-1).
1247    pub(crate) fn open_cursor_internal<'txn>(
1248        &self,
1249        txn: Option<&'txn Transaction>,
1250        config: Option<&CursorConfig>,
1251    ) -> Result<Cursor<'txn>> {
1252        self.check_open()?;
1253
1254        // JE invariant: a transactional cursor cannot be opened on a
1255        // non-transactional database.  JE throws IllegalArgumentException
1256        // in DatabaseTest.testCursor when a Transaction is supplied to
1257        // a non-txnal DB (wave-11-G, database_txn_cursor_on_non_txn_db_rejected).
1258        self.reject_txn_on_non_txnal_db(txn.is_some())?;
1259        let read_only = config.map(|c| c.read_uncommitted).unwrap_or(false)
1260            || self.config.read_only;
1261
1262        let cursor_impl = if read_only {
1263            CursorImpl::new(Arc::clone(&self.db_impl), 0)
1264                .with_env_invalid(Arc::clone(&self.env_invalid))
1265        } else {
1266            // Plumb the caller's txn through to the cursor so that
1267            // cursor reads acquire shared locks via the txn's locker and
1268            // cursor writes acquire exclusive locks (and roll back on
1269            // txn.abort()) rather than auto-committing.  See API audit
1270            // 2026-05 cursor finding C1.
1271            match txn {
1272                Some(t) => self.make_cursor_for_txn(t),
1273                None => self.make_cursor(),
1274            }
1275        };
1276
1277        Ok(Cursor::from_impl(cursor_impl, read_only))
1278    }
1279
1280    /// Returns a lazy forward iterator over all records in the database.
1281    ///
1282    /// Records are fetched one at a time (the underlying cursor advances on
1283    /// each `next()` call).  The full database is **not** eagerly materialised
1284    /// into memory.
1285    ///
1286    /// Pass `txn = Some(&txn)` to iterate within an explicit transaction;
1287    /// pass `None` for an auto-commit (non-transactional) scan.
1288    ///
1289    /// # Example
1290    ///
1291    /// ```no_run
1292    /// # use noxu_db::{Database, DatabaseConfig, DatabaseEntry,
1293    /// #              Environment, EnvironmentConfig};
1294    /// # use std::path::PathBuf;
1295    /// # fn main() -> noxu_db::Result<()> {
1296    /// # let env = Environment::open(EnvironmentConfig::new(PathBuf::from("/tmp/t")).with_allow_create(true))?;
1297    /// # let db = env.open_database(None, "d", &DatabaseConfig::new().with_allow_create(true).with_transactional(true))?;
1298    /// for result in db.iter(None)? {
1299    ///     let (key, val) = result?;
1300    ///     println!("{:?} => {:?}", key, val);
1301    /// }
1302    /// # Ok(()) }
1303    /// ```
1304    ///
1305    /// # Errors
1306    /// Returns an error if the database is closed.
1307    pub fn iter<'txn>(
1308        &self,
1309        txn: Option<&'txn Transaction>,
1310    ) -> Result<crate::db_iter::DbIter<'txn>> {
1311        let cursor = self.open_cursor_internal(txn, None)?;
1312        Ok(crate::db_iter::DbIter::new(cursor))
1313    }
1314
1315    /// Returns a lazy iterator over the records whose keys fall within `range`.
1316    ///
1317    /// The iterator is positioned at the first key that satisfies the lower
1318    /// bound (using `SearchGte`) and stops once the key exceeds the upper
1319    /// bound.  All standard `RangeBounds` variants are supported:
1320    /// `..`, `lo..`, `..=hi`, `lo..hi`, `lo..=hi`, etc.
1321    ///
1322    /// Pass `txn = Some(&txn)` to iterate within an explicit transaction;
1323    /// pass `None` for a non-transactional scan.
1324    ///
1325    /// # Example
1326    ///
1327    /// ```no_run
1328    /// # use noxu_db::{Database, DatabaseConfig, DatabaseEntry,
1329    /// #              Environment, EnvironmentConfig};
1330    /// # use std::path::PathBuf;
1331    /// # fn main() -> noxu_db::Result<()> {
1332    /// # let env = Environment::open(EnvironmentConfig::new(PathBuf::from("/tmp/t")).with_allow_create(true))?;
1333    /// # let db = env.open_database(None, "d", &DatabaseConfig::new().with_allow_create(true).with_transactional(true))?;
1334    /// let lo = b"key010";
1335    /// let hi = b"key020";
1336    /// for result in db.range(None, lo.as_ref()..=hi.as_ref())? {
1337    ///     let (key, _val) = result?;
1338    ///     assert!(key.as_slice() >= lo.as_slice());
1339    /// }
1340    /// # Ok(()) }
1341    /// ```
1342    ///
1343    /// # Errors
1344    /// Returns an error if the database is closed.
1345    pub fn range<'txn, K: AsRef<[u8]>>(
1346        &self,
1347        txn: Option<&'txn Transaction>,
1348        range: impl std::ops::RangeBounds<K>,
1349    ) -> Result<crate::db_iter::DbRange<'txn>> {
1350        use std::ops::Bound;
1351        let map_bound = |b: std::ops::Bound<&K>| -> std::ops::Bound<Vec<u8>> {
1352            match b {
1353                Bound::Included(k) => Bound::Included(k.as_ref().to_vec()),
1354                Bound::Excluded(k) => Bound::Excluded(k.as_ref().to_vec()),
1355                Bound::Unbounded => Bound::Unbounded,
1356            }
1357        };
1358        let start = map_bound(range.start_bound());
1359        let end = map_bound(range.end_bound());
1360        let cursor = self.open_cursor_internal(txn, None)?;
1361        Ok(crate::db_iter::DbRange::new(cursor, start, end))
1362    }
1363    ///
1364    ///
1365    ///
1366    /// # Arguments
1367    /// * `key`    - The database key under which the sequence record is stored.
1368    /// * `config` - Sequence configuration (use `SequenceConfig::new()` for defaults).
1369    ///
1370    /// # Errors
1371    /// Returns an error if the database is closed, the config is invalid, or
1372    /// `allow_create` is false and the sequence does not exist.
1373    pub fn open_sequence<'db>(
1374        &'db self,
1375        key: &DatabaseEntry,
1376        config: SequenceConfig,
1377    ) -> Result<Sequence<'db>> {
1378        self.check_open()?;
1379        Sequence::open(self, key, config)
1380    }
1381
1382    /// Closes the database handle.
1383    ///
1384    ///
1385    ///
1386    /// # Errors
1387    /// Returns an error if the database is already closed
1388    pub fn close(&self) -> Result<()> {
1389        if !self.open.load(Ordering::Acquire) {
1390            return Err(NoxuError::DatabaseClosed);
1391        }
1392
1393        self.open.store(false, Ordering::Release);
1394        let _ = self
1395            .env_impl
1396            .lock()
1397            .close_database(noxu_dbi::DatabaseId::new(self.id as i64));
1398        Ok(())
1399    }
1400
1401    /// Returns the database name.
1402    ///
1403    ///
1404    pub fn name(&self) -> &str {
1405        &self.name
1406    }
1407
1408    /// Returns the database configuration.
1409    ///
1410    ///
1411    pub fn config(&self) -> &DatabaseConfig {
1412        &self.config
1413    }
1414
1415    /// Returns whether this database was created with sorted duplicates.
1416    ///
1417    /// Unlike `config().sorted_duplicates` — which reflects the
1418    /// `DatabaseConfig` the caller *passed* to `open_database` — this reads
1419    /// the property stored in the opened `DatabaseImpl`, so it is correct even
1420    /// when an existing database is reopened without restating its dup-sort
1421    /// flag (as `noxu-admin dump` does).  Mirrors JE
1422    /// `Database.getConfig().getSortedDuplicates()` after
1423    /// `DbInternal.setUseExistingConfig`.
1424    pub fn sorted_duplicates(&self) -> bool {
1425        self.db_impl.read().get_sorted_duplicates()
1426    }
1427
1428    /// Returns the underlying database ID.  Used by FK cascade guards
1429    /// to disambiguate `(db, key)` frames when several databases
1430    /// participate in a cycle.
1431    pub(crate) fn db_id_for_fk_guard(&self) -> u64 {
1432        self.id
1433    }
1434
1435    /// Registers a secondary index for automatic maintenance.
1436    ///
1437    /// v1.6 (audit C3 — associate() hook): every [`SecondaryDatabase`]
1438    /// downgrades its inner `Arc<SecondaryHookState>` to a `Weak` and
1439    /// stores it here.  Subsequent `put` / `delete` calls iterate the
1440    /// list and forward the same txn to every live secondary, dropping
1441    /// dead `Weak` entries on the fly.
1442    pub(crate) fn register_secondary(
1443        &self,
1444        hook: std::sync::Weak<
1445            dyn crate::secondary_database::SecondaryHook + Send + Sync,
1446        >,
1447    ) {
1448        let mut guard = self.secondaries.write();
1449        // Compact dead Weak entries lazily on every registration so the
1450        // list does not grow unboundedly with churn.
1451        guard.retain(|w| w.strong_count() > 0);
1452        guard.push(hook);
1453    }
1454
1455    /// Returns a snapshot of every live registered secondary.  Used by
1456    /// the automatic-maintenance plumbing in `put` / `delete` to drive
1457    /// secondaries without holding the registry lock across the
1458    /// secondary call.  Dead `Weak` entries are dropped from the
1459    /// returned list (and — because we re-acquire the registry write
1460    /// lock at registration time — lazily compacted from the registry
1461    /// itself).
1462    pub(crate) fn live_secondaries(
1463        &self,
1464    ) -> Vec<Arc<dyn crate::secondary_database::SecondaryHook + Send + Sync>>
1465    {
1466        self.secondaries.read().iter().filter_map(|w| w.upgrade()).collect()
1467    }
1468
1469    /// Whether any user triggers are registered on this database (DB-TRIG).
1470    /// JE `DatabaseImpl.hasUserTriggers()` — the single fast-path check that
1471    /// keeps the no-trigger write path free of any trigger work.
1472    fn has_user_triggers(&self) -> bool {
1473        self.db_impl.read().has_user_triggers()
1474    }
1475
1476    /// Fire `Trigger.put` for every registered trigger, in registration order,
1477    /// and record this database on the transaction so its commit/abort
1478    /// triggers fire on resolution (DB-TRIG).
1479    ///
1480    /// Called after the record modification is applied, within the
1481    /// transaction — JE `Cursor.putNotify` -> `TriggerManager.runPutTriggers`,
1482    /// which also calls `Txn.noteTriggerDb`.
1483    fn fire_put_triggers(
1484        &self,
1485        txn: Option<&Transaction>,
1486        key: &[u8],
1487        old_data: Option<&[u8]>,
1488        new_data: &[u8],
1489    ) {
1490        let (db_id, triggers) = {
1491            let db = self.db_impl.read();
1492            (db.get_id().id() as u64, db.triggers().to_vec())
1493        };
1494        if triggers.is_empty() {
1495            return;
1496        }
1497        let txn_id = txn.map(Transaction::id);
1498        for trigger in &triggers {
1499            trigger.put(txn_id, key, old_data, new_data);
1500        }
1501        // JE Txn.noteTriggerDb: remember the modified DB so commit/abort
1502        // triggers fire later.  Only meaningful under an explicit txn (the
1503        // auto-commit path commits immediately and has no handle to note).
1504        if let Some(t) = txn {
1505            t.note_trigger_db(db_id, &triggers);
1506        }
1507    }
1508
1509    /// Fire `Trigger.delete` for every registered trigger, in registration
1510    /// order, and record this database on the transaction (DB-TRIG).
1511    ///
1512    /// JE `Cursor.deleteInternal` -> `TriggerManager.runDeleteTriggers` +
1513    /// `Txn.noteTriggerDb`.
1514    fn fire_delete_triggers(
1515        &self,
1516        txn: Option<&Transaction>,
1517        key: &[u8],
1518        old_data: &[u8],
1519    ) {
1520        let (db_id, triggers) = {
1521            let db = self.db_impl.read();
1522            (db.get_id().id() as u64, db.triggers().to_vec())
1523        };
1524        if triggers.is_empty() {
1525            return;
1526        }
1527        let txn_id = txn.map(Transaction::id);
1528        for trigger in &triggers {
1529            trigger.delete(txn_id, key, old_data);
1530        }
1531        if let Some(t) = txn {
1532            t.note_trigger_db(db_id, &triggers);
1533        }
1534    }
1535
1536    /// Registers an FK referrer that points at this primary as its
1537    /// foreign-key target (v1.6 audit C2 / Decision 2C — Abort hook).
1538    pub(crate) fn register_fk_referrer(
1539        &self,
1540        referrer: std::sync::Weak<
1541            dyn crate::secondary_database::FkReferrer + Send + Sync,
1542        >,
1543    ) {
1544        let mut guard = self.fk_referrers.write();
1545        guard.retain(|w| w.strong_count() > 0);
1546        guard.push(referrer);
1547    }
1548
1549    /// Snapshot of every live FK referrer.
1550    pub(crate) fn live_fk_referrers(
1551        &self,
1552    ) -> Vec<Arc<dyn crate::secondary_database::FkReferrer + Send + Sync>> {
1553        self.fk_referrers.read().iter().filter_map(|w| w.upgrade()).collect()
1554    }
1555
1556    /// Returns an approximate count of records in the database.
1557    ///
1558    /// reads the per-database `AtomicU64` entry
1559    /// counter, giving O(1) performance analogous to an O(1) counter.
1560    ///
1561    /// The counter is incremented on every new insert and decremented on every
1562    /// delete (including transaction aborts that undo inserts).
1563    ///
1564    /// # Errors
1565    /// Returns an error if the database is closed
1566    pub fn count(&self) -> Result<u64> {
1567        self.check_open()?;
1568        Ok(self.db_impl.read().entry_count())
1569    }
1570
1571    /// Returns all records as `(key_bytes, data_bytes)` pairs in key order.
1572    ///
1573    /// This is a helper for schema evolution: it uses the lower-level
1574    /// `CursorImpl` directly so each iteration yields raw `Vec<u8>` pairs
1575    /// without allocating a pair of `DatabaseEntry` values per record.
1576    ///
1577    /// # Errors
1578    /// Returns an error if the database is closed or a cursor operation fails.
1579    pub fn scan_all_kv(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1580        self.check_open()?;
1581
1582        let mut cursor = CursorImpl::new(Arc::clone(&self.db_impl), 0)
1583            .with_env_invalid(Arc::clone(&self.env_invalid));
1584        let first_status = cursor
1585            .get_first()
1586            .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1587
1588        if first_status != noxu_dbi::OperationStatus::Success {
1589            return Ok(Vec::new());
1590        }
1591
1592        let mut records = Vec::new();
1593        loop {
1594            let (k, v) = cursor
1595                .get_current()
1596                .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1597            records.push((k, v));
1598
1599            let status = cursor
1600                .retrieve_next(GetMode::Next)
1601                .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1602            if status != noxu_dbi::OperationStatus::Success {
1603                break;
1604            }
1605        }
1606
1607        Ok(records)
1608    }
1609
1610    /// Returns whether the database handle is valid.
1611    ///
1612    ///
1613    pub fn is_valid(&self) -> bool {
1614        self.open.load(Ordering::Acquire)
1615    }
1616
1617    /// Returns the current state of the database handle.
1618    pub fn state(&self) -> DbState {
1619        if self.open.load(Ordering::Acquire) {
1620            DbState::Open
1621        } else {
1622            DbState::Closed
1623        }
1624    }
1625
1626    /// Flushes all pending writes for this database to stable storage.
1627    ///
1628    /// Implements `Database.sync()` — issues an fdatasync on the log file,
1629    /// ensuring that all writes made by non-transactional or deferred-sync
1630    /// operations are durable before returning.
1631    ///
1632    /// # Returns
1633    /// `Ok(())` on success. Acts as a no-op for non-transactional /
1634    /// in-memory environments where no log manager is configured.
1635    ///
1636    /// # Errors
1637    /// Returns an error if the database is closed or the underlying
1638    /// log-manager flush fails.
1639    pub fn sync(&self) -> Result<()> {
1640        self.check_open()?;
1641        if let Some(lm) = &self.log_manager {
1642            lm.flush_sync()
1643                .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1644        }
1645        Ok(())
1646    }
1647
1648    /// Preloads the database into cache by scanning the B-tree.
1649    ///
1650    /// Walks the tree, touching each internal-node and BIN level so they
1651    /// are pulled into the in-memory cache.  Useful for warming the
1652    /// cache before a workload begins.
1653    ///
1654    /// # Limitations
1655    /// * The current implementation warms the BIN/IN structure only;
1656    ///   `PreloadConfig::load_lns` therefore makes `lns_loaded` report
1657    ///   the *number of LN slots in the tree* rather than the number
1658    ///   of LNs actually fetched off disk.  Full LN warming is
1659    ///   tracked as a future-work item; the engine has no public
1660    ///   single-shot LN fetch API today, so the only way to warm an
1661    ///   LN is to position a cursor on its slot.
1662    /// * `PreloadConfig::max_millis` is honoured: the call returns
1663    ///   early once the wall-clock budget is exceeded, with the
1664    ///   partial results in the returned `PreloadStats`.
1665    ///
1666    /// # Arguments
1667    /// * `config` - Controls limits on preload duration and memory
1668    ///
1669    /// # Returns
1670    /// Statistics about what was preloaded.
1671    pub fn preload(&self, config: &PreloadConfig) -> Result<PreloadStats> {
1672        self.check_open()?;
1673        let start = std::time::Instant::now();
1674        let max_millis = config.max_millis;
1675        let mut stats =
1676            PreloadStats { bins_loaded: 0, lns_loaded: 0, elapsed_ms: 0 };
1677
1678        let guard = self.db_impl.read();
1679        if let Some(tree_stats) = guard.collect_btree_stats() {
1680            // collect_btree_stats() walks every node in the tree, which has
1681            // the side effect of pulling all BINs/INs into memory (cache).
1682            stats.bins_loaded = tree_stats.n_bins;
1683            if config.load_lns {
1684                // F9 (residual): this is the slot count, not a count of
1685                // actual LN fetches.  See the doc comment above.
1686                stats.lns_loaded = tree_stats.n_entries;
1687            }
1688        }
1689
1690        // Audit database F10 (Wave 2C-4): honour `max_millis` as a
1691        // post-walk diagnostic.  `collect_btree_stats` is currently
1692        // not interruptible, so the time bound surfaces in `stats`
1693        // (callers can detect over-budget runs by comparing
1694        // `elapsed_ms` to their config) but does not yet stop the
1695        // walk early.  Tracked for v2.0 alongside true LN warming.
1696        let elapsed_ms = start.elapsed().as_millis() as u64;
1697        if max_millis > 0 && elapsed_ms > max_millis {
1698            log::warn!(
1699                "Database::preload: walk took {elapsed_ms} ms, exceeding \
1700                 max_millis budget of {max_millis} ms (advisory until \
1701                 the BIN walker becomes interruptible)",
1702            );
1703        }
1704        stats.elapsed_ms = elapsed_ms;
1705        Ok(stats)
1706    }
1707
1708    /// Returns B-tree statistics for this database.
1709    ///
1710    /// Implements `Database.getStats(StatsConfig)`.
1711    ///
1712    /// When `config.fast` is `true`, only the O(1) entry-count is returned
1713    /// and no tree traversal is performed.  When `fast` is `false` (default),
1714    /// the full tree is walked to populate all node-count fields.
1715    ///
1716    /// # Errors
1717    /// Returns an error if the database is closed.
1718    pub fn stats(&self, config: Option<&StatsConfig>) -> Result<DatabaseStats> {
1719        self.check_open()?;
1720        let fast = config.map(|c| c.fast).unwrap_or(false);
1721
1722        let btree = if fast {
1723            // Fast path: O(1) counter only; skip tree traversal.
1724            BtreeStats {
1725                leaf_node_count: self.db_impl.read().entry_count(),
1726                ..Default::default()
1727            }
1728        } else {
1729            // Full path: walk the tree.
1730            let guard = self.db_impl.read();
1731            match guard.collect_btree_stats() {
1732                Some(ts) => BtreeStats {
1733                    leaf_node_count: ts.n_entries,
1734                    deleted_leaf_node_count: 0,
1735                    bottom_internal_node_count: ts.n_bins,
1736                    internal_node_count: ts.n_ins,
1737                    main_tree_max_depth: ts.height,
1738                },
1739                None => BtreeStats {
1740                    leaf_node_count: guard.entry_count(),
1741                    ..Default::default()
1742                },
1743            }
1744        };
1745
1746        Ok(DatabaseStats { btree })
1747    }
1748
1749    /// Verifies the structural integrity of this database's B-tree.
1750    ///
1751    /// Walks the B-tree from root to BIN leaves and checks:
1752    /// - Each upper IN's children are accessible (non-null child references).
1753    /// - Each BIN entry that is not known-deleted has a valid (non-NULL) LSN.
1754    /// - The BIN's first key is >= the parent routing key (key-range containment).
1755    ///
1756    /// Mirrors `Database.verify(VerifyConfig)` — calls `BtreeVerifier` on
1757    /// the underlying tree.
1758    ///
1759    /// # Arguments
1760    /// * `config` - Verification options (which checks to run, max errors, etc.)
1761    ///
1762    /// # Returns
1763    /// A `VerifyResult` with any structural errors and the count of records verified.
1764    ///
1765    /// # Errors
1766    /// Returns an error if the database is closed.
1767    pub fn verify(
1768        &self,
1769        config: &noxu_engine::VerifyConfig,
1770    ) -> Result<noxu_engine::VerifyResult> {
1771        self.check_open()?;
1772        let guard = self.db_impl.read();
1773        Ok(noxu_engine::verify_database_impl(&guard, config))
1774    }
1775
1776    /// Creates a join cursor that returns records matching all secondary-key
1777    /// constraints expressed by the pre-positioned `cursors`.
1778    ///
1779    /// Mirrors `Database.join(SecondaryCursor[], JoinConfig)`.
1780    ///
1781    /// Each cursor in `cursors` must already be positioned at the desired
1782    /// secondary key value (e.g. via `SecondaryCursor::get_search_key`).
1783    /// The join algorithm iterates through all candidate primary keys from
1784    /// `cursors[0]` and probes `cursors[1..n]` to confirm each candidate
1785    /// also appears in their secondary keys.  Candidates that pass all
1786    /// probes are returned by [`JoinCursor::get_next`].
1787    ///
1788    /// Unless `config.no_sort` is `true`, the cursor array is re-ordered by
1789    /// ascending duplicate-count estimate before the join starts, matching
1790    /// JE's optimisation for minimum candidate-set size.
1791    ///
1792    /// The returned `JoinCursor` owns the `cursors` for its lifetime.
1793    ///
1794    /// # Errors
1795    /// Returns an error if this database handle is closed.
1796    pub fn join<'db>(
1797        &'db self,
1798        cursors: Vec<SecondaryCursor<'db>>,
1799        config: Option<JoinConfig>,
1800    ) -> Result<JoinCursor<'db>> {
1801        self.check_open()?;
1802        JoinCursor::new(self, cursors, config)
1803    }
1804
1805    /// Checks if the database is open, returns an error if not.
1806    ///
1807    /// X-13: also checks the environment validity flags so that reads and
1808    /// writes return `EnvironmentFailure` after an fsync error or explicit
1809    /// `EnvironmentImpl::invalidate()` call rather than silently succeeding
1810    /// on stale BIN data.
1811    /// TXN-6 (JE invariant): a transactional handle must not be used against a
1812    /// non-transactional database.  JE `LockerFactory.getWritableLocker`/
1813    /// `getReadableLocker` throw `IllegalArgumentException` on EVERY operation
1814    /// (not just cursor-open) when a `Transaction` is supplied to a non-txnal DB.
1815    /// Shared by get/put/delete/get_with_options/put_with_options/open_cursor.
1816    fn reject_txn_on_non_txnal_db(&self, has_txn: bool) -> Result<()> {
1817        if has_txn && !self.config.transactional {
1818            return Err(NoxuError::IllegalArgument(
1819                "a transaction cannot be used with a \
1820                 non-transactional database"
1821                    .to_string(),
1822            ));
1823        }
1824        Ok(())
1825    }
1826
1827    fn check_open(&self) -> Result<()> {
1828        // Check environment validity first — explicit invalidation.
1829        if self.env_invalid.load(Ordering::Acquire) {
1830            return Err(NoxuError::environment_with_reason(
1831                crate::error::EnvironmentFailureReason::UnexpectedStateFatal,
1832                "environment has been invalidated".to_string(),
1833            ));
1834        }
1835        // Check I/O failure (C-2 / fsync-gate).
1836        if self
1837            .log_manager
1838            .as_ref()
1839            .is_some_and(|lm| lm.io_invalid.load(Ordering::Acquire))
1840        {
1841            return Err(NoxuError::environment_with_reason(
1842                crate::error::EnvironmentFailureReason::LogWrite,
1843                "I/O failure: environment invalidated by fsync error"
1844                    .to_string(),
1845            ));
1846        }
1847        if !self.open.load(Ordering::Acquire) {
1848            return Err(NoxuError::DatabaseClosed);
1849        }
1850        Ok(())
1851    }
1852
1853    /// Public-ish accessor for the cached log manager, used by
1854    /// [`crate::disk_ordered_cursor::open_disk_ordered_cursor_multi`].
1855    /// Returns `None` for non-WAL environments.
1856    pub(crate) fn cached_log_manager(
1857        &self,
1858    ) -> Option<&std::sync::Arc<noxu_log::LogManager>> {
1859        self.log_manager.as_ref()
1860    }
1861
1862    /// Cached cleaner `FileProtector` for this database's environment, used by
1863    /// the disk-ordered-cursor producer to protect the files it scans from
1864    /// cleaner deletion (CLN-7).  `None` when the environment has no cleaner.
1865    pub(crate) fn cached_file_protector(
1866        &self,
1867    ) -> Option<std::sync::Arc<noxu_cleaner::FileProtector>> {
1868        self.file_protector.clone()
1869    }
1870
1871    /// Public-ish accessor used by the disk-ordered-cursor helper to
1872    /// validate that the database is still open before scanning.
1873    pub(crate) fn check_open_for_doc(&self) -> Result<()> {
1874        self.check_open()
1875    }
1876
1877    /// Returns this database's `DatabaseId` for use by the disk-ordered
1878    /// cursor producer.
1879    pub(crate) fn database_id_for_doc(&self) -> noxu_dbi::DatabaseId {
1880        noxu_dbi::DatabaseId::new(self.id as i64)
1881    }
1882
1883    /// The env's `DOS_PRODUCER_QUEUE_TIMEOUT` (ms), read from the owning
1884    /// `EnvironmentImpl` at cursor-open time (not a hot path).  Passed to the
1885    /// disk-ordered-cursor producer so a lagging consumer fails the scan
1886    /// instead of hanging (JE `DOS_PRODUCER_QUEUE_TIMEOUT`).
1887    pub(crate) fn dos_producer_queue_timeout_ms(&self) -> u64 {
1888        self.env_impl.lock().get_dos_producer_queue_timeout_ms()
1889    }
1890
1891    /// Checks if the database is writable, returns an error if not.
1892    fn check_writable(&self) -> Result<()> {
1893        if self.config.read_only {
1894            return Err(NoxuError::ReadOnly);
1895        }
1896        Ok(())
1897    }
1898
1899    /// Unify the empty-key contract
1900    /// across `get` / `put` / `put_no_overwrite` / `put_with_options`
1901    /// / `delete`.  Returns the key bytes if the entry has data set
1902    /// (even if zero-length); rejects `None`-data keys with a typed
1903    /// `IllegalArgument` so the previous put-vs-get asymmetry can no
1904    /// longer black-hole records under a `None` key.
1905    #[allow(dead_code)] // documented empty-key contract helper, not yet wired
1906    fn require_key_bytes<'a>(
1907        key: &'a DatabaseEntry,
1908        op: &'static str,
1909    ) -> Result<&'a [u8]> {
1910        match key.data_opt() {
1911            Some(k) => Ok(k),
1912            None => Err(NoxuError::IllegalArgument(format!(
1913                "{op}: key DatabaseEntry has no data; \
1914                 use DatabaseEntry::from_bytes(...) or set_data(...) \
1915                 (Some(&[]) for an explicit empty key)",
1916            ))),
1917        }
1918    }
1919}
1920
1921impl Drop for Database {
1922    fn drop(&mut self) {
1923        // Best effort close on drop
1924        let _ = self.close();
1925    }
1926}
1927
1928#[cfg(test)]
1929mod tests {
1930    use super::*;
1931    use crate::environment::Environment;
1932    use crate::environment_config::EnvironmentConfig;
1933    use tempfile::TempDir;
1934
1935    fn temp_env_and_db() -> (TempDir, Environment, Database) {
1936        let temp_dir = TempDir::new().unwrap();
1937        let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1938            .with_allow_create(true)
1939            .with_transactional(true);
1940        let env = Environment::open(env_config).unwrap();
1941
1942        let db_config = DatabaseConfig::new()
1943            .with_allow_create(true)
1944            .with_transactional(true);
1945        let db = env.open_database(None, "testdb", &db_config).unwrap();
1946
1947        (temp_dir, env, db)
1948    }
1949
1950    #[test]
1951    fn test_database_name() {
1952        let (_temp_dir, _env, db) = temp_env_and_db();
1953        assert_eq!(db.name(), "testdb");
1954    }
1955
1956    #[test]
1957    fn test_put_and_get() {
1958        let (_temp_dir, _env, db) = temp_env_and_db();
1959
1960        let key = DatabaseEntry::from_bytes(b"key1");
1961        let value = DatabaseEntry::from_bytes(b"value1");
1962
1963        db.put(&key, &value).unwrap();
1964
1965        let mut retrieved = DatabaseEntry::new();
1966        let result = db.get_into(None, &key, &mut retrieved).unwrap();
1967        assert!(result);
1968        assert_eq!(retrieved.data_opt().unwrap(), b"value1");
1969    }
1970
1971    #[test]
1972    fn test_get_nonexistent() {
1973        let (_temp_dir, _env, db) = temp_env_and_db();
1974
1975        let key = DatabaseEntry::from_bytes(b"nonexistent");
1976        let mut data = DatabaseEntry::new();
1977
1978        let result = db.get_into(None, &key, &mut data).unwrap();
1979        assert!(!result);
1980    }
1981
1982    /// ("partial-put length
1983    /// mismatch silent truncation"): a partial put whose `data` slice
1984    /// differs in length from the configured partial-length must be
1985    /// rejected with a typed error instead of silently truncating or
1986    /// padding the splice.
1987    #[test]
1988    fn test_partial_put_length_mismatch_rejected() {
1989        let (_temp_dir, _env, db) = temp_env_and_db();
1990
1991        let key = DatabaseEntry::from_bytes(b"k");
1992        db.put(&key, DatabaseEntry::from_bytes(b"hello world")).unwrap();
1993
1994        // Partial offset=6, partial_length=5 ("world"), but only 3 bytes
1995        // supplied.  Used to silently truncate; now rejected.
1996        let mut patch = DatabaseEntry::from_bytes(b"abc");
1997        patch.set_partial(6, 5, true);
1998        let err = db.put_partial(None, &key, &patch).unwrap_err();
1999        assert!(
2000            matches!(err, NoxuError::IllegalArgument(_)),
2001            "expected IllegalArgument, got {err:?}"
2002        );
2003        assert!(
2004            err.to_string().contains("partial"),
2005            "expected partial-related message, got {}",
2006            err
2007        );
2008
2009        // The on-disk record is unchanged because the call returned
2010        // before any write.
2011        let mut buf = DatabaseEntry::new();
2012        let status = db.get_into(None, &key, &mut buf).unwrap();
2013        assert!(status);
2014        assert_eq!(buf.data_opt().unwrap(), b"hello world");
2015    }
2016
2017    /// Companion: when data.len() == partial_length the partial put
2018    /// patches the slice in place and other bytes are preserved.
2019    #[test]
2020    fn test_partial_put_exact_length_patches_in_place() {
2021        let (_temp_dir, _env, db) = temp_env_and_db();
2022
2023        let key = DatabaseEntry::from_bytes(b"k");
2024        db.put(&key, DatabaseEntry::from_bytes(b"hello world")).unwrap();
2025
2026        let mut patch = DatabaseEntry::from_bytes(b"WORLD");
2027        patch.set_partial(6, 5, true);
2028        db.put_partial(None, &key, &patch).unwrap();
2029
2030        let mut buf = DatabaseEntry::new();
2031        db.get_into(None, &key, &mut buf).unwrap();
2032        assert_eq!(buf.data_opt().unwrap(), b"hello WORLD");
2033    }
2034
2035    #[test]
2036    fn test_put_updates_existing() {
2037        let (_temp_dir, _env, db) = temp_env_and_db();
2038
2039        let key = DatabaseEntry::from_bytes(b"key1");
2040        let value1 = DatabaseEntry::from_bytes(b"value1");
2041        let value2 = DatabaseEntry::from_bytes(b"value2");
2042
2043        db.put(&key, &value1).unwrap();
2044        db.put(&key, &value2).unwrap();
2045
2046        let mut retrieved = DatabaseEntry::new();
2047        db.get_into(None, &key, &mut retrieved).unwrap();
2048        assert_eq!(retrieved.data_opt().unwrap(), b"value2");
2049    }
2050
2051    #[test]
2052    fn test_put_no_overwrite_success() {
2053        let (_temp_dir, _env, db) = temp_env_and_db();
2054
2055        let key = DatabaseEntry::from_bytes(b"key1");
2056        let value = DatabaseEntry::from_bytes(b"value1");
2057
2058        let result = db.put_no_overwrite(&key, &value).unwrap();
2059        assert!(result);
2060    }
2061
2062    #[test]
2063    fn test_put_no_overwrite_key_exists() {
2064        let (_temp_dir, _env, db) = temp_env_and_db();
2065
2066        let key = DatabaseEntry::from_bytes(b"key1");
2067        let value1 = DatabaseEntry::from_bytes(b"value1");
2068        let value2 = DatabaseEntry::from_bytes(b"value2");
2069
2070        db.put(&key, &value1).unwrap();
2071        let result = db.put_no_overwrite(&key, &value2).unwrap();
2072        assert!(!result);
2073
2074        // Verify original value is unchanged
2075        let mut retrieved = DatabaseEntry::new();
2076        db.get_into(None, &key, &mut retrieved).unwrap();
2077        assert_eq!(retrieved.data_opt().unwrap(), b"value1");
2078    }
2079
2080    #[test]
2081    fn test_delete() {
2082        let (_temp_dir, _env, db) = temp_env_and_db();
2083
2084        let key = DatabaseEntry::from_bytes(b"key1");
2085        let value = DatabaseEntry::from_bytes(b"value1");
2086
2087        db.put(&key, &value).unwrap();
2088        let result = db.delete(&key).unwrap();
2089        assert!(result);
2090
2091        let mut retrieved = DatabaseEntry::new();
2092        let result = db.get_into(None, &key, &mut retrieved).unwrap();
2093        assert!(!result);
2094    }
2095
2096    #[test]
2097    fn test_delete_nonexistent() {
2098        let (_temp_dir, _env, db) = temp_env_and_db();
2099
2100        let key = DatabaseEntry::from_bytes(b"nonexistent");
2101        let result = db.delete(&key).unwrap();
2102        assert!(!result);
2103    }
2104
2105    #[test]
2106    fn test_count() {
2107        let (_temp_dir, _env, db) = temp_env_and_db();
2108
2109        assert_eq!(db.count().unwrap(), 0);
2110
2111        let key1 = DatabaseEntry::from_bytes(b"key1");
2112        let value1 = DatabaseEntry::from_bytes(b"value1");
2113        db.put(&key1, &value1).unwrap();
2114        assert_eq!(db.count().unwrap(), 1);
2115
2116        let key2 = DatabaseEntry::from_bytes(b"key2");
2117        let value2 = DatabaseEntry::from_bytes(b"value2");
2118        db.put(&key2, &value2).unwrap();
2119        assert_eq!(db.count().unwrap(), 2);
2120
2121        db.delete(&key1).unwrap();
2122        assert_eq!(db.count().unwrap(), 1);
2123    }
2124
2125    #[test]
2126    fn test_close() {
2127        let (_temp_dir, _env, db) = temp_env_and_db();
2128        assert!(db.is_valid());
2129        db.close().unwrap();
2130        assert!(!db.is_valid());
2131    }
2132
2133    #[test]
2134    fn test_close_twice_fails() {
2135        let (_temp_dir, _env, db) = temp_env_and_db();
2136        db.close().unwrap();
2137        let result = db.close();
2138        assert!(result.is_err());
2139    }
2140
2141    #[test]
2142    fn test_operations_on_closed_database_fail() {
2143        let (_temp_dir, _env, db) = temp_env_and_db();
2144        db.close().unwrap();
2145
2146        let key = DatabaseEntry::from_bytes(b"key1");
2147        let value = DatabaseEntry::from_bytes(b"value1");
2148        let mut data = DatabaseEntry::new();
2149
2150        assert!(db.get_into(None, &key, &mut data).is_err());
2151        assert!(db.put(&key, &value).is_err());
2152        assert!(db.put_no_overwrite(&key, &value).is_err());
2153        assert!(db.delete(&key).is_err());
2154        assert!(db.count().is_err());
2155        assert!(db.open_cursor(None).is_err());
2156    }
2157
2158    #[test]
2159    fn test_state() {
2160        let (_temp_dir, _env, db) = temp_env_and_db();
2161        assert_eq!(db.state(), DbState::Open);
2162        db.close().unwrap();
2163        assert_eq!(db.state(), DbState::Closed);
2164    }
2165
2166    #[test]
2167    fn test_read_only_database() {
2168        let temp_dir = TempDir::new().unwrap();
2169        let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2170            .with_allow_create(true);
2171        let env = Environment::open(env_config).unwrap();
2172
2173        let db_config = DatabaseConfig::new()
2174            .with_allow_create(true)
2175            .with_transactional(true)
2176            .with_read_only(true);
2177        let db = env.open_database(None, "readonly_db", &db_config).unwrap();
2178
2179        let key = DatabaseEntry::from_bytes(b"key1");
2180        let value = DatabaseEntry::from_bytes(b"value1");
2181
2182        // Write operations should fail
2183        assert!(db.put(&key, &value).is_err());
2184        assert!(db.put_no_overwrite(&key, &value).is_err());
2185        assert!(db.delete(&key).is_err());
2186    }
2187
2188    #[test]
2189    fn test_multiple_databases() {
2190        let temp_dir = TempDir::new().unwrap();
2191        let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2192            .with_allow_create(true);
2193        let env = Environment::open(env_config).unwrap();
2194
2195        let db_config = DatabaseConfig::new()
2196            .with_allow_create(true)
2197            .with_transactional(true);
2198        let db1 = env.open_database(None, "db1", &db_config).unwrap();
2199        let db2 = env.open_database(None, "db2", &db_config).unwrap();
2200
2201        let key = DatabaseEntry::from_bytes(b"key1");
2202        let value1 = DatabaseEntry::from_bytes(b"value1");
2203        let value2 = DatabaseEntry::from_bytes(b"value2");
2204
2205        db1.put(&key, &value1).unwrap();
2206        db2.put(&key, &value2).unwrap();
2207
2208        let mut retrieved1 = DatabaseEntry::new();
2209        let mut retrieved2 = DatabaseEntry::new();
2210
2211        db1.get_into(None, &key, &mut retrieved1).unwrap();
2212        db2.get_into(None, &key, &mut retrieved2).unwrap();
2213
2214        assert_eq!(retrieved1.data_opt().unwrap(), b"value1");
2215        assert_eq!(retrieved2.data_opt().unwrap(), b"value2");
2216    }
2217
2218    #[test]
2219    fn test_empty_keys_and_values() {
2220        let (_temp_dir, _env, db) = temp_env_and_db();
2221
2222        let empty_key = DatabaseEntry::from_bytes(b"");
2223        let empty_value = DatabaseEntry::from_bytes(b"");
2224
2225        db.put(&empty_key, &empty_value).unwrap();
2226
2227        let mut retrieved = DatabaseEntry::new();
2228        let result = db.get_into(None, &empty_key, &mut retrieved).unwrap();
2229        assert!(result);
2230        assert_eq!(retrieved.data_opt().unwrap(), b"");
2231    }
2232
2233    #[test]
2234    fn test_large_keys_and_values() {
2235        let (_temp_dir, _env, db) = temp_env_and_db();
2236
2237        let large_key = DatabaseEntry::from_bytes(&vec![b'k'; 1000]);
2238        let large_value = DatabaseEntry::from_bytes(&vec![b'v'; 10000]);
2239
2240        db.put(&large_key, &large_value).unwrap();
2241
2242        let mut retrieved = DatabaseEntry::new();
2243        db.get_into(None, &large_key, &mut retrieved).unwrap();
2244        assert_eq!(retrieved.data_opt().unwrap().len(), 10000);
2245        assert!(retrieved.data_opt().unwrap().iter().all(|&b| b == b'v'));
2246    }
2247
2248    #[test]
2249    fn test_binary_keys_and_values() {
2250        let (_temp_dir, _env, db) = temp_env_and_db();
2251
2252        let binary_key = DatabaseEntry::from_bytes(&[0u8, 1, 2, 255, 254, 253]);
2253        let binary_value = DatabaseEntry::from_bytes(&[255u8, 0, 128, 64, 32]);
2254
2255        db.put(&binary_key, &binary_value).unwrap();
2256
2257        let mut retrieved = DatabaseEntry::new();
2258        db.get_into(None, &binary_key, &mut retrieved).unwrap();
2259        assert_eq!(retrieved.data_opt().unwrap(), &[255u8, 0, 128, 64, 32]);
2260    }
2261
2262    #[test]
2263    fn test_scan_all_kv_empty() {
2264        let (_temp_dir, _env, db) = temp_env_and_db();
2265        let kv = db.scan_all_kv().unwrap();
2266        assert!(kv.is_empty());
2267    }
2268
2269    #[test]
2270    fn test_scan_all_kv_returns_records() {
2271        let (_temp_dir, _env, db) = temp_env_and_db();
2272        db.put(
2273            DatabaseEntry::from_vec(vec![1]),
2274            DatabaseEntry::from_vec(vec![10]),
2275        )
2276        .unwrap();
2277        db.put(
2278            DatabaseEntry::from_vec(vec![2]),
2279            DatabaseEntry::from_vec(vec![20]),
2280        )
2281        .unwrap();
2282        let kv = db.scan_all_kv().unwrap();
2283        assert_eq!(kv.len(), 2);
2284    }
2285
2286    #[test]
2287    fn test_scan_all_kv_then_delete() {
2288        let (_temp_dir, _env, db) = temp_env_and_db();
2289        db.put(
2290            DatabaseEntry::from_vec(vec![1]),
2291            DatabaseEntry::from_vec(vec![10]),
2292        )
2293        .unwrap();
2294        db.put(
2295            DatabaseEntry::from_vec(vec![2]),
2296            DatabaseEntry::from_vec(vec![20]),
2297        )
2298        .unwrap();
2299
2300        let kv = db.scan_all_kv().unwrap();
2301        assert_eq!(kv.len(), 2);
2302
2303        for (k, _v) in &kv {
2304            let status = db.delete(DatabaseEntry::from_vec(k.clone())).unwrap();
2305            assert!(status, "delete failed for key {:?}", k);
2306        }
2307
2308        let count = db.count().unwrap();
2309        assert_eq!(count, 0, "expected 0 records after deletes, got {}", count);
2310    }
2311
2312    #[test]
2313    fn test_scan_all_kv_then_delete_u64_be_keys() {
2314        // Simulate the exact pattern used in EntityStore::evolve: big-endian u64 keys.
2315        let (_temp_dir, _env, db) = temp_env_and_db();
2316        for id in [1u64, 2u64] {
2317            let key_bytes = id.to_be_bytes().to_vec();
2318            let val_bytes = format!("user{}", id).into_bytes();
2319            db.put(
2320                DatabaseEntry::from_vec(key_bytes),
2321                DatabaseEntry::from_vec(val_bytes),
2322            )
2323            .unwrap();
2324        }
2325        assert_eq!(db.count().unwrap(), 2);
2326
2327        let records = db.scan_all_kv().unwrap();
2328        assert_eq!(records.len(), 2);
2329
2330        for (k, _v) in records {
2331            let status = db.delete(DatabaseEntry::from_vec(k.clone())).unwrap();
2332            assert!(status, "delete failed for u64 key {:?}", k);
2333        }
2334        assert_eq!(db.count().unwrap(), 0);
2335    }
2336
2337    // ========================================================================
2338    // Additional branch-coverage tests
2339    // ========================================================================
2340
2341    /// get() with a None-data DatabaseEntry returns NotFound.
2342    #[test]
2343    fn test_get_with_none_key_data_returns_not_found() {
2344        let (_temp_dir, _env, db) = temp_env_and_db();
2345        let key_none = DatabaseEntry::new(); // no data set
2346        let mut data = DatabaseEntry::new();
2347
2348        let result = db.get_into(None, &key_none, &mut data).unwrap();
2349        assert!(!result);
2350    }
2351
2352    /// delete() with a None-data DatabaseEntry returns NotFound.
2353    #[test]
2354    fn test_delete_with_none_key_data_returns_not_found() {
2355        let (_temp_dir, _env, db) = temp_env_and_db();
2356        let key_none = DatabaseEntry::new();
2357
2358        let result = db.delete(&key_none).unwrap();
2359        assert!(!result);
2360    }
2361
2362    /// open_cursor() with a CursorConfig that has read_uncommitted=true makes
2363    /// the cursor read-only.
2364    #[test]
2365    fn test_open_cursor_read_uncommitted_config_makes_read_only() {
2366        use crate::cursor_config::CursorConfig;
2367        let (_temp_dir, _env, db) = temp_env_and_db();
2368
2369        let config = CursorConfig::new().with_read_uncommitted(true);
2370        let cursor = db.open_cursor(Some(&config)).unwrap();
2371        assert!(cursor.is_read_only());
2372    }
2373
2374    /// open_cursor() with no config and a non-read-only database produces a
2375    /// writable cursor.
2376    #[test]
2377    fn test_open_cursor_no_config_writable_db_is_writable() {
2378        let (_temp_dir, _env, db) = temp_env_and_db();
2379        let cursor = db.open_cursor(None).unwrap();
2380        assert!(!cursor.is_read_only());
2381    }
2382
2383    /// scan_all_kv() on a closed database returns an error.
2384    #[test]
2385    fn test_scan_all_kv_on_closed_database_fails() {
2386        let (_temp_dir, _env, db) = temp_env_and_db();
2387        db.close().unwrap();
2388        let result = db.scan_all_kv();
2389        assert!(result.is_err());
2390    }
2391
2392    /// put_no_overwrite() on a read-only database returns an error.
2393    #[test]
2394    fn test_put_no_overwrite_on_read_only_database_fails() {
2395        let temp_dir = TempDir::new().unwrap();
2396        let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2397            .with_allow_create(true);
2398        let env = Environment::open(env_config).unwrap();
2399
2400        let db_config = DatabaseConfig::new()
2401            .with_allow_create(true)
2402            .with_transactional(true)
2403            .with_read_only(true);
2404        let db = env.open_database(None, "ro_db", &db_config).unwrap();
2405
2406        let key = DatabaseEntry::from_bytes(b"k");
2407        let val = DatabaseEntry::from_bytes(b"v");
2408        let result = db.put_no_overwrite(&key, &val);
2409        assert!(result.is_err());
2410    }
2411
2412    // =====================================================================
2413    // cursor-failure map_err coverage: use the test hook in noxu-dbi to
2414    // force cursor operations to return Err, exercising the map_err closures
2415    // in Database::get / put / put_no_overwrite / delete / count / scan_all_kv.
2416    // =====================================================================
2417
2418    /// Covers the map_err closure on `cursor.search(...)` inside `get()`.
2419    #[test]
2420    fn test_get_search_map_err_via_hook() {
2421        let (_tmp, _env, db) = temp_env_and_db();
2422        noxu_dbi::set_cursor_fail_after(1); // fail on the 1st check_state (search)
2423        let key = DatabaseEntry::from_bytes(b"any");
2424        let mut data = DatabaseEntry::new();
2425        let result = db.get_into(None, &key, &mut data);
2426        noxu_dbi::clear_cursor_fail_flag();
2427        assert!(result.is_err());
2428    }
2429
2430    /// Covers the map_err closure on `cursor.get_current()` inside `get()`.
2431    #[test]
2432    fn test_get_get_current_map_err_via_hook() {
2433        let (_tmp, _env, db) = temp_env_and_db();
2434        // Insert a key so search can succeed.
2435        db.put(
2436            DatabaseEntry::from_bytes(b"k"),
2437            DatabaseEntry::from_bytes(b"v"),
2438        )
2439        .unwrap();
2440        // fail on the 2nd check (check_initialized inside get_current).
2441        noxu_dbi::set_cursor_fail_after(2);
2442        let key = DatabaseEntry::from_bytes(b"k");
2443        let mut data = DatabaseEntry::new();
2444        let result = db.get_into(None, &key, &mut data);
2445        noxu_dbi::clear_cursor_fail_flag();
2446        assert!(result.is_err());
2447    }
2448
2449    /// Covers the map_err closure on `cursor.put(...)` inside `put()`.
2450    #[test]
2451    fn test_put_map_err_via_hook() {
2452        let (_tmp, _env, db) = temp_env_and_db();
2453        noxu_dbi::set_cursor_fail_after(1);
2454        let key = DatabaseEntry::from_bytes(b"k");
2455        let val = DatabaseEntry::from_bytes(b"v");
2456        let result = db.put(&key, &val);
2457        noxu_dbi::clear_cursor_fail_flag();
2458        assert!(result.is_err());
2459    }
2460
2461    /// Covers the map_err closure on `cursor.put(...)` inside `put_no_overwrite()`.
2462    #[test]
2463    fn test_put_no_overwrite_map_err_via_hook() {
2464        let (_tmp, _env, db) = temp_env_and_db();
2465        noxu_dbi::set_cursor_fail_after(1);
2466        let key = DatabaseEntry::from_bytes(b"k");
2467        let val = DatabaseEntry::from_bytes(b"v");
2468        let result = db.put_no_overwrite(&key, &val);
2469        noxu_dbi::clear_cursor_fail_flag();
2470        assert!(result.is_err());
2471    }
2472
2473    /// Covers the map_err closure on `cursor.search(...)` inside `delete()`.
2474    #[test]
2475    fn test_delete_search_map_err_via_hook() {
2476        let (_tmp, _env, db) = temp_env_and_db();
2477        noxu_dbi::set_cursor_fail_after(1);
2478        let key = DatabaseEntry::from_bytes(b"k");
2479        let result = db.delete(&key);
2480        noxu_dbi::clear_cursor_fail_flag();
2481        assert!(result.is_err());
2482    }
2483
2484    /// Covers the map_err closure on `cursor.delete()` inside `delete()`.
2485    #[test]
2486    fn test_delete_delete_map_err_via_hook() {
2487        let (_tmp, _env, db) = temp_env_and_db();
2488        db.put(
2489            DatabaseEntry::from_bytes(b"k"),
2490            DatabaseEntry::from_bytes(b"v"),
2491        )
2492        .unwrap();
2493        // fail on the 2nd check_state (the delete() call, after search succeeds).
2494        noxu_dbi::set_cursor_fail_after(2);
2495        let key = DatabaseEntry::from_bytes(b"k");
2496        let result = db.delete(&key);
2497        noxu_dbi::clear_cursor_fail_flag();
2498        assert!(result.is_err());
2499    }
2500
2501    /// count() uses the O(1) AtomicU64 counter; cursor-fail hooks do not affect it.
2502    /// Verify the counter is correct across insert/update/delete.
2503    #[test]
2504    fn test_count_atomic_counter_insert_update_delete() {
2505        let (_tmp, _env, db) = temp_env_and_db();
2506
2507        // Empty database starts at 0.
2508        assert_eq!(db.count().unwrap(), 0);
2509
2510        // Insert three distinct keys.
2511        db.put(
2512            DatabaseEntry::from_bytes(b"a"),
2513            DatabaseEntry::from_bytes(b"1"),
2514        )
2515        .unwrap();
2516        db.put(
2517            DatabaseEntry::from_bytes(b"b"),
2518            DatabaseEntry::from_bytes(b"2"),
2519        )
2520        .unwrap();
2521        db.put(
2522            DatabaseEntry::from_bytes(b"c"),
2523            DatabaseEntry::from_bytes(b"3"),
2524        )
2525        .unwrap();
2526        assert_eq!(db.count().unwrap(), 3);
2527
2528        // Overwrite an existing key — count must NOT change.
2529        db.put(
2530            DatabaseEntry::from_bytes(b"a"),
2531            DatabaseEntry::from_bytes(b"updated"),
2532        )
2533        .unwrap();
2534        assert_eq!(db.count().unwrap(), 3);
2535
2536        // Delete one key — count decrements.
2537        db.delete(DatabaseEntry::from_bytes(b"b")).unwrap();
2538        assert_eq!(db.count().unwrap(), 2);
2539    }
2540
2541    /// count() is O(1): verify it still works even when the cursor fail-hook
2542    /// is active (the hook only affects cursor operations, not the atomic read).
2543    #[test]
2544    fn test_count_unaffected_by_cursor_fail_hook() {
2545        let (_tmp, _env, db) = temp_env_and_db();
2546        db.put(
2547            DatabaseEntry::from_bytes(b"k"),
2548            DatabaseEntry::from_bytes(b"v"),
2549        )
2550        .unwrap();
2551        noxu_dbi::set_cursor_fail_after(1);
2552        // count() must succeed (no cursor used).
2553        let result = db.count();
2554        noxu_dbi::clear_cursor_fail_flag();
2555        assert!(result.is_ok());
2556        assert_eq!(result.unwrap(), 1);
2557    }
2558
2559    /// Covers the map_err closure on `cursor.get_first()` inside `scan_all_kv()`.
2560    #[test]
2561    fn test_scan_all_kv_get_first_map_err_via_hook() {
2562        let (_tmp, _env, db) = temp_env_and_db();
2563        noxu_dbi::set_cursor_fail_after(1);
2564        let result = db.scan_all_kv();
2565        noxu_dbi::clear_cursor_fail_flag();
2566        assert!(result.is_err());
2567    }
2568
2569    /// Covers the map_err closure on `cursor.get_current()` inside `scan_all_kv()`.
2570    #[test]
2571    fn test_scan_all_kv_get_current_map_err_via_hook() {
2572        let (_tmp, _env, db) = temp_env_and_db();
2573        db.put(
2574            DatabaseEntry::from_bytes(b"k"),
2575            DatabaseEntry::from_bytes(b"v"),
2576        )
2577        .unwrap();
2578        // fail on the 2nd check (check_initialized inside get_current, after get_first succeeds).
2579        noxu_dbi::set_cursor_fail_after(2);
2580        let result = db.scan_all_kv();
2581        noxu_dbi::clear_cursor_fail_flag();
2582        assert!(result.is_err());
2583    }
2584
2585    /// Covers the map_err closure on `cursor.retrieve_next(...)` inside `scan_all_kv()`.
2586    #[test]
2587    fn test_scan_all_kv_retrieve_next_map_err_via_hook() {
2588        let (_tmp, _env, db) = temp_env_and_db();
2589        db.put(
2590            DatabaseEntry::from_bytes(b"k"),
2591            DatabaseEntry::from_bytes(b"v"),
2592        )
2593        .unwrap();
2594        // fail on the 3rd check (retrieve_next, after get_first and get_current succeed).
2595        noxu_dbi::set_cursor_fail_after(3);
2596        let result = db.scan_all_kv();
2597        noxu_dbi::clear_cursor_fail_flag();
2598        assert!(result.is_err());
2599    }
2600
2601    #[test]
2602    fn test_sync_on_open_database_succeeds() {
2603        let (_tmp, _env, db) = temp_env_and_db();
2604        db.put(
2605            DatabaseEntry::from_bytes(b"key"),
2606            DatabaseEntry::from_bytes(b"val"),
2607        )
2608        .unwrap();
2609        assert!(db.sync().is_ok());
2610    }
2611
2612    #[test]
2613    fn test_sync_on_closed_database_fails() {
2614        let (_tmp, _env, db) = temp_env_and_db();
2615        db.close().unwrap();
2616        assert!(db.sync().is_err());
2617    }
2618
2619    // ── verify ─────────────────────────────────────────────────────────────
2620
2621    #[test]
2622    fn test_verify_empty_database_passes() {
2623        use noxu_engine::VerifyConfig;
2624        let (_tmp, _env, db) = temp_env_and_db();
2625        let config = VerifyConfig::default();
2626        let result = db.verify(&config).unwrap();
2627        assert!(result.passed, "empty db should pass: {:?}", result.errors);
2628    }
2629
2630    #[test]
2631    fn test_verify_populated_database_passes() {
2632        use noxu_engine::VerifyConfig;
2633        let (_tmp, _env, db) = temp_env_and_db();
2634        for i in 0u32..20 {
2635            let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
2636            let v = DatabaseEntry::from_bytes(&(i * 2).to_be_bytes());
2637            db.put(&k, &v).unwrap();
2638        }
2639        let config = VerifyConfig::default();
2640        let result = db.verify(&config).unwrap();
2641        assert!(result.passed, "populated db should pass: {:?}", result.errors);
2642        assert!(result.records_verified > 0);
2643    }
2644
2645    #[test]
2646    fn test_verify_closed_database_fails() {
2647        use noxu_engine::VerifyConfig;
2648        let (_tmp, _env, db) = temp_env_and_db();
2649        db.close().unwrap();
2650        let config = VerifyConfig::default();
2651        assert!(db.verify(&config).is_err());
2652    }
2653
2654    // ── get_with_options / put_with_options ────────────────────────────────
2655
2656    #[test]
2657    fn test_get_with_options_default_reads_written_record() {
2658        use crate::read_options::ReadOptions;
2659        let (_tmp, _env, db) = temp_env_and_db();
2660        let key = DatabaseEntry::from_bytes(b"ropt_key");
2661        let val = DatabaseEntry::from_bytes(b"ropt_val");
2662        db.put(&key, &val).unwrap();
2663
2664        let opts = ReadOptions::new();
2665        let out = db.get_with_options(None, &key, &opts).unwrap();
2666        assert!(out.is_some());
2667        assert_eq!(out.unwrap().as_ref(), b"ropt_val");
2668    }
2669
2670    #[test]
2671    fn test_get_with_options_read_uncommitted_sees_written_record() {
2672        use crate::read_options::ReadOptions;
2673        let (_tmp, _env, db) = temp_env_and_db();
2674        let key = DatabaseEntry::from_bytes(b"ru_key");
2675        let val = DatabaseEntry::from_bytes(b"ru_val");
2676        db.put(&key, &val).unwrap();
2677
2678        let opts = ReadOptions::read_uncommitted();
2679        let out = db.get_with_options(None, &key, &opts).unwrap();
2680        assert!(out.is_some());
2681        assert_eq!(out.unwrap().as_ref(), b"ru_val");
2682    }
2683
2684    #[test]
2685    fn test_get_with_options_not_found() {
2686        use crate::read_options::ReadOptions;
2687        let (_tmp, _env, db) = temp_env_and_db();
2688        let key = DatabaseEntry::from_bytes(b"missing");
2689        let opts = ReadOptions::new();
2690        let out = db.get_with_options(None, &key, &opts).unwrap();
2691        assert!(out.is_none());
2692    }
2693
2694    #[test]
2695    fn test_put_with_options_no_ttl_behaves_like_put() {
2696        use crate::write_options::WriteOptions;
2697        let (_tmp, _env, db) = temp_env_and_db();
2698        let key = DatabaseEntry::from_bytes(b"wopt_key");
2699        let val = DatabaseEntry::from_bytes(b"wopt_val");
2700        let opts = WriteOptions::new();
2701        db.put_with_options(None, &key, &val, &opts).unwrap();
2702
2703        let mut out = DatabaseEntry::new();
2704        db.get_into(None, &key, &mut out).unwrap();
2705        assert_eq!(out.data_opt().unwrap(), b"wopt_val");
2706    }
2707
2708    #[test]
2709    fn test_put_with_options_with_ttl_stores_record() {
2710        use crate::write_options::WriteOptions;
2711        let (_tmp, _env, db) = temp_env_and_db();
2712        let key = DatabaseEntry::from_bytes(b"ttl_key");
2713        let val = DatabaseEntry::from_bytes(b"ttl_val");
2714        // TTL of 1 hour — the record is not yet expired so it should be readable
2715        let opts = WriteOptions::with_expiration(1);
2716        db.put_with_options(None, &key, &val, &opts).unwrap();
2717
2718        let mut out = DatabaseEntry::new();
2719        let read_status = db.get_into(None, &key, &mut out).unwrap();
2720        assert!(read_status);
2721        assert_eq!(out.data_opt().unwrap(), b"ttl_val");
2722    }
2723
2724    #[test]
2725    fn test_put_with_options_closed_db_fails() {
2726        use crate::write_options::WriteOptions;
2727        let (_tmp, _env, db) = temp_env_and_db();
2728        db.close().unwrap();
2729        let key = DatabaseEntry::from_bytes(b"k");
2730        let val = DatabaseEntry::from_bytes(b"v");
2731        let opts = WriteOptions::new();
2732        assert!(db.put_with_options(None, &key, &val, &opts).is_err());
2733    }
2734
2735    // ========================================================================
2736    // Audit database F11 — Wave 2C-4: reject None-data keys on writes.
2737    // ========================================================================
2738
2739    // 7.0 NOTE: the three `*_with_none_key_returns_illegal_argument` tests
2740    // were removed in the 7.0 API reshape (review P1-3).  The write surface
2741    // now takes `key: impl AsRef<[u8]>`, so a key is *always* a byte slice;
2742    // the historical "None key" (a `DatabaseEntry` with no data set, distinct
2743    // from an empty `b""`) can no longer be expressed at the call site.
2744    // An empty key is accepted on writes — see
2745    // `test_put_with_explicit_empty_key_accepted`, which the reshape kept as
2746    // the canonical behaviour.  The removed tests asserted a None-vs-empty
2747    // distinction that the new signature intentionally eliminates.
2748
2749    /// Explicit `Some(&[])` empty key is still accepted on writes.
2750    #[test]
2751    fn test_put_with_explicit_empty_key_accepted() {
2752        let (_tmp, _env, db) = temp_env_and_db();
2753        let empty_key = DatabaseEntry::from_bytes(b"");
2754        let val = DatabaseEntry::from_bytes(b"v");
2755        db.put(&empty_key, &val).unwrap();
2756    }
2757
2758    // ── X-13: env-invalidity checks propagate through check_open ──────────────
2759
2760    /// X-13: after the `io_invalid` flag is set, `db.get` must return
2761    /// `EnvironmentFailure` rather than silently reading stale BIN data.
2762    #[test]
2763    fn test_x13_io_invalid_blocks_db_get() {
2764        use std::sync::atomic::Ordering;
2765        let (_tmp, env, db) = temp_env_and_db();
2766
2767        // Write a record so there is something to read.
2768        let key = DatabaseEntry::from_bytes(b"k");
2769        let val = DatabaseEntry::from_bytes(b"v");
2770        db.put(&key, &val).unwrap();
2771
2772        // Flip io_invalid via the cached LogManager.
2773        let lm = db.log_manager.as_ref().expect("WAL env must have LogManager");
2774        lm.io_invalid.store(true, Ordering::Release);
2775
2776        // db.get must now fail.
2777        let mut out = DatabaseEntry::new();
2778        let result = db.get_into(None, &key, &mut out);
2779        assert!(
2780            matches!(result, Err(NoxuError::EnvironmentFailure { .. })),
2781            "expected EnvironmentFailure, got {result:?}"
2782        );
2783
2784        // db.put must also fail.
2785        let result2 = db.put(&key, &val);
2786        assert!(
2787            matches!(result2, Err(NoxuError::EnvironmentFailure { .. })),
2788            "expected EnvironmentFailure on put, got {result2:?}"
2789        );
2790
2791        // Restore flag so env closes cleanly.
2792        lm.io_invalid.store(false, Ordering::Release);
2793        drop(env);
2794    }
2795
2796    /// X-13: after `EnvironmentImpl::invalidate()`, cursor `get_first`
2797    /// must return `EnvironmentFailure`.
2798    #[test]
2799    fn test_x13_env_invalid_blocks_cursor_get() {
2800        use std::sync::atomic::Ordering;
2801        let (_tmp, env, db) = temp_env_and_db();
2802
2803        // Insert a record.
2804        let key = DatabaseEntry::from_bytes(b"ck");
2805        let val = DatabaseEntry::from_bytes(b"cv");
2806        db.put(&key, &val).unwrap();
2807
2808        // Open a cursor BEFORE invalidating.
2809        let mut cursor = db.open_cursor(None).unwrap();
2810
2811        // Now directly flip the env_invalid flag.
2812        db.env_invalid.store(true, Ordering::Release);
2813
2814        // The cursor's check_state should detect the flag.
2815        let mut key = DatabaseEntry::new();
2816        let mut out = DatabaseEntry::new();
2817        let result =
2818            cursor.get(&mut key, &mut out, crate::get::Get::First, None);
2819        assert!(
2820            matches!(result, Err(NoxuError::EnvironmentFailure { .. })),
2821            "expected EnvironmentFailure from cursor, got {result:?}"
2822        );
2823
2824        // Restore so env drops cleanly.
2825        db.env_invalid.store(false, Ordering::Release);
2826        drop(env);
2827    }
2828}