Skip to main content

noxu_db/
database.rs

1//! Database handle.
2//!
3
4use crate::cursor::Cursor;
5use crate::cursor_config::CursorConfig;
6use crate::database_config::DatabaseConfig;
7use crate::database_entry::DatabaseEntry;
8use crate::database_stats::{BtreeStats, DatabaseStats};
9use crate::error::{NoxuError, Result};
10use crate::join_config::JoinConfig;
11use crate::join_cursor::JoinCursor;
12use crate::lock_mode::LockMode;
13use crate::operation_status::OperationStatus;
14use crate::preload::{PreloadConfig, PreloadStats};
15use crate::read_options::ReadOptions;
16use crate::secondary_cursor::SecondaryCursor;
17use crate::sequence::Sequence;
18use crate::sequence_config::SequenceConfig;
19use crate::stats_config::StatsConfig;
20use crate::transaction::Transaction;
21use crate::write_options::WriteOptions;
22use noxu_dbi::{
23    CursorImpl, DatabaseImpl, EnvironmentImpl, GetMode, PutMode, SearchMode,
24    ThroughputStats,
25};
26use noxu_log::LogManager;
27use noxu_sync::{Mutex, RwLock};
28use noxu_txn::{Durability, LockManager, Txn, TxnManager, UndoRecord};
29use noxu_util::lsn::Lsn;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32
33/// A database handle.
34///
35///
36///
37/// Database handles provide methods for inserting, retrieving, and
38/// deleting records. A database belongs to a single environment.
39///
40/// # Example
41/// ```ignore
42/// use noxu_db::{Environment, EnvironmentConfig, DatabaseConfig, DatabaseEntry};
43/// use std::path::PathBuf;
44///
45/// let env_config = EnvironmentConfig::new(PathBuf::from("/tmp/mydb"))
46///     .allow_create(true);
47/// let env = Environment::open(env_config).unwrap();
48///
49/// let db_config = DatabaseConfig::new().allow_create(true);
50/// let db = env.open_database(None, "mydb", &db_config).unwrap();
51///
52/// let key = DatabaseEntry::from_bytes(b"key1");
53/// let value = DatabaseEntry::from_bytes(b"value1");
54/// db.put(None, &key, &value).unwrap();
55///
56/// db.close().unwrap();
57/// env.close().unwrap();
58/// ```
59pub struct Database {
60    /// Name of this database
61    name: String,
62    /// Database ID
63    id: u64,
64    /// Configuration
65    config: DatabaseConfig,
66    /// The underlying DatabaseImpl (shared with the EnvironmentImpl).
67    pub(crate) db_impl: Arc<RwLock<DatabaseImpl>>,
68    /// Back-reference to the owning EnvironmentImpl (for close/cleanup).
69    env_impl: Arc<Mutex<EnvironmentImpl>>,
70    /// Shared open flag — same `Arc<AtomicBool>` as the environment's
71    /// `DatabaseHandle.open`, so that `Database::close()` automatically
72    /// marks the environment-side handle as closed too.
73    open: Arc<AtomicBool>,
74    /// Throughput counters for this database's operations.
75    ///
76    /// Cloned from `DatabaseImpl.throughput` at open time so that
77    /// `get()`, `put()`, `delete()` can increment stats without
78    /// locking `db_impl`.
79    throughput: Arc<ThroughputStats>,
80    /// Cached lock manager — acquired once at open, never changes.
81    /// Eliminates per-operation `env_impl.lock()` on the hot read/write path.
82    lock_manager: Arc<LockManager>,
83    /// Cached log manager — acquired once at open, None for no-WAL envs.
84    /// Eliminates per-operation `env_impl.lock()` on the hot read/write path.
85    log_manager: Option<Arc<LogManager>>,
86    /// Cached environment-invalidity flag (X-13).
87    ///
88    /// Cloned from `EnvironmentImpl::is_invalid_flag()` at `Database::new()`
89    /// time so `check_open()` can detect a failed environment without
90    /// acquiring `env_impl.lock()` on every read/write operation.
91    env_invalid: Arc<std::sync::atomic::AtomicBool>,
92    /// Cached cleaner throttle — acquired once at open, None when no cleaner.
93    /// Used by put() for write-path backpressure without locking env_impl.
94    cleaner_throttle: Option<Arc<noxu_cleaner::CleanerThrottle>>,
95    /// Cached transaction manager — acquired once at open.
96    ///
97    /// Used by [`Self::with_auto_txn`] to allocate a synthetic auto-commit
98    /// `Txn` per `txn = None` write so the lock manager sees a typed
99    /// locker id from the explicit-txn id space (`"auto-txn:<id>"` in
100    /// deadlock messages) and the auto-commit op gets full abort-undo
101    /// semantics on any error path.  Closes the F12 residuals.
102    txn_manager: Arc<TxnManager>,
103    /// If true, auto-commit writes skip the log flush entirely (: TXN_NO_SYNC).
104    no_sync: bool,
105    /// If true, auto-commit writes flush to OS but skip fdatasync (: TXN_WRITE_NO_SYNC).
106    write_no_sync: bool,
107    /// Registered secondary indexes that automatically maintain themselves
108    /// when this primary is written.  v1.6 (Decision 1B / audit C3 — the
109    /// associate()-style hook): every [`SecondaryDatabase`] opened against
110    /// this primary downgrades its `Arc<SecondaryHookState>` to a
111    /// `Weak<dyn SecondaryHook>` and pushes it here.  `Database::put` and
112    /// `Database::delete` walk the list under the same caller-supplied
113    /// txn so primary writes and secondary index updates commit / abort
114    /// atomically.
115    ///
116    /// Stored behind an `Arc<RwLock<…>>` (rather than directly on the
117    /// `Database` body) so registrations performed through one of the
118    /// `Arc<Mutex<Database>>` clones the user typically holds become
119    /// visible to every other clone of the same primary.
120    pub(crate) secondaries: Arc<
121        RwLock<
122            Vec<
123                std::sync::Weak<
124                    dyn crate::secondary_database::SecondaryHook + Send + Sync,
125                >,
126            >,
127        >,
128    >,
129    /// Foreign-key referrer registry: every child secondary whose
130    /// `foreign_key_database` points at *this* primary downgrades its
131    /// hook to a `Weak<dyn FkReferrer>` and pushes it here.  When this
132    /// primary is deleted, every entry is consulted to apply
133    /// `ForeignKeyDeleteAction::Abort` (v1.6 step 8) /
134    /// `Cascade` (step 9) / `Nullify` (step 10).
135    pub(crate) fk_referrers: Arc<
136        RwLock<
137            Vec<
138                std::sync::Weak<
139                    dyn crate::secondary_database::FkReferrer + Send + Sync,
140                >,
141            >,
142        >,
143    >,
144}
145
146/// State of a database handle.
147///
148///
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub enum DbState {
151    /// Database is open and operational
152    Open,
153    /// Database has been closed
154    Closed,
155    /// Database is in an invalid state
156    Invalid,
157}
158
159impl Database {
160    /// Creates a CursorImpl, wired to the WAL and lock manager when the
161    /// environment has them.
162    ///
163    /// Uses cached `lock_manager` / `log_manager` to avoid acquiring
164    /// `env_impl.lock()` on every operation.
165    fn make_cursor(&self) -> CursorImpl {
166        self.make_cursor_with_locker(0)
167    }
168
169    /// Creates a CursorImpl with an explicit `locker_id`.
170    ///
171    /// Auto-commit cursors use `0`; transactional cursors must use the
172    /// owning `Transaction::id` so that the LN log entries written by
173    /// `cursor.put` / `cursor.delete` carry the txn id and recovery's
174    /// commit/abort tracking can correctly skip aborted txns.  Without
175    /// this, every LN entry was written with `txn_id = None` (the
176    /// auto-commit form) and recovery treated aborted-txn writes as
177    /// committed once `env_impl.close()` started running on `env.close()`
178    /// after a successful commit/abort (F1).
179    fn make_cursor_with_locker(&self, locker_id: i64) -> CursorImpl {
180        match &self.log_manager {
181            Some(lm) => CursorImpl::with_log_manager(
182                Arc::clone(&self.db_impl),
183                locker_id,
184                Arc::clone(lm),
185            )
186            .with_env_invalid(Arc::clone(&self.env_invalid))
187            .with_lock_manager(Arc::clone(&self.lock_manager)),
188            None => CursorImpl::new(Arc::clone(&self.db_impl), locker_id)
189                .with_env_invalid(Arc::clone(&self.env_invalid))
190                .with_lock_manager(Arc::clone(&self.lock_manager)),
191        }
192    }
193
194    /// Creates a CursorImpl without a lock manager (dirty-read / read-uncommitted).
195    ///
196    /// Used by `get_with_options()` when `ReadOptions.lock_mode == ReadUncommitted`.
197    /// Skips all lock acquisition so the cursor reads directly from the BIN
198    /// without blocking on write locks — mirrors 's read-uncommitted cursor.
199    fn make_cursor_no_lock(&self) -> CursorImpl {
200        match &self.log_manager {
201            Some(lm) => CursorImpl::with_log_manager(
202                Arc::clone(&self.db_impl),
203                0,
204                Arc::clone(lm),
205            )
206            .with_env_invalid(Arc::clone(&self.env_invalid)),
207            None => CursorImpl::new(Arc::clone(&self.db_impl), 0)
208                .with_env_invalid(Arc::clone(&self.env_invalid)),
209        }
210    }
211
212    /// Creates a CursorImpl wired to the given transaction for write-lock tracking.
213    ///
214    /// Behaves like `make_cursor()` but additionally calls `.with_txn()` so
215    /// that write operations acquire locks via the transaction's `Txn` and
216    /// record abort before-images in `WriteLockInfo`.
217    ///
218    /// In which passes the
219    /// transaction's `Locker` to the new `CursorImpl`.
220    #[allow(deprecated)] // uses Transaction::get_inner_txn — internal wiring
221    fn make_cursor_for_txn(&self, txn: &Transaction) -> CursorImpl {
222        // Use the transaction id as the cursor's locker_id so that LN
223        // log entries written under this cursor carry the txn id
224        // (recovery's analysis pass uses LN.txn_id together with
225        // TxnCommit / TxnAbort records to decide whether to redo or
226        // undo the LN). Pre-fix this was hardcoded to 0, which made
227        // every txn-LN look like an auto-commit LN and caused
228        // recovery to redo aborted writes.
229        let cursor = self.make_cursor_with_locker(txn.get_id() as i64);
230        if let Some(inner) = txn.get_inner_txn() {
231            cursor.with_txn(inner)
232        } else {
233            cursor
234        }
235    }
236
237    /// Allocates a synthetic auto-commit `Txn` and runs `op` under it.
238    ///
239    /// `op` receives an auto-commit-wired [`CursorImpl`] (locker_id = 0
240    /// so the LN is logged with the auto-commit `InsertLN` / `DeleteLN`
241    /// form, txn_ref set so the lock manager sees the synthetic auto-txn
242    /// as the owner).
243    ///
244    /// On `Ok(value)`:
245    ///   * Drops the cursor (closing it).
246    ///   * Calls [`Txn::commit_with_durability`] on the synthetic auto-txn
247    ///     with a [`Durability`] derived from the database's
248    ///     `no_sync` / `write_no_sync` config; this releases all locks
249    ///     and (for `CommitSync`) fsyncs up to the LN's LSN via
250    ///     `LogManager::flush_sync_if_needed` for many-to-one fsync
251    ///     coalescing under concurrent write load.
252    ///   * Records the commit in the txn manager statistics and removes
253    ///     the diagnostic locker label.
254    ///
255    /// On `Err(e)`:
256    ///   * Drops the cursor.
257    ///   * Calls [`Txn::abort_collect_undo`] to harvest before-image undo
258    ///     records WITHOUT releasing the held write locks (so a reader
259    ///     blocked on a write lock cannot observe the in-flight value
260    ///     before we restore the before-image).
261    ///   * Applies the undo records to the in-memory B-tree.
262    ///   * Calls [`Txn::release_all_locks`] to drain the held locks.
263    ///   * Records the abort in the txn manager statistics and removes
264    ///     the diagnostic locker label.
265    ///   * Returns `Err(e)`.
266    ///
267    /// Closes the first F12 residual: an auto-commit op now goes through
268    /// the same lock-tracking and abort-undo machinery as an explicit
269    /// transaction, so two concurrent inserts of the same brand-new key
270    /// serialise through the lock manager and a forced mid-write failure
271    /// rolls back the in-memory tree mutation.
272    fn with_auto_txn<F, T>(&self, op: F) -> Result<T>
273    where
274        F: FnOnce(&mut CursorImpl) -> Result<T>,
275    {
276        let auto_txn =
277            self.txn_manager.begin_auto_txn(self.log_manager.clone());
278        let synthetic_id = auto_txn.id_as_locker();
279        let auto_txn_arc = Arc::new(std::sync::Mutex::new(auto_txn));
280
281        let mut cursor = self.make_cursor();
282        cursor.attach_txn(Arc::clone(&auto_txn_arc));
283
284        let result = op(&mut cursor);
285        // Drop the cursor handle (un-pins BIN, drops Arc<Mutex<Txn>> ref).
286        drop(cursor);
287
288        // Reclaim sole ownership of the synthetic auto-txn so we can
289        // finalise it.  All cursors and their Arcs were dropped above.
290        let mut auto_txn = match Arc::try_unwrap(auto_txn_arc) {
291            Ok(m) => m.into_inner().unwrap_or_else(|p| p.into_inner()),
292            Err(_arc) => {
293                // A cursor escaped the closure with a clone of the txn
294                // Arc.  This is a caller-side bug — leak the auto-txn
295                // (its Drop calls `close()` which aborts) and surface a
296                // typed error.  No undo applied because we cannot
297                // safely take the Txn out of the shared Arc.
298                return Err(NoxuError::OperationNotAllowed(
299                    "with_auto_txn: synthetic auto-txn outlived cursor scope"
300                        .to_string(),
301                ));
302            }
303        };
304        let txn_manager = Arc::clone(&self.txn_manager);
305
306        match result {
307            Ok(value) => {
308                let durability = if self.no_sync {
309                    Durability::CommitNoSync
310                } else if self.write_no_sync {
311                    Durability::CommitWriteNoSync
312                } else {
313                    Durability::CommitSync
314                };
315                if let Err(e) = auto_txn.commit_with_durability(durability) {
316                    // Commit failed (e.g. log fsync error).  Undo the
317                    // in-memory tree write so we are not left in an
318                    // inconsistent state, then surface the error.
319                    let undo_records =
320                        auto_txn.abort_collect_undo().unwrap_or_default();
321                    self.apply_auto_txn_undo(undo_records);
322                    auto_txn.release_all_locks();
323                    txn_manager.abort_txn(synthetic_id);
324                    return Err(NoxuError::OperationNotAllowed(format!(
325                        "auto-commit fsync failed: {e}"
326                    )));
327                }
328                txn_manager.commit_txn(synthetic_id);
329                Ok(value)
330            }
331            Err(e) => {
332                // Phase 1: collect undo without releasing write locks.
333                let undo_records =
334                    auto_txn.abort_collect_undo().unwrap_or_default();
335                // Phase 2: apply undo while write locks are still held
336                // so any concurrent reader blocked on a write lock
337                // cannot observe the in-flight value.
338                self.apply_auto_txn_undo(undo_records);
339                // Phase 3: drain locks.
340                auto_txn.release_all_locks();
341                txn_manager.abort_txn(synthetic_id);
342                Err(e)
343            }
344        }
345    }
346
347    /// Applies undo records collected from a synthetic auto-txn to the
348    /// in-memory B-tree of `self`.  Mirrors the per-`undo_record` block
349    /// in `Transaction::abort()` but specialised for the
350    /// single-database `with_auto_txn` case so we do not need to thread
351    /// the env-impl in.
352    fn apply_auto_txn_undo(&self, mut undo_records: Vec<UndoRecord>) {
353        // Apply newest-LSN first so multi-step writes (delete + reinsert)
354        // unwind in reverse-operation order.  See the matching sort in
355        // `Transaction::abort()`.
356        undo_records.sort_by_key(|r| std::cmp::Reverse(r.current_lsn));
357        let db_id_match = self.id;
358        let db_guard = self.db_impl.read();
359        let Some(tree) = db_guard.get_real_tree() else { return };
360        for undo in undo_records {
361            // The synthetic auto-txn touches only this database, but be
362            // defensive in case future changes broaden the contract.
363            if undo.database_id != db_id_match {
364                continue;
365            }
366            let Some(abort_key) = undo.abort_key else { continue };
367            if undo.abort_known_deleted {
368                if tree.delete(&abort_key) {
369                    db_guard.decrement_entry_count();
370                }
371            } else if let Some(abort_data) = undo.abort_data {
372                let lsn = noxu_util::Lsn::from_u64(undo.abort_lsn);
373                if let Ok(is_new) = tree.insert(abort_key, abort_data, lsn)
374                    && is_new
375                {
376                    // Restoring a slot that the aborted txn had deleted:
377                    // re-bump the counter that the in-memory delete
378                    // already decremented.
379                    db_guard.increment_entry_count();
380                }
381            }
382        }
383    }
384
385    /// Auto-commit flush: when `txn` is `None` (auto-commit mode), flush and
386    /// fsync the log before returning to the caller.
387    ///
388    /// `write_lsn` is the LSN assigned to the write operation just performed.
389    /// Port of`LogManager.flushTo(lsn)`: if a concurrent committer already
390    /// flushed past `write_lsn`, the fdatasync is skipped entirely, giving
391    /// natural many:1 fsync coalescing under concurrent write load with no
392    /// explicit group-commit configuration required.
393    fn auto_commit_sync(
394        &self,
395        txn: Option<&Transaction>,
396        write_lsn: Lsn,
397    ) -> Result<()> {
398        if txn.is_some() {
399            return Ok(()); // explicit txn handles its own commit/fsync
400        }
401        if self.no_sync {
402            return Ok(()); // : TXN_NO_SYNC — skip log flush entirely
403        }
404        if let Some(lm) = &self.log_manager {
405            if self.write_no_sync {
406                // : TXN_WRITE_NO_SYNC — flush to OS buffer, no fdatasync
407                lm.flush_no_sync().map_err(|e| {
408                    NoxuError::OperationNotAllowed(e.to_string())
409                })?;
410            } else {
411                // : flushTo(lsn) — skip if already covered by another flush.
412                lm.flush_sync_if_needed(write_lsn).map_err(|e| {
413                    NoxuError::OperationNotAllowed(e.to_string())
414                })?;
415            }
416        }
417        Ok(())
418    }
419
420    /// Creates a new database handle.
421    ///
422    /// Internal constructor called by Environment.
423    ///
424    /// `open_flag` is a shared `Arc<AtomicBool>` that is also stored in the
425    /// environment's `DatabaseHandle` for this database.  Setting it to `false`
426    /// (via `Database::close()`) simultaneously marks the env-side handle as
427    /// closed, allowing `Environment::close()` to succeed without a separate
428    /// callback.
429    pub(crate) fn new(
430        name: String,
431        id: u64,
432        config: DatabaseConfig,
433        db_impl: Arc<RwLock<DatabaseImpl>>,
434        env_impl: Arc<Mutex<EnvironmentImpl>>,
435        open_flag: Arc<AtomicBool>,
436        no_sync: bool,
437        write_no_sync: bool,
438    ) -> Self {
439        let throughput = db_impl.read().throughput.clone();
440        // Cache the manager Arcs at construction so hot-path operations
441        // (get/put/delete) never need to re-acquire env_impl.lock().
442        let (
443            lock_manager,
444            log_manager,
445            cleaner_throttle,
446            txn_manager,
447            env_invalid,
448        ) = {
449            let env = env_impl.lock();
450            let lm = Arc::clone(env.get_lock_manager());
451            let logm = env.get_log_manager();
452            let ct = env.get_cleaner_throttle();
453            let txnm = Arc::clone(env.get_txn_manager());
454            let inv = env.is_invalid_flag();
455            (lm, logm, ct, txnm, inv)
456        };
457        Database {
458            name,
459            id,
460            config,
461            db_impl,
462            env_impl,
463            open: open_flag,
464            throughput,
465            lock_manager,
466            log_manager,
467            env_invalid,
468            cleaner_throttle,
469            txn_manager,
470            no_sync,
471            write_no_sync,
472            secondaries: Arc::new(RwLock::new(Vec::new())),
473            fk_referrers: Arc::new(RwLock::new(Vec::new())),
474        }
475    }
476
477    /// Retrieves a record by key.
478    ///
479    ///
480    ///
481    /// # Arguments
482    /// * `txn` - Optional transaction handle (used to scope locks and writes to the transaction)
483    /// * `key` - The search key
484    /// * `data` - Output parameter to receive the data
485    ///
486    /// # Returns
487    /// `OperationStatus::Success` if found, `OperationStatus::NotFound` otherwise
488    ///
489    /// # Errors
490    /// Returns an error if the database is closed
491    pub fn get(
492        &self,
493        txn: Option<&Transaction>,
494        key: &DatabaseEntry,
495        data: &mut DatabaseEntry,
496    ) -> Result<OperationStatus> {
497        self.check_open()?;
498        observe_span!(
499            "db_get",
500            db_name = self.name.as_str(),
501            key_size = key.get_data().map_or(0, |k| k.len()),
502        );
503        let _obs_timer = observe_timer_start!();
504        observe_counter!("noxu_db_operations_total", "op" => "get");
505
506        let key_bytes = match key.get_data() {
507            Some(k) => k,
508            None => return Ok(OperationStatus::NotFound),
509        };
510
511        let mut cursor = match txn {
512            Some(t) => self.make_cursor_for_txn(t),
513            None => self.make_cursor(),
514        };
515        match cursor
516            .search(key_bytes, None, SearchMode::Set)
517            .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
518        {
519            noxu_dbi::OperationStatus::Success => {
520                let (_, value) = cursor.get_current().map_err(|e| {
521                    NoxuError::OperationNotAllowed(e.to_string())
522                })?;
523                // Partial get: return only the requested slice.
524                // DatabaseEntry partial-read logic.
525                if data.is_partial() {
526                    let off = data.get_partial_offset();
527                    let len = data.get_partial_length();
528                    let end = (off + len).min(value.len());
529                    let slice =
530                        if off < value.len() { &value[off..end] } else { &[] };
531                    data.set_data(slice);
532                } else {
533                    data.set_data(&value);
534                }
535                self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
536                observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "get");
537                Ok(OperationStatus::Success)
538            }
539            _ => {
540                self.throughput
541                    .n_pri_search_fails
542                    .fetch_add(1, Ordering::Relaxed);
543                observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "get");
544                Ok(OperationStatus::NotFound)
545            }
546        }
547    }
548
549    /// Retrieves a record with per-operation read options.
550    ///
551    /// Mirrors `Cursor.get()` with `ReadOptions` applied:
552    /// - `LockMode::ReadUncommitted` — dirty read, no lock acquired ( read-uncommitted)
553    /// - `LockMode::ReadCommitted` — read-committed isolation (standard locking)
554    /// - `LockMode::Rmw` — acquire write lock for read-modify-write
555    /// - `LockMode::Default` — environment default isolation
556    ///
557    /// `CacheMode` in `ReadOptions` is advisory (currently informational).
558    ///
559    /// # Arguments
560    /// * `txn` - Optional transaction handle
561    /// * `key` - The search key
562    /// * `data` - Output parameter to receive the data
563    /// * `opts` - Per-operation read options (isolation, cache hints)
564    ///
565    /// # Returns
566    /// `OperationStatus::Success` if found, `OperationStatus::NotFound` otherwise
567    pub fn get_with_options(
568        &self,
569        txn: Option<&Transaction>,
570        key: &DatabaseEntry,
571        data: &mut DatabaseEntry,
572        opts: &ReadOptions,
573    ) -> Result<OperationStatus> {
574        self.check_open()?;
575        // Wave 1C audit cleanup (database Low "asymmetric observability
576        // between *_with_options paths and the basic ones"): mirror
577        // the span / counter / timer instrumentation that
578        // `Database::get` emits so dashboards do not lose the
579        // per-operation `op = get_with_options` slice when callers
580        // opt for the richer surface.
581        observe_span!(
582            "db_get_with_options",
583            db_name = self.name.as_str(),
584            key_size = key.get_data().map_or(0, |k| k.len()),
585            lock_mode = format!("{:?}", opts.lock_mode),
586        );
587        let _obs_timer = observe_timer_start!();
588        observe_counter!("noxu_db_operations_total", "op" => "get_with_options");
589
590        let key_bytes = match key.get_data() {
591            Some(k) => k,
592            None => return Ok(OperationStatus::NotFound),
593        };
594
595        let mut cursor = match opts.lock_mode {
596            LockMode::ReadUncommitted => self.make_cursor_no_lock(),
597            _ => match txn {
598                Some(t) => self.make_cursor_for_txn(t),
599                None => self.make_cursor(),
600            },
601        };
602
603        match cursor
604            .search(key_bytes, None, SearchMode::Set)
605            .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
606        {
607            noxu_dbi::OperationStatus::Success => {
608                let (_, value) = cursor.get_current().map_err(|e| {
609                    NoxuError::OperationNotAllowed(e.to_string())
610                })?;
611                if data.is_partial() {
612                    let off = data.get_partial_offset();
613                    let len = data.get_partial_length();
614                    let end = (off + len).min(value.len());
615                    let slice =
616                        if off < value.len() { &value[off..end] } else { &[] };
617                    data.set_data(slice);
618                } else {
619                    data.set_data(&value);
620                }
621                self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
622                observe_timer_record!(
623                    _obs_timer,
624                    "noxu_db_operation_duration_seconds",
625                    "op" => "get_with_options"
626                );
627                Ok(OperationStatus::Success)
628            }
629            _ => {
630                self.throughput
631                    .n_pri_search_fails
632                    .fetch_add(1, Ordering::Relaxed);
633                observe_timer_record!(
634                    _obs_timer,
635                    "noxu_db_operation_duration_seconds",
636                    "op" => "get_with_options"
637                );
638                Ok(OperationStatus::NotFound)
639            }
640        }
641    }
642
643    /// Inserts or updates a record.
644    ///
645    ///
646    ///
647    /// # Arguments
648    /// * `txn` - Optional transaction handle (used to scope locks and writes to the transaction)
649    /// * `key` - The key to insert/update
650    /// * `data` - The data to store
651    ///
652    /// # Returns
653    /// `OperationStatus::Success` on success
654    ///
655    /// # Errors
656    /// Returns an error if the database is closed or read-only
657    pub fn put(
658        &self,
659        txn: Option<&Transaction>,
660        key: &DatabaseEntry,
661        data: &DatabaseEntry,
662    ) -> Result<OperationStatus> {
663        self.check_open()?;
664        self.check_writable()?;
665        observe_span!(
666            "db_put",
667            db_name = self.name.as_str(),
668            key_size = key.get_data().map_or(0, |k| k.len()),
669            data_size = data.get_data().map_or(0, |d| d.len()),
670        );
671        let _obs_timer = observe_timer_start!();
672        observe_counter!("noxu_db_operations_total", "op" => "put");
673
674        // Audit database F11 (Wave 2C-4): reject `None`-data keys on
675        // write paths so we can no longer black-hole a record that is
676        // unreachable from `get` (which returns NotFound for the same
677        // input).  An explicit `Some(&[])` empty key is still accepted
678        // by the underlying engine.
679        let key_bytes = Self::require_key_bytes(key, "put")?;
680
681        // Partial put: read-modify-write using the partial offset/length.
682        // LN.combinePuts() — existing bytes outside [offset..offset+length]
683        // are preserved; only the specified range is replaced with new data.
684        //
685        // Wave 1C audit cleanup (database Low "partial-put length mismatch
686        // silent truncation"): when the user supplies a `data` value
687        // whose byte length does not match the configured partial
688        // length, JE rejects the call; the v1.5.0 implementation here
689        // silently min'd the two lengths and truncated the user's
690        // bytes.  We now return a typed [`NoxuError::IllegalArgument`]
691        // so the mismatch is visible at the call site instead of
692        // corrupting the on-disk record.
693        let write_bytes: Vec<u8>;
694        let data_bytes: &[u8] = if data.is_partial() {
695            let new_bytes = data.get_data().unwrap_or(&[]);
696            let off = data.get_partial_offset();
697            let len = data.get_partial_length();
698            if new_bytes.len() != len {
699                return Err(NoxuError::IllegalArgument(format!(
700                    "partial put: data length {} does not match \
701                     partial_length {} (partial_offset={}); JE \
702                     requires exact equality",
703                    new_bytes.len(),
704                    len,
705                    off
706                )));
707            }
708            // Fetch the existing record to splice into.
709            let existing = {
710                let mut tmp_entry = DatabaseEntry::new();
711                let mut tmp_cursor = self.make_cursor();
712                match tmp_cursor
713                    .search(key_bytes, None, noxu_dbi::SearchMode::Set)
714                    .map_err(|e| {
715                        NoxuError::OperationNotAllowed(e.to_string())
716                    })? {
717                    noxu_dbi::OperationStatus::Success => {
718                        let (_, v) = tmp_cursor.get_current().map_err(|e| {
719                            NoxuError::OperationNotAllowed(e.to_string())
720                        })?;
721                        tmp_entry.set_data(&v);
722                        tmp_entry.get_data().unwrap_or(&[]).to_vec()
723                    }
724                    _ => vec![0u8; off + len],
725                }
726            };
727            let total_len = (off + len).max(existing.len());
728            let mut patched = existing;
729            patched.resize(total_len, 0);
730            patched[off..off + len].copy_from_slice(new_bytes);
731            write_bytes = patched;
732            &write_bytes
733        } else {
734            data.get_data().unwrap_or(&[])
735        };
736
737        // v1.6 (audit C3 / step 6): if any secondaries are registered,
738        // capture the pre-put value of this key (if it exists) BEFORE
739        // the overwrite so we can pass it as `old_data` to the
740        // secondary key creator below.  When the put is a fresh
741        // insert the read returns NotFound and `old_data_for_secondaries`
742        // remains `None`.  For partial puts the pre-put value already
743        // lives in `write_bytes` indirectly; we re-read here to keep
744        // the code paths uniform and to use the caller's txn for the
745        // read so isolation is honoured.
746        let secondaries_pre = self.live_secondaries();
747        let old_data_for_secondaries: Option<Vec<u8>> =
748            if secondaries_pre.is_empty() {
749                None
750            } else {
751                let mut existing = DatabaseEntry::new();
752                match self.get(txn, key, &mut existing)? {
753                    OperationStatus::Success => {
754                        existing.get_data().map(<[u8]>::to_vec)
755                    }
756                    _ => None,
757                }
758            };
759
760        match txn {
761            Some(t) => {
762                let mut cursor = self.make_cursor_for_txn(t);
763                cursor
764                    .put(key_bytes, data_bytes, PutMode::Overwrite)
765                    .map_err(NoxuError::from)?;
766            }
767            None => {
768                // Wrap the write in a synthetic auto-commit `Txn` so the
769                // lock manager sees a typed locker id ("auto-txn:<id>")
770                // and any error rolls back the in-memory tree write
771                // through `Txn::abort_collect_undo`.
772                self.with_auto_txn(|cursor| {
773                    cursor
774                        .put(key_bytes, data_bytes, PutMode::Overwrite)
775                        .map_err(NoxuError::from)?;
776                    Ok(())
777                })?;
778            }
779        }
780
781        // v1.6 (audit C3 — the associate()-style hook): drive every
782        // registered secondary index under the same caller-supplied
783        // txn so the primary record and its secondary entries commit /
784        // abort together.
785        //
786        // Step 4 + Step 6 (this path): we capture the pre-put value
787        // before issuing the primary write so the put-existing-key
788        // update path can pass it as `old_data` and the secondary key
789        // creator can compute every stale (sec_key, pri_key) pair to
790        // delete in addition to inserting the new ones.  Pre-Step-6
791        // an update over an existing key leaked the previous
792        // secondary entries (audit C3 sub-case).
793        let secondaries = self.live_secondaries();
794        if !secondaries.is_empty() {
795            // Build a fresh DatabaseEntry around the data we just
796            // wrote so the secondary key creator sees the bytes the
797            // user asked for (and not the partial-put scratch buffer).
798            let new_entry = DatabaseEntry::from_bytes(data_bytes);
799            let old_entry: Option<DatabaseEntry> = old_data_for_secondaries
800                .as_deref()
801                .map(DatabaseEntry::from_bytes);
802            for hook in secondaries {
803                hook.maintain(txn, key, old_entry.as_ref(), Some(&new_entry))?;
804            }
805        }
806
807        // Apply cleaner write-path backpressure for auto-commit (no-txn) bulk
808        // writes: if the log write rate exceeds the cleaner's capacity, sleep
809        // briefly so the cleaner can keep up.  Transactional paths handle this
810        // in Transaction::commit_with_durability() instead.
811        if txn.is_none()
812            && let Some(delay) = self
813                .cleaner_throttle
814                .as_ref()
815                .and_then(|t| t.should_throttle_writer())
816        {
817            std::thread::sleep(delay);
818        }
819
820        self.throughput.n_pri_updates.fetch_add(1, Ordering::Relaxed);
821        observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "put");
822        Ok(OperationStatus::Success)
823    }
824
825    /// Inserts or updates a record with per-operation write options.
826    ///
827    /// Extends `put()` with `WriteOptions` support:
828    /// - `ttl` — if > 0, sets a per-record TTL expiration (hours from now); the
829    ///   record will be treated as expired and invisible after the TTL elapses.
830    ///   Stored in the BIN slot as absolute hours since Unix epoch, matching
831    ///   the `BIN.expirationInHours` / `IN.entryExpiration` path.
832    /// - `update_ttl` — if true and the record already exists, refreshes its TTL
833    ///   to the new value rather than leaving the original expiration.
834    /// - `cache_mode` — advisory cache hint (currently informational).
835    ///
836    /// # Arguments
837    /// * `txn` - Optional transaction handle
838    /// * `key` - The key to insert/update
839    /// * `data` - The data to store
840    /// * `opts` - Per-operation write options (TTL, cache hints)
841    ///
842    /// # Returns
843    /// `OperationStatus::Success` on success
844    pub fn put_with_options(
845        &self,
846        txn: Option<&Transaction>,
847        key: &DatabaseEntry,
848        data: &DatabaseEntry,
849        opts: &WriteOptions,
850    ) -> Result<OperationStatus> {
851        // Reject `None`-data keys early to keep parity with `put` (audit
852        // database F11, Wave 2C-4).  We compute key_bytes here so the
853        // TTL update below can reuse it without re-validating.
854        let key_bytes = Self::require_key_bytes(key, "put_with_options")?;
855
856        let result = self.put(txn, key, data)?;
857
858        // Apply TTL to the just-written BIN slot when requested.
859        //
860        // Audit database F8 (Wave 2C-4) — partial fix:
861        //   * The TTL update is still in-memory only (see
862        //     `update_key_expiration`); recovery does not yet replay
863        //     it.  Tracked as residual F8 work alongside the
864        //     CursorImpl::put extension that would WAL-log the
865        //     expiration alongside the LN.
866        //   * `WriteOptions::update_ttl` is documented as a JE-compatible
867        //     hint; the engine cannot distinguish insert-vs-update at
868        //     this layer, so we always apply when `ttl > 0` and the
869        //     underlying write succeeded.  The flag is preserved on
870        //     `WriteOptions` for v2.0 once `CursorImpl::put` returns
871        //     whether an insert or an update occurred.
872        if opts.ttl > 0 && result == OperationStatus::Success {
873            let expiration_hours =
874                noxu_util::current_time_hours().saturating_add(opts.ttl as u32);
875            self.db_impl
876                .read()
877                .update_key_expiration(key_bytes, expiration_hours);
878        }
879
880        Ok(result)
881    }
882
883    /// Inserts a record, failing if the key already exists.
884    ///
885    ///
886    ///
887    /// # Arguments
888    /// * `txn` - Optional transaction handle (used to scope locks and writes to the transaction)
889    /// * `key` - The key to insert
890    /// * `data` - The data to store
891    ///
892    /// # Returns
893    /// `OperationStatus::Success` if inserted, `OperationStatus::KeyExists` if key already exists
894    ///
895    /// # Errors
896    /// Returns an error if the database is closed or read-only
897    pub fn put_no_overwrite(
898        &self,
899        txn: Option<&Transaction>,
900        key: &DatabaseEntry,
901        data: &DatabaseEntry,
902    ) -> Result<OperationStatus> {
903        self.check_open()?;
904        self.check_writable()?;
905
906        // Audit database F11 (Wave 2C-4): reject `None`-data keys on
907        // write paths.  See `put` for rationale.
908        let key_bytes = Self::require_key_bytes(key, "put_no_overwrite")?;
909        let data_bytes = data.get_data().unwrap_or(&[]);
910
911        let status = match txn {
912            Some(t) => {
913                let mut cursor = self.make_cursor_for_txn(t);
914                match cursor
915                    .put(key_bytes, data_bytes, PutMode::NoOverwrite)
916                    .map_err(NoxuError::from)?
917                {
918                    noxu_dbi::OperationStatus::KeyExist => {
919                        OperationStatus::KeyExists
920                    }
921                    _ => OperationStatus::Success,
922                }
923            }
924            None => self.with_auto_txn(|cursor| {
925                cursor
926                    .put(key_bytes, data_bytes, PutMode::NoOverwrite)
927                    .map_err(NoxuError::from)
928                    .map(|s| match s {
929                        noxu_dbi::OperationStatus::KeyExist => {
930                            OperationStatus::KeyExists
931                        }
932                        _ => OperationStatus::Success,
933                    })
934            })?,
935        };
936        if status == OperationStatus::Success {
937            self.throughput.n_pri_inserts.fetch_add(1, Ordering::Relaxed);
938        } else {
939            self.throughput.n_pri_insert_fails.fetch_add(1, Ordering::Relaxed);
940        }
941        Ok(status)
942    }
943
944    /// Deletes a record by key.
945    ///
946    ///
947    ///
948    /// # Arguments
949    /// * `txn` - Optional transaction handle (used to scope locks and writes to the transaction)
950    /// * `key` - The key to delete
951    ///
952    /// # Returns
953    /// `OperationStatus::Success` if deleted, `OperationStatus::NotFound` if key didn't exist
954    ///
955    /// # Errors
956    /// Returns an error if the database is closed or read-only
957    pub fn delete(
958        &self,
959        txn: Option<&Transaction>,
960        key: &DatabaseEntry,
961    ) -> Result<OperationStatus> {
962        self.check_open()?;
963        self.check_writable()?;
964        observe_span!(
965            "db_delete",
966            db_name = self.name.as_str(),
967            key_size = key.get_data().map_or(0, |k| k.len()),
968        );
969        let _obs_timer = observe_timer_start!();
970        observe_counter!("noxu_db_operations_total", "op" => "delete");
971
972        let key_bytes = match key.get_data() {
973            Some(k) => k,
974            None => return Ok(OperationStatus::NotFound),
975        };
976
977        // v1.6 (audit C3): if any secondaries are registered we must
978        // capture the pre-delete primary data on each iteration so the
979        // secondary key creator can recompute every (sec_key, pri_key)
980        // pair to remove.  Collected outside the cursor closure so
981        // the auto-commit and explicit-txn paths share one buffer.
982        let secondaries = self.live_secondaries();
983        let track_old_data = !secondaries.is_empty();
984        let mut deleted_old_values: Vec<Vec<u8>> = Vec::new();
985
986        // v1.6 (audit C2 / Decision 2C — step 8): consult any FK
987        // referrers BEFORE the delete is applied.  An Abort action
988        // raises a typed error and prevents the foreign delete from
989        // happening at all (matching JE's `ForeignConstraintException`
990        // semantics).  Cascade / Nullify (steps 9 / 10) mutate child
991        // records under the same caller-supplied txn so the foreign
992        // delete and its consequences commit / abort together.
993        let fk_referrers = self.live_fk_referrers();
994        if !fk_referrers.is_empty() {
995            for referrer in &fk_referrers {
996                referrer.on_foreign_key_deleted(txn, key)?;
997            }
998        }
999
1000        // Inner closure shared between the explicit-txn and synthetic
1001        // auto-txn paths: scans + deletes every duplicate of `key_bytes`
1002        // through the supplied `cursor`.  See comment in pre-Wave-1A
1003        // delete for the dup-loop rationale (BDB-JE
1004        // `Database.delete(key)` semantics).
1005        let mut run_delete = |cursor: &mut CursorImpl| -> Result<bool> {
1006            let mut deleted_any = false;
1007            while let noxu_dbi::OperationStatus::Success = cursor
1008                .search(key_bytes, None, SearchMode::Set)
1009                .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
1010            {
1011                if track_old_data {
1012                    let (_, v) = cursor.get_current().map_err(|e| {
1013                        NoxuError::OperationNotAllowed(e.to_string())
1014                    })?;
1015                    deleted_old_values.push(v);
1016                }
1017                cursor.delete().map_err(|e| {
1018                    NoxuError::OperationNotAllowed(e.to_string())
1019                })?;
1020                deleted_any = true;
1021            }
1022            Ok(deleted_any)
1023        };
1024
1025        let deleted_any = match txn {
1026            Some(t) => {
1027                let mut cursor = self.make_cursor_for_txn(t);
1028                run_delete(&mut cursor)?
1029            }
1030            None => self.with_auto_txn(&mut run_delete)?,
1031        };
1032
1033        // v1.6 (audit C3): fan out the secondary cleanup under the
1034        // caller's txn so the primary delete and every secondary
1035        // tombstone commit / abort together.
1036        if deleted_any && !secondaries.is_empty() {
1037            for old_bytes in &deleted_old_values {
1038                let old_entry = DatabaseEntry::from_bytes(old_bytes);
1039                for hook in &secondaries {
1040                    hook.maintain(txn, key, Some(&old_entry), None)?;
1041                }
1042            }
1043        }
1044
1045        let status = if deleted_any {
1046            OperationStatus::Success
1047        } else {
1048            OperationStatus::NotFound
1049        };
1050        if status == OperationStatus::Success {
1051            self.throughput.n_pri_deletes.fetch_add(1, Ordering::Relaxed);
1052        } else {
1053            self.throughput.n_pri_delete_fails.fetch_add(1, Ordering::Relaxed);
1054        }
1055        observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "delete");
1056        Ok(status)
1057    }
1058
1059    /// Opens a cursor for iterating over database records.
1060    ///
1061    /// When a `Some(&txn)` is passed, the cursor binds to the transaction's
1062    /// `Locker`: every cursor `get` acquires shared locks tracked by the txn,
1063    /// every cursor `put`/`delete` acquires exclusive locks and is rolled
1064    /// back if the txn aborts.  When `txn` is `None` the cursor runs in
1065    /// auto-commit mode — each write is its own transaction and is fsynced
1066    /// before returning.
1067    ///
1068    /// **You must close the cursor before committing or aborting the
1069    /// transaction it was opened under.**
1070    ///
1071    /// # Arguments
1072    /// * `txn` - Optional transaction handle that the cursor should
1073    ///   participate in.
1074    /// * `config` - Optional cursor configuration
1075    ///
1076    /// # Returns
1077    /// A new cursor handle
1078    ///
1079    /// # Errors
1080    /// Returns an error if the database is closed
1081    pub fn open_cursor(
1082        &self,
1083        txn: Option<&Transaction>,
1084        config: Option<&CursorConfig>,
1085    ) -> Result<Cursor> {
1086        self.check_open()?;
1087
1088        // JE invariant: a transactional cursor cannot be opened on a
1089        // non-transactional database.  JE throws IllegalArgumentException
1090        // in DatabaseTest.testCursor when a Transaction is supplied to
1091        // a non-txnal DB (wave-11-G, database_txn_cursor_on_non_txn_db_rejected).
1092        if txn.is_some() && !self.config.transactional {
1093            return Err(NoxuError::IllegalArgument(
1094                "cannot open a transactional cursor on a \
1095                 non-transactional database"
1096                    .to_string(),
1097            ));
1098        }
1099        let read_only = config.map(|c| c.read_uncommitted).unwrap_or(false)
1100            || self.config.read_only;
1101
1102        let cursor_impl = if read_only {
1103            CursorImpl::new(Arc::clone(&self.db_impl), 0)
1104                .with_env_invalid(Arc::clone(&self.env_invalid))
1105        } else {
1106            // Plumb the caller's txn through to the cursor so that
1107            // cursor reads acquire shared locks via the txn's locker and
1108            // cursor writes acquire exclusive locks (and roll back on
1109            // txn.abort()) rather than auto-committing.  See API audit
1110            // 2026-05 cursor finding C1.
1111            match txn {
1112                Some(t) => self.make_cursor_for_txn(t),
1113                None => self.make_cursor(),
1114            }
1115        };
1116
1117        Ok(Cursor::from_impl(cursor_impl, read_only))
1118    }
1119
1120    /// Returns a lazy forward iterator over all records in the database.
1121    ///
1122    /// Records are fetched one at a time (the underlying cursor advances on
1123    /// each `next()` call).  The full database is **not** eagerly materialised
1124    /// into memory.
1125    ///
1126    /// Pass `txn = Some(&txn)` to iterate within an explicit transaction;
1127    /// pass `None` for an auto-commit (non-transactional) scan.
1128    ///
1129    /// # Example
1130    ///
1131    /// ```no_run
1132    /// # use noxu_db::{Database, DatabaseConfig, DatabaseEntry,
1133    /// #              Environment, EnvironmentConfig};
1134    /// # use std::path::PathBuf;
1135    /// # fn main() -> noxu_db::Result<()> {
1136    /// # let env = Environment::open(EnvironmentConfig::new(PathBuf::from("/tmp/t")).with_allow_create(true))?;
1137    /// # let db = env.open_database(None, "d", &DatabaseConfig::new().with_allow_create(true))?;
1138    /// for result in db.iter(None)? {
1139    ///     let (key, val) = result?;
1140    ///     println!("{:?} => {:?}", key, val);
1141    /// }
1142    /// # Ok(()) }
1143    /// ```
1144    ///
1145    /// # Errors
1146    /// Returns an error if the database is closed.
1147    pub fn iter<'txn>(
1148        &self,
1149        txn: Option<&'txn Transaction>,
1150    ) -> Result<crate::db_iter::DbIter<'txn>> {
1151        let cursor = self.open_cursor(txn, None)?;
1152        Ok(crate::db_iter::DbIter::new(cursor))
1153    }
1154
1155    /// Returns a lazy iterator over the records whose keys fall within `range`.
1156    ///
1157    /// The iterator is positioned at the first key that satisfies the lower
1158    /// bound (using `SearchGte`) and stops once the key exceeds the upper
1159    /// bound.  All standard `RangeBounds` variants are supported:
1160    /// `..`, `lo..`, `..=hi`, `lo..hi`, `lo..=hi`, etc.
1161    ///
1162    /// Pass `txn = Some(&txn)` to iterate within an explicit transaction;
1163    /// pass `None` for a non-transactional scan.
1164    ///
1165    /// # Example
1166    ///
1167    /// ```no_run
1168    /// # use noxu_db::{Database, DatabaseConfig, DatabaseEntry,
1169    /// #              Environment, EnvironmentConfig};
1170    /// # use std::path::PathBuf;
1171    /// # fn main() -> noxu_db::Result<()> {
1172    /// # let env = Environment::open(EnvironmentConfig::new(PathBuf::from("/tmp/t")).with_allow_create(true))?;
1173    /// # let db = env.open_database(None, "d", &DatabaseConfig::new().with_allow_create(true))?;
1174    /// let lo = b"key010";
1175    /// let hi = b"key020";
1176    /// for result in db.range(None, lo.as_ref()..=hi.as_ref())? {
1177    ///     let (key, _val) = result?;
1178    ///     assert!(key.as_slice() >= lo.as_slice());
1179    /// }
1180    /// # Ok(()) }
1181    /// ```
1182    ///
1183    /// # Errors
1184    /// Returns an error if the database is closed.
1185    pub fn range<'txn, K: AsRef<[u8]>>(
1186        &self,
1187        txn: Option<&'txn Transaction>,
1188        range: impl std::ops::RangeBounds<K>,
1189    ) -> Result<crate::db_iter::DbRange<'txn>> {
1190        use std::ops::Bound;
1191        let map_bound = |b: std::ops::Bound<&K>| -> std::ops::Bound<Vec<u8>> {
1192            match b {
1193                Bound::Included(k) => Bound::Included(k.as_ref().to_vec()),
1194                Bound::Excluded(k) => Bound::Excluded(k.as_ref().to_vec()),
1195                Bound::Unbounded => Bound::Unbounded,
1196            }
1197        };
1198        let start = map_bound(range.start_bound());
1199        let end = map_bound(range.end_bound());
1200        let cursor = self.open_cursor(txn, None)?;
1201        Ok(crate::db_iter::DbRange::new(cursor, start, end))
1202    }
1203    ///
1204    ///
1205    ///
1206    /// # Arguments
1207    /// * `key`    - The database key under which the sequence record is stored.
1208    /// * `config` - Sequence configuration (use `SequenceConfig::new()` for defaults).
1209    ///
1210    /// # Errors
1211    /// Returns an error if the database is closed, the config is invalid, or
1212    /// `allow_create` is false and the sequence does not exist.
1213    pub fn open_sequence<'db>(
1214        &'db self,
1215        key: &DatabaseEntry,
1216        config: SequenceConfig,
1217    ) -> Result<Sequence<'db>> {
1218        self.check_open()?;
1219        Sequence::open(self, key, config)
1220    }
1221
1222    /// Closes the database handle.
1223    ///
1224    ///
1225    ///
1226    /// # Errors
1227    /// Returns an error if the database is already closed
1228    pub fn close(&self) -> Result<()> {
1229        if !self.open.load(Ordering::Acquire) {
1230            return Err(NoxuError::DatabaseClosed);
1231        }
1232
1233        self.open.store(false, Ordering::Release);
1234        let _ = self
1235            .env_impl
1236            .lock()
1237            .close_database(noxu_dbi::DatabaseId::new(self.id as i64));
1238        Ok(())
1239    }
1240
1241    /// Returns the database name.
1242    ///
1243    ///
1244    pub fn get_database_name(&self) -> &str {
1245        &self.name
1246    }
1247
1248    /// Returns the database configuration.
1249    ///
1250    ///
1251    pub fn get_config(&self) -> &DatabaseConfig {
1252        &self.config
1253    }
1254
1255    /// Returns the underlying database ID.  Used by FK cascade guards
1256    /// to disambiguate `(db, key)` frames when several databases
1257    /// participate in a cycle.
1258    pub(crate) fn db_id_for_fk_guard(&self) -> u64 {
1259        self.id
1260    }
1261
1262    /// Registers a secondary index for automatic maintenance.
1263    ///
1264    /// v1.6 (audit C3 — associate() hook): every [`SecondaryDatabase`]
1265    /// downgrades its inner `Arc<SecondaryHookState>` to a `Weak` and
1266    /// stores it here.  Subsequent `put` / `delete` calls iterate the
1267    /// list and forward the same txn to every live secondary, dropping
1268    /// dead `Weak` entries on the fly.
1269    pub(crate) fn register_secondary(
1270        &self,
1271        hook: std::sync::Weak<
1272            dyn crate::secondary_database::SecondaryHook + Send + Sync,
1273        >,
1274    ) {
1275        let mut guard = self.secondaries.write();
1276        // Compact dead Weak entries lazily on every registration so the
1277        // list does not grow unboundedly with churn.
1278        guard.retain(|w| w.strong_count() > 0);
1279        guard.push(hook);
1280    }
1281
1282    /// Returns a snapshot of every live registered secondary.  Used by
1283    /// the automatic-maintenance plumbing in `put` / `delete` to drive
1284    /// secondaries without holding the registry lock across the
1285    /// secondary call.  Dead `Weak` entries are dropped from the
1286    /// returned list (and — because we re-acquire the registry write
1287    /// lock at registration time — lazily compacted from the registry
1288    /// itself).
1289    pub(crate) fn live_secondaries(
1290        &self,
1291    ) -> Vec<Arc<dyn crate::secondary_database::SecondaryHook + Send + Sync>>
1292    {
1293        self.secondaries.read().iter().filter_map(|w| w.upgrade()).collect()
1294    }
1295
1296    /// Registers an FK referrer that points at this primary as its
1297    /// foreign-key target (v1.6 audit C2 / Decision 2C — Abort hook).
1298    pub(crate) fn register_fk_referrer(
1299        &self,
1300        referrer: std::sync::Weak<
1301            dyn crate::secondary_database::FkReferrer + Send + Sync,
1302        >,
1303    ) {
1304        let mut guard = self.fk_referrers.write();
1305        guard.retain(|w| w.strong_count() > 0);
1306        guard.push(referrer);
1307    }
1308
1309    /// Snapshot of every live FK referrer.
1310    pub(crate) fn live_fk_referrers(
1311        &self,
1312    ) -> Vec<Arc<dyn crate::secondary_database::FkReferrer + Send + Sync>> {
1313        self.fk_referrers.read().iter().filter_map(|w| w.upgrade()).collect()
1314    }
1315
1316    /// Returns an approximate count of records in the database.
1317    ///
1318    /// reads the per-database `AtomicU64` entry
1319    /// counter, giving O(1) performance analogous to an O(1) counter.
1320    ///
1321    /// The counter is incremented on every new insert and decremented on every
1322    /// delete (including transaction aborts that undo inserts).
1323    ///
1324    /// # Errors
1325    /// Returns an error if the database is closed
1326    pub fn count(&self) -> Result<u64> {
1327        self.check_open()?;
1328        Ok(self.db_impl.read().entry_count())
1329    }
1330
1331    /// Returns all records as `(key_bytes, data_bytes)` pairs in key order.
1332    ///
1333    /// This is a helper for schema evolution: it uses the lower-level
1334    /// `CursorImpl` directly so each iteration yields raw `Vec<u8>` pairs
1335    /// without allocating a pair of `DatabaseEntry` values per record.
1336    ///
1337    /// # Errors
1338    /// Returns an error if the database is closed or a cursor operation fails.
1339    pub fn scan_all_kv(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1340        self.check_open()?;
1341
1342        let mut cursor = CursorImpl::new(Arc::clone(&self.db_impl), 0)
1343            .with_env_invalid(Arc::clone(&self.env_invalid));
1344        let first_status = cursor
1345            .get_first()
1346            .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1347
1348        if first_status != noxu_dbi::OperationStatus::Success {
1349            return Ok(Vec::new());
1350        }
1351
1352        let mut records = Vec::new();
1353        loop {
1354            let (k, v) = cursor
1355                .get_current()
1356                .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1357            records.push((k, v));
1358
1359            let status = cursor
1360                .retrieve_next(GetMode::Next)
1361                .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1362            if status != noxu_dbi::OperationStatus::Success {
1363                break;
1364            }
1365        }
1366
1367        Ok(records)
1368    }
1369
1370    /// Returns whether the database handle is valid.
1371    ///
1372    ///
1373    pub fn is_valid(&self) -> bool {
1374        self.open.load(Ordering::Acquire)
1375    }
1376
1377    /// Returns the current state of the database handle.
1378    pub fn state(&self) -> DbState {
1379        if self.open.load(Ordering::Acquire) {
1380            DbState::Open
1381        } else {
1382            DbState::Closed
1383        }
1384    }
1385
1386    /// Flushes all pending writes for this database to stable storage.
1387    ///
1388    /// Implements `Database.sync()` — issues an fdatasync on the log file,
1389    /// ensuring that all writes made by non-transactional or deferred-sync
1390    /// operations are durable before returning.
1391    ///
1392    /// # Returns
1393    /// `Ok(())` on success. Acts as a no-op for non-transactional /
1394    /// in-memory environments where no log manager is configured.
1395    ///
1396    /// # Errors
1397    /// Returns an error if the database is closed or the underlying
1398    /// log-manager flush fails.
1399    pub fn sync(&self) -> Result<()> {
1400        self.check_open()?;
1401        if let Some(lm) = &self.log_manager {
1402            lm.flush_sync()
1403                .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1404        }
1405        Ok(())
1406    }
1407
1408    /// Preloads the database into cache by scanning the B-tree.
1409    ///
1410    /// Walks the tree, touching each internal-node and BIN level so they
1411    /// are pulled into the in-memory cache.  Useful for warming the
1412    /// cache before a workload begins.
1413    ///
1414    /// # Limitations
1415    /// * The current implementation warms the BIN/IN structure only;
1416    ///   `PreloadConfig::load_lns` therefore makes `lns_loaded` report
1417    ///   the *number of LN slots in the tree* rather than the number
1418    ///   of LNs actually fetched off disk.  Full LN warming is
1419    ///   tracked as a future-work item; the engine has no public
1420    ///   single-shot LN fetch API today, so the only way to warm an
1421    ///   LN is to position a cursor on its slot.
1422    /// * `PreloadConfig::max_millis` is honoured: the call returns
1423    ///   early once the wall-clock budget is exceeded, with the
1424    ///   partial results in the returned `PreloadStats`.
1425    ///
1426    /// # Arguments
1427    /// * `config` - Controls limits on preload duration and memory
1428    ///
1429    /// # Returns
1430    /// Statistics about what was preloaded.
1431    pub fn preload(&self, config: &PreloadConfig) -> Result<PreloadStats> {
1432        self.check_open()?;
1433        let start = std::time::Instant::now();
1434        let max_millis = config.max_millis;
1435        let mut stats =
1436            PreloadStats { bins_loaded: 0, lns_loaded: 0, elapsed_ms: 0 };
1437
1438        let guard = self.db_impl.read();
1439        if let Some(tree_stats) = guard.collect_btree_stats() {
1440            // collect_btree_stats() walks every node in the tree, which has
1441            // the side effect of pulling all BINs/INs into memory (cache).
1442            stats.bins_loaded = tree_stats.n_bins;
1443            if config.load_lns {
1444                // F9 (residual): this is the slot count, not a count of
1445                // actual LN fetches.  See the doc comment above.
1446                stats.lns_loaded = tree_stats.n_entries;
1447            }
1448        }
1449
1450        // Audit database F10 (Wave 2C-4): honour `max_millis` as a
1451        // post-walk diagnostic.  `collect_btree_stats` is currently
1452        // not interruptible, so the time bound surfaces in `stats`
1453        // (callers can detect over-budget runs by comparing
1454        // `elapsed_ms` to their config) but does not yet stop the
1455        // walk early.  Tracked for v2.0 alongside true LN warming.
1456        let elapsed_ms = start.elapsed().as_millis() as u64;
1457        if max_millis > 0 && elapsed_ms > max_millis {
1458            log::warn!(
1459                "Database::preload: walk took {elapsed_ms} ms, exceeding \
1460                 max_millis budget of {max_millis} ms (advisory until \
1461                 the BIN walker becomes interruptible)",
1462            );
1463        }
1464        stats.elapsed_ms = elapsed_ms;
1465        Ok(stats)
1466    }
1467
1468    /// Returns B-tree statistics for this database.
1469    ///
1470    /// Implements `Database.getStats(StatsConfig)`.
1471    ///
1472    /// When `config.fast` is `true`, only the O(1) entry-count is returned
1473    /// and no tree traversal is performed.  When `fast` is `false` (default),
1474    /// the full tree is walked to populate all node-count fields.
1475    ///
1476    /// # Errors
1477    /// Returns an error if the database is closed.
1478    pub fn get_stats(
1479        &self,
1480        config: Option<&StatsConfig>,
1481    ) -> Result<DatabaseStats> {
1482        self.check_open()?;
1483        let fast = config.map(|c| c.fast).unwrap_or(false);
1484
1485        let btree = if fast {
1486            // Fast path: O(1) counter only; skip tree traversal.
1487            BtreeStats {
1488                leaf_node_count: self.db_impl.read().entry_count(),
1489                ..Default::default()
1490            }
1491        } else {
1492            // Full path: walk the tree.
1493            let guard = self.db_impl.read();
1494            match guard.collect_btree_stats() {
1495                Some(ts) => BtreeStats {
1496                    leaf_node_count: ts.n_entries,
1497                    deleted_leaf_node_count: 0,
1498                    bottom_internal_node_count: ts.n_bins,
1499                    internal_node_count: ts.n_ins,
1500                    main_tree_max_depth: ts.height,
1501                },
1502                None => BtreeStats {
1503                    leaf_node_count: guard.entry_count(),
1504                    ..Default::default()
1505                },
1506            }
1507        };
1508
1509        Ok(DatabaseStats { btree })
1510    }
1511
1512    /// Verifies the structural integrity of this database's B-tree.
1513    ///
1514    /// Walks the B-tree from root to BIN leaves and checks:
1515    /// - Each upper IN's children are accessible (non-null child references).
1516    /// - Each BIN entry that is not known-deleted has a valid (non-NULL) LSN.
1517    /// - The BIN's first key is >= the parent routing key (key-range containment).
1518    ///
1519    /// Mirrors `Database.verify(VerifyConfig)` — calls `BtreeVerifier` on
1520    /// the underlying tree.
1521    ///
1522    /// # Arguments
1523    /// * `config` - Verification options (which checks to run, max errors, etc.)
1524    ///
1525    /// # Returns
1526    /// A `VerifyResult` with any structural errors and the count of records verified.
1527    ///
1528    /// # Errors
1529    /// Returns an error if the database is closed.
1530    pub fn verify(
1531        &self,
1532        config: &noxu_engine::VerifyConfig,
1533    ) -> Result<noxu_engine::VerifyResult> {
1534        self.check_open()?;
1535        let guard = self.db_impl.read();
1536        Ok(noxu_engine::verify_database_impl(&guard, config))
1537    }
1538
1539    /// Creates a join cursor that returns records matching all secondary-key
1540    /// constraints expressed by the pre-positioned `cursors`.
1541    ///
1542    /// Mirrors `Database.join(SecondaryCursor[], JoinConfig)`.
1543    ///
1544    /// Each cursor in `cursors` must already be positioned at the desired
1545    /// secondary key value (e.g. via `SecondaryCursor::get_search_key`).
1546    /// The join algorithm iterates through all candidate primary keys from
1547    /// `cursors[0]` and probes `cursors[1..n]` to confirm each candidate
1548    /// also appears in their secondary keys.  Candidates that pass all
1549    /// probes are returned by [`JoinCursor::get_next`].
1550    ///
1551    /// Unless `config.no_sort` is `true`, the cursor array is re-ordered by
1552    /// ascending duplicate-count estimate before the join starts, matching
1553    /// JE's optimisation for minimum candidate-set size.
1554    ///
1555    /// The returned `JoinCursor` owns the `cursors` for its lifetime.
1556    ///
1557    /// # Errors
1558    /// Returns an error if this database handle is closed.
1559    pub fn join<'db>(
1560        &'db self,
1561        cursors: Vec<SecondaryCursor<'db>>,
1562        config: Option<JoinConfig>,
1563    ) -> Result<JoinCursor<'db>> {
1564        self.check_open()?;
1565        JoinCursor::new(self, cursors, config)
1566    }
1567
1568    /// Checks if the database is open, returns an error if not.
1569    ///
1570    /// X-13: also checks the environment validity flags so that reads and
1571    /// writes return `EnvironmentFailure` after an fsync error or explicit
1572    /// `EnvironmentImpl::invalidate()` call rather than silently succeeding
1573    /// on stale BIN data.
1574    fn check_open(&self) -> Result<()> {
1575        // Check environment validity first — explicit invalidation.
1576        if self.env_invalid.load(Ordering::Acquire) {
1577            return Err(NoxuError::environment_with_reason(
1578                crate::error::EnvironmentFailureReason::UnexpectedStateFatal,
1579                "environment has been invalidated".to_string(),
1580            ));
1581        }
1582        // Check I/O failure (C-2 / fsync-gate).
1583        if self
1584            .log_manager
1585            .as_ref()
1586            .is_some_and(|lm| lm.io_invalid.load(Ordering::Acquire))
1587        {
1588            return Err(NoxuError::environment_with_reason(
1589                crate::error::EnvironmentFailureReason::LogWrite,
1590                "I/O failure: environment invalidated by fsync error"
1591                    .to_string(),
1592            ));
1593        }
1594        if !self.open.load(Ordering::Acquire) {
1595            return Err(NoxuError::DatabaseClosed);
1596        }
1597        Ok(())
1598    }
1599
1600    /// Public-ish accessor for the cached log manager, used by
1601    /// [`crate::disk_ordered_cursor::open_disk_ordered_cursor_multi`].
1602    /// Returns `None` for non-WAL environments.
1603    pub(crate) fn cached_log_manager(
1604        &self,
1605    ) -> Option<&std::sync::Arc<noxu_log::LogManager>> {
1606        self.log_manager.as_ref()
1607    }
1608
1609    /// Public-ish accessor used by the disk-ordered-cursor helper to
1610    /// validate that the database is still open before scanning.
1611    pub(crate) fn check_open_for_doc(&self) -> Result<()> {
1612        self.check_open()
1613    }
1614
1615    /// Returns this database's `DatabaseId` for use by the disk-ordered
1616    /// cursor producer.
1617    pub(crate) fn database_id_for_doc(&self) -> noxu_dbi::DatabaseId {
1618        noxu_dbi::DatabaseId::new(self.id as i64)
1619    }
1620
1621    /// Checks if the database is writable, returns an error if not.
1622    fn check_writable(&self) -> Result<()> {
1623        if self.config.read_only {
1624            return Err(NoxuError::ReadOnly);
1625        }
1626        Ok(())
1627    }
1628
1629    /// Unify the empty-key contract
1630    /// across `get` / `put` / `put_no_overwrite` / `put_with_options`
1631    /// / `delete`.  Returns the key bytes if the entry has data set
1632    /// (even if zero-length); rejects `None`-data keys with a typed
1633    /// `IllegalArgument` so the previous put-vs-get asymmetry can no
1634    /// longer black-hole records under a `None` key.
1635    fn require_key_bytes<'a>(
1636        key: &'a DatabaseEntry,
1637        op: &'static str,
1638    ) -> Result<&'a [u8]> {
1639        match key.get_data() {
1640            Some(k) => Ok(k),
1641            None => Err(NoxuError::IllegalArgument(format!(
1642                "{op}: key DatabaseEntry has no data; \
1643                 use DatabaseEntry::from_bytes(...) or set_data(...) \
1644                 (Some(&[]) for an explicit empty key)",
1645            ))),
1646        }
1647    }
1648}
1649
1650impl Drop for Database {
1651    fn drop(&mut self) {
1652        // Best effort close on drop
1653        let _ = self.close();
1654    }
1655}
1656
1657#[cfg(test)]
1658mod tests {
1659    use super::*;
1660    use crate::environment::Environment;
1661    use crate::environment_config::EnvironmentConfig;
1662    use tempfile::TempDir;
1663
1664    fn temp_env_and_db() -> (TempDir, Environment, Database) {
1665        let temp_dir = TempDir::new().unwrap();
1666        let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1667            .with_allow_create(true)
1668            .with_transactional(true);
1669        let env = Environment::open(env_config).unwrap();
1670
1671        let db_config = DatabaseConfig::new().with_allow_create(true);
1672        let db = env.open_database(None, "testdb", &db_config).unwrap();
1673
1674        (temp_dir, env, db)
1675    }
1676
1677    #[test]
1678    fn test_database_name() {
1679        let (_temp_dir, _env, db) = temp_env_and_db();
1680        assert_eq!(db.get_database_name(), "testdb");
1681    }
1682
1683    #[test]
1684    fn test_put_and_get() {
1685        let (_temp_dir, _env, db) = temp_env_and_db();
1686
1687        let key = DatabaseEntry::from_bytes(b"key1");
1688        let value = DatabaseEntry::from_bytes(b"value1");
1689
1690        let result = db.put(None, &key, &value).unwrap();
1691        assert_eq!(result, OperationStatus::Success);
1692
1693        let mut retrieved = DatabaseEntry::new();
1694        let result = db.get(None, &key, &mut retrieved).unwrap();
1695        assert_eq!(result, OperationStatus::Success);
1696        assert_eq!(retrieved.get_data().unwrap(), b"value1");
1697    }
1698
1699    #[test]
1700    fn test_get_nonexistent() {
1701        let (_temp_dir, _env, db) = temp_env_and_db();
1702
1703        let key = DatabaseEntry::from_bytes(b"nonexistent");
1704        let mut data = DatabaseEntry::new();
1705
1706        let result = db.get(None, &key, &mut data).unwrap();
1707        assert_eq!(result, OperationStatus::NotFound);
1708    }
1709
1710    /// ("partial-put length
1711    /// mismatch silent truncation"): a partial put whose `data` slice
1712    /// differs in length from the configured partial-length must be
1713    /// rejected with a typed error instead of silently truncating or
1714    /// padding the splice.
1715    #[test]
1716    fn test_partial_put_length_mismatch_rejected() {
1717        let (_temp_dir, _env, db) = temp_env_and_db();
1718
1719        let key = DatabaseEntry::from_bytes(b"k");
1720        db.put(None, &key, &DatabaseEntry::from_bytes(b"hello world")).unwrap();
1721
1722        // Partial offset=6, partial_length=5 ("world"), but only 3 bytes
1723        // supplied.  Used to silently truncate; now rejected.
1724        let mut patch = DatabaseEntry::from_bytes(b"abc");
1725        patch.set_partial(6, 5, true);
1726        let err = db.put(None, &key, &patch).unwrap_err();
1727        assert!(
1728            matches!(err, NoxuError::IllegalArgument(_)),
1729            "expected IllegalArgument, got {err:?}"
1730        );
1731        assert!(
1732            err.to_string().contains("partial"),
1733            "expected partial-related message, got {}",
1734            err
1735        );
1736
1737        // The on-disk record is unchanged because the call returned
1738        // before any write.
1739        let mut buf = DatabaseEntry::new();
1740        let status = db.get(None, &key, &mut buf).unwrap();
1741        assert_eq!(status, OperationStatus::Success);
1742        assert_eq!(buf.get_data().unwrap(), b"hello world");
1743    }
1744
1745    /// Companion: when data.len() == partial_length the partial put
1746    /// patches the slice in place and other bytes are preserved.
1747    #[test]
1748    fn test_partial_put_exact_length_patches_in_place() {
1749        let (_temp_dir, _env, db) = temp_env_and_db();
1750
1751        let key = DatabaseEntry::from_bytes(b"k");
1752        db.put(None, &key, &DatabaseEntry::from_bytes(b"hello world")).unwrap();
1753
1754        let mut patch = DatabaseEntry::from_bytes(b"WORLD");
1755        patch.set_partial(6, 5, true);
1756        db.put(None, &key, &patch).unwrap();
1757
1758        let mut buf = DatabaseEntry::new();
1759        db.get(None, &key, &mut buf).unwrap();
1760        assert_eq!(buf.get_data().unwrap(), b"hello WORLD");
1761    }
1762
1763    #[test]
1764    fn test_put_updates_existing() {
1765        let (_temp_dir, _env, db) = temp_env_and_db();
1766
1767        let key = DatabaseEntry::from_bytes(b"key1");
1768        let value1 = DatabaseEntry::from_bytes(b"value1");
1769        let value2 = DatabaseEntry::from_bytes(b"value2");
1770
1771        db.put(None, &key, &value1).unwrap();
1772        db.put(None, &key, &value2).unwrap();
1773
1774        let mut retrieved = DatabaseEntry::new();
1775        db.get(None, &key, &mut retrieved).unwrap();
1776        assert_eq!(retrieved.get_data().unwrap(), b"value2");
1777    }
1778
1779    #[test]
1780    fn test_put_no_overwrite_success() {
1781        let (_temp_dir, _env, db) = temp_env_and_db();
1782
1783        let key = DatabaseEntry::from_bytes(b"key1");
1784        let value = DatabaseEntry::from_bytes(b"value1");
1785
1786        let result = db.put_no_overwrite(None, &key, &value).unwrap();
1787        assert_eq!(result, OperationStatus::Success);
1788    }
1789
1790    #[test]
1791    fn test_put_no_overwrite_key_exists() {
1792        let (_temp_dir, _env, db) = temp_env_and_db();
1793
1794        let key = DatabaseEntry::from_bytes(b"key1");
1795        let value1 = DatabaseEntry::from_bytes(b"value1");
1796        let value2 = DatabaseEntry::from_bytes(b"value2");
1797
1798        db.put(None, &key, &value1).unwrap();
1799        let result = db.put_no_overwrite(None, &key, &value2).unwrap();
1800        assert_eq!(result, OperationStatus::KeyExists);
1801
1802        // Verify original value is unchanged
1803        let mut retrieved = DatabaseEntry::new();
1804        db.get(None, &key, &mut retrieved).unwrap();
1805        assert_eq!(retrieved.get_data().unwrap(), b"value1");
1806    }
1807
1808    #[test]
1809    fn test_delete() {
1810        let (_temp_dir, _env, db) = temp_env_and_db();
1811
1812        let key = DatabaseEntry::from_bytes(b"key1");
1813        let value = DatabaseEntry::from_bytes(b"value1");
1814
1815        db.put(None, &key, &value).unwrap();
1816        let result = db.delete(None, &key).unwrap();
1817        assert_eq!(result, OperationStatus::Success);
1818
1819        let mut retrieved = DatabaseEntry::new();
1820        let result = db.get(None, &key, &mut retrieved).unwrap();
1821        assert_eq!(result, OperationStatus::NotFound);
1822    }
1823
1824    #[test]
1825    fn test_delete_nonexistent() {
1826        let (_temp_dir, _env, db) = temp_env_and_db();
1827
1828        let key = DatabaseEntry::from_bytes(b"nonexistent");
1829        let result = db.delete(None, &key).unwrap();
1830        assert_eq!(result, OperationStatus::NotFound);
1831    }
1832
1833    #[test]
1834    fn test_count() {
1835        let (_temp_dir, _env, db) = temp_env_and_db();
1836
1837        assert_eq!(db.count().unwrap(), 0);
1838
1839        let key1 = DatabaseEntry::from_bytes(b"key1");
1840        let value1 = DatabaseEntry::from_bytes(b"value1");
1841        db.put(None, &key1, &value1).unwrap();
1842        assert_eq!(db.count().unwrap(), 1);
1843
1844        let key2 = DatabaseEntry::from_bytes(b"key2");
1845        let value2 = DatabaseEntry::from_bytes(b"value2");
1846        db.put(None, &key2, &value2).unwrap();
1847        assert_eq!(db.count().unwrap(), 2);
1848
1849        db.delete(None, &key1).unwrap();
1850        assert_eq!(db.count().unwrap(), 1);
1851    }
1852
1853    #[test]
1854    fn test_close() {
1855        let (_temp_dir, _env, db) = temp_env_and_db();
1856        assert!(db.is_valid());
1857        db.close().unwrap();
1858        assert!(!db.is_valid());
1859    }
1860
1861    #[test]
1862    fn test_close_twice_fails() {
1863        let (_temp_dir, _env, db) = temp_env_and_db();
1864        db.close().unwrap();
1865        let result = db.close();
1866        assert!(result.is_err());
1867    }
1868
1869    #[test]
1870    fn test_operations_on_closed_database_fail() {
1871        let (_temp_dir, _env, db) = temp_env_and_db();
1872        db.close().unwrap();
1873
1874        let key = DatabaseEntry::from_bytes(b"key1");
1875        let value = DatabaseEntry::from_bytes(b"value1");
1876        let mut data = DatabaseEntry::new();
1877
1878        assert!(db.get(None, &key, &mut data).is_err());
1879        assert!(db.put(None, &key, &value).is_err());
1880        assert!(db.put_no_overwrite(None, &key, &value).is_err());
1881        assert!(db.delete(None, &key).is_err());
1882        assert!(db.count().is_err());
1883        assert!(db.open_cursor(None, None).is_err());
1884    }
1885
1886    #[test]
1887    fn test_state() {
1888        let (_temp_dir, _env, db) = temp_env_and_db();
1889        assert_eq!(db.state(), DbState::Open);
1890        db.close().unwrap();
1891        assert_eq!(db.state(), DbState::Closed);
1892    }
1893
1894    #[test]
1895    fn test_read_only_database() {
1896        let temp_dir = TempDir::new().unwrap();
1897        let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1898            .with_allow_create(true);
1899        let env = Environment::open(env_config).unwrap();
1900
1901        let db_config =
1902            DatabaseConfig::new().with_allow_create(true).with_read_only(true);
1903        let db = env.open_database(None, "readonly_db", &db_config).unwrap();
1904
1905        let key = DatabaseEntry::from_bytes(b"key1");
1906        let value = DatabaseEntry::from_bytes(b"value1");
1907
1908        // Write operations should fail
1909        assert!(db.put(None, &key, &value).is_err());
1910        assert!(db.put_no_overwrite(None, &key, &value).is_err());
1911        assert!(db.delete(None, &key).is_err());
1912    }
1913
1914    #[test]
1915    fn test_multiple_databases() {
1916        let temp_dir = TempDir::new().unwrap();
1917        let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1918            .with_allow_create(true);
1919        let env = Environment::open(env_config).unwrap();
1920
1921        let db_config = DatabaseConfig::new().with_allow_create(true);
1922        let db1 = env.open_database(None, "db1", &db_config).unwrap();
1923        let db2 = env.open_database(None, "db2", &db_config).unwrap();
1924
1925        let key = DatabaseEntry::from_bytes(b"key1");
1926        let value1 = DatabaseEntry::from_bytes(b"value1");
1927        let value2 = DatabaseEntry::from_bytes(b"value2");
1928
1929        db1.put(None, &key, &value1).unwrap();
1930        db2.put(None, &key, &value2).unwrap();
1931
1932        let mut retrieved1 = DatabaseEntry::new();
1933        let mut retrieved2 = DatabaseEntry::new();
1934
1935        db1.get(None, &key, &mut retrieved1).unwrap();
1936        db2.get(None, &key, &mut retrieved2).unwrap();
1937
1938        assert_eq!(retrieved1.get_data().unwrap(), b"value1");
1939        assert_eq!(retrieved2.get_data().unwrap(), b"value2");
1940    }
1941
1942    #[test]
1943    fn test_empty_keys_and_values() {
1944        let (_temp_dir, _env, db) = temp_env_and_db();
1945
1946        let empty_key = DatabaseEntry::from_bytes(b"");
1947        let empty_value = DatabaseEntry::from_bytes(b"");
1948
1949        let result = db.put(None, &empty_key, &empty_value).unwrap();
1950        assert_eq!(result, OperationStatus::Success);
1951
1952        let mut retrieved = DatabaseEntry::new();
1953        let result = db.get(None, &empty_key, &mut retrieved).unwrap();
1954        assert_eq!(result, OperationStatus::Success);
1955        assert_eq!(retrieved.get_data().unwrap(), b"");
1956    }
1957
1958    #[test]
1959    fn test_large_keys_and_values() {
1960        let (_temp_dir, _env, db) = temp_env_and_db();
1961
1962        let large_key = DatabaseEntry::from_bytes(&vec![b'k'; 1000]);
1963        let large_value = DatabaseEntry::from_bytes(&vec![b'v'; 10000]);
1964
1965        db.put(None, &large_key, &large_value).unwrap();
1966
1967        let mut retrieved = DatabaseEntry::new();
1968        db.get(None, &large_key, &mut retrieved).unwrap();
1969        assert_eq!(retrieved.get_data().unwrap().len(), 10000);
1970        assert!(retrieved.get_data().unwrap().iter().all(|&b| b == b'v'));
1971    }
1972
1973    #[test]
1974    fn test_binary_keys_and_values() {
1975        let (_temp_dir, _env, db) = temp_env_and_db();
1976
1977        let binary_key = DatabaseEntry::from_bytes(&[0u8, 1, 2, 255, 254, 253]);
1978        let binary_value = DatabaseEntry::from_bytes(&[255u8, 0, 128, 64, 32]);
1979
1980        db.put(None, &binary_key, &binary_value).unwrap();
1981
1982        let mut retrieved = DatabaseEntry::new();
1983        db.get(None, &binary_key, &mut retrieved).unwrap();
1984        assert_eq!(retrieved.get_data().unwrap(), &[255u8, 0, 128, 64, 32]);
1985    }
1986
1987    #[test]
1988    fn test_scan_all_kv_empty() {
1989        let (_temp_dir, _env, db) = temp_env_and_db();
1990        let kv = db.scan_all_kv().unwrap();
1991        assert!(kv.is_empty());
1992    }
1993
1994    #[test]
1995    fn test_scan_all_kv_returns_records() {
1996        let (_temp_dir, _env, db) = temp_env_and_db();
1997        db.put(
1998            None,
1999            &DatabaseEntry::from_vec(vec![1]),
2000            &DatabaseEntry::from_vec(vec![10]),
2001        )
2002        .unwrap();
2003        db.put(
2004            None,
2005            &DatabaseEntry::from_vec(vec![2]),
2006            &DatabaseEntry::from_vec(vec![20]),
2007        )
2008        .unwrap();
2009        let kv = db.scan_all_kv().unwrap();
2010        assert_eq!(kv.len(), 2);
2011    }
2012
2013    #[test]
2014    fn test_scan_all_kv_then_delete() {
2015        let (_temp_dir, _env, db) = temp_env_and_db();
2016        db.put(
2017            None,
2018            &DatabaseEntry::from_vec(vec![1]),
2019            &DatabaseEntry::from_vec(vec![10]),
2020        )
2021        .unwrap();
2022        db.put(
2023            None,
2024            &DatabaseEntry::from_vec(vec![2]),
2025            &DatabaseEntry::from_vec(vec![20]),
2026        )
2027        .unwrap();
2028
2029        let kv = db.scan_all_kv().unwrap();
2030        assert_eq!(kv.len(), 2);
2031
2032        for (k, _v) in &kv {
2033            let status =
2034                db.delete(None, &DatabaseEntry::from_vec(k.clone())).unwrap();
2035            assert_eq!(
2036                status,
2037                OperationStatus::Success,
2038                "delete failed for key {:?}",
2039                k
2040            );
2041        }
2042
2043        let count = db.count().unwrap();
2044        assert_eq!(count, 0, "expected 0 records after deletes, got {}", count);
2045    }
2046
2047    #[test]
2048    fn test_scan_all_kv_then_delete_u64_be_keys() {
2049        // Simulate the exact pattern used in EntityStore::evolve: big-endian u64 keys.
2050        let (_temp_dir, _env, db) = temp_env_and_db();
2051        for id in [1u64, 2u64] {
2052            let key_bytes = id.to_be_bytes().to_vec();
2053            let val_bytes = format!("user{}", id).into_bytes();
2054            db.put(
2055                None,
2056                &DatabaseEntry::from_vec(key_bytes),
2057                &DatabaseEntry::from_vec(val_bytes),
2058            )
2059            .unwrap();
2060        }
2061        assert_eq!(db.count().unwrap(), 2);
2062
2063        let records = db.scan_all_kv().unwrap();
2064        assert_eq!(records.len(), 2);
2065
2066        for (k, _v) in records {
2067            let status =
2068                db.delete(None, &DatabaseEntry::from_vec(k.clone())).unwrap();
2069            assert_eq!(
2070                status,
2071                OperationStatus::Success,
2072                "delete failed for u64 key {:?}",
2073                k
2074            );
2075        }
2076        assert_eq!(db.count().unwrap(), 0);
2077    }
2078
2079    // ========================================================================
2080    // Additional branch-coverage tests
2081    // ========================================================================
2082
2083    /// get() with a None-data DatabaseEntry returns NotFound.
2084    #[test]
2085    fn test_get_with_none_key_data_returns_not_found() {
2086        let (_temp_dir, _env, db) = temp_env_and_db();
2087        let key_none = DatabaseEntry::new(); // no data set
2088        let mut data = DatabaseEntry::new();
2089
2090        let result = db.get(None, &key_none, &mut data).unwrap();
2091        assert_eq!(result, OperationStatus::NotFound);
2092    }
2093
2094    /// delete() with a None-data DatabaseEntry returns NotFound.
2095    #[test]
2096    fn test_delete_with_none_key_data_returns_not_found() {
2097        let (_temp_dir, _env, db) = temp_env_and_db();
2098        let key_none = DatabaseEntry::new();
2099
2100        let result = db.delete(None, &key_none).unwrap();
2101        assert_eq!(result, OperationStatus::NotFound);
2102    }
2103
2104    /// open_cursor() with a CursorConfig that has read_uncommitted=true makes
2105    /// the cursor read-only.
2106    #[test]
2107    fn test_open_cursor_read_uncommitted_config_makes_read_only() {
2108        use crate::cursor_config::CursorConfig;
2109        let (_temp_dir, _env, db) = temp_env_and_db();
2110
2111        let config = CursorConfig::new().with_read_uncommitted(true);
2112        let cursor = db.open_cursor(None, Some(&config)).unwrap();
2113        assert!(cursor.is_read_only());
2114    }
2115
2116    /// open_cursor() with no config and a non-read-only database produces a
2117    /// writable cursor.
2118    #[test]
2119    fn test_open_cursor_no_config_writable_db_is_writable() {
2120        let (_temp_dir, _env, db) = temp_env_and_db();
2121        let cursor = db.open_cursor(None, None).unwrap();
2122        assert!(!cursor.is_read_only());
2123    }
2124
2125    /// scan_all_kv() on a closed database returns an error.
2126    #[test]
2127    fn test_scan_all_kv_on_closed_database_fails() {
2128        let (_temp_dir, _env, db) = temp_env_and_db();
2129        db.close().unwrap();
2130        let result = db.scan_all_kv();
2131        assert!(result.is_err());
2132    }
2133
2134    /// put_no_overwrite() on a read-only database returns an error.
2135    #[test]
2136    fn test_put_no_overwrite_on_read_only_database_fails() {
2137        let temp_dir = TempDir::new().unwrap();
2138        let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2139            .with_allow_create(true);
2140        let env = Environment::open(env_config).unwrap();
2141
2142        let db_config =
2143            DatabaseConfig::new().with_allow_create(true).with_read_only(true);
2144        let db = env.open_database(None, "ro_db", &db_config).unwrap();
2145
2146        let key = DatabaseEntry::from_bytes(b"k");
2147        let val = DatabaseEntry::from_bytes(b"v");
2148        let result = db.put_no_overwrite(None, &key, &val);
2149        assert!(result.is_err());
2150    }
2151
2152    // =====================================================================
2153    // cursor-failure map_err coverage: use the test hook in noxu-dbi to
2154    // force cursor operations to return Err, exercising the map_err closures
2155    // in Database::get / put / put_no_overwrite / delete / count / scan_all_kv.
2156    // =====================================================================
2157
2158    /// Covers the map_err closure on `cursor.search(...)` inside `get()`.
2159    #[test]
2160    fn test_get_search_map_err_via_hook() {
2161        let (_tmp, _env, db) = temp_env_and_db();
2162        noxu_dbi::set_cursor_fail_after(1); // fail on the 1st check_state (search)
2163        let key = DatabaseEntry::from_bytes(b"any");
2164        let mut data = DatabaseEntry::new();
2165        let result = db.get(None, &key, &mut data);
2166        noxu_dbi::clear_cursor_fail_flag();
2167        assert!(result.is_err());
2168    }
2169
2170    /// Covers the map_err closure on `cursor.get_current()` inside `get()`.
2171    #[test]
2172    fn test_get_get_current_map_err_via_hook() {
2173        let (_tmp, _env, db) = temp_env_and_db();
2174        // Insert a key so search can succeed.
2175        db.put(
2176            None,
2177            &DatabaseEntry::from_bytes(b"k"),
2178            &DatabaseEntry::from_bytes(b"v"),
2179        )
2180        .unwrap();
2181        // fail on the 2nd check (check_initialized inside get_current).
2182        noxu_dbi::set_cursor_fail_after(2);
2183        let key = DatabaseEntry::from_bytes(b"k");
2184        let mut data = DatabaseEntry::new();
2185        let result = db.get(None, &key, &mut data);
2186        noxu_dbi::clear_cursor_fail_flag();
2187        assert!(result.is_err());
2188    }
2189
2190    /// Covers the map_err closure on `cursor.put(...)` inside `put()`.
2191    #[test]
2192    fn test_put_map_err_via_hook() {
2193        let (_tmp, _env, db) = temp_env_and_db();
2194        noxu_dbi::set_cursor_fail_after(1);
2195        let key = DatabaseEntry::from_bytes(b"k");
2196        let val = DatabaseEntry::from_bytes(b"v");
2197        let result = db.put(None, &key, &val);
2198        noxu_dbi::clear_cursor_fail_flag();
2199        assert!(result.is_err());
2200    }
2201
2202    /// Covers the map_err closure on `cursor.put(...)` inside `put_no_overwrite()`.
2203    #[test]
2204    fn test_put_no_overwrite_map_err_via_hook() {
2205        let (_tmp, _env, db) = temp_env_and_db();
2206        noxu_dbi::set_cursor_fail_after(1);
2207        let key = DatabaseEntry::from_bytes(b"k");
2208        let val = DatabaseEntry::from_bytes(b"v");
2209        let result = db.put_no_overwrite(None, &key, &val);
2210        noxu_dbi::clear_cursor_fail_flag();
2211        assert!(result.is_err());
2212    }
2213
2214    /// Covers the map_err closure on `cursor.search(...)` inside `delete()`.
2215    #[test]
2216    fn test_delete_search_map_err_via_hook() {
2217        let (_tmp, _env, db) = temp_env_and_db();
2218        noxu_dbi::set_cursor_fail_after(1);
2219        let key = DatabaseEntry::from_bytes(b"k");
2220        let result = db.delete(None, &key);
2221        noxu_dbi::clear_cursor_fail_flag();
2222        assert!(result.is_err());
2223    }
2224
2225    /// Covers the map_err closure on `cursor.delete()` inside `delete()`.
2226    #[test]
2227    fn test_delete_delete_map_err_via_hook() {
2228        let (_tmp, _env, db) = temp_env_and_db();
2229        db.put(
2230            None,
2231            &DatabaseEntry::from_bytes(b"k"),
2232            &DatabaseEntry::from_bytes(b"v"),
2233        )
2234        .unwrap();
2235        // fail on the 2nd check_state (the delete() call, after search succeeds).
2236        noxu_dbi::set_cursor_fail_after(2);
2237        let key = DatabaseEntry::from_bytes(b"k");
2238        let result = db.delete(None, &key);
2239        noxu_dbi::clear_cursor_fail_flag();
2240        assert!(result.is_err());
2241    }
2242
2243    /// count() uses the O(1) AtomicU64 counter; cursor-fail hooks do not affect it.
2244    /// Verify the counter is correct across insert/update/delete.
2245    #[test]
2246    fn test_count_atomic_counter_insert_update_delete() {
2247        let (_tmp, _env, db) = temp_env_and_db();
2248
2249        // Empty database starts at 0.
2250        assert_eq!(db.count().unwrap(), 0);
2251
2252        // Insert three distinct keys.
2253        db.put(
2254            None,
2255            &DatabaseEntry::from_bytes(b"a"),
2256            &DatabaseEntry::from_bytes(b"1"),
2257        )
2258        .unwrap();
2259        db.put(
2260            None,
2261            &DatabaseEntry::from_bytes(b"b"),
2262            &DatabaseEntry::from_bytes(b"2"),
2263        )
2264        .unwrap();
2265        db.put(
2266            None,
2267            &DatabaseEntry::from_bytes(b"c"),
2268            &DatabaseEntry::from_bytes(b"3"),
2269        )
2270        .unwrap();
2271        assert_eq!(db.count().unwrap(), 3);
2272
2273        // Overwrite an existing key — count must NOT change.
2274        db.put(
2275            None,
2276            &DatabaseEntry::from_bytes(b"a"),
2277            &DatabaseEntry::from_bytes(b"updated"),
2278        )
2279        .unwrap();
2280        assert_eq!(db.count().unwrap(), 3);
2281
2282        // Delete one key — count decrements.
2283        db.delete(None, &DatabaseEntry::from_bytes(b"b")).unwrap();
2284        assert_eq!(db.count().unwrap(), 2);
2285    }
2286
2287    /// count() is O(1): verify it still works even when the cursor fail-hook
2288    /// is active (the hook only affects cursor operations, not the atomic read).
2289    #[test]
2290    fn test_count_unaffected_by_cursor_fail_hook() {
2291        let (_tmp, _env, db) = temp_env_and_db();
2292        db.put(
2293            None,
2294            &DatabaseEntry::from_bytes(b"k"),
2295            &DatabaseEntry::from_bytes(b"v"),
2296        )
2297        .unwrap();
2298        noxu_dbi::set_cursor_fail_after(1);
2299        // count() must succeed (no cursor used).
2300        let result = db.count();
2301        noxu_dbi::clear_cursor_fail_flag();
2302        assert!(result.is_ok());
2303        assert_eq!(result.unwrap(), 1);
2304    }
2305
2306    /// Covers the map_err closure on `cursor.get_first()` inside `scan_all_kv()`.
2307    #[test]
2308    fn test_scan_all_kv_get_first_map_err_via_hook() {
2309        let (_tmp, _env, db) = temp_env_and_db();
2310        noxu_dbi::set_cursor_fail_after(1);
2311        let result = db.scan_all_kv();
2312        noxu_dbi::clear_cursor_fail_flag();
2313        assert!(result.is_err());
2314    }
2315
2316    /// Covers the map_err closure on `cursor.get_current()` inside `scan_all_kv()`.
2317    #[test]
2318    fn test_scan_all_kv_get_current_map_err_via_hook() {
2319        let (_tmp, _env, db) = temp_env_and_db();
2320        db.put(
2321            None,
2322            &DatabaseEntry::from_bytes(b"k"),
2323            &DatabaseEntry::from_bytes(b"v"),
2324        )
2325        .unwrap();
2326        // fail on the 2nd check (check_initialized inside get_current, after get_first succeeds).
2327        noxu_dbi::set_cursor_fail_after(2);
2328        let result = db.scan_all_kv();
2329        noxu_dbi::clear_cursor_fail_flag();
2330        assert!(result.is_err());
2331    }
2332
2333    /// Covers the map_err closure on `cursor.retrieve_next(...)` inside `scan_all_kv()`.
2334    #[test]
2335    fn test_scan_all_kv_retrieve_next_map_err_via_hook() {
2336        let (_tmp, _env, db) = temp_env_and_db();
2337        db.put(
2338            None,
2339            &DatabaseEntry::from_bytes(b"k"),
2340            &DatabaseEntry::from_bytes(b"v"),
2341        )
2342        .unwrap();
2343        // fail on the 3rd check (retrieve_next, after get_first and get_current succeed).
2344        noxu_dbi::set_cursor_fail_after(3);
2345        let result = db.scan_all_kv();
2346        noxu_dbi::clear_cursor_fail_flag();
2347        assert!(result.is_err());
2348    }
2349
2350    #[test]
2351    fn test_sync_on_open_database_succeeds() {
2352        let (_tmp, _env, db) = temp_env_and_db();
2353        db.put(
2354            None,
2355            &DatabaseEntry::from_bytes(b"key"),
2356            &DatabaseEntry::from_bytes(b"val"),
2357        )
2358        .unwrap();
2359        assert!(db.sync().is_ok());
2360    }
2361
2362    #[test]
2363    fn test_sync_on_closed_database_fails() {
2364        let (_tmp, _env, db) = temp_env_and_db();
2365        db.close().unwrap();
2366        assert!(db.sync().is_err());
2367    }
2368
2369    // ── verify ─────────────────────────────────────────────────────────────
2370
2371    #[test]
2372    fn test_verify_empty_database_passes() {
2373        use noxu_engine::VerifyConfig;
2374        let (_tmp, _env, db) = temp_env_and_db();
2375        let config = VerifyConfig::default();
2376        let result = db.verify(&config).unwrap();
2377        assert!(result.passed, "empty db should pass: {:?}", result.errors);
2378    }
2379
2380    #[test]
2381    fn test_verify_populated_database_passes() {
2382        use noxu_engine::VerifyConfig;
2383        let (_tmp, _env, db) = temp_env_and_db();
2384        for i in 0u32..20 {
2385            let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
2386            let v = DatabaseEntry::from_bytes(&(i * 2).to_be_bytes());
2387            db.put(None, &k, &v).unwrap();
2388        }
2389        let config = VerifyConfig::default();
2390        let result = db.verify(&config).unwrap();
2391        assert!(result.passed, "populated db should pass: {:?}", result.errors);
2392        assert!(result.records_verified > 0);
2393    }
2394
2395    #[test]
2396    fn test_verify_closed_database_fails() {
2397        use noxu_engine::VerifyConfig;
2398        let (_tmp, _env, db) = temp_env_and_db();
2399        db.close().unwrap();
2400        let config = VerifyConfig::default();
2401        assert!(db.verify(&config).is_err());
2402    }
2403
2404    // ── get_with_options / put_with_options ────────────────────────────────
2405
2406    #[test]
2407    fn test_get_with_options_default_reads_written_record() {
2408        use crate::read_options::ReadOptions;
2409        let (_tmp, _env, db) = temp_env_and_db();
2410        let key = DatabaseEntry::from_bytes(b"ropt_key");
2411        let val = DatabaseEntry::from_bytes(b"ropt_val");
2412        db.put(None, &key, &val).unwrap();
2413
2414        let opts = ReadOptions::new();
2415        let mut out = DatabaseEntry::new();
2416        let status = db.get_with_options(None, &key, &mut out, &opts).unwrap();
2417        assert_eq!(status, OperationStatus::Success);
2418        assert_eq!(out.get_data().unwrap(), b"ropt_val");
2419    }
2420
2421    #[test]
2422    fn test_get_with_options_read_uncommitted_sees_written_record() {
2423        use crate::read_options::ReadOptions;
2424        let (_tmp, _env, db) = temp_env_and_db();
2425        let key = DatabaseEntry::from_bytes(b"ru_key");
2426        let val = DatabaseEntry::from_bytes(b"ru_val");
2427        db.put(None, &key, &val).unwrap();
2428
2429        let opts = ReadOptions::read_uncommitted();
2430        let mut out = DatabaseEntry::new();
2431        let status = db.get_with_options(None, &key, &mut out, &opts).unwrap();
2432        assert_eq!(status, OperationStatus::Success);
2433        assert_eq!(out.get_data().unwrap(), b"ru_val");
2434    }
2435
2436    #[test]
2437    fn test_get_with_options_not_found() {
2438        use crate::read_options::ReadOptions;
2439        let (_tmp, _env, db) = temp_env_and_db();
2440        let key = DatabaseEntry::from_bytes(b"missing");
2441        let opts = ReadOptions::new();
2442        let mut out = DatabaseEntry::new();
2443        let status = db.get_with_options(None, &key, &mut out, &opts).unwrap();
2444        assert_eq!(status, OperationStatus::NotFound);
2445    }
2446
2447    #[test]
2448    fn test_put_with_options_no_ttl_behaves_like_put() {
2449        use crate::write_options::WriteOptions;
2450        let (_tmp, _env, db) = temp_env_and_db();
2451        let key = DatabaseEntry::from_bytes(b"wopt_key");
2452        let val = DatabaseEntry::from_bytes(b"wopt_val");
2453        let opts = WriteOptions::new();
2454        let status = db.put_with_options(None, &key, &val, &opts).unwrap();
2455        assert_eq!(status, OperationStatus::Success);
2456
2457        let mut out = DatabaseEntry::new();
2458        db.get(None, &key, &mut out).unwrap();
2459        assert_eq!(out.get_data().unwrap(), b"wopt_val");
2460    }
2461
2462    #[test]
2463    fn test_put_with_options_with_ttl_stores_record() {
2464        use crate::write_options::WriteOptions;
2465        let (_tmp, _env, db) = temp_env_and_db();
2466        let key = DatabaseEntry::from_bytes(b"ttl_key");
2467        let val = DatabaseEntry::from_bytes(b"ttl_val");
2468        // TTL of 1 hour — the record is not yet expired so it should be readable
2469        let opts = WriteOptions::with_expiration(1);
2470        let status = db.put_with_options(None, &key, &val, &opts).unwrap();
2471        assert_eq!(status, OperationStatus::Success);
2472
2473        let mut out = DatabaseEntry::new();
2474        let read_status = db.get(None, &key, &mut out).unwrap();
2475        assert_eq!(read_status, OperationStatus::Success);
2476        assert_eq!(out.get_data().unwrap(), b"ttl_val");
2477    }
2478
2479    #[test]
2480    fn test_put_with_options_closed_db_fails() {
2481        use crate::write_options::WriteOptions;
2482        let (_tmp, _env, db) = temp_env_and_db();
2483        db.close().unwrap();
2484        let key = DatabaseEntry::from_bytes(b"k");
2485        let val = DatabaseEntry::from_bytes(b"v");
2486        let opts = WriteOptions::new();
2487        assert!(db.put_with_options(None, &key, &val, &opts).is_err());
2488    }
2489
2490    // ========================================================================
2491    // Audit database F11 — Wave 2C-4: reject None-data keys on writes.
2492    // ========================================================================
2493
2494    /// `put` with a `DatabaseEntry::new()` (no data set) returns
2495    /// `IllegalArgument` instead of silently writing under an empty key.
2496    #[test]
2497    fn test_put_with_none_key_returns_illegal_argument() {
2498        let (_tmp, _env, db) = temp_env_and_db();
2499        let none_key = DatabaseEntry::new();
2500        let val = DatabaseEntry::from_bytes(b"v");
2501        let result = db.put(None, &none_key, &val);
2502        assert!(matches!(result, Err(NoxuError::IllegalArgument(_))));
2503    }
2504
2505    /// `put_no_overwrite` likewise rejects `None`-data keys.
2506    #[test]
2507    fn test_put_no_overwrite_with_none_key_returns_illegal_argument() {
2508        let (_tmp, _env, db) = temp_env_and_db();
2509        let none_key = DatabaseEntry::new();
2510        let val = DatabaseEntry::from_bytes(b"v");
2511        let result = db.put_no_overwrite(None, &none_key, &val);
2512        assert!(matches!(result, Err(NoxuError::IllegalArgument(_))));
2513    }
2514
2515    /// `put_with_options` likewise rejects `None`-data keys.
2516    #[test]
2517    fn test_put_with_options_with_none_key_returns_illegal_argument() {
2518        use crate::write_options::WriteOptions;
2519        let (_tmp, _env, db) = temp_env_and_db();
2520        let none_key = DatabaseEntry::new();
2521        let val = DatabaseEntry::from_bytes(b"v");
2522        let opts = WriteOptions::new();
2523        let result = db.put_with_options(None, &none_key, &val, &opts);
2524        assert!(matches!(result, Err(NoxuError::IllegalArgument(_))));
2525    }
2526
2527    /// Explicit `Some(&[])` empty key is still accepted on writes.
2528    #[test]
2529    fn test_put_with_explicit_empty_key_accepted() {
2530        let (_tmp, _env, db) = temp_env_and_db();
2531        let empty_key = DatabaseEntry::from_bytes(b"");
2532        let val = DatabaseEntry::from_bytes(b"v");
2533        let status = db.put(None, &empty_key, &val).unwrap();
2534        assert_eq!(status, OperationStatus::Success);
2535    }
2536
2537    // ── X-13: env-invalidity checks propagate through check_open ──────────────
2538
2539    /// X-13: after the `io_invalid` flag is set, `db.get` must return
2540    /// `EnvironmentFailure` rather than silently reading stale BIN data.
2541    #[test]
2542    fn test_x13_io_invalid_blocks_db_get() {
2543        use std::sync::atomic::Ordering;
2544        let (_tmp, env, db) = temp_env_and_db();
2545
2546        // Write a record so there is something to read.
2547        let key = DatabaseEntry::from_bytes(b"k");
2548        let val = DatabaseEntry::from_bytes(b"v");
2549        db.put(None, &key, &val).unwrap();
2550
2551        // Flip io_invalid via the cached LogManager.
2552        let lm = db.log_manager.as_ref().expect("WAL env must have LogManager");
2553        lm.io_invalid.store(true, Ordering::Release);
2554
2555        // db.get must now fail.
2556        let mut out = DatabaseEntry::new();
2557        let result = db.get(None, &key, &mut out);
2558        assert!(
2559            matches!(result, Err(NoxuError::EnvironmentFailure { .. })),
2560            "expected EnvironmentFailure, got {result:?}"
2561        );
2562
2563        // db.put must also fail.
2564        let result2 = db.put(None, &key, &val);
2565        assert!(
2566            matches!(result2, Err(NoxuError::EnvironmentFailure { .. })),
2567            "expected EnvironmentFailure on put, got {result2:?}"
2568        );
2569
2570        // Restore flag so env closes cleanly.
2571        lm.io_invalid.store(false, Ordering::Release);
2572        drop(env);
2573    }
2574
2575    /// X-13: after `EnvironmentImpl::invalidate()`, cursor `get_first`
2576    /// must return `EnvironmentFailure`.
2577    #[test]
2578    fn test_x13_env_invalid_blocks_cursor_get() {
2579        use std::sync::atomic::Ordering;
2580        let (_tmp, env, db) = temp_env_and_db();
2581
2582        // Insert a record.
2583        let key = DatabaseEntry::from_bytes(b"ck");
2584        let val = DatabaseEntry::from_bytes(b"cv");
2585        db.put(None, &key, &val).unwrap();
2586
2587        // Open a cursor BEFORE invalidating.
2588        let mut cursor = db.open_cursor(None, None).unwrap();
2589
2590        // Now directly flip the env_invalid flag.
2591        db.env_invalid.store(true, Ordering::Release);
2592
2593        // The cursor's check_state should detect the flag.
2594        let mut key = DatabaseEntry::new();
2595        let mut out = DatabaseEntry::new();
2596        let result =
2597            cursor.get(&mut key, &mut out, crate::get::Get::First, None);
2598        assert!(
2599            matches!(result, Err(NoxuError::EnvironmentFailure { .. })),
2600            "expected EnvironmentFailure from cursor, got {result:?}"
2601        );
2602
2603        // Restore so env drops cleanly.
2604        db.env_invalid.store(false, Ordering::Release);
2605        drop(env);
2606    }
2607}