noxu_db/database.rs
1//! Database handle.
2//!
3
4use crate::cursor::Cursor;
5use crate::cursor_config::CursorConfig;
6use crate::database_config::DatabaseConfig;
7use crate::database_entry::DatabaseEntry;
8use crate::database_stats::{BtreeStats, DatabaseStats};
9use crate::error::{NoxuError, Result};
10use crate::join_config::JoinConfig;
11use crate::join_cursor::JoinCursor;
12use crate::lock_mode::LockMode;
13use crate::operation_status::OperationStatus;
14use crate::preload::{PreloadConfig, PreloadStats};
15use crate::read_options::ReadOptions;
16use crate::secondary_cursor::SecondaryCursor;
17use crate::sequence::Sequence;
18use crate::sequence_config::SequenceConfig;
19use crate::stats_config::StatsConfig;
20use crate::transaction::Transaction;
21use crate::write_options::WriteOptions;
22use bytes::Bytes;
23use noxu_dbi::{
24 CursorImpl, DatabaseImpl, EnvironmentImpl, GetMode, PutMode, SearchMode,
25 ThroughputStats,
26};
27use noxu_log::LogManager;
28use noxu_sync::{Mutex, RwLock};
29use noxu_txn::{Durability, LockManager, TxnManager, UndoRecord};
30use noxu_util::lsn::Lsn;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33
34/// A database handle.
35///
36///
37///
38/// Database handles provide methods for inserting, retrieving, and
39/// deleting records. A database belongs to a single environment.
40///
41/// # Example
42/// ```ignore
43/// use noxu_db::{Environment, EnvironmentConfig, DatabaseConfig, DatabaseEntry};
44/// use std::path::PathBuf;
45///
46/// let env_config = EnvironmentConfig::new(PathBuf::from("/tmp/mydb"))
47/// .allow_create(true);
48/// let env = Environment::open(env_config).unwrap();
49///
50/// let db_config = DatabaseConfig::new().allow_create(true);
51/// let db = env.open_database(None, "mydb", &db_config).unwrap();
52///
53/// let key = DatabaseEntry::from_bytes(b"key1");
54/// let value = DatabaseEntry::from_bytes(b"value1");
55/// db.put( &key, &value).unwrap();
56///
57/// db.close().unwrap();
58/// env.close().unwrap();
59/// ```
60pub struct Database {
61 /// Name of this database
62 name: String,
63 /// Database ID
64 id: u64,
65 /// Configuration
66 config: DatabaseConfig,
67 /// The underlying DatabaseImpl (shared with the EnvironmentImpl).
68 pub(crate) db_impl: Arc<RwLock<DatabaseImpl>>,
69 /// Back-reference to the owning EnvironmentImpl (for close/cleanup).
70 env_impl: Arc<Mutex<EnvironmentImpl>>,
71 /// Shared open flag — same `Arc<AtomicBool>` as the environment's
72 /// `DatabaseHandle.open`, so that `Database::close()` automatically
73 /// marks the environment-side handle as closed too.
74 open: Arc<AtomicBool>,
75 /// Throughput counters for this database's operations.
76 ///
77 /// Cloned from `DatabaseImpl.throughput` at open time so that
78 /// `get()`, `put()`, `delete()` can increment stats without
79 /// locking `db_impl`.
80 throughput: Arc<ThroughputStats>,
81 /// Cached lock manager — acquired once at open, never changes.
82 /// Eliminates per-operation `env_impl.lock()` on the hot read/write path.
83 lock_manager: Arc<LockManager>,
84 /// Cached log manager — acquired once at open, None for no-WAL envs.
85 /// Eliminates per-operation `env_impl.lock()` on the hot read/write path.
86 log_manager: Option<Arc<LogManager>>,
87 /// Cached disk-limit tracker (JE: the env's `getDiskLimitViolation()`).
88 /// `Some` only for user databases; internal databases are exempt from the
89 /// limit so the cleaner/checkpointer can still write to free space.
90 /// Wired into each user cursor so the write path can refuse writes while a
91 /// disk limit is violated without locking `env_impl`.
92 disk_limit: Option<Arc<noxu_dbi::disk_limit::DiskLimitTracker>>,
93 /// Cached environment-invalidity flag (X-13).
94 ///
95 /// Cloned from `EnvironmentImpl::is_invalid_flag()` at `Database::new()`
96 /// time so `check_open()` can detect a failed environment without
97 /// acquiring `env_impl.lock()` on every read/write operation.
98 env_invalid: Arc<std::sync::atomic::AtomicBool>,
99 /// Cached cleaner throttle — acquired once at open, None when no cleaner.
100 /// Used by put() for write-path backpressure without locking env_impl.
101 cleaner_throttle: Option<Arc<noxu_cleaner::CleanerThrottle>>,
102 /// Cached cleaner file protector — acquired once at open, None when no
103 /// cleaner. Passed to a `DiskOrderedCursor` producer so it can protect
104 /// the files it scans from cleaner deletion mid-scan (CLN-7).
105 file_protector: Option<Arc<noxu_cleaner::FileProtector>>,
106 /// Cached transaction manager — acquired once at open.
107 ///
108 /// Used by [`Self::with_auto_txn`] to allocate a synthetic auto-commit
109 /// `Txn` per `txn = None` write so the lock manager sees a typed
110 /// locker id from the explicit-txn id space (`"auto-txn:<id>"` in
111 /// deadlock messages) and the auto-commit op gets full abort-undo
112 /// semantics on any error path. Closes the F12 residuals.
113 txn_manager: Arc<TxnManager>,
114 /// EV-15: cached evictor — acquired once at open, drives per-write
115 /// synchronous critical eviction (write back-pressure) without locking
116 /// env_impl. Mirrors JE `EnvironmentImpl.criticalEviction` being called
117 /// before every cursor operation.
118 evictor: Arc<noxu_dbi::Evictor>,
119 /// If true, auto-commit writes skip the log flush entirely (: TXN_NO_SYNC).
120 no_sync: bool,
121 /// If true, auto-commit writes flush to OS but skip fdatasync (: TXN_WRITE_NO_SYNC).
122 write_no_sync: bool,
123 /// Registered secondary indexes that automatically maintain themselves
124 /// when this primary is written. v1.6 (Decision 1B / audit C3 — the
125 /// associate()-style hook): every [`SecondaryDatabase`] opened against
126 /// this primary downgrades its `Arc<SecondaryHookState>` to a
127 /// `Weak<dyn SecondaryHook>` and pushes it here. `Database::put` and
128 /// `Database::delete` walk the list under the same caller-supplied
129 /// txn so primary writes and secondary index updates commit / abort
130 /// atomically.
131 ///
132 /// Stored behind an `Arc<RwLock<…>>` (rather than directly on the
133 /// `Database` body) so registrations performed through one of the
134 /// `Arc<Mutex<Database>>` clones the user typically holds become
135 /// visible to every other clone of the same primary.
136 pub(crate) secondaries: Arc<
137 RwLock<
138 Vec<
139 std::sync::Weak<
140 dyn crate::secondary_database::SecondaryHook + Send + Sync,
141 >,
142 >,
143 >,
144 >,
145 /// Foreign-key referrer registry: every child secondary whose
146 /// `foreign_key_database` points at *this* primary downgrades its
147 /// hook to a `Weak<dyn FkReferrer>` and pushes it here. When this
148 /// primary is deleted, every entry is consulted to apply
149 /// `ForeignKeyDeleteAction::Abort` (v1.6 step 8) /
150 /// `Cascade` (step 9) / `Nullify` (step 10).
151 pub(crate) fk_referrers: Arc<
152 RwLock<
153 Vec<
154 std::sync::Weak<
155 dyn crate::secondary_database::FkReferrer + Send + Sync,
156 >,
157 >,
158 >,
159 >,
160}
161
162/// State of a database handle.
163///
164///
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum DbState {
167 /// Database is open and operational
168 Open,
169 /// Database has been closed
170 Closed,
171 /// Database is in an invalid state
172 Invalid,
173}
174
175impl Database {
176 /// Creates a CursorImpl, wired to the WAL and lock manager when the
177 /// environment has them.
178 ///
179 /// Uses cached `lock_manager` / `log_manager` to avoid acquiring
180 /// `env_impl.lock()` on every operation.
181 fn make_cursor(&self) -> CursorImpl {
182 self.make_cursor_with_locker(0)
183 }
184
185 /// Creates a CursorImpl with an explicit `locker_id`.
186 ///
187 /// Auto-commit cursors use `0`; transactional cursors must use the
188 /// owning `Transaction::id` so that the LN log entries written by
189 /// `cursor.put` / `cursor.delete` carry the txn id and recovery's
190 /// commit/abort tracking can correctly skip aborted txns. Without
191 /// this, every LN entry was written with `txn_id = None` (the
192 /// auto-commit form) and recovery treated aborted-txn writes as
193 /// committed once `env_impl.close()` started running on `env.close()`
194 /// after a successful commit/abort (F1).
195 fn make_cursor_with_locker(&self, locker_id: i64) -> CursorImpl {
196 match &self.log_manager {
197 Some(lm) => {
198 let mut c = CursorImpl::with_log_manager(
199 Arc::clone(&self.db_impl),
200 locker_id,
201 Arc::clone(lm),
202 )
203 .with_env_invalid(Arc::clone(&self.env_invalid))
204 .with_lock_manager(Arc::clone(&self.lock_manager))
205 .with_txn_manager(Arc::clone(&self.txn_manager));
206 // Gate user writes on the disk limit (None for internal DBs).
207 if let Some(dl) = &self.disk_limit {
208 c = c.with_disk_limit(Arc::clone(dl));
209 }
210 c
211 }
212 None => CursorImpl::new(Arc::clone(&self.db_impl), locker_id)
213 .with_env_invalid(Arc::clone(&self.env_invalid))
214 .with_lock_manager(Arc::clone(&self.lock_manager)),
215 }
216 }
217
218 /// Creates a CursorImpl without a lock manager (dirty-read / read-uncommitted).
219 ///
220 /// Used by `get_with_options()` when `ReadOptions.lock_mode == ReadUncommitted`.
221 /// Skips all lock acquisition so the cursor reads directly from the BIN
222 /// without blocking on write locks — mirrors 's read-uncommitted cursor.
223 fn make_cursor_no_lock(&self) -> CursorImpl {
224 match &self.log_manager {
225 Some(lm) => CursorImpl::with_log_manager(
226 Arc::clone(&self.db_impl),
227 0,
228 Arc::clone(lm),
229 )
230 .with_env_invalid(Arc::clone(&self.env_invalid)),
231 None => CursorImpl::new(Arc::clone(&self.db_impl), 0)
232 .with_env_invalid(Arc::clone(&self.env_invalid)),
233 }
234 }
235
236 /// Creates a CursorImpl wired to the given transaction for write-lock tracking.
237 ///
238 /// Behaves like `make_cursor()` but additionally calls `.with_txn()` so
239 /// that write operations acquire locks via the transaction's `Txn` and
240 /// record abort before-images in `WriteLockInfo`.
241 ///
242 /// In which passes the
243 /// transaction's `Locker` to the new `CursorImpl`.
244 fn make_cursor_for_txn(&self, txn: &Transaction) -> CursorImpl {
245 // Use the transaction id as the cursor's locker_id so that LN
246 // log entries written under this cursor carry the txn id
247 // (recovery's analysis pass uses LN.txn_id together with
248 // TxnCommit / TxnAbort records to decide whether to redo or
249 // undo the LN). Pre-fix this was hardcoded to 0, which made
250 // every txn-LN look like an auto-commit LN and caused
251 // recovery to redo aborted writes.
252 let cursor = self.make_cursor_with_locker(txn.id() as i64);
253 if let Some(inner) = txn.get_inner_txn() {
254 cursor.with_txn(inner)
255 } else {
256 cursor
257 }
258 }
259
260 /// Allocates a synthetic auto-commit `Txn` and runs `op` under it.
261 ///
262 /// `op` receives an auto-commit-wired [`CursorImpl`] (locker_id = 0
263 /// so the LN is logged with the auto-commit `InsertLN` / `DeleteLN`
264 /// form, txn_ref set so the lock manager sees the synthetic auto-txn
265 /// as the owner).
266 ///
267 /// On `Ok(value)`:
268 /// * Drops the cursor (closing it).
269 /// * Calls [`Txn::commit_with_durability`] on the synthetic auto-txn
270 /// with a [`Durability`] derived from the database's
271 /// `no_sync` / `write_no_sync` config; this releases all locks
272 /// and (for `CommitSync`) fsyncs up to the LN's LSN via
273 /// `LogManager::flush_sync_if_needed` for many-to-one fsync
274 /// coalescing under concurrent write load.
275 /// * Records the commit in the txn manager statistics and removes
276 /// the diagnostic locker label.
277 ///
278 /// On `Err(e)`:
279 /// * Drops the cursor.
280 /// * Calls [`Txn::abort_collect_undo`] to harvest before-image undo
281 /// records WITHOUT releasing the held write locks (so a reader
282 /// blocked on a write lock cannot observe the in-flight value
283 /// before we restore the before-image).
284 /// * Applies the undo records to the in-memory B-tree.
285 /// * Calls [`Txn::release_all_locks`] to drain the held locks.
286 /// * Records the abort in the txn manager statistics and removes
287 /// the diagnostic locker label.
288 /// * Returns `Err(e)`.
289 ///
290 /// Closes the first F12 residual: an auto-commit op now goes through
291 /// the same lock-tracking and abort-undo machinery as an explicit
292 /// transaction, so two concurrent inserts of the same brand-new key
293 /// serialise through the lock manager and a forced mid-write failure
294 /// rolls back the in-memory tree mutation.
295 fn with_auto_txn<F, T>(&self, op: F) -> Result<T>
296 where
297 F: FnOnce(&mut CursorImpl) -> Result<T>,
298 {
299 let auto_txn =
300 self.txn_manager.begin_auto_txn(self.log_manager.clone());
301 let synthetic_id = auto_txn.id_as_locker();
302 let auto_txn_arc = Arc::new(std::sync::Mutex::new(auto_txn));
303
304 let mut cursor = self.make_cursor();
305 cursor.attach_txn(Arc::clone(&auto_txn_arc));
306
307 let result = op(&mut cursor);
308 // Drop the cursor handle (un-pins BIN, drops Arc<Mutex<Txn>> ref).
309 drop(cursor);
310
311 // Reclaim sole ownership of the synthetic auto-txn so we can
312 // finalise it. All cursors and their Arcs were dropped above.
313 let mut auto_txn = match Arc::try_unwrap(auto_txn_arc) {
314 Ok(m) => m.into_inner().unwrap_or_else(|p| p.into_inner()),
315 Err(_arc) => {
316 // A cursor escaped the closure with a clone of the txn
317 // Arc. This is a caller-side bug — leak the auto-txn
318 // (its Drop calls `close()` which aborts) and surface a
319 // typed error. No undo applied because we cannot
320 // safely take the Txn out of the shared Arc.
321 return Err(NoxuError::OperationNotAllowed(
322 "with_auto_txn: synthetic auto-txn outlived cursor scope"
323 .to_string(),
324 ));
325 }
326 };
327 let txn_manager = Arc::clone(&self.txn_manager);
328
329 match result {
330 Ok(value) => {
331 let durability = if self.no_sync {
332 Durability::CommitNoSync
333 } else if self.write_no_sync {
334 Durability::CommitWriteNoSync
335 } else {
336 Durability::CommitSync
337 };
338 if let Err(e) = auto_txn.commit_with_durability(durability) {
339 // Commit failed (e.g. log fsync error). Undo the
340 // in-memory tree write so we are not left in an
341 // inconsistent state, then surface the error.
342 let undo_records =
343 auto_txn.abort_collect_undo().unwrap_or_default();
344 self.apply_auto_txn_undo(undo_records);
345 auto_txn.release_all_locks();
346 txn_manager.abort_txn(synthetic_id);
347 return Err(NoxuError::OperationNotAllowed(format!(
348 "auto-commit fsync failed: {e}"
349 )));
350 }
351 txn_manager.commit_txn(synthetic_id);
352 Ok(value)
353 }
354 Err(e) => {
355 // Phase 1: collect undo without releasing write locks.
356 let undo_records =
357 auto_txn.abort_collect_undo().unwrap_or_default();
358 // Phase 2: apply undo while write locks are still held
359 // so any concurrent reader blocked on a write lock
360 // cannot observe the in-flight value.
361 self.apply_auto_txn_undo(undo_records);
362 // Phase 3: drain locks.
363 auto_txn.release_all_locks();
364 txn_manager.abort_txn(synthetic_id);
365 Err(e)
366 }
367 }
368 }
369
370 /// Applies undo records collected from a synthetic auto-txn to the
371 /// in-memory B-tree of `self`. Mirrors the per-`undo_record` block
372 /// in `Transaction::abort()` but specialised for the
373 /// single-database `with_auto_txn` case so we do not need to thread
374 /// the env-impl in.
375 fn apply_auto_txn_undo(&self, mut undo_records: Vec<UndoRecord>) {
376 // Apply newest-LSN first so multi-step writes (delete + reinsert)
377 // unwind in reverse-operation order. See the matching sort in
378 // `Transaction::abort()`.
379 undo_records.sort_by_key(|r| std::cmp::Reverse(r.current_lsn));
380 let db_id_match = self.id;
381 let db_guard = self.db_impl.read();
382 let Some(tree) = db_guard.get_real_tree() else { return };
383 for undo in undo_records {
384 // The synthetic auto-txn touches only this database, but be
385 // defensive in case future changes broaden the contract.
386 if undo.database_id != db_id_match {
387 continue;
388 }
389 let Some(abort_key) = undo.abort_key else { continue };
390 if undo.abort_known_deleted {
391 if tree.delete(&abort_key) {
392 db_guard.decrement_entry_count();
393 }
394 } else if let Some(abort_data) = undo.abort_data {
395 let lsn = noxu_util::Lsn::from_u64(undo.abort_lsn);
396 if let Ok(is_new) = tree.insert(abort_key, abort_data, lsn)
397 && is_new
398 {
399 // Restoring a slot that the aborted txn had deleted:
400 // re-bump the counter that the in-memory delete
401 // already decremented.
402 db_guard.increment_entry_count();
403 }
404 }
405 }
406 }
407
408 /// Auto-commit flush: when `txn` is `None` (auto-commit mode), flush and
409 /// fsync the log before returning to the caller.
410 ///
411 /// `write_lsn` is the LSN assigned to the write operation just performed.
412 /// Port of`LogManager.flushTo(lsn)`: if a concurrent committer already
413 /// flushed past `write_lsn`, the fdatasync is skipped entirely, giving
414 /// natural many:1 fsync coalescing under concurrent write load with no
415 /// explicit group-commit configuration required.
416 #[allow(dead_code)] // unwired helper kept for the documented coalescing path
417 fn auto_commit_sync(
418 &self,
419 txn: Option<&Transaction>,
420 write_lsn: Lsn,
421 ) -> Result<()> {
422 if txn.is_some() {
423 return Ok(()); // explicit txn handles its own commit/fsync
424 }
425 if self.no_sync {
426 return Ok(()); // : TXN_NO_SYNC — skip log flush entirely
427 }
428 if let Some(lm) = &self.log_manager {
429 if self.write_no_sync {
430 // : TXN_WRITE_NO_SYNC — flush to OS buffer, no fdatasync
431 lm.flush_no_sync().map_err(|e| {
432 NoxuError::OperationNotAllowed(e.to_string())
433 })?;
434 } else {
435 // : flushTo(lsn) — skip if already covered by another flush.
436 lm.flush_sync_if_needed(write_lsn).map_err(|e| {
437 NoxuError::OperationNotAllowed(e.to_string())
438 })?;
439 }
440 }
441 Ok(())
442 }
443
444 /// Creates a new database handle.
445 ///
446 /// Internal constructor called by Environment.
447 ///
448 /// `open_flag` is a shared `Arc<AtomicBool>` that is also stored in the
449 /// environment's `DatabaseHandle` for this database. Setting it to `false`
450 /// (via `Database::close()`) simultaneously marks the env-side handle as
451 /// closed, allowing `Environment::close()` to succeed without a separate
452 /// callback.
453 pub(crate) fn new(
454 name: String,
455 id: u64,
456 config: DatabaseConfig,
457 db_impl: Arc<RwLock<DatabaseImpl>>,
458 env_impl: Arc<Mutex<EnvironmentImpl>>,
459 open_flag: Arc<AtomicBool>,
460 no_sync: bool,
461 write_no_sync: bool,
462 ) -> Self {
463 let throughput = db_impl.read().throughput.clone();
464 // Cache the manager Arcs at construction so hot-path operations
465 // (get/put/delete) never need to re-acquire env_impl.lock().
466 let (
467 lock_manager,
468 log_manager,
469 cleaner_throttle,
470 file_protector,
471 txn_manager,
472 env_invalid,
473 evictor,
474 ) = {
475 let env = env_impl.lock();
476 let lm = Arc::clone(env.get_lock_manager());
477 let logm = env.get_log_manager();
478 let ct = env.get_cleaner_throttle();
479 let fp = env.get_file_protector();
480 let txnm = Arc::clone(env.get_txn_manager());
481 let inv = env.is_invalid_flag();
482 let ev = env.get_evictor();
483 (lm, logm, ct, fp, txnm, inv, ev)
484 };
485 // Disk-limit tracker: wire it only for user databases (JE exempts
486 // internal DBs in Cursor.checkUpdatesAllowed via
487 // dbImpl.getDbType().isInternal()). Internal DBs leave this None so
488 // the cleaner/checkpointer/recovery writes through them are never
489 // blocked by the limit.
490 let disk_limit = if db_impl.read().get_db_type().is_internal() {
491 None
492 } else {
493 Some(env_impl.lock().get_disk_limit())
494 };
495 Database {
496 name,
497 id,
498 config,
499 db_impl,
500 env_impl,
501 open: open_flag,
502 throughput,
503 lock_manager,
504 log_manager,
505 disk_limit,
506 env_invalid,
507 cleaner_throttle,
508 file_protector,
509 txn_manager,
510 evictor,
511 no_sync,
512 write_no_sync,
513 secondaries: Arc::new(RwLock::new(Vec::new())),
514 fk_referrers: Arc::new(RwLock::new(Vec::new())),
515 }
516 }
517
518 /// Retrieves a record by key, auto-committing the read.
519 ///
520 /// Idiomatic Rust lookup: `Some(value)` if found, `None` if the key
521 /// is absent, `Err` only on a real failure (review P0-3). Keys accept
522 /// any `impl AsRef<[u8]>` (review P1-3) — `b"k"`, `&str`, `Vec<u8>`,
523 /// `Bytes`, etc., no `DatabaseEntry` wrapper required.
524 ///
525 /// For an explicit transaction use [`Self::get_in`]; for zero-alloc
526 /// buffer reuse or partial reads use [`Self::get_into`].
527 ///
528 /// # Errors
529 /// Returns an error if the database is closed or the environment failed.
530 pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<Bytes>> {
531 self.get_bytes(None, key.as_ref())
532 }
533
534 /// Retrieves a record by key within an explicit transaction (review
535 /// P0-2: auto-commit vs transactional is a *named* choice, not a bare
536 /// `None`).
537 ///
538 /// # Errors
539 /// Returns an error if the database is closed, the environment failed,
540 /// or a transactional handle is used against a non-transactional DB.
541 pub fn get_in(
542 &self,
543 txn: &Transaction,
544 key: impl AsRef<[u8]>,
545 ) -> Result<Option<Bytes>> {
546 self.get_bytes(Some(txn), key.as_ref())
547 }
548
549 /// Shared `Result<Option<Bytes>>` read used by [`Self::get`] /
550 /// [`Self::get_in`] (review P0-3). Semantics are identical to the
551 /// pre-7.0 `get` out-param path — found = `Some`, NotFound = `None`,
552 /// error = `Err` — only the *shape* changed.
553 fn get_bytes(
554 &self,
555 txn: Option<&Transaction>,
556 key_bytes: &[u8],
557 ) -> Result<Option<Bytes>> {
558 self.check_open()?;
559 self.reject_txn_on_non_txnal_db(txn.is_some())?;
560 observe_span!(
561 "db_get",
562 db_name = self.name.as_str(),
563 key_size = key_bytes.len(),
564 );
565 let _obs_timer = observe_timer_start!();
566 observe_counter!("noxu_db_operations_total", "op" => "get");
567
568 let mut cursor = match txn {
569 Some(t) => self.make_cursor_for_txn(t),
570 None => self.make_cursor(),
571 };
572 match cursor
573 .search(key_bytes, None, SearchMode::Set)
574 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
575 {
576 noxu_dbi::OperationStatus::Success => {
577 let (_, value) = cursor.get_current().map_err(|e| {
578 NoxuError::OperationNotAllowed(e.to_string())
579 })?;
580 self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
581 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "get");
582 Ok(Some(Bytes::from(value)))
583 }
584 _ => {
585 self.throughput
586 .n_pri_search_fails
587 .fetch_add(1, Ordering::Relaxed);
588 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "get");
589 Ok(None)
590 }
591 }
592 }
593
594 /// Zero-alloc / partial-read escape hatch (review P0-3, P1-3).
595 ///
596 /// Reads into a caller-owned [`DatabaseEntry`] so the buffer can be
597 /// reused across calls, and honours `data.is_partial()` for
598 /// partial reads (the offset/length machinery that `DatabaseEntry`
599 /// exists for). Returns `true` if the record was found.
600 ///
601 /// `txn` is `Option` here because this is the low-level escape hatch;
602 /// the idiomatic named-choice surface is [`Self::get`] / [`Self::get_in`].
603 ///
604 /// # Errors
605 /// Returns an error if the database is closed or the environment failed.
606 pub fn get_into(
607 &self,
608 txn: Option<&Transaction>,
609 key: impl AsRef<[u8]>,
610 data: &mut DatabaseEntry,
611 ) -> Result<bool> {
612 self.check_open()?;
613 self.reject_txn_on_non_txnal_db(txn.is_some())?;
614 let key_bytes = key.as_ref();
615
616 let mut cursor = match txn {
617 Some(t) => self.make_cursor_for_txn(t),
618 None => self.make_cursor(),
619 };
620 match cursor
621 .search(key_bytes, None, SearchMode::Set)
622 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
623 {
624 noxu_dbi::OperationStatus::Success => {
625 let (_, value) = cursor.get_current().map_err(|e| {
626 NoxuError::OperationNotAllowed(e.to_string())
627 })?;
628 // Partial get: return only the requested slice.
629 if data.is_partial() {
630 let off = data.partial_offset();
631 let len = data.partial_length();
632 let end = (off + len).min(value.len());
633 let slice =
634 if off < value.len() { &value[off..end] } else { &[] };
635 data.set_data(slice);
636 } else {
637 data.set_data(&value);
638 }
639 self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
640 Ok(true)
641 }
642 _ => {
643 self.throughput
644 .n_pri_search_fails
645 .fetch_add(1, Ordering::Relaxed);
646 Ok(false)
647 }
648 }
649 }
650
651 /// Retrieves a record with per-operation read options (escape hatch;
652 /// review P0-3 returns `Result<Option<Bytes>>`, P1-3 takes
653 /// `impl AsRef<[u8]>`).
654 ///
655 /// Mirrors `Cursor.get()` with `ReadOptions` applied:
656 /// - `LockMode::ReadUncommitted` — dirty read, no lock acquired
657 /// - `LockMode::ReadCommitted` — read-committed isolation (standard locking)
658 /// - `LockMode::Rmw` — acquire write lock for read-modify-write
659 /// - `LockMode::Default` — environment default isolation
660 ///
661 /// `CacheMode` in `ReadOptions` is advisory (accepted but not yet honored):
662 /// the per-operation hint does not reach the evictor and has no effect
663 /// today. See [`crate::CacheMode`] for the tracking note.
664 ///
665 /// # Arguments
666 /// * `txn` - Optional transaction handle
667 /// * `key` - The search key
668 /// * `opts` - Per-operation read options (isolation, cache hints)
669 ///
670 /// # Returns
671 /// `Some(value)` if found, `None` otherwise.
672 pub fn get_with_options(
673 &self,
674 txn: Option<&Transaction>,
675 key: impl AsRef<[u8]>,
676 opts: &ReadOptions,
677 ) -> Result<Option<Bytes>> {
678 self.check_open()?;
679 self.reject_txn_on_non_txnal_db(txn.is_some())?;
680 let key_bytes = key.as_ref();
681 observe_span!(
682 "db_get_with_options",
683 db_name = self.name.as_str(),
684 key_size = key_bytes.len(),
685 lock_mode = format!("{:?}", opts.lock_mode),
686 );
687 let _obs_timer = observe_timer_start!();
688 observe_counter!("noxu_db_operations_total", "op" => "get_with_options");
689
690 let mut cursor = match opts.lock_mode {
691 LockMode::ReadUncommitted => self.make_cursor_no_lock(),
692 _ => match txn {
693 Some(t) => self.make_cursor_for_txn(t),
694 None => self.make_cursor(),
695 },
696 };
697
698 match cursor
699 .search(key_bytes, None, SearchMode::Set)
700 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
701 {
702 noxu_dbi::OperationStatus::Success => {
703 let (_, value) = cursor.get_current().map_err(|e| {
704 NoxuError::OperationNotAllowed(e.to_string())
705 })?;
706 // JE LockMode.RMW (Cursor.java:5281): an RMW read takes a WRITE
707 // lock on the record so a later update in the same txn cannot
708 // deadlock and a concurrent writer blocks at read time.
709 if matches!(opts.lock_mode, LockMode::Rmw) {
710 cursor.upgrade_current_to_write_lock().map_err(|e| {
711 NoxuError::OperationNotAllowed(e.to_string())
712 })?;
713 }
714 self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
715 observe_timer_record!(
716 _obs_timer,
717 "noxu_db_operation_duration_seconds",
718 "op" => "get_with_options"
719 );
720 Ok(Some(Bytes::from(value)))
721 }
722 _ => {
723 self.throughput
724 .n_pri_search_fails
725 .fetch_add(1, Ordering::Relaxed);
726 observe_timer_record!(
727 _obs_timer,
728 "noxu_db_operation_duration_seconds",
729 "op" => "get_with_options"
730 );
731 Ok(None)
732 }
733 }
734 }
735
736 /// Inserts or updates a record, auto-committing the write (review
737 /// P0-2: auto-commit is the unadorned `put`; the transactional form is
738 /// the named [`Self::put_in`]). Keys and values accept any
739 /// `impl AsRef<[u8]>` (review P1-3).
740 ///
741 /// # Errors
742 /// Returns an error if the database is closed or read-only.
743 pub fn put(
744 &self,
745 key: impl AsRef<[u8]>,
746 data: impl AsRef<[u8]>,
747 ) -> Result<()> {
748 self.put_bytes(None, key.as_ref(), data.as_ref())
749 }
750
751 /// Inserts or updates a record within an explicit transaction (review
752 /// P0-2).
753 ///
754 /// # Errors
755 /// Returns an error if the database is closed, read-only, or a
756 /// transactional handle is used against a non-transactional DB.
757 pub fn put_in(
758 &self,
759 txn: &Transaction,
760 key: impl AsRef<[u8]>,
761 data: impl AsRef<[u8]>,
762 ) -> Result<()> {
763 self.put_bytes(Some(txn), key.as_ref(), data.as_ref())
764 }
765
766 /// Shared byte-slice put used by [`Self::put`] / [`Self::put_in`]
767 /// (review P0-2/P1-3). Preserves the v6 semantics exactly — secondary
768 /// maintenance, triggers, auto-commit fsync and cleaner backpressure —
769 /// only the public signature shape changed.
770 fn put_bytes(
771 &self,
772 txn: Option<&Transaction>,
773 key_bytes: &[u8],
774 data_bytes: &[u8],
775 ) -> Result<()> {
776 self.check_open()?;
777 self.reject_txn_on_non_txnal_db(txn.is_some())?;
778 self.check_writable()?;
779 // EV-15: per-write synchronous critical eviction (write back-pressure).
780 // JE EnvironmentImpl.criticalEviction is called before every cursor
781 // operation; when the cache is critically over budget this writer
782 // thread evicts a bounded batch itself before allocating more.
783 self.evictor.do_critical_eviction();
784 observe_span!(
785 "db_put",
786 db_name = self.name.as_str(),
787 key_size = key_bytes.len(),
788 data_size = data_bytes.len(),
789 );
790 let _obs_timer = observe_timer_start!();
791 observe_counter!("noxu_db_operations_total", "op" => "put");
792
793 // v1.6 (audit C3 / step 6): if any secondaries are registered,
794 // capture the pre-put value of this key (if it exists) BEFORE
795 // the overwrite so we can pass it as `old_data` to the
796 // secondary key creator below. When the put is a fresh
797 // insert the read returns NotFound and `old_data_for_secondaries`
798 // remains `None`. We re-read here using the caller's txn for
799 // the read so isolation is honoured.
800 let secondaries_pre = self.live_secondaries();
801 let need_old_data =
802 !secondaries_pre.is_empty() || self.has_user_triggers();
803 let old_data_for_secondaries: Option<Vec<u8>> = if !need_old_data {
804 None
805 } else {
806 self.get_bytes(txn, key_bytes)?.map(|b| b.to_vec())
807 };
808
809 match txn {
810 Some(t) => {
811 let mut cursor = self.make_cursor_for_txn(t);
812 cursor
813 .put(key_bytes, data_bytes, PutMode::Overwrite)
814 .map_err(NoxuError::from)?;
815 }
816 None => {
817 // Wrap the write in a synthetic auto-commit `Txn` so the
818 // lock manager sees a typed locker id ("auto-txn:<id>")
819 // and any error rolls back the in-memory tree write
820 // through `Txn::abort_collect_undo`.
821 self.with_auto_txn(|cursor| {
822 cursor
823 .put(key_bytes, data_bytes, PutMode::Overwrite)
824 .map_err(NoxuError::from)?;
825 Ok(())
826 })?;
827 }
828 }
829
830 // DB-TRIG: fire Trigger.put within the transaction, after the change
831 // is applied. oldData = None for an insert, Some(prev) for an update;
832 // newData is the bytes just written. JE
833 // `TriggerManager.runPutTriggers(locker, dbImpl, key, oldData,
834 // newData)`.
835 self.fire_put_triggers(
836 txn,
837 key_bytes,
838 old_data_for_secondaries.as_deref(),
839 data_bytes,
840 );
841
842 // v1.6 (audit C3 — the associate()-style hook): drive every
843 // registered secondary index under the same caller-supplied
844 // txn so the primary record and its secondary entries commit /
845 // abort together.
846 let secondaries = self.live_secondaries();
847 if !secondaries.is_empty() {
848 let key_entry = DatabaseEntry::from_bytes(key_bytes);
849 let new_entry = DatabaseEntry::from_bytes(data_bytes);
850 let old_entry: Option<DatabaseEntry> = old_data_for_secondaries
851 .as_deref()
852 .map(DatabaseEntry::from_bytes);
853 for hook in secondaries {
854 hook.maintain(
855 txn,
856 &key_entry,
857 old_entry.as_ref(),
858 Some(&new_entry),
859 )?;
860 }
861 }
862
863 // Apply cleaner write-path backpressure for auto-commit (no-txn) bulk
864 // writes: if the log write rate exceeds the cleaner's capacity, sleep
865 // briefly so the cleaner can keep up. Transactional paths handle this
866 // in Transaction::commit_with_durability() instead.
867 if txn.is_none()
868 && let Some(delay) = self
869 .cleaner_throttle
870 .as_ref()
871 .and_then(|t| t.should_throttle_writer())
872 {
873 std::thread::sleep(delay);
874 }
875
876 self.throughput.n_pri_updates.fetch_add(1, Ordering::Relaxed);
877 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "put");
878 Ok(())
879 }
880
881 /// Partial-read/-write escape hatch (review P1-3: `DatabaseEntry` is
882 /// retained only where the offset/length actually matter).
883 ///
884 /// `data` must be configured with [`DatabaseEntry::set_partial`]; the
885 /// existing record's bytes outside `[offset, offset+length)` are
886 /// preserved and only the specified range is replaced (JE
887 /// `LN.combinePuts()`). The supplied `data` length must equal the
888 /// configured partial length.
889 ///
890 /// # Errors
891 /// Returns [`NoxuError::IllegalArgument`] if the data length does not
892 /// match the partial length, or if the database is closed / read-only.
893 pub fn put_partial(
894 &self,
895 txn: Option<&Transaction>,
896 key: impl AsRef<[u8]>,
897 data: &DatabaseEntry,
898 ) -> Result<()> {
899 self.check_open()?;
900 self.reject_txn_on_non_txnal_db(txn.is_some())?;
901 self.check_writable()?;
902 let key_bytes = key.as_ref();
903
904 // Partial put: read-modify-write using the partial offset/length.
905 // LN.combinePuts() — existing bytes outside [offset..offset+length]
906 // are preserved; only the specified range is replaced with new data.
907 // A length mismatch is rejected with a typed error rather than
908 // silently truncating (matches JE's exact-equality requirement).
909 let write_bytes: Vec<u8> = if data.is_partial() {
910 let new_bytes = data.data_opt().unwrap_or(&[]);
911 let off = data.partial_offset();
912 let len = data.partial_length();
913 if new_bytes.len() != len {
914 return Err(NoxuError::IllegalArgument(format!(
915 "partial put: data length {} does not match \
916 partial_length {} (partial_offset={}); JE \
917 requires exact equality",
918 new_bytes.len(),
919 len,
920 off
921 )));
922 }
923 // Fetch the existing record to splice into.
924 let existing = match self.get_bytes(txn, key_bytes)? {
925 Some(b) => b.to_vec(),
926 None => vec![0u8; off + len],
927 };
928 let total_len = (off + len).max(existing.len());
929 let mut patched = existing;
930 patched.resize(total_len, 0);
931 patched[off..off + len].copy_from_slice(new_bytes);
932 patched
933 } else {
934 data.data_opt().unwrap_or(&[]).to_vec()
935 };
936
937 self.put_bytes(txn, key_bytes, &write_bytes)
938 }
939
940 /// Inserts or updates a record with per-operation write options
941 /// (escape hatch; review P1-3 takes `impl AsRef<[u8]>`).
942 ///
943 /// Extends `put()` with `WriteOptions` support:
944 /// - `ttl` — if > 0, sets a per-record TTL expiration (hours from now); the
945 /// record will be treated as expired and invisible after the TTL elapses.
946 /// - `update_ttl` — if true and the record already exists, refreshes its TTL.
947 /// - `cache_mode` — advisory cache hint, accepted but not yet honored (no
948 /// effect today; see [`crate::CacheMode`]).
949 ///
950 /// # Arguments
951 /// * `txn` - Optional transaction handle
952 /// * `key` - The key to insert/update
953 /// * `data` - The data to store
954 /// * `opts` - Per-operation write options (TTL, cache hints)
955 pub fn put_with_options(
956 &self,
957 txn: Option<&Transaction>,
958 key: impl AsRef<[u8]>,
959 data: impl AsRef<[u8]>,
960 opts: &WriteOptions,
961 ) -> Result<()> {
962 let key_bytes = key.as_ref();
963 self.put_bytes(txn, key_bytes, data.as_ref())?;
964
965 // Apply TTL to the just-written BIN slot when requested.
966 //
967 // Audit database F8 (Wave 2C-4) — partial fix: the TTL update is
968 // still in-memory only; recovery does not yet replay it. The
969 // engine cannot distinguish insert-vs-update at this layer, so we
970 // always apply when `ttl > 0` and the underlying write succeeded.
971 if opts.ttl > 0 {
972 let expiration_hours =
973 noxu_util::current_time_hours().saturating_add(opts.ttl as u32);
974 self.db_impl
975 .read()
976 .update_key_expiration(key_bytes, expiration_hours);
977 }
978
979 Ok(())
980 }
981
982 /// Inserts a record, failing if the key already exists; auto-commits
983 /// (review P0-2/P0-3/P1-3).
984 ///
985 /// Returns `Ok(true)` if the record was inserted, `Ok(false)` if the
986 /// key already existed (the prior `OperationStatus::KeyExists` collapses
987 /// to the boolean per review P0-3).
988 ///
989 /// # Errors
990 /// Returns an error if the database is closed or read-only.
991 pub fn put_no_overwrite(
992 &self,
993 key: impl AsRef<[u8]>,
994 data: impl AsRef<[u8]>,
995 ) -> Result<bool> {
996 self.put_no_overwrite_bytes(None, key.as_ref(), data.as_ref())
997 }
998
999 /// Inserts a record within an explicit transaction, failing if the key
1000 /// already exists (review P0-2). `Ok(true)` = inserted.
1001 ///
1002 /// # Errors
1003 /// Returns an error if the database is closed, read-only, or a
1004 /// transactional handle is used against a non-transactional DB.
1005 pub fn put_no_overwrite_in(
1006 &self,
1007 txn: &Transaction,
1008 key: impl AsRef<[u8]>,
1009 data: impl AsRef<[u8]>,
1010 ) -> Result<bool> {
1011 self.put_no_overwrite_bytes(Some(txn), key.as_ref(), data.as_ref())
1012 }
1013
1014 /// Shared byte-slice no-overwrite insert. `Ok(true)` = inserted,
1015 /// `Ok(false)` = key already present.
1016 fn put_no_overwrite_bytes(
1017 &self,
1018 txn: Option<&Transaction>,
1019 key_bytes: &[u8],
1020 data_bytes: &[u8],
1021 ) -> Result<bool> {
1022 self.check_open()?;
1023 self.reject_txn_on_non_txnal_db(txn.is_some())?;
1024 self.check_writable()?;
1025
1026 let inserted = match txn {
1027 Some(t) => {
1028 let mut cursor = self.make_cursor_for_txn(t);
1029 !matches!(
1030 cursor
1031 .put(key_bytes, data_bytes, PutMode::NoOverwrite)
1032 .map_err(NoxuError::from)?,
1033 noxu_dbi::OperationStatus::KeyExist
1034 )
1035 }
1036 None => self.with_auto_txn(|cursor| {
1037 cursor
1038 .put(key_bytes, data_bytes, PutMode::NoOverwrite)
1039 .map_err(NoxuError::from)
1040 .map(|s| !matches!(s, noxu_dbi::OperationStatus::KeyExist))
1041 })?,
1042 };
1043 if inserted {
1044 // DB-TRIG: a successful no-overwrite put is always an insert, so
1045 // oldData is None. JE `TriggerManager.runPutTriggers`.
1046 self.fire_put_triggers(txn, key_bytes, None, data_bytes);
1047 self.throughput.n_pri_inserts.fetch_add(1, Ordering::Relaxed);
1048 } else {
1049 self.throughput.n_pri_insert_fails.fetch_add(1, Ordering::Relaxed);
1050 }
1051 Ok(inserted)
1052 }
1053
1054 /// Deletes a record by key, auto-committing (review P0-2/P0-3/P1-3).
1055 ///
1056 /// Returns `Ok(true)` if a record was deleted, `Ok(false)` if the key
1057 /// was absent (the prior `OperationStatus::NotFound` collapses to the
1058 /// boolean per review P0-3).
1059 ///
1060 /// # Errors
1061 /// Returns an error if the database is closed or read-only.
1062 pub fn delete(&self, key: impl AsRef<[u8]>) -> Result<bool> {
1063 self.delete_bytes(None, key.as_ref())
1064 }
1065
1066 /// Deletes a record by key within an explicit transaction (review
1067 /// P0-2). `Ok(true)` = deleted.
1068 ///
1069 /// # Errors
1070 /// Returns an error if the database is closed, read-only, or a
1071 /// transactional handle is used against a non-transactional DB.
1072 pub fn delete_in(
1073 &self,
1074 txn: &Transaction,
1075 key: impl AsRef<[u8]>,
1076 ) -> Result<bool> {
1077 self.delete_bytes(Some(txn), key.as_ref())
1078 }
1079
1080 /// Shared byte-slice delete. `Ok(true)` = deleted, `Ok(false)` = key
1081 /// absent. Preserves the v6 dup-handling, FK referrer, secondary and
1082 /// trigger semantics exactly.
1083 fn delete_bytes(
1084 &self,
1085 txn: Option<&Transaction>,
1086 key_bytes: &[u8],
1087 ) -> Result<bool> {
1088 self.check_open()?;
1089 self.reject_txn_on_non_txnal_db(txn.is_some())?;
1090 self.check_writable()?;
1091 // EV-15: per-write synchronous critical eviction (write back-pressure).
1092 self.evictor.do_critical_eviction();
1093 observe_span!(
1094 "db_delete",
1095 db_name = self.name.as_str(),
1096 key_size = key_bytes.len(),
1097 );
1098 let _obs_timer = observe_timer_start!();
1099 observe_counter!("noxu_db_operations_total", "op" => "delete");
1100
1101 // FK referrers and secondary hooks consume a `&DatabaseEntry` key;
1102 // build one once from the bytes (review P1-3: `DatabaseEntry` stays
1103 // an internal-plumbing detail, not the public surface).
1104 let key = DatabaseEntry::from_bytes(key_bytes);
1105
1106 // v1.6 (audit C3): if any secondaries are registered we must
1107 // capture the pre-delete primary data on each iteration so the
1108 // secondary key creator can recompute every (sec_key, pri_key)
1109 // pair to remove. Collected outside the cursor closure so
1110 // the auto-commit and explicit-txn paths share one buffer.
1111 let secondaries = self.live_secondaries();
1112 let track_old_data =
1113 !secondaries.is_empty() || self.has_user_triggers();
1114 let mut deleted_old_values: Vec<Vec<u8>> = Vec::new();
1115
1116 // v1.6 (audit C2 / Decision 2C — step 8): consult any FK
1117 // referrers BEFORE the delete is applied. An Abort action
1118 // raises a typed error and prevents the foreign delete from
1119 // happening at all (matching JE's `ForeignConstraintException`
1120 // semantics). Cascade / Nullify (steps 9 / 10) mutate child
1121 // records under the same caller-supplied txn so the foreign
1122 // delete and its consequences commit / abort together.
1123 let fk_referrers = self.live_fk_referrers();
1124 if !fk_referrers.is_empty() {
1125 for referrer in &fk_referrers {
1126 referrer.on_foreign_key_deleted(txn, &key)?;
1127 }
1128 }
1129
1130 // Inner closure shared between the explicit-txn and synthetic
1131 // auto-txn paths: scans + deletes every duplicate of `key_bytes`
1132 // through the supplied `cursor`. See comment in pre-Wave-1A
1133 // delete for the dup-loop rationale (BDB-JE
1134 // `Database.delete(key)` semantics).
1135 let mut run_delete = |cursor: &mut CursorImpl| -> Result<bool> {
1136 let mut deleted_any = false;
1137 while let noxu_dbi::OperationStatus::Success = cursor
1138 .search(key_bytes, None, SearchMode::Set)
1139 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
1140 {
1141 if track_old_data {
1142 let (_, v) = cursor.get_current().map_err(|e| {
1143 NoxuError::OperationNotAllowed(e.to_string())
1144 })?;
1145 deleted_old_values.push(v);
1146 }
1147 cursor.delete().map_err(|e| {
1148 NoxuError::OperationNotAllowed(e.to_string())
1149 })?;
1150 deleted_any = true;
1151 }
1152 Ok(deleted_any)
1153 };
1154
1155 let deleted_any = match txn {
1156 Some(t) => {
1157 let mut cursor = self.make_cursor_for_txn(t);
1158 run_delete(&mut cursor)?
1159 }
1160 None => self.with_auto_txn(&mut run_delete)?,
1161 };
1162
1163 // v1.6 (audit C3): fan out the secondary cleanup under the
1164 // caller's txn so the primary delete and every secondary
1165 // tombstone commit / abort together.
1166 if deleted_any && !secondaries.is_empty() {
1167 for old_bytes in &deleted_old_values {
1168 let old_entry = DatabaseEntry::from_bytes(old_bytes);
1169 for hook in &secondaries {
1170 hook.maintain(txn, &key, Some(&old_entry), None)?;
1171 }
1172 }
1173 }
1174
1175 // DB-TRIG: fire Trigger.delete within the transaction for each
1176 // removed (key, data) pair, after the change is applied. JE
1177 // `TriggerManager.runDeleteTriggers(locker, dbImpl, key, oldData)`.
1178 if deleted_any && self.has_user_triggers() {
1179 for old_bytes in &deleted_old_values {
1180 self.fire_delete_triggers(txn, key_bytes, old_bytes);
1181 }
1182 }
1183
1184 let status = if deleted_any {
1185 OperationStatus::Success
1186 } else {
1187 OperationStatus::NotFound
1188 };
1189 if status == OperationStatus::Success {
1190 self.throughput.n_pri_deletes.fetch_add(1, Ordering::Relaxed);
1191 } else {
1192 self.throughput.n_pri_delete_fails.fetch_add(1, Ordering::Relaxed);
1193 }
1194 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "delete");
1195 Ok(deleted_any)
1196 }
1197
1198 /// Opens an auto-commit cursor for iterating over database records
1199 /// (review P0-1: returns `Cursor<'_>`; review P0-2: auto-commit is the
1200 /// unadorned form, the transactional form is [`Self::open_cursor_in`]).
1201 ///
1202 /// In auto-commit mode each cursor write is its own transaction and is
1203 /// fsynced before returning.
1204 ///
1205 /// # Arguments
1206 /// * `config` - Optional cursor configuration
1207 ///
1208 /// # Errors
1209 /// Returns an error if the database is closed.
1210 pub fn open_cursor(
1211 &self,
1212 config: Option<&CursorConfig>,
1213 ) -> Result<Cursor<'static>> {
1214 self.open_cursor_internal(None, config)
1215 }
1216
1217 /// Opens a cursor bound to an explicit transaction (review P0-1/P0-2).
1218 ///
1219 /// The cursor binds to the transaction's `Locker`: every cursor `get`
1220 /// acquires shared locks tracked by the txn, every cursor `put`/`delete`
1221 /// acquires exclusive locks and is rolled back if the txn aborts.
1222 ///
1223 /// The returned `Cursor<'txn>` borrows `txn`, so the borrow checker
1224 /// rejects any attempt to commit or drop the transaction while the
1225 /// cursor is still alive — the old "close the cursor before commit"
1226 /// prose invariant is now a compile error.
1227 ///
1228 /// # Arguments
1229 /// * `txn` - the transaction the cursor participates in
1230 /// * `config` - Optional cursor configuration
1231 ///
1232 /// # Errors
1233 /// Returns an error if the database is closed or a transactional handle
1234 /// is used against a non-transactional database.
1235 pub fn open_cursor_in<'txn>(
1236 &self,
1237 txn: &'txn Transaction,
1238 config: Option<&CursorConfig>,
1239 ) -> Result<Cursor<'txn>> {
1240 self.open_cursor_internal(Some(txn), config)
1241 }
1242
1243 /// Shared cursor-open used by [`Self::open_cursor`] /
1244 /// [`Self::open_cursor_in`]. The `'txn` lifetime flows from the
1245 /// `Option<&'txn Transaction>` into the returned `Cursor<'txn>`
1246 /// (review P0-1).
1247 pub(crate) fn open_cursor_internal<'txn>(
1248 &self,
1249 txn: Option<&'txn Transaction>,
1250 config: Option<&CursorConfig>,
1251 ) -> Result<Cursor<'txn>> {
1252 self.check_open()?;
1253
1254 // JE invariant: a transactional cursor cannot be opened on a
1255 // non-transactional database. JE throws IllegalArgumentException
1256 // in DatabaseTest.testCursor when a Transaction is supplied to
1257 // a non-txnal DB (wave-11-G, database_txn_cursor_on_non_txn_db_rejected).
1258 self.reject_txn_on_non_txnal_db(txn.is_some())?;
1259 let read_only = config.map(|c| c.read_uncommitted).unwrap_or(false)
1260 || self.config.read_only;
1261
1262 let cursor_impl = if read_only {
1263 CursorImpl::new(Arc::clone(&self.db_impl), 0)
1264 .with_env_invalid(Arc::clone(&self.env_invalid))
1265 } else {
1266 // Plumb the caller's txn through to the cursor so that
1267 // cursor reads acquire shared locks via the txn's locker and
1268 // cursor writes acquire exclusive locks (and roll back on
1269 // txn.abort()) rather than auto-committing. See API audit
1270 // 2026-05 cursor finding C1.
1271 match txn {
1272 Some(t) => self.make_cursor_for_txn(t),
1273 None => self.make_cursor(),
1274 }
1275 };
1276
1277 Ok(Cursor::from_impl(cursor_impl, read_only))
1278 }
1279
1280 /// Returns a lazy forward iterator over all records in the database.
1281 ///
1282 /// Records are fetched one at a time (the underlying cursor advances on
1283 /// each `next()` call). The full database is **not** eagerly materialised
1284 /// into memory.
1285 ///
1286 /// Pass `txn = Some(&txn)` to iterate within an explicit transaction;
1287 /// pass `None` for an auto-commit (non-transactional) scan.
1288 ///
1289 /// # Example
1290 ///
1291 /// ```no_run
1292 /// # use noxu_db::{Database, DatabaseConfig, DatabaseEntry,
1293 /// # Environment, EnvironmentConfig};
1294 /// # use std::path::PathBuf;
1295 /// # fn main() -> noxu_db::Result<()> {
1296 /// # let env = Environment::open(EnvironmentConfig::new(PathBuf::from("/tmp/t")).with_allow_create(true))?;
1297 /// # let db = env.open_database(None, "d", &DatabaseConfig::new().with_allow_create(true).with_transactional(true))?;
1298 /// for result in db.iter(None)? {
1299 /// let (key, val) = result?;
1300 /// println!("{:?} => {:?}", key, val);
1301 /// }
1302 /// # Ok(()) }
1303 /// ```
1304 ///
1305 /// # Errors
1306 /// Returns an error if the database is closed.
1307 pub fn iter<'txn>(
1308 &self,
1309 txn: Option<&'txn Transaction>,
1310 ) -> Result<crate::db_iter::DbIter<'txn>> {
1311 let cursor = self.open_cursor_internal(txn, None)?;
1312 Ok(crate::db_iter::DbIter::new(cursor))
1313 }
1314
1315 /// Returns a lazy iterator over the records whose keys fall within `range`.
1316 ///
1317 /// The iterator is positioned at the first key that satisfies the lower
1318 /// bound (using `SearchGte`) and stops once the key exceeds the upper
1319 /// bound. All standard `RangeBounds` variants are supported:
1320 /// `..`, `lo..`, `..=hi`, `lo..hi`, `lo..=hi`, etc.
1321 ///
1322 /// Pass `txn = Some(&txn)` to iterate within an explicit transaction;
1323 /// pass `None` for a non-transactional scan.
1324 ///
1325 /// # Example
1326 ///
1327 /// ```no_run
1328 /// # use noxu_db::{Database, DatabaseConfig, DatabaseEntry,
1329 /// # Environment, EnvironmentConfig};
1330 /// # use std::path::PathBuf;
1331 /// # fn main() -> noxu_db::Result<()> {
1332 /// # let env = Environment::open(EnvironmentConfig::new(PathBuf::from("/tmp/t")).with_allow_create(true))?;
1333 /// # let db = env.open_database(None, "d", &DatabaseConfig::new().with_allow_create(true).with_transactional(true))?;
1334 /// let lo = b"key010";
1335 /// let hi = b"key020";
1336 /// for result in db.range(None, lo.as_ref()..=hi.as_ref())? {
1337 /// let (key, _val) = result?;
1338 /// assert!(key.as_slice() >= lo.as_slice());
1339 /// }
1340 /// # Ok(()) }
1341 /// ```
1342 ///
1343 /// # Errors
1344 /// Returns an error if the database is closed.
1345 pub fn range<'txn, K: AsRef<[u8]>>(
1346 &self,
1347 txn: Option<&'txn Transaction>,
1348 range: impl std::ops::RangeBounds<K>,
1349 ) -> Result<crate::db_iter::DbRange<'txn>> {
1350 use std::ops::Bound;
1351 let map_bound = |b: std::ops::Bound<&K>| -> std::ops::Bound<Vec<u8>> {
1352 match b {
1353 Bound::Included(k) => Bound::Included(k.as_ref().to_vec()),
1354 Bound::Excluded(k) => Bound::Excluded(k.as_ref().to_vec()),
1355 Bound::Unbounded => Bound::Unbounded,
1356 }
1357 };
1358 let start = map_bound(range.start_bound());
1359 let end = map_bound(range.end_bound());
1360 let cursor = self.open_cursor_internal(txn, None)?;
1361 Ok(crate::db_iter::DbRange::new(cursor, start, end))
1362 }
1363 ///
1364 ///
1365 ///
1366 /// # Arguments
1367 /// * `key` - The database key under which the sequence record is stored.
1368 /// * `config` - Sequence configuration (use `SequenceConfig::new()` for defaults).
1369 ///
1370 /// # Errors
1371 /// Returns an error if the database is closed, the config is invalid, or
1372 /// `allow_create` is false and the sequence does not exist.
1373 pub fn open_sequence<'db>(
1374 &'db self,
1375 key: &DatabaseEntry,
1376 config: SequenceConfig,
1377 ) -> Result<Sequence<'db>> {
1378 self.check_open()?;
1379 Sequence::open(self, key, config)
1380 }
1381
1382 /// Closes the database handle.
1383 ///
1384 ///
1385 ///
1386 /// # Errors
1387 /// Returns an error if the database is already closed
1388 pub fn close(&self) -> Result<()> {
1389 if !self.open.load(Ordering::Acquire) {
1390 return Err(NoxuError::DatabaseClosed);
1391 }
1392
1393 self.open.store(false, Ordering::Release);
1394 let _ = self
1395 .env_impl
1396 .lock()
1397 .close_database(noxu_dbi::DatabaseId::new(self.id as i64));
1398 Ok(())
1399 }
1400
1401 /// Returns the database name.
1402 ///
1403 ///
1404 pub fn name(&self) -> &str {
1405 &self.name
1406 }
1407
1408 /// Returns the database configuration.
1409 ///
1410 ///
1411 pub fn config(&self) -> &DatabaseConfig {
1412 &self.config
1413 }
1414
1415 /// Returns whether this database was created with sorted duplicates.
1416 ///
1417 /// Unlike `config().sorted_duplicates` — which reflects the
1418 /// `DatabaseConfig` the caller *passed* to `open_database` — this reads
1419 /// the property stored in the opened `DatabaseImpl`, so it is correct even
1420 /// when an existing database is reopened without restating its dup-sort
1421 /// flag (as `noxu-admin dump` does). Mirrors JE
1422 /// `Database.getConfig().getSortedDuplicates()` after
1423 /// `DbInternal.setUseExistingConfig`.
1424 pub fn sorted_duplicates(&self) -> bool {
1425 self.db_impl.read().get_sorted_duplicates()
1426 }
1427
1428 /// Returns the underlying database ID. Used by FK cascade guards
1429 /// to disambiguate `(db, key)` frames when several databases
1430 /// participate in a cycle.
1431 pub(crate) fn db_id_for_fk_guard(&self) -> u64 {
1432 self.id
1433 }
1434
1435 /// Registers a secondary index for automatic maintenance.
1436 ///
1437 /// v1.6 (audit C3 — associate() hook): every [`SecondaryDatabase`]
1438 /// downgrades its inner `Arc<SecondaryHookState>` to a `Weak` and
1439 /// stores it here. Subsequent `put` / `delete` calls iterate the
1440 /// list and forward the same txn to every live secondary, dropping
1441 /// dead `Weak` entries on the fly.
1442 pub(crate) fn register_secondary(
1443 &self,
1444 hook: std::sync::Weak<
1445 dyn crate::secondary_database::SecondaryHook + Send + Sync,
1446 >,
1447 ) {
1448 let mut guard = self.secondaries.write();
1449 // Compact dead Weak entries lazily on every registration so the
1450 // list does not grow unboundedly with churn.
1451 guard.retain(|w| w.strong_count() > 0);
1452 guard.push(hook);
1453 }
1454
1455 /// Returns a snapshot of every live registered secondary. Used by
1456 /// the automatic-maintenance plumbing in `put` / `delete` to drive
1457 /// secondaries without holding the registry lock across the
1458 /// secondary call. Dead `Weak` entries are dropped from the
1459 /// returned list (and — because we re-acquire the registry write
1460 /// lock at registration time — lazily compacted from the registry
1461 /// itself).
1462 pub(crate) fn live_secondaries(
1463 &self,
1464 ) -> Vec<Arc<dyn crate::secondary_database::SecondaryHook + Send + Sync>>
1465 {
1466 self.secondaries.read().iter().filter_map(|w| w.upgrade()).collect()
1467 }
1468
1469 /// Whether any user triggers are registered on this database (DB-TRIG).
1470 /// JE `DatabaseImpl.hasUserTriggers()` — the single fast-path check that
1471 /// keeps the no-trigger write path free of any trigger work.
1472 fn has_user_triggers(&self) -> bool {
1473 self.db_impl.read().has_user_triggers()
1474 }
1475
1476 /// Fire `Trigger.put` for every registered trigger, in registration order,
1477 /// and record this database on the transaction so its commit/abort
1478 /// triggers fire on resolution (DB-TRIG).
1479 ///
1480 /// Called after the record modification is applied, within the
1481 /// transaction — JE `Cursor.putNotify` -> `TriggerManager.runPutTriggers`,
1482 /// which also calls `Txn.noteTriggerDb`.
1483 fn fire_put_triggers(
1484 &self,
1485 txn: Option<&Transaction>,
1486 key: &[u8],
1487 old_data: Option<&[u8]>,
1488 new_data: &[u8],
1489 ) {
1490 let (db_id, triggers) = {
1491 let db = self.db_impl.read();
1492 (db.get_id().id() as u64, db.triggers().to_vec())
1493 };
1494 if triggers.is_empty() {
1495 return;
1496 }
1497 let txn_id = txn.map(Transaction::id);
1498 for trigger in &triggers {
1499 trigger.put(txn_id, key, old_data, new_data);
1500 }
1501 // JE Txn.noteTriggerDb: remember the modified DB so commit/abort
1502 // triggers fire later. Only meaningful under an explicit txn (the
1503 // auto-commit path commits immediately and has no handle to note).
1504 if let Some(t) = txn {
1505 t.note_trigger_db(db_id, &triggers);
1506 }
1507 }
1508
1509 /// Fire `Trigger.delete` for every registered trigger, in registration
1510 /// order, and record this database on the transaction (DB-TRIG).
1511 ///
1512 /// JE `Cursor.deleteInternal` -> `TriggerManager.runDeleteTriggers` +
1513 /// `Txn.noteTriggerDb`.
1514 fn fire_delete_triggers(
1515 &self,
1516 txn: Option<&Transaction>,
1517 key: &[u8],
1518 old_data: &[u8],
1519 ) {
1520 let (db_id, triggers) = {
1521 let db = self.db_impl.read();
1522 (db.get_id().id() as u64, db.triggers().to_vec())
1523 };
1524 if triggers.is_empty() {
1525 return;
1526 }
1527 let txn_id = txn.map(Transaction::id);
1528 for trigger in &triggers {
1529 trigger.delete(txn_id, key, old_data);
1530 }
1531 if let Some(t) = txn {
1532 t.note_trigger_db(db_id, &triggers);
1533 }
1534 }
1535
1536 /// Registers an FK referrer that points at this primary as its
1537 /// foreign-key target (v1.6 audit C2 / Decision 2C — Abort hook).
1538 pub(crate) fn register_fk_referrer(
1539 &self,
1540 referrer: std::sync::Weak<
1541 dyn crate::secondary_database::FkReferrer + Send + Sync,
1542 >,
1543 ) {
1544 let mut guard = self.fk_referrers.write();
1545 guard.retain(|w| w.strong_count() > 0);
1546 guard.push(referrer);
1547 }
1548
1549 /// Snapshot of every live FK referrer.
1550 pub(crate) fn live_fk_referrers(
1551 &self,
1552 ) -> Vec<Arc<dyn crate::secondary_database::FkReferrer + Send + Sync>> {
1553 self.fk_referrers.read().iter().filter_map(|w| w.upgrade()).collect()
1554 }
1555
1556 /// Returns an approximate count of records in the database.
1557 ///
1558 /// reads the per-database `AtomicU64` entry
1559 /// counter, giving O(1) performance analogous to an O(1) counter.
1560 ///
1561 /// The counter is incremented on every new insert and decremented on every
1562 /// delete (including transaction aborts that undo inserts).
1563 ///
1564 /// # Errors
1565 /// Returns an error if the database is closed
1566 pub fn count(&self) -> Result<u64> {
1567 self.check_open()?;
1568 Ok(self.db_impl.read().entry_count())
1569 }
1570
1571 /// Returns all records as `(key_bytes, data_bytes)` pairs in key order.
1572 ///
1573 /// This is a helper for schema evolution: it uses the lower-level
1574 /// `CursorImpl` directly so each iteration yields raw `Vec<u8>` pairs
1575 /// without allocating a pair of `DatabaseEntry` values per record.
1576 ///
1577 /// # Errors
1578 /// Returns an error if the database is closed or a cursor operation fails.
1579 pub fn scan_all_kv(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1580 self.check_open()?;
1581
1582 let mut cursor = CursorImpl::new(Arc::clone(&self.db_impl), 0)
1583 .with_env_invalid(Arc::clone(&self.env_invalid));
1584 let first_status = cursor
1585 .get_first()
1586 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1587
1588 if first_status != noxu_dbi::OperationStatus::Success {
1589 return Ok(Vec::new());
1590 }
1591
1592 let mut records = Vec::new();
1593 loop {
1594 let (k, v) = cursor
1595 .get_current()
1596 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1597 records.push((k, v));
1598
1599 let status = cursor
1600 .retrieve_next(GetMode::Next)
1601 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1602 if status != noxu_dbi::OperationStatus::Success {
1603 break;
1604 }
1605 }
1606
1607 Ok(records)
1608 }
1609
1610 /// Returns whether the database handle is valid.
1611 ///
1612 ///
1613 pub fn is_valid(&self) -> bool {
1614 self.open.load(Ordering::Acquire)
1615 }
1616
1617 /// Returns the current state of the database handle.
1618 pub fn state(&self) -> DbState {
1619 if self.open.load(Ordering::Acquire) {
1620 DbState::Open
1621 } else {
1622 DbState::Closed
1623 }
1624 }
1625
1626 /// Flushes all pending writes for this database to stable storage.
1627 ///
1628 /// Implements `Database.sync()` — issues an fdatasync on the log file,
1629 /// ensuring that all writes made by non-transactional or deferred-sync
1630 /// operations are durable before returning.
1631 ///
1632 /// # Returns
1633 /// `Ok(())` on success. Acts as a no-op for non-transactional /
1634 /// in-memory environments where no log manager is configured.
1635 ///
1636 /// # Errors
1637 /// Returns an error if the database is closed or the underlying
1638 /// log-manager flush fails.
1639 pub fn sync(&self) -> Result<()> {
1640 self.check_open()?;
1641 if let Some(lm) = &self.log_manager {
1642 lm.flush_sync()
1643 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1644 }
1645 Ok(())
1646 }
1647
1648 /// Preloads the database into cache by scanning the B-tree.
1649 ///
1650 /// Walks the tree, touching each internal-node and BIN level so they
1651 /// are pulled into the in-memory cache. Useful for warming the
1652 /// cache before a workload begins.
1653 ///
1654 /// # Limitations
1655 /// * The current implementation warms the BIN/IN structure only;
1656 /// `PreloadConfig::load_lns` therefore makes `lns_loaded` report
1657 /// the *number of LN slots in the tree* rather than the number
1658 /// of LNs actually fetched off disk. Full LN warming is
1659 /// tracked as a future-work item; the engine has no public
1660 /// single-shot LN fetch API today, so the only way to warm an
1661 /// LN is to position a cursor on its slot.
1662 /// * `PreloadConfig::max_millis` is honoured: the call returns
1663 /// early once the wall-clock budget is exceeded, with the
1664 /// partial results in the returned `PreloadStats`.
1665 ///
1666 /// # Arguments
1667 /// * `config` - Controls limits on preload duration and memory
1668 ///
1669 /// # Returns
1670 /// Statistics about what was preloaded.
1671 pub fn preload(&self, config: &PreloadConfig) -> Result<PreloadStats> {
1672 self.check_open()?;
1673 let start = std::time::Instant::now();
1674 let max_millis = config.max_millis;
1675 let mut stats =
1676 PreloadStats { bins_loaded: 0, lns_loaded: 0, elapsed_ms: 0 };
1677
1678 let guard = self.db_impl.read();
1679 if let Some(tree_stats) = guard.collect_btree_stats() {
1680 // collect_btree_stats() walks every node in the tree, which has
1681 // the side effect of pulling all BINs/INs into memory (cache).
1682 stats.bins_loaded = tree_stats.n_bins;
1683 if config.load_lns {
1684 // F9 (residual): this is the slot count, not a count of
1685 // actual LN fetches. See the doc comment above.
1686 stats.lns_loaded = tree_stats.n_entries;
1687 }
1688 }
1689
1690 // Audit database F10 (Wave 2C-4): honour `max_millis` as a
1691 // post-walk diagnostic. `collect_btree_stats` is currently
1692 // not interruptible, so the time bound surfaces in `stats`
1693 // (callers can detect over-budget runs by comparing
1694 // `elapsed_ms` to their config) but does not yet stop the
1695 // walk early. Tracked for v2.0 alongside true LN warming.
1696 let elapsed_ms = start.elapsed().as_millis() as u64;
1697 if max_millis > 0 && elapsed_ms > max_millis {
1698 log::warn!(
1699 "Database::preload: walk took {elapsed_ms} ms, exceeding \
1700 max_millis budget of {max_millis} ms (advisory until \
1701 the BIN walker becomes interruptible)",
1702 );
1703 }
1704 stats.elapsed_ms = elapsed_ms;
1705 Ok(stats)
1706 }
1707
1708 /// Returns B-tree statistics for this database.
1709 ///
1710 /// Implements `Database.getStats(StatsConfig)`.
1711 ///
1712 /// When `config.fast` is `true`, only the O(1) entry-count is returned
1713 /// and no tree traversal is performed. When `fast` is `false` (default),
1714 /// the full tree is walked to populate all node-count fields.
1715 ///
1716 /// # Errors
1717 /// Returns an error if the database is closed.
1718 pub fn stats(&self, config: Option<&StatsConfig>) -> Result<DatabaseStats> {
1719 self.check_open()?;
1720 let fast = config.map(|c| c.fast).unwrap_or(false);
1721
1722 let btree = if fast {
1723 // Fast path: O(1) counter only; skip tree traversal.
1724 BtreeStats {
1725 leaf_node_count: self.db_impl.read().entry_count(),
1726 ..Default::default()
1727 }
1728 } else {
1729 // Full path: walk the tree.
1730 let guard = self.db_impl.read();
1731 match guard.collect_btree_stats() {
1732 Some(ts) => BtreeStats {
1733 leaf_node_count: ts.n_entries,
1734 deleted_leaf_node_count: 0,
1735 bottom_internal_node_count: ts.n_bins,
1736 internal_node_count: ts.n_ins,
1737 main_tree_max_depth: ts.height,
1738 },
1739 None => BtreeStats {
1740 leaf_node_count: guard.entry_count(),
1741 ..Default::default()
1742 },
1743 }
1744 };
1745
1746 Ok(DatabaseStats { btree })
1747 }
1748
1749 /// Verifies the structural integrity of this database's B-tree.
1750 ///
1751 /// Walks the B-tree from root to BIN leaves and checks:
1752 /// - Each upper IN's children are accessible (non-null child references).
1753 /// - Each BIN entry that is not known-deleted has a valid (non-NULL) LSN.
1754 /// - The BIN's first key is >= the parent routing key (key-range containment).
1755 ///
1756 /// Mirrors `Database.verify(VerifyConfig)` — calls `BtreeVerifier` on
1757 /// the underlying tree.
1758 ///
1759 /// # Arguments
1760 /// * `config` - Verification options (which checks to run, max errors, etc.)
1761 ///
1762 /// # Returns
1763 /// A `VerifyResult` with any structural errors and the count of records verified.
1764 ///
1765 /// # Errors
1766 /// Returns an error if the database is closed.
1767 pub fn verify(
1768 &self,
1769 config: &noxu_engine::VerifyConfig,
1770 ) -> Result<noxu_engine::VerifyResult> {
1771 self.check_open()?;
1772 let guard = self.db_impl.read();
1773 Ok(noxu_engine::verify_database_impl(&guard, config))
1774 }
1775
1776 /// Creates a join cursor that returns records matching all secondary-key
1777 /// constraints expressed by the pre-positioned `cursors`.
1778 ///
1779 /// Mirrors `Database.join(SecondaryCursor[], JoinConfig)`.
1780 ///
1781 /// Each cursor in `cursors` must already be positioned at the desired
1782 /// secondary key value (e.g. via `SecondaryCursor::get_search_key`).
1783 /// The join algorithm iterates through all candidate primary keys from
1784 /// `cursors[0]` and probes `cursors[1..n]` to confirm each candidate
1785 /// also appears in their secondary keys. Candidates that pass all
1786 /// probes are returned by [`JoinCursor::get_next`].
1787 ///
1788 /// Unless `config.no_sort` is `true`, the cursor array is re-ordered by
1789 /// ascending duplicate-count estimate before the join starts, matching
1790 /// JE's optimisation for minimum candidate-set size.
1791 ///
1792 /// The returned `JoinCursor` owns the `cursors` for its lifetime.
1793 ///
1794 /// # Errors
1795 /// Returns an error if this database handle is closed.
1796 pub fn join<'db>(
1797 &'db self,
1798 cursors: Vec<SecondaryCursor<'db>>,
1799 config: Option<JoinConfig>,
1800 ) -> Result<JoinCursor<'db>> {
1801 self.check_open()?;
1802 JoinCursor::new(self, cursors, config)
1803 }
1804
1805 /// Checks if the database is open, returns an error if not.
1806 ///
1807 /// X-13: also checks the environment validity flags so that reads and
1808 /// writes return `EnvironmentFailure` after an fsync error or explicit
1809 /// `EnvironmentImpl::invalidate()` call rather than silently succeeding
1810 /// on stale BIN data.
1811 /// TXN-6 (JE invariant): a transactional handle must not be used against a
1812 /// non-transactional database. JE `LockerFactory.getWritableLocker`/
1813 /// `getReadableLocker` throw `IllegalArgumentException` on EVERY operation
1814 /// (not just cursor-open) when a `Transaction` is supplied to a non-txnal DB.
1815 /// Shared by get/put/delete/get_with_options/put_with_options/open_cursor.
1816 fn reject_txn_on_non_txnal_db(&self, has_txn: bool) -> Result<()> {
1817 if has_txn && !self.config.transactional {
1818 return Err(NoxuError::IllegalArgument(
1819 "a transaction cannot be used with a \
1820 non-transactional database"
1821 .to_string(),
1822 ));
1823 }
1824 Ok(())
1825 }
1826
1827 fn check_open(&self) -> Result<()> {
1828 // Check environment validity first — explicit invalidation.
1829 if self.env_invalid.load(Ordering::Acquire) {
1830 return Err(NoxuError::environment_with_reason(
1831 crate::error::EnvironmentFailureReason::UnexpectedStateFatal,
1832 "environment has been invalidated".to_string(),
1833 ));
1834 }
1835 // Check I/O failure (C-2 / fsync-gate).
1836 if self
1837 .log_manager
1838 .as_ref()
1839 .is_some_and(|lm| lm.io_invalid.load(Ordering::Acquire))
1840 {
1841 return Err(NoxuError::environment_with_reason(
1842 crate::error::EnvironmentFailureReason::LogWrite,
1843 "I/O failure: environment invalidated by fsync error"
1844 .to_string(),
1845 ));
1846 }
1847 if !self.open.load(Ordering::Acquire) {
1848 return Err(NoxuError::DatabaseClosed);
1849 }
1850 Ok(())
1851 }
1852
1853 /// Public-ish accessor for the cached log manager, used by
1854 /// [`crate::disk_ordered_cursor::open_disk_ordered_cursor_multi`].
1855 /// Returns `None` for non-WAL environments.
1856 pub(crate) fn cached_log_manager(
1857 &self,
1858 ) -> Option<&std::sync::Arc<noxu_log::LogManager>> {
1859 self.log_manager.as_ref()
1860 }
1861
1862 /// Cached cleaner `FileProtector` for this database's environment, used by
1863 /// the disk-ordered-cursor producer to protect the files it scans from
1864 /// cleaner deletion (CLN-7). `None` when the environment has no cleaner.
1865 pub(crate) fn cached_file_protector(
1866 &self,
1867 ) -> Option<std::sync::Arc<noxu_cleaner::FileProtector>> {
1868 self.file_protector.clone()
1869 }
1870
1871 /// Public-ish accessor used by the disk-ordered-cursor helper to
1872 /// validate that the database is still open before scanning.
1873 pub(crate) fn check_open_for_doc(&self) -> Result<()> {
1874 self.check_open()
1875 }
1876
1877 /// Returns this database's `DatabaseId` for use by the disk-ordered
1878 /// cursor producer.
1879 pub(crate) fn database_id_for_doc(&self) -> noxu_dbi::DatabaseId {
1880 noxu_dbi::DatabaseId::new(self.id as i64)
1881 }
1882
1883 /// The env's `DOS_PRODUCER_QUEUE_TIMEOUT` (ms), read from the owning
1884 /// `EnvironmentImpl` at cursor-open time (not a hot path). Passed to the
1885 /// disk-ordered-cursor producer so a lagging consumer fails the scan
1886 /// instead of hanging (JE `DOS_PRODUCER_QUEUE_TIMEOUT`).
1887 pub(crate) fn dos_producer_queue_timeout_ms(&self) -> u64 {
1888 self.env_impl.lock().get_dos_producer_queue_timeout_ms()
1889 }
1890
1891 /// Checks if the database is writable, returns an error if not.
1892 fn check_writable(&self) -> Result<()> {
1893 if self.config.read_only {
1894 return Err(NoxuError::ReadOnly);
1895 }
1896 Ok(())
1897 }
1898
1899 /// Unify the empty-key contract
1900 /// across `get` / `put` / `put_no_overwrite` / `put_with_options`
1901 /// / `delete`. Returns the key bytes if the entry has data set
1902 /// (even if zero-length); rejects `None`-data keys with a typed
1903 /// `IllegalArgument` so the previous put-vs-get asymmetry can no
1904 /// longer black-hole records under a `None` key.
1905 #[allow(dead_code)] // documented empty-key contract helper, not yet wired
1906 fn require_key_bytes<'a>(
1907 key: &'a DatabaseEntry,
1908 op: &'static str,
1909 ) -> Result<&'a [u8]> {
1910 match key.data_opt() {
1911 Some(k) => Ok(k),
1912 None => Err(NoxuError::IllegalArgument(format!(
1913 "{op}: key DatabaseEntry has no data; \
1914 use DatabaseEntry::from_bytes(...) or set_data(...) \
1915 (Some(&[]) for an explicit empty key)",
1916 ))),
1917 }
1918 }
1919}
1920
1921impl Drop for Database {
1922 fn drop(&mut self) {
1923 // Best effort close on drop
1924 let _ = self.close();
1925 }
1926}
1927
1928#[cfg(test)]
1929mod tests {
1930 use super::*;
1931 use crate::environment::Environment;
1932 use crate::environment_config::EnvironmentConfig;
1933 use tempfile::TempDir;
1934
1935 fn temp_env_and_db() -> (TempDir, Environment, Database) {
1936 let temp_dir = TempDir::new().unwrap();
1937 let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1938 .with_allow_create(true)
1939 .with_transactional(true);
1940 let env = Environment::open(env_config).unwrap();
1941
1942 let db_config = DatabaseConfig::new()
1943 .with_allow_create(true)
1944 .with_transactional(true);
1945 let db = env.open_database(None, "testdb", &db_config).unwrap();
1946
1947 (temp_dir, env, db)
1948 }
1949
1950 #[test]
1951 fn test_database_name() {
1952 let (_temp_dir, _env, db) = temp_env_and_db();
1953 assert_eq!(db.name(), "testdb");
1954 }
1955
1956 #[test]
1957 fn test_put_and_get() {
1958 let (_temp_dir, _env, db) = temp_env_and_db();
1959
1960 let key = DatabaseEntry::from_bytes(b"key1");
1961 let value = DatabaseEntry::from_bytes(b"value1");
1962
1963 db.put(&key, &value).unwrap();
1964
1965 let mut retrieved = DatabaseEntry::new();
1966 let result = db.get_into(None, &key, &mut retrieved).unwrap();
1967 assert!(result);
1968 assert_eq!(retrieved.data_opt().unwrap(), b"value1");
1969 }
1970
1971 #[test]
1972 fn test_get_nonexistent() {
1973 let (_temp_dir, _env, db) = temp_env_and_db();
1974
1975 let key = DatabaseEntry::from_bytes(b"nonexistent");
1976 let mut data = DatabaseEntry::new();
1977
1978 let result = db.get_into(None, &key, &mut data).unwrap();
1979 assert!(!result);
1980 }
1981
1982 /// ("partial-put length
1983 /// mismatch silent truncation"): a partial put whose `data` slice
1984 /// differs in length from the configured partial-length must be
1985 /// rejected with a typed error instead of silently truncating or
1986 /// padding the splice.
1987 #[test]
1988 fn test_partial_put_length_mismatch_rejected() {
1989 let (_temp_dir, _env, db) = temp_env_and_db();
1990
1991 let key = DatabaseEntry::from_bytes(b"k");
1992 db.put(&key, DatabaseEntry::from_bytes(b"hello world")).unwrap();
1993
1994 // Partial offset=6, partial_length=5 ("world"), but only 3 bytes
1995 // supplied. Used to silently truncate; now rejected.
1996 let mut patch = DatabaseEntry::from_bytes(b"abc");
1997 patch.set_partial(6, 5, true);
1998 let err = db.put_partial(None, &key, &patch).unwrap_err();
1999 assert!(
2000 matches!(err, NoxuError::IllegalArgument(_)),
2001 "expected IllegalArgument, got {err:?}"
2002 );
2003 assert!(
2004 err.to_string().contains("partial"),
2005 "expected partial-related message, got {}",
2006 err
2007 );
2008
2009 // The on-disk record is unchanged because the call returned
2010 // before any write.
2011 let mut buf = DatabaseEntry::new();
2012 let status = db.get_into(None, &key, &mut buf).unwrap();
2013 assert!(status);
2014 assert_eq!(buf.data_opt().unwrap(), b"hello world");
2015 }
2016
2017 /// Companion: when data.len() == partial_length the partial put
2018 /// patches the slice in place and other bytes are preserved.
2019 #[test]
2020 fn test_partial_put_exact_length_patches_in_place() {
2021 let (_temp_dir, _env, db) = temp_env_and_db();
2022
2023 let key = DatabaseEntry::from_bytes(b"k");
2024 db.put(&key, DatabaseEntry::from_bytes(b"hello world")).unwrap();
2025
2026 let mut patch = DatabaseEntry::from_bytes(b"WORLD");
2027 patch.set_partial(6, 5, true);
2028 db.put_partial(None, &key, &patch).unwrap();
2029
2030 let mut buf = DatabaseEntry::new();
2031 db.get_into(None, &key, &mut buf).unwrap();
2032 assert_eq!(buf.data_opt().unwrap(), b"hello WORLD");
2033 }
2034
2035 #[test]
2036 fn test_put_updates_existing() {
2037 let (_temp_dir, _env, db) = temp_env_and_db();
2038
2039 let key = DatabaseEntry::from_bytes(b"key1");
2040 let value1 = DatabaseEntry::from_bytes(b"value1");
2041 let value2 = DatabaseEntry::from_bytes(b"value2");
2042
2043 db.put(&key, &value1).unwrap();
2044 db.put(&key, &value2).unwrap();
2045
2046 let mut retrieved = DatabaseEntry::new();
2047 db.get_into(None, &key, &mut retrieved).unwrap();
2048 assert_eq!(retrieved.data_opt().unwrap(), b"value2");
2049 }
2050
2051 #[test]
2052 fn test_put_no_overwrite_success() {
2053 let (_temp_dir, _env, db) = temp_env_and_db();
2054
2055 let key = DatabaseEntry::from_bytes(b"key1");
2056 let value = DatabaseEntry::from_bytes(b"value1");
2057
2058 let result = db.put_no_overwrite(&key, &value).unwrap();
2059 assert!(result);
2060 }
2061
2062 #[test]
2063 fn test_put_no_overwrite_key_exists() {
2064 let (_temp_dir, _env, db) = temp_env_and_db();
2065
2066 let key = DatabaseEntry::from_bytes(b"key1");
2067 let value1 = DatabaseEntry::from_bytes(b"value1");
2068 let value2 = DatabaseEntry::from_bytes(b"value2");
2069
2070 db.put(&key, &value1).unwrap();
2071 let result = db.put_no_overwrite(&key, &value2).unwrap();
2072 assert!(!result);
2073
2074 // Verify original value is unchanged
2075 let mut retrieved = DatabaseEntry::new();
2076 db.get_into(None, &key, &mut retrieved).unwrap();
2077 assert_eq!(retrieved.data_opt().unwrap(), b"value1");
2078 }
2079
2080 #[test]
2081 fn test_delete() {
2082 let (_temp_dir, _env, db) = temp_env_and_db();
2083
2084 let key = DatabaseEntry::from_bytes(b"key1");
2085 let value = DatabaseEntry::from_bytes(b"value1");
2086
2087 db.put(&key, &value).unwrap();
2088 let result = db.delete(&key).unwrap();
2089 assert!(result);
2090
2091 let mut retrieved = DatabaseEntry::new();
2092 let result = db.get_into(None, &key, &mut retrieved).unwrap();
2093 assert!(!result);
2094 }
2095
2096 #[test]
2097 fn test_delete_nonexistent() {
2098 let (_temp_dir, _env, db) = temp_env_and_db();
2099
2100 let key = DatabaseEntry::from_bytes(b"nonexistent");
2101 let result = db.delete(&key).unwrap();
2102 assert!(!result);
2103 }
2104
2105 #[test]
2106 fn test_count() {
2107 let (_temp_dir, _env, db) = temp_env_and_db();
2108
2109 assert_eq!(db.count().unwrap(), 0);
2110
2111 let key1 = DatabaseEntry::from_bytes(b"key1");
2112 let value1 = DatabaseEntry::from_bytes(b"value1");
2113 db.put(&key1, &value1).unwrap();
2114 assert_eq!(db.count().unwrap(), 1);
2115
2116 let key2 = DatabaseEntry::from_bytes(b"key2");
2117 let value2 = DatabaseEntry::from_bytes(b"value2");
2118 db.put(&key2, &value2).unwrap();
2119 assert_eq!(db.count().unwrap(), 2);
2120
2121 db.delete(&key1).unwrap();
2122 assert_eq!(db.count().unwrap(), 1);
2123 }
2124
2125 #[test]
2126 fn test_close() {
2127 let (_temp_dir, _env, db) = temp_env_and_db();
2128 assert!(db.is_valid());
2129 db.close().unwrap();
2130 assert!(!db.is_valid());
2131 }
2132
2133 #[test]
2134 fn test_close_twice_fails() {
2135 let (_temp_dir, _env, db) = temp_env_and_db();
2136 db.close().unwrap();
2137 let result = db.close();
2138 assert!(result.is_err());
2139 }
2140
2141 #[test]
2142 fn test_operations_on_closed_database_fail() {
2143 let (_temp_dir, _env, db) = temp_env_and_db();
2144 db.close().unwrap();
2145
2146 let key = DatabaseEntry::from_bytes(b"key1");
2147 let value = DatabaseEntry::from_bytes(b"value1");
2148 let mut data = DatabaseEntry::new();
2149
2150 assert!(db.get_into(None, &key, &mut data).is_err());
2151 assert!(db.put(&key, &value).is_err());
2152 assert!(db.put_no_overwrite(&key, &value).is_err());
2153 assert!(db.delete(&key).is_err());
2154 assert!(db.count().is_err());
2155 assert!(db.open_cursor(None).is_err());
2156 }
2157
2158 #[test]
2159 fn test_state() {
2160 let (_temp_dir, _env, db) = temp_env_and_db();
2161 assert_eq!(db.state(), DbState::Open);
2162 db.close().unwrap();
2163 assert_eq!(db.state(), DbState::Closed);
2164 }
2165
2166 #[test]
2167 fn test_read_only_database() {
2168 let temp_dir = TempDir::new().unwrap();
2169 let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2170 .with_allow_create(true);
2171 let env = Environment::open(env_config).unwrap();
2172
2173 let db_config = DatabaseConfig::new()
2174 .with_allow_create(true)
2175 .with_transactional(true)
2176 .with_read_only(true);
2177 let db = env.open_database(None, "readonly_db", &db_config).unwrap();
2178
2179 let key = DatabaseEntry::from_bytes(b"key1");
2180 let value = DatabaseEntry::from_bytes(b"value1");
2181
2182 // Write operations should fail
2183 assert!(db.put(&key, &value).is_err());
2184 assert!(db.put_no_overwrite(&key, &value).is_err());
2185 assert!(db.delete(&key).is_err());
2186 }
2187
2188 #[test]
2189 fn test_multiple_databases() {
2190 let temp_dir = TempDir::new().unwrap();
2191 let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2192 .with_allow_create(true);
2193 let env = Environment::open(env_config).unwrap();
2194
2195 let db_config = DatabaseConfig::new()
2196 .with_allow_create(true)
2197 .with_transactional(true);
2198 let db1 = env.open_database(None, "db1", &db_config).unwrap();
2199 let db2 = env.open_database(None, "db2", &db_config).unwrap();
2200
2201 let key = DatabaseEntry::from_bytes(b"key1");
2202 let value1 = DatabaseEntry::from_bytes(b"value1");
2203 let value2 = DatabaseEntry::from_bytes(b"value2");
2204
2205 db1.put(&key, &value1).unwrap();
2206 db2.put(&key, &value2).unwrap();
2207
2208 let mut retrieved1 = DatabaseEntry::new();
2209 let mut retrieved2 = DatabaseEntry::new();
2210
2211 db1.get_into(None, &key, &mut retrieved1).unwrap();
2212 db2.get_into(None, &key, &mut retrieved2).unwrap();
2213
2214 assert_eq!(retrieved1.data_opt().unwrap(), b"value1");
2215 assert_eq!(retrieved2.data_opt().unwrap(), b"value2");
2216 }
2217
2218 #[test]
2219 fn test_empty_keys_and_values() {
2220 let (_temp_dir, _env, db) = temp_env_and_db();
2221
2222 let empty_key = DatabaseEntry::from_bytes(b"");
2223 let empty_value = DatabaseEntry::from_bytes(b"");
2224
2225 db.put(&empty_key, &empty_value).unwrap();
2226
2227 let mut retrieved = DatabaseEntry::new();
2228 let result = db.get_into(None, &empty_key, &mut retrieved).unwrap();
2229 assert!(result);
2230 assert_eq!(retrieved.data_opt().unwrap(), b"");
2231 }
2232
2233 #[test]
2234 fn test_large_keys_and_values() {
2235 let (_temp_dir, _env, db) = temp_env_and_db();
2236
2237 let large_key = DatabaseEntry::from_bytes(&vec![b'k'; 1000]);
2238 let large_value = DatabaseEntry::from_bytes(&vec![b'v'; 10000]);
2239
2240 db.put(&large_key, &large_value).unwrap();
2241
2242 let mut retrieved = DatabaseEntry::new();
2243 db.get_into(None, &large_key, &mut retrieved).unwrap();
2244 assert_eq!(retrieved.data_opt().unwrap().len(), 10000);
2245 assert!(retrieved.data_opt().unwrap().iter().all(|&b| b == b'v'));
2246 }
2247
2248 #[test]
2249 fn test_binary_keys_and_values() {
2250 let (_temp_dir, _env, db) = temp_env_and_db();
2251
2252 let binary_key = DatabaseEntry::from_bytes(&[0u8, 1, 2, 255, 254, 253]);
2253 let binary_value = DatabaseEntry::from_bytes(&[255u8, 0, 128, 64, 32]);
2254
2255 db.put(&binary_key, &binary_value).unwrap();
2256
2257 let mut retrieved = DatabaseEntry::new();
2258 db.get_into(None, &binary_key, &mut retrieved).unwrap();
2259 assert_eq!(retrieved.data_opt().unwrap(), &[255u8, 0, 128, 64, 32]);
2260 }
2261
2262 #[test]
2263 fn test_scan_all_kv_empty() {
2264 let (_temp_dir, _env, db) = temp_env_and_db();
2265 let kv = db.scan_all_kv().unwrap();
2266 assert!(kv.is_empty());
2267 }
2268
2269 #[test]
2270 fn test_scan_all_kv_returns_records() {
2271 let (_temp_dir, _env, db) = temp_env_and_db();
2272 db.put(
2273 DatabaseEntry::from_vec(vec![1]),
2274 DatabaseEntry::from_vec(vec![10]),
2275 )
2276 .unwrap();
2277 db.put(
2278 DatabaseEntry::from_vec(vec![2]),
2279 DatabaseEntry::from_vec(vec![20]),
2280 )
2281 .unwrap();
2282 let kv = db.scan_all_kv().unwrap();
2283 assert_eq!(kv.len(), 2);
2284 }
2285
2286 #[test]
2287 fn test_scan_all_kv_then_delete() {
2288 let (_temp_dir, _env, db) = temp_env_and_db();
2289 db.put(
2290 DatabaseEntry::from_vec(vec![1]),
2291 DatabaseEntry::from_vec(vec![10]),
2292 )
2293 .unwrap();
2294 db.put(
2295 DatabaseEntry::from_vec(vec![2]),
2296 DatabaseEntry::from_vec(vec![20]),
2297 )
2298 .unwrap();
2299
2300 let kv = db.scan_all_kv().unwrap();
2301 assert_eq!(kv.len(), 2);
2302
2303 for (k, _v) in &kv {
2304 let status = db.delete(DatabaseEntry::from_vec(k.clone())).unwrap();
2305 assert!(status, "delete failed for key {:?}", k);
2306 }
2307
2308 let count = db.count().unwrap();
2309 assert_eq!(count, 0, "expected 0 records after deletes, got {}", count);
2310 }
2311
2312 #[test]
2313 fn test_scan_all_kv_then_delete_u64_be_keys() {
2314 // Simulate the exact pattern used in EntityStore::evolve: big-endian u64 keys.
2315 let (_temp_dir, _env, db) = temp_env_and_db();
2316 for id in [1u64, 2u64] {
2317 let key_bytes = id.to_be_bytes().to_vec();
2318 let val_bytes = format!("user{}", id).into_bytes();
2319 db.put(
2320 DatabaseEntry::from_vec(key_bytes),
2321 DatabaseEntry::from_vec(val_bytes),
2322 )
2323 .unwrap();
2324 }
2325 assert_eq!(db.count().unwrap(), 2);
2326
2327 let records = db.scan_all_kv().unwrap();
2328 assert_eq!(records.len(), 2);
2329
2330 for (k, _v) in records {
2331 let status = db.delete(DatabaseEntry::from_vec(k.clone())).unwrap();
2332 assert!(status, "delete failed for u64 key {:?}", k);
2333 }
2334 assert_eq!(db.count().unwrap(), 0);
2335 }
2336
2337 // ========================================================================
2338 // Additional branch-coverage tests
2339 // ========================================================================
2340
2341 /// get() with a None-data DatabaseEntry returns NotFound.
2342 #[test]
2343 fn test_get_with_none_key_data_returns_not_found() {
2344 let (_temp_dir, _env, db) = temp_env_and_db();
2345 let key_none = DatabaseEntry::new(); // no data set
2346 let mut data = DatabaseEntry::new();
2347
2348 let result = db.get_into(None, &key_none, &mut data).unwrap();
2349 assert!(!result);
2350 }
2351
2352 /// delete() with a None-data DatabaseEntry returns NotFound.
2353 #[test]
2354 fn test_delete_with_none_key_data_returns_not_found() {
2355 let (_temp_dir, _env, db) = temp_env_and_db();
2356 let key_none = DatabaseEntry::new();
2357
2358 let result = db.delete(&key_none).unwrap();
2359 assert!(!result);
2360 }
2361
2362 /// open_cursor() with a CursorConfig that has read_uncommitted=true makes
2363 /// the cursor read-only.
2364 #[test]
2365 fn test_open_cursor_read_uncommitted_config_makes_read_only() {
2366 use crate::cursor_config::CursorConfig;
2367 let (_temp_dir, _env, db) = temp_env_and_db();
2368
2369 let config = CursorConfig::new().with_read_uncommitted(true);
2370 let cursor = db.open_cursor(Some(&config)).unwrap();
2371 assert!(cursor.is_read_only());
2372 }
2373
2374 /// open_cursor() with no config and a non-read-only database produces a
2375 /// writable cursor.
2376 #[test]
2377 fn test_open_cursor_no_config_writable_db_is_writable() {
2378 let (_temp_dir, _env, db) = temp_env_and_db();
2379 let cursor = db.open_cursor(None).unwrap();
2380 assert!(!cursor.is_read_only());
2381 }
2382
2383 /// scan_all_kv() on a closed database returns an error.
2384 #[test]
2385 fn test_scan_all_kv_on_closed_database_fails() {
2386 let (_temp_dir, _env, db) = temp_env_and_db();
2387 db.close().unwrap();
2388 let result = db.scan_all_kv();
2389 assert!(result.is_err());
2390 }
2391
2392 /// put_no_overwrite() on a read-only database returns an error.
2393 #[test]
2394 fn test_put_no_overwrite_on_read_only_database_fails() {
2395 let temp_dir = TempDir::new().unwrap();
2396 let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2397 .with_allow_create(true);
2398 let env = Environment::open(env_config).unwrap();
2399
2400 let db_config = DatabaseConfig::new()
2401 .with_allow_create(true)
2402 .with_transactional(true)
2403 .with_read_only(true);
2404 let db = env.open_database(None, "ro_db", &db_config).unwrap();
2405
2406 let key = DatabaseEntry::from_bytes(b"k");
2407 let val = DatabaseEntry::from_bytes(b"v");
2408 let result = db.put_no_overwrite(&key, &val);
2409 assert!(result.is_err());
2410 }
2411
2412 // =====================================================================
2413 // cursor-failure map_err coverage: use the test hook in noxu-dbi to
2414 // force cursor operations to return Err, exercising the map_err closures
2415 // in Database::get / put / put_no_overwrite / delete / count / scan_all_kv.
2416 // =====================================================================
2417
2418 /// Covers the map_err closure on `cursor.search(...)` inside `get()`.
2419 #[test]
2420 fn test_get_search_map_err_via_hook() {
2421 let (_tmp, _env, db) = temp_env_and_db();
2422 noxu_dbi::set_cursor_fail_after(1); // fail on the 1st check_state (search)
2423 let key = DatabaseEntry::from_bytes(b"any");
2424 let mut data = DatabaseEntry::new();
2425 let result = db.get_into(None, &key, &mut data);
2426 noxu_dbi::clear_cursor_fail_flag();
2427 assert!(result.is_err());
2428 }
2429
2430 /// Covers the map_err closure on `cursor.get_current()` inside `get()`.
2431 #[test]
2432 fn test_get_get_current_map_err_via_hook() {
2433 let (_tmp, _env, db) = temp_env_and_db();
2434 // Insert a key so search can succeed.
2435 db.put(
2436 DatabaseEntry::from_bytes(b"k"),
2437 DatabaseEntry::from_bytes(b"v"),
2438 )
2439 .unwrap();
2440 // fail on the 2nd check (check_initialized inside get_current).
2441 noxu_dbi::set_cursor_fail_after(2);
2442 let key = DatabaseEntry::from_bytes(b"k");
2443 let mut data = DatabaseEntry::new();
2444 let result = db.get_into(None, &key, &mut data);
2445 noxu_dbi::clear_cursor_fail_flag();
2446 assert!(result.is_err());
2447 }
2448
2449 /// Covers the map_err closure on `cursor.put(...)` inside `put()`.
2450 #[test]
2451 fn test_put_map_err_via_hook() {
2452 let (_tmp, _env, db) = temp_env_and_db();
2453 noxu_dbi::set_cursor_fail_after(1);
2454 let key = DatabaseEntry::from_bytes(b"k");
2455 let val = DatabaseEntry::from_bytes(b"v");
2456 let result = db.put(&key, &val);
2457 noxu_dbi::clear_cursor_fail_flag();
2458 assert!(result.is_err());
2459 }
2460
2461 /// Covers the map_err closure on `cursor.put(...)` inside `put_no_overwrite()`.
2462 #[test]
2463 fn test_put_no_overwrite_map_err_via_hook() {
2464 let (_tmp, _env, db) = temp_env_and_db();
2465 noxu_dbi::set_cursor_fail_after(1);
2466 let key = DatabaseEntry::from_bytes(b"k");
2467 let val = DatabaseEntry::from_bytes(b"v");
2468 let result = db.put_no_overwrite(&key, &val);
2469 noxu_dbi::clear_cursor_fail_flag();
2470 assert!(result.is_err());
2471 }
2472
2473 /// Covers the map_err closure on `cursor.search(...)` inside `delete()`.
2474 #[test]
2475 fn test_delete_search_map_err_via_hook() {
2476 let (_tmp, _env, db) = temp_env_and_db();
2477 noxu_dbi::set_cursor_fail_after(1);
2478 let key = DatabaseEntry::from_bytes(b"k");
2479 let result = db.delete(&key);
2480 noxu_dbi::clear_cursor_fail_flag();
2481 assert!(result.is_err());
2482 }
2483
2484 /// Covers the map_err closure on `cursor.delete()` inside `delete()`.
2485 #[test]
2486 fn test_delete_delete_map_err_via_hook() {
2487 let (_tmp, _env, db) = temp_env_and_db();
2488 db.put(
2489 DatabaseEntry::from_bytes(b"k"),
2490 DatabaseEntry::from_bytes(b"v"),
2491 )
2492 .unwrap();
2493 // fail on the 2nd check_state (the delete() call, after search succeeds).
2494 noxu_dbi::set_cursor_fail_after(2);
2495 let key = DatabaseEntry::from_bytes(b"k");
2496 let result = db.delete(&key);
2497 noxu_dbi::clear_cursor_fail_flag();
2498 assert!(result.is_err());
2499 }
2500
2501 /// count() uses the O(1) AtomicU64 counter; cursor-fail hooks do not affect it.
2502 /// Verify the counter is correct across insert/update/delete.
2503 #[test]
2504 fn test_count_atomic_counter_insert_update_delete() {
2505 let (_tmp, _env, db) = temp_env_and_db();
2506
2507 // Empty database starts at 0.
2508 assert_eq!(db.count().unwrap(), 0);
2509
2510 // Insert three distinct keys.
2511 db.put(
2512 DatabaseEntry::from_bytes(b"a"),
2513 DatabaseEntry::from_bytes(b"1"),
2514 )
2515 .unwrap();
2516 db.put(
2517 DatabaseEntry::from_bytes(b"b"),
2518 DatabaseEntry::from_bytes(b"2"),
2519 )
2520 .unwrap();
2521 db.put(
2522 DatabaseEntry::from_bytes(b"c"),
2523 DatabaseEntry::from_bytes(b"3"),
2524 )
2525 .unwrap();
2526 assert_eq!(db.count().unwrap(), 3);
2527
2528 // Overwrite an existing key — count must NOT change.
2529 db.put(
2530 DatabaseEntry::from_bytes(b"a"),
2531 DatabaseEntry::from_bytes(b"updated"),
2532 )
2533 .unwrap();
2534 assert_eq!(db.count().unwrap(), 3);
2535
2536 // Delete one key — count decrements.
2537 db.delete(DatabaseEntry::from_bytes(b"b")).unwrap();
2538 assert_eq!(db.count().unwrap(), 2);
2539 }
2540
2541 /// count() is O(1): verify it still works even when the cursor fail-hook
2542 /// is active (the hook only affects cursor operations, not the atomic read).
2543 #[test]
2544 fn test_count_unaffected_by_cursor_fail_hook() {
2545 let (_tmp, _env, db) = temp_env_and_db();
2546 db.put(
2547 DatabaseEntry::from_bytes(b"k"),
2548 DatabaseEntry::from_bytes(b"v"),
2549 )
2550 .unwrap();
2551 noxu_dbi::set_cursor_fail_after(1);
2552 // count() must succeed (no cursor used).
2553 let result = db.count();
2554 noxu_dbi::clear_cursor_fail_flag();
2555 assert!(result.is_ok());
2556 assert_eq!(result.unwrap(), 1);
2557 }
2558
2559 /// Covers the map_err closure on `cursor.get_first()` inside `scan_all_kv()`.
2560 #[test]
2561 fn test_scan_all_kv_get_first_map_err_via_hook() {
2562 let (_tmp, _env, db) = temp_env_and_db();
2563 noxu_dbi::set_cursor_fail_after(1);
2564 let result = db.scan_all_kv();
2565 noxu_dbi::clear_cursor_fail_flag();
2566 assert!(result.is_err());
2567 }
2568
2569 /// Covers the map_err closure on `cursor.get_current()` inside `scan_all_kv()`.
2570 #[test]
2571 fn test_scan_all_kv_get_current_map_err_via_hook() {
2572 let (_tmp, _env, db) = temp_env_and_db();
2573 db.put(
2574 DatabaseEntry::from_bytes(b"k"),
2575 DatabaseEntry::from_bytes(b"v"),
2576 )
2577 .unwrap();
2578 // fail on the 2nd check (check_initialized inside get_current, after get_first succeeds).
2579 noxu_dbi::set_cursor_fail_after(2);
2580 let result = db.scan_all_kv();
2581 noxu_dbi::clear_cursor_fail_flag();
2582 assert!(result.is_err());
2583 }
2584
2585 /// Covers the map_err closure on `cursor.retrieve_next(...)` inside `scan_all_kv()`.
2586 #[test]
2587 fn test_scan_all_kv_retrieve_next_map_err_via_hook() {
2588 let (_tmp, _env, db) = temp_env_and_db();
2589 db.put(
2590 DatabaseEntry::from_bytes(b"k"),
2591 DatabaseEntry::from_bytes(b"v"),
2592 )
2593 .unwrap();
2594 // fail on the 3rd check (retrieve_next, after get_first and get_current succeed).
2595 noxu_dbi::set_cursor_fail_after(3);
2596 let result = db.scan_all_kv();
2597 noxu_dbi::clear_cursor_fail_flag();
2598 assert!(result.is_err());
2599 }
2600
2601 #[test]
2602 fn test_sync_on_open_database_succeeds() {
2603 let (_tmp, _env, db) = temp_env_and_db();
2604 db.put(
2605 DatabaseEntry::from_bytes(b"key"),
2606 DatabaseEntry::from_bytes(b"val"),
2607 )
2608 .unwrap();
2609 assert!(db.sync().is_ok());
2610 }
2611
2612 #[test]
2613 fn test_sync_on_closed_database_fails() {
2614 let (_tmp, _env, db) = temp_env_and_db();
2615 db.close().unwrap();
2616 assert!(db.sync().is_err());
2617 }
2618
2619 // ── verify ─────────────────────────────────────────────────────────────
2620
2621 #[test]
2622 fn test_verify_empty_database_passes() {
2623 use noxu_engine::VerifyConfig;
2624 let (_tmp, _env, db) = temp_env_and_db();
2625 let config = VerifyConfig::default();
2626 let result = db.verify(&config).unwrap();
2627 assert!(result.passed, "empty db should pass: {:?}", result.errors);
2628 }
2629
2630 #[test]
2631 fn test_verify_populated_database_passes() {
2632 use noxu_engine::VerifyConfig;
2633 let (_tmp, _env, db) = temp_env_and_db();
2634 for i in 0u32..20 {
2635 let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
2636 let v = DatabaseEntry::from_bytes(&(i * 2).to_be_bytes());
2637 db.put(&k, &v).unwrap();
2638 }
2639 let config = VerifyConfig::default();
2640 let result = db.verify(&config).unwrap();
2641 assert!(result.passed, "populated db should pass: {:?}", result.errors);
2642 assert!(result.records_verified > 0);
2643 }
2644
2645 #[test]
2646 fn test_verify_closed_database_fails() {
2647 use noxu_engine::VerifyConfig;
2648 let (_tmp, _env, db) = temp_env_and_db();
2649 db.close().unwrap();
2650 let config = VerifyConfig::default();
2651 assert!(db.verify(&config).is_err());
2652 }
2653
2654 // ── get_with_options / put_with_options ────────────────────────────────
2655
2656 #[test]
2657 fn test_get_with_options_default_reads_written_record() {
2658 use crate::read_options::ReadOptions;
2659 let (_tmp, _env, db) = temp_env_and_db();
2660 let key = DatabaseEntry::from_bytes(b"ropt_key");
2661 let val = DatabaseEntry::from_bytes(b"ropt_val");
2662 db.put(&key, &val).unwrap();
2663
2664 let opts = ReadOptions::new();
2665 let out = db.get_with_options(None, &key, &opts).unwrap();
2666 assert!(out.is_some());
2667 assert_eq!(out.unwrap().as_ref(), b"ropt_val");
2668 }
2669
2670 #[test]
2671 fn test_get_with_options_read_uncommitted_sees_written_record() {
2672 use crate::read_options::ReadOptions;
2673 let (_tmp, _env, db) = temp_env_and_db();
2674 let key = DatabaseEntry::from_bytes(b"ru_key");
2675 let val = DatabaseEntry::from_bytes(b"ru_val");
2676 db.put(&key, &val).unwrap();
2677
2678 let opts = ReadOptions::read_uncommitted();
2679 let out = db.get_with_options(None, &key, &opts).unwrap();
2680 assert!(out.is_some());
2681 assert_eq!(out.unwrap().as_ref(), b"ru_val");
2682 }
2683
2684 #[test]
2685 fn test_get_with_options_not_found() {
2686 use crate::read_options::ReadOptions;
2687 let (_tmp, _env, db) = temp_env_and_db();
2688 let key = DatabaseEntry::from_bytes(b"missing");
2689 let opts = ReadOptions::new();
2690 let out = db.get_with_options(None, &key, &opts).unwrap();
2691 assert!(out.is_none());
2692 }
2693
2694 #[test]
2695 fn test_put_with_options_no_ttl_behaves_like_put() {
2696 use crate::write_options::WriteOptions;
2697 let (_tmp, _env, db) = temp_env_and_db();
2698 let key = DatabaseEntry::from_bytes(b"wopt_key");
2699 let val = DatabaseEntry::from_bytes(b"wopt_val");
2700 let opts = WriteOptions::new();
2701 db.put_with_options(None, &key, &val, &opts).unwrap();
2702
2703 let mut out = DatabaseEntry::new();
2704 db.get_into(None, &key, &mut out).unwrap();
2705 assert_eq!(out.data_opt().unwrap(), b"wopt_val");
2706 }
2707
2708 #[test]
2709 fn test_put_with_options_with_ttl_stores_record() {
2710 use crate::write_options::WriteOptions;
2711 let (_tmp, _env, db) = temp_env_and_db();
2712 let key = DatabaseEntry::from_bytes(b"ttl_key");
2713 let val = DatabaseEntry::from_bytes(b"ttl_val");
2714 // TTL of 1 hour — the record is not yet expired so it should be readable
2715 let opts = WriteOptions::with_expiration(1);
2716 db.put_with_options(None, &key, &val, &opts).unwrap();
2717
2718 let mut out = DatabaseEntry::new();
2719 let read_status = db.get_into(None, &key, &mut out).unwrap();
2720 assert!(read_status);
2721 assert_eq!(out.data_opt().unwrap(), b"ttl_val");
2722 }
2723
2724 #[test]
2725 fn test_put_with_options_closed_db_fails() {
2726 use crate::write_options::WriteOptions;
2727 let (_tmp, _env, db) = temp_env_and_db();
2728 db.close().unwrap();
2729 let key = DatabaseEntry::from_bytes(b"k");
2730 let val = DatabaseEntry::from_bytes(b"v");
2731 let opts = WriteOptions::new();
2732 assert!(db.put_with_options(None, &key, &val, &opts).is_err());
2733 }
2734
2735 // ========================================================================
2736 // Audit database F11 — Wave 2C-4: reject None-data keys on writes.
2737 // ========================================================================
2738
2739 // 7.0 NOTE: the three `*_with_none_key_returns_illegal_argument` tests
2740 // were removed in the 7.0 API reshape (review P1-3). The write surface
2741 // now takes `key: impl AsRef<[u8]>`, so a key is *always* a byte slice;
2742 // the historical "None key" (a `DatabaseEntry` with no data set, distinct
2743 // from an empty `b""`) can no longer be expressed at the call site.
2744 // An empty key is accepted on writes — see
2745 // `test_put_with_explicit_empty_key_accepted`, which the reshape kept as
2746 // the canonical behaviour. The removed tests asserted a None-vs-empty
2747 // distinction that the new signature intentionally eliminates.
2748
2749 /// Explicit `Some(&[])` empty key is still accepted on writes.
2750 #[test]
2751 fn test_put_with_explicit_empty_key_accepted() {
2752 let (_tmp, _env, db) = temp_env_and_db();
2753 let empty_key = DatabaseEntry::from_bytes(b"");
2754 let val = DatabaseEntry::from_bytes(b"v");
2755 db.put(&empty_key, &val).unwrap();
2756 }
2757
2758 // ── X-13: env-invalidity checks propagate through check_open ──────────────
2759
2760 /// X-13: after the `io_invalid` flag is set, `db.get` must return
2761 /// `EnvironmentFailure` rather than silently reading stale BIN data.
2762 #[test]
2763 fn test_x13_io_invalid_blocks_db_get() {
2764 use std::sync::atomic::Ordering;
2765 let (_tmp, env, db) = temp_env_and_db();
2766
2767 // Write a record so there is something to read.
2768 let key = DatabaseEntry::from_bytes(b"k");
2769 let val = DatabaseEntry::from_bytes(b"v");
2770 db.put(&key, &val).unwrap();
2771
2772 // Flip io_invalid via the cached LogManager.
2773 let lm = db.log_manager.as_ref().expect("WAL env must have LogManager");
2774 lm.io_invalid.store(true, Ordering::Release);
2775
2776 // db.get must now fail.
2777 let mut out = DatabaseEntry::new();
2778 let result = db.get_into(None, &key, &mut out);
2779 assert!(
2780 matches!(result, Err(NoxuError::EnvironmentFailure { .. })),
2781 "expected EnvironmentFailure, got {result:?}"
2782 );
2783
2784 // db.put must also fail.
2785 let result2 = db.put(&key, &val);
2786 assert!(
2787 matches!(result2, Err(NoxuError::EnvironmentFailure { .. })),
2788 "expected EnvironmentFailure on put, got {result2:?}"
2789 );
2790
2791 // Restore flag so env closes cleanly.
2792 lm.io_invalid.store(false, Ordering::Release);
2793 drop(env);
2794 }
2795
2796 /// X-13: after `EnvironmentImpl::invalidate()`, cursor `get_first`
2797 /// must return `EnvironmentFailure`.
2798 #[test]
2799 fn test_x13_env_invalid_blocks_cursor_get() {
2800 use std::sync::atomic::Ordering;
2801 let (_tmp, env, db) = temp_env_and_db();
2802
2803 // Insert a record.
2804 let key = DatabaseEntry::from_bytes(b"ck");
2805 let val = DatabaseEntry::from_bytes(b"cv");
2806 db.put(&key, &val).unwrap();
2807
2808 // Open a cursor BEFORE invalidating.
2809 let mut cursor = db.open_cursor(None).unwrap();
2810
2811 // Now directly flip the env_invalid flag.
2812 db.env_invalid.store(true, Ordering::Release);
2813
2814 // The cursor's check_state should detect the flag.
2815 let mut key = DatabaseEntry::new();
2816 let mut out = DatabaseEntry::new();
2817 let result =
2818 cursor.get(&mut key, &mut out, crate::get::Get::First, None);
2819 assert!(
2820 matches!(result, Err(NoxuError::EnvironmentFailure { .. })),
2821 "expected EnvironmentFailure from cursor, got {result:?}"
2822 );
2823
2824 // Restore so env drops cleanly.
2825 db.env_invalid.store(false, Ordering::Release);
2826 drop(env);
2827 }
2828}