Skip to main content

noxu_db/
environment.rs

1//! Environment handle.
2//!
3
4use crate::checkpoint_config::CheckpointConfig;
5use crate::database::Database;
6use crate::database_config::DatabaseConfig;
7use crate::environment_config::EnvironmentConfig;
8use crate::environment_mutable_config::EnvironmentMutableConfig;
9use crate::error::{NoxuError, Result};
10use crate::transaction::Transaction;
11use crate::transaction_config::TransactionConfig;
12use hashbrown::HashMap;
13use noxu_dbi::{DbiEnvConfig, EnvironmentImpl};
14use noxu_engine::EnvironmentStats;
15use noxu_engine::env_stats::{
16    EvictorStatsSnapshot, LockStatsSnapshot, LogStatsSnapshot, TxnStatsSnapshot,
17};
18use noxu_log::LogManager;
19use noxu_sync::Mutex;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
23use std::time::Instant;
24
25/// Build an [`EnvironmentStats`] snapshot from a locked [`EnvironmentImpl`].
26///
27/// Shared by [`Environment::stats`] and the periodic stats-file dumper
28/// ([`crate::stats_file`]) so both produce byte-identical snapshots.  The
29/// caller holds the `EnvironmentImpl` lock; `log_manager` is the cached handle
30/// (avoids a second lock on the stats hot path).
31pub(crate) fn build_environment_stats(
32    env_impl: &EnvironmentImpl,
33    log_manager: Option<&LogManager>,
34    cache_size: u64,
35) -> EnvironmentStats {
36    let n_databases = env_impl.n_databases() as u32;
37    let log = log_manager
38        .map(|lm| LogStatsSnapshot::from(&lm.get_stats()))
39        .unwrap_or_default();
40    let lock =
41        LockStatsSnapshot::from(&env_impl.get_lock_manager().get_stats());
42    let txn = TxnStatsSnapshot::from(&env_impl.get_txn_manager().get_stats());
43    let throughput = env_impl.get_throughput_snapshot();
44    let evictor =
45        EvictorStatsSnapshot::from(env_impl.get_evictor().get_stats());
46    let cleaner = env_impl
47        .get_cleaner()
48        .map(|c| c.get_stats().snapshot())
49        .unwrap_or_default();
50    let checkpoint = env_impl
51        .get_checkpointer()
52        .map(|cp| cp.get_stats().snapshot())
53        .unwrap_or_default();
54    EnvironmentStats {
55        cache_size,
56        cache_usage: env_impl.get_cache_usage().max(0) as u64,
57        n_databases,
58        log,
59        lock,
60        txn,
61        throughput,
62        evictor,
63        cleaner,
64        checkpoint,
65    }
66}
67
68/// A database environment.
69///
70///
71///
72/// An Environment provides support for caching, locking, logging, and
73/// transactions. It is the top-level handle through which databases are
74/// opened and transactions are started.
75///
76/// # Example
77/// ```ignore
78/// use noxu_db::{Environment, EnvironmentConfig};
79/// use std::path::PathBuf;
80///
81/// let config = EnvironmentConfig::new(PathBuf::from("/tmp/mydb"))
82///     .allow_create(true)
83///     .transactional(true);
84/// let env = Environment::open(config).unwrap();
85/// env.close().unwrap();
86/// ```
87pub struct Environment {
88    /// Home directory path
89    home: PathBuf,
90    /// Configuration used to open this environment
91    config: EnvironmentConfig,
92    /// Open databases by name (tracks which names are currently open via this handle)
93    databases: Mutex<HashMap<String, Arc<DatabaseHandle>>>,
94    /// Active transactions registry, shared with `Transaction` so that
95    /// `Transaction::commit()` / `Transaction::abort()` can prune their
96    /// own entry on completion (F1: `mark_transaction_complete` was dead
97    /// code, so `env.close()` after `txn.commit()` always failed).
98    active_txns: Arc<ActiveTxns>,
99    /// Next transaction ID
100    next_txn_id: AtomicU64,
101    /// Whether the environment is open
102    open: AtomicBool,
103    /// Whether the environment is valid (not invalidated by a fatal error).
104    ///
105    /// Mirrors `EnvironmentImpl.isValid()` / `envInvalid` AtomicBoolean.
106    /// Set to `false` when an `EnvironmentFailure` with `invalidates_environment() == true`
107    /// is returned; all subsequent API calls check this and return `EnvironmentFailure`.
108    env_valid: AtomicBool,
109    /// The real internal environment implementation (B-tree backed).
110    env_impl: Arc<Mutex<EnvironmentImpl>>,
111    /// Cached log manager — acquired once at open; None for non-transactional envs.
112    /// Used by stat_fsync_count() to avoid env_impl.lock() on the stats hot path.
113    log_manager: Option<Arc<LogManager>>,
114    /// Bookkeeping for `Environment::checkpoint(CheckpointConfig)` so that
115    /// `force` / `k_bytes` / `minutes` can gate whether the call actually
116    /// runs a checkpoint.
117    last_checkpoint_time: Mutex<Option<Instant>>,
118    last_checkpoint_end_lsn: Mutex<noxu_util::Lsn>,
119    /// Optional replica-ack coordinator (typically a
120    /// `noxu_rep::ReplicatedEnvironment`).  When set via
121    /// [`Environment::set_replica_coordinator`], every new
122    /// `Transaction` is wired to the coordinator and its
123    /// `commit_with_durability` blocks until the configured
124    /// `ReplicaAckPolicy` is satisfied (or the configured timeout
125    /// elapses, in which case `NoxuError::InsufficientReplicas` is
126    /// returned).  Closes finding F1 of
127    /// the 2026 review.
128    replica_coordinator: Mutex<Option<noxu_dbi::SharedReplicaAckCoordinator>>,
129    /// Per-commit timeout for replica acknowledgments.  Mirrors
130    /// `noxu_rep::RepConfig::replica_ack_timeout`; defaults to 5s.
131    replica_ack_timeout: Mutex<std::time::Duration>,
132    /// Background stats-file dumper (STATS_FILE_*), started at open when
133    /// `stats_collect` is enabled and a stats-file destination is configured.
134    /// `None` when stats collection is off.  Stopped (joined) at `close`.
135    stats_dumper: Mutex<Option<crate::stats_file::StatsFileDumper>>,
136
137    /// Background B-tree verifier daemon (`VERIFY_SCHEDULE` /
138    /// `ENV_RUN_VERIFIER`).  `Some` only when `run_verifier` is true AND a
139    /// non-empty, well-formed `verify_schedule` was supplied; `None`
140    /// otherwise (the default — no daemon, unchanged behaviour).  Stopped
141    /// (joined) at `close`.
142    verify_daemon: Mutex<Option<crate::verify_daemon::VerifyDaemon>>,
143}
144
145/// Internal database handle state.
146struct DatabaseHandle {
147    #[expect(dead_code)]
148    name: String,
149    #[expect(dead_code)]
150    id: u64,
151    #[expect(dead_code)]
152    config: DatabaseConfig,
153    /// Shared open flag — same `Arc<AtomicBool>` as `Database.open` so that
154    /// `Database::close()` setting the flag to false also marks this handle
155    /// as closed, letting `Environment::close()` succeed.
156    open: Arc<AtomicBool>,
157}
158
159/// Internal transaction state.
160struct TransactionState {
161    #[expect(dead_code)]
162    id: u64,
163    #[expect(dead_code)]
164    config: TransactionConfig,
165    #[expect(dead_code)]
166    committed: AtomicBool,
167    #[expect(dead_code)]
168    aborted: AtomicBool,
169}
170
171/// Shared registry of active transactions, owned by `Environment` and
172/// referenced (via `Arc`) by every `Transaction` so that `commit()` /
173/// `abort()` can prune their own entry without a callback into
174/// `Environment` itself.
175///
176/// Resolves F1 of the May 2026 API audit: `Environment::active_txns` was
177/// previously a private `Mutex<HashMap>` that no `Transaction` could see,
178/// so `mark_transaction_complete` was dead code and `env.close()` after a
179/// commit always returned `OperationNotAllowed`.
180pub(crate) struct ActiveTxns {
181    txns: Mutex<HashMap<u64, Arc<TransactionState>>>,
182}
183
184impl ActiveTxns {
185    fn new() -> Self {
186        Self { txns: Mutex::new(HashMap::new()) }
187    }
188
189    #[allow(dead_code)] // convenience wrapper; the hot path locks `txns` directly
190    fn insert(&self, id: u64, state: Arc<TransactionState>) {
191        self.txns.lock().insert(id, state);
192    }
193
194    /// Removes the entry for the given transaction id.
195    ///
196    /// Called by `Transaction::commit_with_durability` and `Transaction::abort`
197    /// once the transaction has reached a terminal state.
198    pub(crate) fn mark_complete(&self, id: u64) {
199        self.txns.lock().remove(&id);
200    }
201
202    fn len(&self) -> usize {
203        self.txns.lock().len()
204    }
205
206    fn is_empty(&self) -> bool {
207        self.txns.lock().is_empty()
208    }
209}
210
211impl Environment {
212    /// Opens or creates a database environment.
213    ///
214    /// Constructor.
215    ///
216    /// # Arguments
217    /// * `config` - The environment configuration
218    ///
219    /// # Returns
220    /// The opened environment handle
221    ///
222    /// # Errors
223    /// Returns an error if:
224    /// - The environment directory does not exist and `allow_create` is false
225    /// - The environment directory exists but is not writable and `read_only` is false
226    /// - Invalid configuration parameters are provided
227    pub fn open(config: EnvironmentConfig) -> Result<Self> {
228        let open_start = std::time::Instant::now();
229        let home = config.home.clone();
230
231        // Validate home directory
232        if !home.exists() {
233            if config.allow_create {
234                std::fs::create_dir_all(&home).map_err(|e| {
235                    NoxuError::environment(format!(
236                        "Failed to create environment directory {:?}: {}",
237                        home, e
238                    ))
239                })?;
240            } else {
241                return Err(NoxuError::environment(format!(
242                    "Environment directory {:?} does not exist and allow_create is false",
243                    home
244                )));
245            }
246        }
247
248        if !home.is_dir() {
249            return Err(NoxuError::environment(format!(
250                "Environment home {:?} is not a directory",
251                home
252            )));
253        }
254
255        // Check write permissions if not read-only
256        if !config.read_only {
257            // Test write access by creating a temp file
258            let test_file = home.join(".noxu_write_test");
259            std::fs::write(&test_file, b"test").map_err(|e| {
260                NoxuError::environment(format!(
261                    "Environment directory {:?} is not writable: {}",
262                    home, e
263                ))
264            })?;
265            let _ = std::fs::remove_file(&test_file);
266        }
267
268        // Warn about any unimplemented config parameters that have been set
269        // to non-default values (re-audit JE F-1: no silently-ignored params).
270        crate::unimplemented_params::warn_unimplemented_params(&config);
271
272        // Translate EnvironmentConfig into DbiEnvConfig (the noxu-dbi struct)
273        // to avoid a circular dependency between the two crates.
274        let buf_size = if config.log_buffer_size > 0 {
275            config.log_buffer_size
276        } else {
277            (config.log_total_buffer_bytes as usize)
278                .checked_div(config.log_num_buffers)
279                .unwrap_or(1024 * 1024)
280        };
281        let dbi_cfg = DbiEnvConfig {
282            // Core
283            read_only: config.read_only,
284            transactional: config.transactional,
285            env_is_locking: config.env_is_locking,
286            env_recovery_force_checkpoint: config.env_recovery_force_checkpoint,
287            env_recovery_force_checkpoint_field: config
288                .env_recovery_force_checkpoint,
289            env_recovery_force_new_file: config.env_recovery_force_new_file,
290            halt_on_commit_after_checksum_exception: config
291                .halt_on_commit_after_checksum_exception,
292            env_check_leaks: config.env_check_leaks,
293            env_forced_yield: config.env_forced_yield,
294            env_fair_latches: config.env_fair_latches,
295            env_latch_timeout_ms: config.env_latch_timeout_ms,
296            env_ttl_clock_tolerance_ms: config.env_ttl_clock_tolerance_ms,
297            env_expiration_enabled: config.env_expiration_enabled,
298            dos_producer_queue_timeout_ms: config.dos_producer_queue_timeout_ms,
299            env_db_eviction: config.env_db_eviction,
300            // Memory
301            cache_size: config.cache_size,
302            cache_percent: config.cache_percent,
303            shared_cache: config.shared_cache,
304            max_off_heap_memory: config.max_off_heap_memory,
305            max_disk: config.max_disk,
306            free_disk: config.free_disk,
307            reserved_disk: config.reserved_disk,
308            // Log
309            log_file_max_bytes: config.log_file_max_bytes,
310            log_file_cache_size: config.log_file_cache_size,
311            log_checksum_read: config.log_checksum_read,
312            log_verify_checksums: config.log_verify_checksums,
313            log_fsync_timeout_ms: config.log_fsync_timeout_ms,
314            log_fsync_time_limit_ms: config.log_fsync_time_limit_ms,
315            log_num_buffers: config.log_num_buffers,
316            log_buffer_size: buf_size,
317            log_fault_read_size: config.log_fault_read_size,
318            log_iterator_read_size: config.log_iterator_read_size,
319            log_iterator_max_size: config.log_iterator_max_size,
320            log_n_data_directories: config.log_n_data_directories,
321            log_mem_only: config.log_mem_only,
322            log_detect_file_delete: config.log_detect_file_delete,
323            log_detect_file_delete_interval_ms: config
324                .log_detect_file_delete_interval_ms,
325            log_flush_sync_interval_ms: config.log_flush_sync_interval_ms,
326            log_flush_no_sync_interval_ms: config.log_flush_no_sync_interval_ms,
327            log_use_odsync: config.log_use_odsync,
328            log_use_write_queue: config.log_use_write_queue,
329            log_write_queue_size: config.log_write_queue_size,
330            log_group_commit_threshold: config.log_group_commit_threshold,
331            log_group_commit_interval_ms: config.log_group_commit_interval_ms,
332            // B-tree
333            node_max_entries: config.node_max_entries,
334            node_dup_tree_max_entries: config.node_dup_tree_max_entries,
335            tree_max_embedded_ln: config.tree_max_embedded_ln,
336            tree_max_delta: config.tree_max_delta,
337            tree_bin_delta: config.tree_bin_delta,
338            tree_bin_delta_percent: config.tree_bin_delta_percent,
339            tree_min_memory: config.tree_min_memory,
340            tree_compact_max_key_length: config.tree_compact_max_key_length,
341            // INCompressor
342            run_in_compressor: config.run_in_compressor,
343            in_compressor_wakeup_interval_ms: config
344                .in_compressor_wakeup_interval_ms,
345            compressor_deadlock_retry: config.compressor_deadlock_retry,
346            compressor_lock_timeout_ms: config.compressor_lock_timeout_ms,
347            compressor_purge_root: config.compressor_purge_root,
348            // Cleaner
349            run_cleaner: config.run_cleaner,
350            cleaner_min_utilization: config.cleaner_min_utilization,
351
352            cleaner_two_pass_gap: config.cleaner_two_pass_gap,
353
354            cleaner_two_pass_threshold: config.cleaner_two_pass_threshold,
355            cleaner_min_file_utilization: config.cleaner_min_file_utilization,
356            cleaner_threads: config.cleaner_threads,
357            cleaner_min_file_count: config.cleaner_min_file_count,
358            cleaner_min_age: config.cleaner_min_age,
359            cleaner_bytes_interval: config.cleaner_bytes_interval,
360            cleaner_wakeup_interval_ms: config.cleaner_wakeup_interval_ms,
361            cleaner_fetch_obsolete_size: config.cleaner_fetch_obsolete_size,
362            cleaner_adjust_utilization: config.cleaner_adjust_utilization,
363            cleaner_deadlock_retry: config.cleaner_deadlock_retry,
364            cleaner_lock_timeout_ms: config.cleaner_lock_timeout_ms,
365            cleaner_expunge: config.cleaner_expunge,
366            cleaner_use_deleted_dir: config.cleaner_use_deleted_dir,
367            cleaner_max_batch_files: config.cleaner_max_batch_files,
368            cleaner_read_size: config.cleaner_read_size,
369            cleaner_detail_max_memory_percentage: config
370                .cleaner_detail_max_memory_percentage,
371            cleaner_look_ahead_cache_size: config.cleaner_look_ahead_cache_size,
372            cleaner_foreground_proactive_migration: config
373                .cleaner_foreground_proactive_migration,
374            cleaner_background_proactive_migration: config
375                .cleaner_background_proactive_migration,
376            cleaner_lazy_migration: config.cleaner_lazy_migration,
377            cleaner_expiration_enabled: config.cleaner_expiration_enabled,
378            // Checkpointer
379            run_checkpointer: config.run_checkpointer,
380            checkpointer_bytes_interval: config.checkpointer_bytes_interval,
381            checkpointer_wakeup_interval_ms: config
382                .checkpointer_wakeup_interval_ms,
383            checkpointer_deadlock_retry: config.checkpointer_deadlock_retry,
384            checkpointer_high_priority: config.checkpointer_high_priority,
385            // Evictor
386            run_evictor: config.run_evictor,
387            evictor_nodes_per_scan: config.evictor_nodes_per_scan,
388            evictor_algorithm: config.evictor_algorithm.clone(),
389            evictor_evict_bytes: config.evictor_evict_bytes,
390            evictor_critical_percentage: config.evictor_critical_percentage,
391            evictor_lru_only: config.evictor_lru_only,
392            evictor_use_dirty_lru: config.evictor_use_dirty_lru,
393            evictor_mutate_bins: config.evictor_mutate_bins,
394            evictor_n_lru_lists: config.evictor_n_lru_lists,
395            evictor_deadlock_retry: config.evictor_deadlock_retry,
396            evictor_core_threads: config.evictor_core_threads,
397            evictor_max_threads: config.evictor_max_threads,
398            evictor_keep_alive_ms: config.evictor_keep_alive_ms,
399            evictor_allow_bin_deltas: config.evictor_allow_bin_deltas,
400            // Off-heap evictor
401            run_offheap_evictor: config.run_offheap_evictor,
402            offheap_evict_bytes: config.offheap_evict_bytes,
403            offheap_n_lru_lists: config.offheap_n_lru_lists,
404            offheap_checksum: config.offheap_checksum,
405            offheap_core_threads: config.offheap_core_threads,
406            offheap_max_threads: config.offheap_max_threads,
407            offheap_keep_alive_ms: config.offheap_keep_alive_ms,
408            // Locking
409            lock_timeout_ms: config.lock_timeout_ms,
410            lock_deadlock_detect: config.lock_deadlock_detect,
411            lock_deadlock_detect_delay_ms: config.lock_deadlock_detect_delay_ms,
412            n_lock_tables: config.lock_n_lock_tables as usize,
413            // Transactions
414            txn_timeout_ms: config.txn_timeout_ms,
415            txn_serializable_isolation: config.txn_serializable_isolation,
416            txn_deadlock_stack_trace: config.txn_deadlock_stack_trace,
417            txn_dump_locks: config.txn_dump_locks,
418            // Verifier
419            run_verifier: config.run_verifier,
420            verify_log: config.verify_log,
421            verify_log_read_delay_ms: config.verify_log_read_delay_ms,
422            verify_btree: config.verify_btree,
423            verify_secondaries: config.verify_secondaries,
424            verify_data_records: config.verify_data_records,
425            verify_obsolete_records: config.verify_obsolete_records,
426            verify_btree_batch_size: config.verify_btree_batch_size,
427            verify_btree_batch_delay_ms: config.verify_btree_batch_delay_ms,
428            // Stats
429            stats_collect: config.stats_collect,
430            stats_collect_interval_secs: config.stats_collect_interval_secs,
431            // Background rate limits
432            env_background_read_limit_kb: config.env_background_read_limit_kb,
433            env_background_write_limit_kb: config.env_background_write_limit_kb,
434            env_background_sleep_interval_us: config
435                .env_background_sleep_interval_us,
436        };
437        let env_impl = EnvironmentImpl::from_dbi_config(home.clone(), &dbi_cfg)
438            .map_err(|e| NoxuError::environment(e.to_string()))?;
439
440        // exception_listener (JE ExceptionListener): if the application
441        // registered a listener, install a daemon exception sink that adapts
442        // the daemon's (source, message) callback into the public
443        // `ExceptionEvent`.  Installed here — before any daemon performs work
444        // (each daemon sleeps its wakeup interval first) — so no async daemon
445        // failure is missed.  Without a listener the sink is never installed
446        // and dispatch is a no-op.
447        if let Some(listener) = config.exception_listener() {
448            env_impl.set_exception_sink(std::sync::Arc::new(
449                move |source: &str, message: &str| {
450                    let src = match source {
451                        "Checkpointer" => {
452                            crate::error::ExceptionSource::Checkpointer
453                        }
454                        "Cleaner" => crate::error::ExceptionSource::Cleaner,
455                        "Evictor" => crate::error::ExceptionSource::Evictor,
456                        "INCompressor" => {
457                            crate::error::ExceptionSource::INCompressor
458                        }
459                        "Verifier" => crate::error::ExceptionSource::Verifier,
460                        other => crate::error::ExceptionSource::Unknown(
461                            other.to_string(),
462                        ),
463                    };
464                    let thread_name = std::thread::current()
465                        .name()
466                        .unwrap_or("<unnamed>")
467                        .to_string();
468                    let event = crate::error::ExceptionEvent::new(
469                        message.to_string(),
470                        src,
471                        thread_name,
472                    );
473                    listener.exception_event(&event);
474                },
475            ));
476        }
477
478        let log_manager = env_impl.get_log_manager();
479        let env_impl_arc = Arc::new(Mutex::new(env_impl));
480
481        // STATS_FILE_*: start the periodic stats-file dumper (JE StatCapture)
482        // when stats collection is enabled.  The dumper samples the same
483        // snapshot `stats()` returns and appends a rotating CSV to
484        // `stats_file_directory` (default: the env home).  Off by default
485        // (stats_collect = false).
486        let stats_dumper = if config.stats_collect {
487            let dir = config
488                .stats_file_directory
489                .clone()
490                .unwrap_or_else(|| home.clone());
491            let interval = std::time::Duration::from_secs(
492                config.stats_collect_interval_secs.max(1),
493            );
494            Some(crate::stats_file::StatsFileDumper::start(
495                Arc::clone(&env_impl_arc),
496                log_manager.clone(),
497                config.cache_size,
498                dir,
499                interval,
500                config.stats_file_row_count,
501                config.stats_max_files,
502            ))
503        } else {
504            None
505        };
506
507        // VERIFY_SCHEDULE / ENV_RUN_VERIFIER: start the background B-tree
508        // verifier daemon ONLY when the operator explicitly enabled it
509        // (run_verifier = true, default false) AND supplied a non-empty,
510        // well-formed cron schedule.  The default (run_verifier = false)
511        // spawns nothing, so behaviour is unchanged.  The daemon re-uses the
512        // same public verification walk `Environment::verify` runs (it does
513        // NOT touch verify's internals) on the shared EnvironmentImpl.  It is
514        // env-owned (not registered with the engine DaemonManager) so it
515        // cannot perturb the daemon-manager shutdown ordering.
516        // JE ref: EnvironmentParams.ENV_RUN_VERIFIER / VERIFY_SCHEDULE,
517        // com.sleepycat.je.dbi.DataVerifier.
518        let verify_daemon = if config.run_verifier
519            && !config.verify_schedule.is_empty()
520        {
521            match crate::verify_daemon::CronSchedule::parse(
522                &config.verify_schedule,
523            ) {
524                Some(schedule) => {
525                    let vconfig = noxu_engine::VerifyConfig::new()
526                        .with_btree_verification(true);
527                    Some(crate::verify_daemon::VerifyDaemon::start(
528                        Arc::clone(&env_impl_arc),
529                        schedule,
530                        vconfig,
531                    ))
532                }
533                None => {
534                    log::warn!(
535                        "verify_schedule={:?} is not a valid 5-field cron \
536                         expression; the background verifier will NOT run",
537                        config.verify_schedule,
538                    );
539                    None
540                }
541            }
542        } else {
543            None
544        };
545
546        let env = Environment {
547            home,
548            config,
549            databases: Mutex::new(HashMap::new()),
550            active_txns: Arc::new(ActiveTxns::new()),
551            next_txn_id: AtomicU64::new(1),
552            open: AtomicBool::new(true),
553            env_valid: AtomicBool::new(true),
554            env_impl: env_impl_arc,
555            log_manager,
556            last_checkpoint_time: Mutex::new(None),
557            last_checkpoint_end_lsn: Mutex::new(noxu_util::NULL_LSN),
558            replica_coordinator: Mutex::new(None),
559            replica_ack_timeout: Mutex::new(std::time::Duration::from_secs(5)),
560            stats_dumper: Mutex::new(stats_dumper),
561            verify_daemon: Mutex::new(verify_daemon),
562        };
563
564        // STARTUP_DUMP_THRESHOLD: if opening the environment (dominated by
565        // crash-recovery: the analysis + redo + undo passes) took longer than
566        // the configured threshold, log a one-line startup performance summary
567        // plus a stats snapshot so operators can see WHY a slow start happened.
568        // A threshold of 0 (the default) disables the dump.
569        // JE ref: EnvironmentParams.STARTUP_DUMP_THRESHOLD / EnvironmentImpl
570        // dumping the startup RecoveryInfo + EnvironmentStats when startup is
571        // slow.
572        let threshold_ms = env.config.startup_dump_threshold_ms;
573        if threshold_ms > 0 {
574            let elapsed_ms = open_start.elapsed().as_millis() as u64;
575            if Self::startup_dump_triggered(elapsed_ms, threshold_ms) {
576                match env.stats() {
577                    Ok(stats) => log::warn!(
578                        "startup dump: Environment::open took {elapsed_ms} ms \
579                         (threshold {threshold_ms} ms). Startup is dominated by \
580                         crash recovery. Stats snapshot after open: {stats:?}"
581                    ),
582                    Err(e) => log::warn!(
583                        "startup dump: Environment::open took {elapsed_ms} ms \
584                         (threshold {threshold_ms} ms); stats unavailable: {e}"
585                    ),
586                }
587            }
588        }
589
590        Ok(env)
591    }
592
593    /// STARTUP_DUMP_THRESHOLD predicate: returns `true` when the measured
594    /// `Environment::open` elapsed time (ms) meets or exceeds a non-zero
595    /// `threshold_ms`.  Extracted so the decision is unit-testable without a
596    /// log-capture harness.  A threshold of 0 disables the dump.
597    #[inline]
598    fn startup_dump_triggered(elapsed_ms: u64, threshold_ms: u64) -> bool {
599        threshold_ms > 0 && elapsed_ms >= threshold_ms
600    }
601
602    /// Closes the environment handle.
603    ///
604    /// # Errors
605    /// Returns an error if:
606    /// - The environment is already closed
607    /// - There are open database handles
608    /// - There are active transactions
609    pub fn close(&self) -> Result<()> {
610        if !self.open.load(Ordering::Acquire) {
611            return Err(NoxuError::EnvironmentClosed);
612        }
613
614        // Check for open databases
615        let databases = self.databases.lock();
616        let open_dbs: Vec<String> = databases
617            .iter()
618            .filter(|(_, db)| db.open.load(Ordering::Acquire))
619            .map(|(name, _)| name.clone())
620            .collect();
621
622        if !open_dbs.is_empty() {
623            return Err(NoxuError::OperationNotAllowed(format!(
624                "Cannot close environment with open database handles: {:?}",
625                open_dbs
626            )));
627        }
628
629        // Check for active transactions
630        if !self.active_txns.is_empty() {
631            return Err(NoxuError::OperationNotAllowed(format!(
632                "Cannot close environment with {} active transactions",
633                self.active_txns.len()
634            )));
635        }
636
637        self.open.store(false, Ordering::Release);
638        // Stop the stats-file dumper (join its thread) before tearing down the
639        // engine, so it cannot sample a closing EnvironmentImpl.
640        if let Some(dumper) = self.stats_dumper.lock().take() {
641            dumper.stop();
642        }
643        // Stop the background verifier daemon (join its thread) before tearing
644        // down the engine, so it cannot walk a closing EnvironmentImpl.
645        if let Some(verifier) = self.verify_daemon.lock().take() {
646            verifier.stop();
647        }
648        let env_impl = self.env_impl.lock();
649        // env_check_leaks (default true): at a clean close all user
650        // transactions have released their locks; any lock still held with an
651        // owner indicates an application leak (a dropped Transaction, a cursor
652        // held open, ...).  Report it (JE EnvironmentImpl leak checking).  This
653        // is diagnostic only — it warns, it does not fail the close.
654        if self.config.env_check_leaks {
655            let leaks = env_impl.get_lock_manager().report_leaked_locks();
656            if !leaks.is_empty() {
657                let total_locks: usize =
658                    leaks.iter().map(|(_, owners)| owners.len()).sum();
659                log::warn!(
660                    "env_check_leaks: {} lock(s) still held by {} locker(s) at \
661                     Environment::close — an application likely leaked a \
662                     transaction or cursor. Leaked (lsn, owners): {:?}",
663                    leaks.len(),
664                    total_locks,
665                    leaks,
666                );
667            }
668        }
669        let _ = env_impl.close();
670        Ok(())
671    }
672
673    /// Opens or creates a database.
674    ///
675    ///
676    ///
677    /// # Arguments
678    /// * `txn` - Optional transaction handle (currently ignored)
679    /// * `name` - Database name
680    /// * `config` - Database configuration
681    ///
682    /// # Returns
683    /// The opened database handle
684    ///
685    /// # Errors
686    /// Returns an error if:
687    /// - The environment is closed
688    /// - The database name is invalid
689    /// - The database does not exist and `allow_create` is false
690    /// - A handle for `name` is already open in this `Environment`
691    ///   (`DatabaseAlreadyExists`)
692    pub fn open_database(
693        &self,
694        txn: Option<&Transaction>,
695        name: &str,
696        config: &DatabaseConfig,
697    ) -> Result<Database> {
698        self.check_open()?;
699
700        // Audit transaction-env F5 (Wave 2C-4): on a read-only env,
701        // open_database must not create new databases nor open existing
702        // ones in writable mode.  Pre-fix the request silently created an
703        // in-memory-only database (no WAL backing) which violated the
704        // "no write operations" guarantee in the user-facing docs.
705        if self.config.read_only {
706            if config.allow_create {
707                return Err(NoxuError::OperationNotAllowed(
708                    "open_database: cannot create a database on a read-only \
709                     environment (DatabaseConfig::with_allow_create(true))"
710                        .to_string(),
711                ));
712            }
713            if !config.read_only {
714                return Err(NoxuError::OperationNotAllowed(
715                    "open_database: read-only environment requires the \
716                     database to be opened read-only \
717                     (DatabaseConfig::with_read_only(true))"
718                        .to_string(),
719                ));
720            }
721        }
722
723        if name.is_empty() {
724            return Err(NoxuError::IllegalArgument(
725                "Database name cannot be empty".to_string(),
726            ));
727        }
728
729        let mut databases = self.databases.lock();
730
731        // Check if database is already open via this environment handle
732        if let Some(db_handle) = databases.get(name)
733            && db_handle.open.load(Ordering::Acquire)
734        {
735            return Err(NoxuError::DatabaseAlreadyExists(format!(
736                "Database '{}' is already open",
737                name
738            )));
739        }
740
741        // Build the noxu-dbi config from noxu-db config
742        let mut dbi_config = noxu_dbi::DatabaseConfig::new();
743        dbi_config.set_allow_create(config.allow_create);
744        dbi_config.set_sorted_duplicates(config.sorted_duplicates);
745        dbi_config.set_read_only(config.read_only);
746        dbi_config.set_temporary(config.temporary);
747        dbi_config.set_transactional(config.transactional);
748        dbi_config.deferred_write = config.deferred_write;
749        // Audit database F7 (Wave 2C-4): plumb key_prefixing through;
750        // pre-fix the outer flag was silently dropped on the floor.
751        dbi_config.set_key_prefixing(config.key_prefixing);
752        // DBI-14: thread user comparators (identity + closure) through to
753        // DatabaseImpl so the tree honours them on every key comparison.
754        dbi_config.btree_comparator =
755            config.btree_comparator.as_ref().map(|c| {
756                noxu_dbi::ConfigComparator {
757                    identity: c.identity().to_string(),
758                    func: c.func(),
759                }
760            });
761        dbi_config.duplicate_comparator = config
762            .duplicate_comparator
763            .as_ref()
764            .map(|c| noxu_dbi::ConfigComparator {
765                identity: c.identity().to_string(),
766                func: c.func(),
767            });
768        dbi_config.override_btree_comparator = config.override_btree_comparator;
769        dbi_config.override_duplicate_comparator =
770            config.override_duplicate_comparator;
771        // DB-TRIG: thread user triggers (runtime-registered, not persisted)
772        // through to DatabaseImpl so put/delete/commit/abort fire.  JE
773        // `DatabaseConfig.getTriggers` -> `DatabaseImpl.triggers`.
774        dbi_config.triggers = config.triggers.0.clone();
775        if config.node_max_entries > 0 {
776            dbi_config.set_node_max_entries(config.node_max_entries as i32);
777        }
778
779        // Open the database via EnvironmentImpl (creates if allow_create, else errors)
780        // C-4 / JE 1-I: if a transaction is supplied and this is a new
781        // creation, use the transactional path so the name registration is
782        // deferred until commit.
783        let is_transactional_create = txn.is_some() && config.allow_create;
784        let db_impl_arc = {
785            let env_impl = self.env_impl.lock();
786            if is_transactional_create {
787                // SAFETY: is_transactional_create implies txn.is_some().
788                let txn_id = txn
789                    .expect("invariant: txn is Some when is_transactional_create")
790                    .id();
791                env_impl
792                    .open_database_transactional(name, &dbi_config, txn_id)
793            } else {
794                env_impl.open_database(name, &dbi_config)
795            }
796            .map_err(|e| {
797                match &e {
798                    noxu_dbi::DbiError::DatabaseNotFound(_) => {
799                        NoxuError::DatabaseNotFound(format!(
800                            "Database '{}' does not exist and allow_create is false",
801                            name
802                        ))
803                    }
804                    _ => NoxuError::environment(e.to_string()),
805                }
806            })?
807        };
808
809        // Register abort/commit callbacks on the transaction so that
810        // transactional database creation is properly rolled back or
811        // finalised when the transaction resolves (C-4 / JE 1-I).
812        if is_transactional_create {
813            let env_impl_arc = Arc::clone(&self.env_impl);
814            let db_name_abort = name.to_string();
815            let db_name_commit = name.to_string();
816            let env_impl_arc2 = Arc::clone(&self.env_impl);
817            // SAFETY: is_transactional_create implies txn.is_some().
818            let txn_ref = txn
819                .expect("invariant: txn is Some when is_transactional_create");
820            txn_ref.register_abort_callback(move || {
821                env_impl_arc.lock().abort_pending_database(&db_name_abort);
822            });
823            txn_ref.register_commit_callback(move || {
824                env_impl_arc2.lock().commit_pending_database(&db_name_commit);
825            });
826        }
827
828        let db_id = db_impl_arc.read().get_id().id() as u64;
829
830        // Shared open flag: stored in both `DatabaseHandle` and `Database`.
831        // When `Database::close()` sets it to false the env-side handle is
832        // also marked as closed, so `Environment::close()` can proceed.
833        let open_flag = Arc::new(AtomicBool::new(true));
834
835        let db_handle = Arc::new(DatabaseHandle {
836            name: name.to_string(),
837            id: db_id,
838            config: config.clone(),
839            open: Arc::clone(&open_flag),
840        });
841
842        databases.insert(name.to_string(), db_handle);
843        drop(databases);
844
845        Ok(Database::new(
846            name.to_string(),
847            db_id,
848            config.clone(),
849            db_impl_arc,
850            Arc::clone(&self.env_impl),
851            open_flag,
852            self.config.txn_no_sync,
853            self.config.txn_write_no_sync,
854        ))
855    }
856
857    /// Removes a database.
858    ///
859    ///
860    ///
861    /// # Arguments
862    /// * `txn` - Optional transaction handle (currently ignored)
863    /// * `name` - Database name
864    ///
865    /// # Errors
866    /// Returns an error if:
867    /// - The environment is closed
868    /// - The database does not exist
869    /// - The database is currently open
870    pub fn remove_database(
871        &self,
872        _txn: Option<&Transaction>,
873        name: &str,
874    ) -> Result<()> {
875        self.check_writable("remove_database")?;
876
877        let mut databases = self.databases.lock();
878        {
879            let env_impl = self.env_impl.lock();
880            env_impl.remove_database(name).map_err(|e| match &e {
881                noxu_dbi::DbiError::DatabaseNotFound(_) => {
882                    NoxuError::DatabaseNotFound(format!(
883                        "Database '{}' does not exist",
884                        name
885                    ))
886                }
887                _ => NoxuError::environment(e.to_string()),
888            })?;
889        }
890        databases.remove(name);
891
892        Ok(())
893    }
894
895    /// Truncates a database: removes all records while keeping the database
896    /// registered and any open handles valid.
897    ///
898    /// Returns the number of records that were in the database before truncation.
899    ///
900    /// Mirrors `Environment.truncateDatabase(txn, dbName, returnCount)`.
901    pub fn truncate_database(
902        &self,
903        _txn: Option<&Transaction>,
904        name: &str,
905    ) -> Result<u64> {
906        self.check_writable("truncate_database")?;
907        let env_impl = self.env_impl.lock();
908        env_impl.truncate_database(name).map_err(|e| match &e {
909            noxu_dbi::DbiError::DatabaseNotFound(_) => {
910                NoxuError::DatabaseNotFound(format!(
911                    "Database '{}' does not exist",
912                    name
913                ))
914            }
915            _ => NoxuError::environment(e.to_string()),
916        })
917    }
918
919    /// Renames a database.
920    ///
921    ///
922    ///
923    /// # Arguments
924    /// * `txn` - Optional transaction handle (currently ignored)
925    /// * `old_name` - Current database name
926    /// * `new_name` - New database name
927    ///
928    /// # Errors
929    /// Returns an error if:
930    /// - The environment is closed
931    /// - The source database does not exist
932    /// - The destination database already exists
933    /// - The source database is currently open
934    pub fn rename_database(
935        &self,
936        _txn: Option<&Transaction>,
937        old_name: &str,
938        new_name: &str,
939    ) -> Result<()> {
940        self.check_writable("rename_database")?;
941
942        if old_name == new_name {
943            return Ok(());
944        }
945
946        let mut databases = self.databases.lock();
947        {
948            let env_impl = self.env_impl.lock();
949            env_impl.rename_database(old_name, new_name).map_err(
950                |e| match &e {
951                    noxu_dbi::DbiError::DatabaseNotFound(_) => {
952                        NoxuError::DatabaseNotFound(format!(
953                            "Database '{}' does not exist",
954                            old_name
955                        ))
956                    }
957                    noxu_dbi::DbiError::DatabaseAlreadyExists(_) => {
958                        NoxuError::DatabaseAlreadyExists(format!(
959                            "Database '{}' already exists",
960                            new_name
961                        ))
962                    }
963                    _ => NoxuError::environment(e.to_string()),
964                },
965            )?;
966        }
967
968        if let Some(handle) = databases.remove(old_name) {
969            databases.insert(new_name.to_string(), handle);
970        }
971
972        Ok(())
973    }
974
975    /// Begins a new transaction.
976    ///
977    /// # Arguments
978    /// * `config` - Optional transaction configuration
979    ///
980    /// # Returns
981    /// A new transaction handle.
982    ///
983    /// # Errors
984    /// Returns an error if:
985    /// - The environment is closed
986    /// - The environment is not transactional
987    ///
988    /// # Nested transactions
989    /// Nested (child) transactions are not supported.  In v1.5 this method
990    /// took an `Option<&Transaction>` `parent` argument that was rejected
991    /// at runtime with [`NoxuError::Unsupported`] (Decision 3B in
992    /// the 2026 review, audit finding F11).
993    /// In v2.0 the parameter has been removed entirely — the
994    /// type system now enforces the constraint, so what was a runtime
995    /// error is now a compile error.
996    pub fn begin_transaction(
997        &self,
998        config: Option<&TransactionConfig>,
999    ) -> Result<Transaction> {
1000        self.check_open()?;
1001
1002        if !self.config.transactional {
1003            return Err(NoxuError::OperationNotAllowed(
1004                "Cannot begin transaction on non-transactional environment"
1005                    .to_string(),
1006            ));
1007        }
1008
1009        // Audit transaction-env F5 (Wave 2C-4): on a read-only env, only
1010        // explicitly read-only transactions are allowed.  A writable txn
1011        // on a read-only env was previously accepted but every commit
1012        // silently no-op'd because `log_manager` was None.
1013        if self.config.read_only
1014            && !config.map(|c| c.read_only).unwrap_or(false)
1015        {
1016            return Err(NoxuError::OperationNotAllowed(
1017                "begin_transaction: read-only environment requires the \
1018                 transaction to be read-only \
1019                 (TransactionConfig::with_read_only(true))"
1020                    .to_string(),
1021            ));
1022        }
1023
1024        let txn_id = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
1025        // F3: when the caller does not supply a TransactionConfig, the
1026        // environment-level `Durability` default (`EnvironmentConfig::durability`,
1027        // settable via `EnvironmentConfig::with_durability`) must be
1028        // honoured.  Pre-fix `unwrap_or_default()` produced a config with
1029        // `Durability::COMMIT_SYNC` regardless of the env setting, so a
1030        // user opening with `.with_durability(COMMIT_NO_SYNC)` and then
1031        // calling `begin_transaction(None)` still fsynced on every
1032        // commit.
1033        // Audit transaction-env F4 (Wave 2C-4): the env-level
1034        // `txn_no_sync` / `txn_write_no_sync` flags now apply to explicit
1035        // commits as well as auto-commit.  When neither config nor
1036        // env-default sets a durability override, derive one from the
1037        // boolean flags.  An explicit `with_durability(...)` on the
1038        // TransactionConfig still wins.
1039        let mut txn_config = match config.cloned() {
1040            Some(c) => c,
1041            None => TransactionConfig::default()
1042                .with_durability(self.config.durability),
1043        };
1044        if config.is_none() {
1045            // No caller config: env flags can override the inherited
1046            // durability if they request a less-strict sync policy.
1047            let derived = match (
1048                self.config.txn_no_sync,
1049                self.config.txn_write_no_sync,
1050            ) {
1051                (true, _) => {
1052                    Some(crate::durability::Durability::COMMIT_NO_SYNC)
1053                }
1054                (_, true) => {
1055                    Some(crate::durability::Durability::COMMIT_WRITE_NO_SYNC)
1056                }
1057                _ => None,
1058            };
1059            if let Some(d) = derived {
1060                txn_config = txn_config.with_durability(d);
1061            }
1062        }
1063
1064        let txn_state = Arc::new(TransactionState {
1065            id: txn_id,
1066            config: txn_config.clone(),
1067            committed: AtomicBool::new(false),
1068            aborted: AtomicBool::new(false),
1069        });
1070
1071        let mut active_txns = self.active_txns.txns.lock();
1072        active_txns.insert(txn_id, txn_state);
1073        drop(active_txns);
1074
1075        // Wire the transaction to the WAL so commit/abort write log entries.
1076        // Also create an inner Txn for per-record lock management.
1077        let env_guard = self.env_impl.lock();
1078        let inner_txn = env_guard
1079            .begin_txn()
1080            .map(|mut t| {
1081                // Propagate all relevant TransactionConfig fields into the
1082                // inner Txn for lock management and isolation behavior.
1083                if txn_config.read_committed {
1084                    t.set_read_committed_isolation(true);
1085                }
1086                if txn_config.read_uncommitted {
1087                    // F2: previously this branch was missing, so the
1088                    // user-set `with_read_uncommitted(true)` flag was
1089                    // silently dropped and dirty reads were impossible
1090                    // at the txn level.
1091                    t.set_read_uncommitted_default(true);
1092                }
1093                if txn_config.serializable_isolation {
1094                    t.set_serializable_isolation(true);
1095                }
1096                if txn_config.importunate {
1097                    t.set_importunate(true);
1098                }
1099                if txn_config.no_wait {
1100                    t.set_no_wait(true);
1101                }
1102                if txn_config.lock_timeout_ms > 0 {
1103                    t.set_lock_timeout(txn_config.lock_timeout_ms);
1104                }
1105                if txn_config.txn_timeout_ms > 0 {
1106                    t.set_txn_timeout(txn_config.txn_timeout_ms);
1107                }
1108                Arc::new(std::sync::Mutex::new(t))
1109            })
1110            .ok();
1111        // TXN-2 (JE TxnManager.registerTxn `nActiveSerializable` path):
1112        // Increment the serializable counter exactly once per serializable
1113        // explicit txn.  The counterpart decrement is in
1114        // `Transaction::unregister_inner_txn`, which is called from every
1115        // terminal path (commit, abort, resolved_commit/abort_after_prepare).
1116        // Guard on `inner_txn.is_some()` so read-only or env-less contexts
1117        // (which have no TxnManager) are not counted.
1118        if txn_config.serializable_isolation && inner_txn.is_some() {
1119            env_guard.get_txn_manager().register_serializable();
1120        }
1121        let txn = if let Some(lm) = env_guard.get_log_manager() {
1122            Transaction::with_log_manager(txn_id, txn_config, lm)
1123        } else {
1124            Transaction::new(txn_id, txn_config)
1125        };
1126        drop(env_guard);
1127
1128        let txn =
1129            if let Some(it) = inner_txn { txn.with_inner_txn(it) } else { txn };
1130
1131        // Wire env_impl so Transaction::abort() can apply undo records.
1132        // Txn environment reference during construction.
1133        let txn = txn.with_env_impl(Arc::clone(&self.env_impl));
1134
1135        // Wire the active-txns registry so commit/abort can prune their
1136        // own entry (F1).  Without this, every successful txn left an
1137        // entry in `active_txns` and `env.close()` returned
1138        // `OperationNotAllowed`.
1139        let txn = txn.with_active_txns(Arc::clone(&self.active_txns));
1140
1141        // F1: if a replica-ack coordinator has been installed (via
1142        // `set_replica_coordinator`), wire it into the transaction so
1143        // that `commit_with_durability` blocks until the configured
1144        // `ReplicaAckPolicy` is satisfied.
1145        let txn = if let Some(coord) = self.replica_coordinator.lock().clone() {
1146            let timeout = *self.replica_ack_timeout.lock();
1147            txn.with_replica_coordinator(coord, timeout)
1148        } else {
1149            txn
1150        };
1151
1152        Ok(txn)
1153    }
1154
1155    /// Returns a list of database names.
1156    ///
1157    ///
1158    ///
1159    /// # Returns
1160    /// A vector of database names
1161    ///
1162    /// # Errors
1163    /// Returns an error if the environment is closed
1164    pub fn database_names(&self) -> Result<Vec<String>> {
1165        self.check_open()?;
1166        let env_impl = self.env_impl.lock();
1167        Ok(env_impl.get_database_names())
1168    }
1169
1170    /// Install a replica-ack coordinator on this environment.
1171    ///
1172    /// After this call, every transaction begun on this environment
1173    /// will consult the coordinator on `commit_with_durability` and
1174    /// block until the configured `ReplicaAckPolicy` is satisfied (or
1175    /// until `replica_ack_timeout` elapses, in which case
1176    /// `NoxuError::InsufficientReplicas` is returned).
1177    ///
1178    /// `noxu_rep::ReplicatedEnvironment` implements
1179    /// `noxu_dbi::ReplicaAckCoordinator`; users typically wire it as:
1180    ///
1181    /// ```ignore
1182    /// let rep_env = Arc::new(ReplicatedEnvironment::new(rep_config)?);
1183    /// env.set_replica_coordinator(rep_env.clone());
1184    /// rep_env.with_environment(env_impl);
1185    /// ```
1186    ///
1187    /// Closes finding F1 of the 2026 review.
1188    pub fn set_replica_coordinator(
1189        &self,
1190        coord: noxu_dbi::SharedReplicaAckCoordinator,
1191    ) {
1192        *self.replica_coordinator.lock() = Some(coord);
1193    }
1194
1195    /// Clear any installed replica-ack coordinator.
1196    ///
1197    /// Subsequent `commit_with_durability` calls revert to local-only
1198    /// durability semantics.
1199    pub fn clear_replica_coordinator(&self) {
1200        *self.replica_coordinator.lock() = None;
1201    }
1202
1203    /// Set the per-commit timeout used when waiting for replica
1204    /// acknowledgments.
1205    ///
1206    /// Default is 5 seconds.  Mirrors
1207    /// `noxu_rep::RepConfig::replica_ack_timeout`.
1208    pub fn set_replica_ack_timeout(&self, timeout: std::time::Duration) {
1209        *self.replica_ack_timeout.lock() = timeout;
1210    }
1211
1212    /// Returns the per-commit replica-ack timeout.
1213    pub fn replica_ack_timeout(&self) -> std::time::Duration {
1214        *self.replica_ack_timeout.lock()
1215    }
1216
1217    /// Returns the home directory path.
1218    ///
1219    ///
1220    pub fn home(&self) -> &Path {
1221        &self.home
1222    }
1223
1224    /// Returns the environment configuration.
1225    ///
1226    ///
1227    pub fn config(&self) -> &EnvironmentConfig {
1228        &self.config
1229    }
1230
1231    /// Returns the mutable subset of environment configuration.
1232    ///
1233    /// Mirrors `Environment.getMutableConfig()`.  The returned struct reflects the
1234    /// current runtime values; pass it (modified) to `set_mutable_config()` to
1235    /// apply changes without re-opening the environment.
1236    pub fn mutable_config(&self) -> Result<EnvironmentMutableConfig> {
1237        self.check_open()?;
1238        Ok(EnvironmentMutableConfig {
1239            cache_size: Some(self.config.cache_size as usize),
1240            durability: None,
1241            txn_no_sync: self.config.txn_no_sync,
1242            txn_write_no_sync: self.config.txn_write_no_sync,
1243            run_cleaner: Some(self.config.run_cleaner),
1244            run_checkpointer: Some(self.config.run_checkpointer),
1245            run_evictor: Some(self.config.run_evictor),
1246            lock_timeout_ms: Some(self.config.lock_timeout_ms),
1247            txn_timeout_ms: Some(self.config.txn_timeout_ms),
1248            cleaner_min_utilization: Some(
1249                self.config.cleaner_min_utilization as u32,
1250            ),
1251        })
1252    }
1253
1254    /// Applies a set of mutable configuration changes to the running environment.
1255    ///
1256    /// Mirrors `Environment.setMutableConfig(EnvironmentMutableConfig)`.
1257    /// Only the fields that differ from their sentinel "no-change" values are
1258    /// applied (`None` means unchanged).  `Some(0)` for a timeout clears it
1259    /// (matches JE: 0 = no timeout).
1260    ///
1261    /// # Errors
1262    /// Returns an error if the environment is closed or invalidated.
1263    pub fn set_mutable_config(
1264        &mut self,
1265        cfg: EnvironmentMutableConfig,
1266    ) -> Result<()> {
1267        self.check_open()?;
1268        if let Some(sz) = cfg.cache_size {
1269            self.config.cache_size = sz as u64;
1270            // Audit transaction-env F7 (Wave 2C-4): push the cache-size
1271            // change to the evictor's Arbiter so it actually takes
1272            // effect at runtime; pre-fix the value was only recorded in
1273            // `self.config`.
1274            let env_impl = self.env_impl.lock();
1275            let evictor = env_impl.get_evictor();
1276            evictor.get_arbiter().set_max_memory(sz as i64);
1277        }
1278        if let Some(ms) = cfg.lock_timeout_ms {
1279            self.config.lock_timeout_ms = ms;
1280            // Push the new default to the live LockManager.
1281            let env_impl = self.env_impl.lock();
1282            env_impl.get_lock_manager().set_lock_timeout(ms);
1283        }
1284        if let Some(ms) = cfg.txn_timeout_ms {
1285            self.config.txn_timeout_ms = ms;
1286            // The TxnManager does not currently track a default txn
1287            // timeout (each Txn snapshots the value at `begin_txn` from
1288            // its own TransactionConfig).  We record the new env-level
1289            // default here so that future `begin_transaction` calls that
1290            // rely on the env default pick it up; live txns keep their
1291            // original timeout.  Tracked under transaction-env F7
1292            // residual; pushing into running txns requires a TxnManager
1293            // API change beyond Wave 2C-4.
1294        }
1295        // DBI-10 / JE EnvConfigObserver: push the mutable cleaner
1296        // minUtilization to the running cleaner (noxu.cleaner.minUtilization
1297        // has mutable=true). Mirrors how JE's Cleaner re-reads
1298        // CLEANER_MIN_UTILIZATION on envConfigUpdate.
1299        if let Some(pct) = cfg.cleaner_min_utilization {
1300            self.config.cleaner_min_utilization = pct.min(100) as u8;
1301            let env_impl = self.env_impl.lock();
1302            if let Some(cleaner) = env_impl.get_cleaner() {
1303                cleaner.set_min_utilization(pct);
1304            }
1305        }
1306        self.config.txn_no_sync = cfg.txn_no_sync;
1307        self.config.txn_write_no_sync = cfg.txn_write_no_sync;
1308        // Daemon enable/disable flags are advisory at runtime; dbi-level wiring
1309        // for live daemon pause/resume is future work (mirrors where
1310        // setMutableConfig re-reads the flag on next daemon wakeup).
1311        if let Some(v) = cfg.run_cleaner {
1312            self.config.run_cleaner = v;
1313        }
1314        if let Some(v) = cfg.run_checkpointer {
1315            self.config.run_checkpointer = v;
1316        }
1317        if let Some(v) = cfg.run_evictor {
1318            self.config.run_evictor = v;
1319        }
1320        Ok(())
1321    }
1322
1323    /// Runs a checkpoint.
1324    ///
1325    /// Mirrors `Environment.checkpoint(CheckpointConfig)`.  If the environment has
1326    /// no checkpointer (e.g. non-transactional or in-memory), this is a no-op.
1327    ///
1328    /// # Arguments
1329    /// * `config` - Optional checkpoint options (force, thresholds, etc.)
1330    ///
1331    /// # Errors
1332    /// Returns an error if the environment is closed, invalidated, or if the
1333    /// checkpoint itself fails (e.g. disk write error).
1334    pub fn checkpoint(&self, config: Option<&CheckpointConfig>) -> Result<()> {
1335        self.check_open()?;
1336
1337        // Audit transaction-env F6 (Wave 2C-4): honour `force` /
1338        // `k_bytes` / `minutes` / `minimize_recovery_time` in
1339        // `CheckpointConfig`.  Pre-fix the entire config was a no-op.
1340        // Threshold gating happens in the wrapper layer; the underlying
1341        // `noxu_recovery::Checkpointer::do_checkpoint` is invoker-only.
1342        let cfg = config.cloned().unwrap_or_default();
1343
1344        if !cfg.force {
1345            // k_bytes: skip the checkpoint if not enough log bytes have
1346            // been written since the last successful checkpoint.
1347            if cfg.k_bytes > 0 {
1348                let cur_lsn = self
1349                    .log_manager
1350                    .as_ref()
1351                    .map(|lm| lm.get_end_of_log())
1352                    .unwrap_or(noxu_util::NULL_LSN);
1353                let last = *self.last_checkpoint_end_lsn.lock();
1354                let bytes_written =
1355                    cur_lsn.as_u64().saturating_sub(last.as_u64());
1356                let threshold = (cfg.k_bytes as u64) * 1024;
1357                if bytes_written < threshold {
1358                    log::debug!(
1359                        "checkpoint: skipping (k_bytes threshold {} not \
1360                         met, only {} bytes since last checkpoint)",
1361                        threshold,
1362                        bytes_written,
1363                    );
1364                    return Ok(());
1365                }
1366            }
1367
1368            // minutes: skip the checkpoint if not enough wall-clock time
1369            // has elapsed since the last successful checkpoint.
1370            if cfg.minutes > 0 {
1371                let last_at = *self.last_checkpoint_time.lock();
1372                if let Some(at) = last_at {
1373                    let elapsed = at.elapsed();
1374                    let threshold =
1375                        std::time::Duration::from_secs(cfg.minutes as u64 * 60);
1376                    if elapsed < threshold {
1377                        log::debug!(
1378                            "checkpoint: skipping (minutes threshold {:?} \
1379                             not met, only {:?} since last checkpoint)",
1380                            threshold,
1381                            elapsed,
1382                        );
1383                        return Ok(());
1384                    }
1385                }
1386            }
1387        }
1388
1389        // `minimize_recovery_time` is currently advisory — the recovery
1390        // checkpointer always writes the full set of dirty BINs; the
1391        // "minimal" path requires a pluggable BIN-flush filter that is
1392        // outside the scope of Wave 2C-4.  We surface the request in the
1393        // invoker label so it shows up in structured logs.
1394        let invoker = match (cfg.force, cfg.minimize_recovery_time) {
1395            (true, true) => "manual_force_full",
1396            (true, false) => "manual_force",
1397            (false, true) => "manual_full",
1398            (false, false) => "manual",
1399        };
1400
1401        let env_impl = self.env_impl.lock();
1402        env_impl
1403            .run_checkpoint_with_invoker(invoker)
1404            .map_err(|e| NoxuError::environment(e.to_string()))?;
1405        drop(env_impl);
1406
1407        // Update bookkeeping so subsequent threshold-gated calls can
1408        // honour `k_bytes` / `minutes`.
1409        *self.last_checkpoint_time.lock() = Some(Instant::now());
1410        if let Some(lm) = &self.log_manager {
1411            *self.last_checkpoint_end_lsn.lock() = lm.get_end_of_log();
1412        }
1413        Ok(())
1414    }
1415
1416    /// Returns `true` if the environment is open and has not been invalidated by a fatal error.
1417    ///
1418    /// Mirrors `Environment.isValid()`.  Returns `false` after the environment is closed
1419    /// or after an `EnvironmentFailure` whose `reason.invalidates_environment()` returns
1420    /// `true` (e.g. `LogChecksum`, `BtreeCorruption`, `DiskLimit`).
1421    /// Once invalidated the environment must be closed and re-opened.
1422    pub fn is_valid(&self) -> bool {
1423        self.open.load(Ordering::Acquire)
1424            && self.env_valid.load(Ordering::Acquire)
1425    }
1426
1427    /// Invalidates the environment in response to a fatal error.
1428    ///
1429    /// Called internally when an `EnvironmentFailure` with
1430    /// `reason.invalidates_environment() == true` propagates out of a
1431    /// background daemon.  After invalidation `is_valid()` returns `false`
1432    /// and all subsequent public API calls return `EnvironmentFailure`.
1433    pub fn invalidate(&self) {
1434        self.env_valid.store(false, Ordering::Release);
1435    }
1436
1437    /// Returns whether the environment is transactional.
1438    ///
1439    /// Via environment.
1440    pub fn is_transactional(&self) -> bool {
1441        self.config.transactional
1442    }
1443
1444    /// Returns whether the environment is read-only.
1445    ///
1446    /// Via environment.
1447    pub fn is_read_only(&self) -> bool {
1448        self.config.read_only
1449    }
1450
1451    /// Returns a snapshot of environment statistics from all subsystems.
1452    ///
1453    /// Mirrors `Environment.getStats(StatsConfig)`.
1454    pub fn stats(&self) -> Result<EnvironmentStats> {
1455        self.check_open()?;
1456        let env_impl = self.env_impl.lock();
1457        Ok(build_environment_stats(
1458            &env_impl,
1459            self.log_manager.as_deref(),
1460            self.config.cache_size,
1461        ))
1462    }
1463
1464    /// Returns the total number of fdatasync calls performed by the log manager.
1465    ///
1466    /// Useful for benchmarking
1467    /// and for verifying that group commit is working (fewer fsyncs than commits).
1468    /// Returns 0 if the environment is non-transactional (no log manager).
1469    pub fn stat_fsync_count(&self) -> u64 {
1470        self.log_manager.as_ref().map(|lm| lm.fsync_count()).unwrap_or(0)
1471    }
1472
1473    // -------------------------------------------------------------------
1474    // Wave 3-2: XA crash-durable two-phase commit support
1475    // -------------------------------------------------------------------
1476
1477    /// Returns the list of XA in-doubt prepared transactions surfaced by
1478    /// the most recent recovery pass.
1479    ///
1480    /// The XA layer (`noxu_xa::XaEnvironment::xa_recover`) reads this
1481    /// list to populate its return value with XIDs that completed phase
1482    /// 1 of two-phase commit but were not committed or aborted before
1483    /// the previous shutdown / crash.  An empty `Vec` means there are
1484    /// no in-doubt transactions to resolve.
1485    ///
1486    /// See `noxu_xa` for XA two-phase commit.
1487    pub fn recovered_prepared_txns(
1488        &self,
1489    ) -> Vec<noxu_recovery::PreparedTxnInfo> {
1490        let env_impl = self.env_impl.lock();
1491        env_impl.recovered_prepared_txns()
1492    }
1493
1494    /// Removes and returns the LN replay list for a recovered prepared
1495    /// transaction.
1496    ///
1497    /// Used by `xa_commit(xid)` after locating the txn id from
1498    /// [`Self::recovered_prepared_txns`].  The XA layer iterates the
1499    /// returned list and applies each LN to the in-memory tree before
1500    /// writing the `TxnCommit` WAL frame.
1501    pub fn take_recovered_prepared_lns(
1502        &self,
1503        txn_id: u64,
1504    ) -> Vec<noxu_recovery::PreparedLnReplay> {
1505        let env_impl = self.env_impl.lock();
1506        env_impl.take_recovered_prepared_lns(txn_id)
1507    }
1508
1509    /// Removes a recovered prepared txn entry after the XA layer has
1510    /// successfully resolved it (`xa_commit` or `xa_rollback`).
1511    /// Idempotent.
1512    pub fn forget_recovered_prepared_txn(&self, txn_id: u64) {
1513        let env_impl = self.env_impl.lock();
1514        env_impl.forget_recovered_prepared_txn(txn_id);
1515    }
1516
1517    /// Writes a `TxnCommit` WAL frame for `txn_id` and fsyncs.
1518    ///
1519    /// Used by `xa_commit(xid)` to durably resolve a recovered prepared
1520    /// transaction without requiring an in-memory `Txn` (which the
1521    /// crash destroyed).  The caller must have already replayed any
1522    /// LNs into the in-memory tree via
1523    /// [`Self::take_recovered_prepared_lns`] and applied them.
1524    ///
1525    /// See `noxu_xa` for XA two-phase commit.
1526    pub fn write_txn_commit_for_recovered(&self, txn_id: u64) -> Result<()> {
1527        let lm = match &self.log_manager {
1528            Some(lm) => lm,
1529            None => return Ok(()), // Non-transactional env (shouldn't happen).
1530        };
1531        // R-3: pre-allocate the VLSN BEFORE writing the WAL entry so the
1532        // TxnCommit record carries it.  On a second crash, the X-14 VLSN
1533        // rebuild scans TxnCommit records with non-NULL dtvlsn and includes
1534        // them — fixing the double-crash VLSN loss reported in Keith R-3.
1535        let pre_vlsn =
1536            if let Some(coord) = self.replica_coordinator.lock().as_ref() {
1537                coord.pre_alloc_vlsn_for_recovered_commit()
1538            } else {
1539                0
1540            };
1541
1542        let commit_lsn = write_txn_end_for_recovered(
1543            lm, txn_id, true, /* is_commit */
1544            true, /* fsync */
1545            true, /* flush */
1546            pre_vlsn,
1547        )?;
1548
1549        // Register the pre-allocated VLSN in the VlsnIndex now that we have
1550        // the actual commit LSN.  Also keep the legacy alloc path for any
1551        // coordinator that doesn't implement pre_alloc (returns 0).
1552        if let Some(coord) = self.replica_coordinator.lock().as_ref() {
1553            if pre_vlsn > 0 {
1554                coord.register_recovered_commit_vlsn(pre_vlsn, commit_lsn);
1555                log::debug!(
1556                    "write_txn_commit_for_recovered: txn_id={} commit_lsn={:?} \
1557                     embedded+registered vlsn={} (R-3)",
1558                    txn_id,
1559                    commit_lsn,
1560                    pre_vlsn
1561                );
1562            } else {
1563                // Fallback: coordinator returned 0 for pre_alloc (non-master
1564                // or non-replicated); try the legacy single-step allocator.
1565                let vlsn = coord.alloc_vlsn_for_recovered_commit(commit_lsn);
1566                if vlsn > 0 {
1567                    log::debug!(
1568                        "write_txn_commit_for_recovered: txn_id={} commit_lsn={:?} \
1569                         assigned vlsn={} (X-3 legacy path)",
1570                        txn_id,
1571                        commit_lsn,
1572                        vlsn
1573                    );
1574                }
1575            }
1576        }
1577        Ok(())
1578    }
1579
1580    /// Writes a `TxnAbort` WAL frame for `txn_id`.  Used by `xa_rollback(xid)`
1581    /// to durably resolve a recovered prepared transaction.
1582    pub fn write_txn_abort_for_recovered(&self, txn_id: u64) -> Result<()> {
1583        let lm = match &self.log_manager {
1584            Some(lm) => lm,
1585            None => return Ok(()),
1586        };
1587        write_txn_end_for_recovered(
1588            lm, txn_id, false, /* is_commit */
1589            false, /* fsync */
1590            false, /* flush */
1591            0,     /* vlsn: NULL_VLSN for abort */
1592        )
1593        .map(|_| ())
1594    }
1595
1596    /// Replays a recovered prepared transaction’s LNs into the in-memory
1597    /// tree at `xa_commit` resolution time.
1598    ///
1599    /// Iterates the LN list (already removed from the recovered map by
1600    /// the caller) and applies each insert/update/delete to the
1601    /// matching `DatabaseImpl`'s tree.  This makes the prepared writes
1602    /// observable to subsequent reads in the same process — without
1603    /// this step, a recovered+committed XA branch's writes would only
1604    /// become visible after a second recovery on the next reopen.
1605    pub fn apply_recovered_prepared_lns(
1606        &self,
1607        lns: &[noxu_recovery::PreparedLnReplay],
1608    ) -> Result<()> {
1609        let env_impl = self.env_impl.lock();
1610        for ln in lns {
1611            let db_id = noxu_dbi::DatabaseId::new(ln.db_id as i64);
1612            let Some(db_arc) = env_impl.get_database_by_id(db_id) else {
1613                continue;
1614            };
1615            let db_guard = db_arc.read();
1616            let Some(tree) = db_guard.get_real_tree() else {
1617                continue;
1618            };
1619            match ln.operation {
1620                noxu_recovery::PreparedLnOperation::Insert
1621                | noxu_recovery::PreparedLnOperation::Update => {
1622                    if let Some(data) = &ln.data {
1623                        let _ = tree.insert(
1624                            ln.key.clone(),
1625                            data.clone(),
1626                            ln.original_lsn,
1627                        );
1628                    }
1629                }
1630                noxu_recovery::PreparedLnOperation::Delete => {
1631                    if tree.delete(&ln.key) {
1632                        db_guard.decrement_entry_count();
1633                    }
1634                }
1635            }
1636        }
1637        Ok(())
1638    }
1639
1640    /// Verifies the structural integrity of all databases in this environment.
1641    ///
1642    /// Iterates every open `DatabaseImpl` in the environment's db_map and
1643    /// calls `verify_database_impl()` on each one (B-tree key-order checks,
1644    /// LSN validity, child-pointer completeness).  Results are merged into a
1645    /// single `VerifyResult`.
1646    ///
1647    /// Mirrors `Environment.verify(VerifyConfig, PrintStream)` in
1648    /// creates a `BtreeVerifier` and calls `verifier.verifyAll()`.
1649    ///
1650    /// # Arguments
1651    /// * `config` - Verification options (btree, log, checksums, max_errors).
1652    ///
1653    /// # Returns
1654    /// A combined `VerifyResult` over all databases.
1655    ///
1656    /// # Errors
1657    /// Returns an error if the environment is closed or invalidated.
1658    pub fn verify(
1659        &self,
1660        config: &noxu_engine::VerifyConfig,
1661    ) -> Result<noxu_engine::VerifyResult> {
1662        self.check_open()?;
1663        let env_impl = self.env_impl.lock();
1664        let all_dbs = env_impl.get_all_database_impls();
1665        // Lock the live utilization tracker for the checkLsns half
1666        // (VerifyUtils.checkLsns): live tree LSNs must be DISJOINT from the
1667        // obsolete LSNs recorded in the UtilizationProfile/tracker. Held
1668        // across the loop (no clone); read-only envs have no tracker.
1669        let tracker_guard =
1670            env_impl.get_utilization_tracker().map(|t| t.lock());
1671
1672        let mut merged = noxu_engine::VerifyResult::new();
1673        for db_arc in &all_dbs {
1674            let guard = db_arc.read();
1675            let result = noxu_engine::verify_database_impl(&guard, config);
1676            merged.databases_verified += result.databases_verified;
1677            merged.records_verified += result.records_verified;
1678            for err in result.errors {
1679                merged.add_error(err);
1680                if merged.error_count() >= config.max_errors as usize {
1681                    return Ok(merged);
1682                }
1683            }
1684            for w in result.warnings {
1685                merged.add_warning(w);
1686            }
1687            // CLN-2 / VerifyUtils.checkLsns: assert live-tree LSNs are
1688            // disjoint from the obsolete set.
1689            if let Some(ref t) = tracker_guard {
1690                noxu_engine::check_lsns_against_tracker(&guard, t, &mut merged);
1691                if merged.error_count() >= config.max_errors as usize {
1692                    return Ok(merged);
1693                }
1694            }
1695        }
1696        Ok(merged)
1697    }
1698
1699    /// Explicitly trigger BIN compression for all open databases.
1700    ///
1701    /// Mirrors `Environment.compress()` in JE (`Environment.java:1887`).
1702    /// Synchronously runs one pass of the INCompressor logic: finds every
1703    /// BIN with known-deleted slots and compresses them.  Useful in tests
1704    /// to drain the compressor queue before taking a checkpoint, and for
1705    /// applications that want deterministic memory reclamation after bulk
1706    /// deletes.
1707    ///
1708    /// Returns `Ok(n)` where `n` is the number of BINs compressed.  Returns
1709    /// `Err` if the environment is closed or invalid.
1710    pub fn compress(&self) -> Result<usize> {
1711        self.check_open()?;
1712        let env_impl = self.env_impl.lock();
1713        let n = env_impl.compress_all();
1714        Ok(n)
1715    }
1716
1717    /// Explicitly clean (garbage-collect) log files, returning the number of
1718    /// files that were cleaned.
1719    ///
1720    /// Mirrors `Environment.cleanLog()` in JE (`Environment.java`). Drives the
1721    /// log cleaner synchronously to migrate live LNs forward and mark
1722    /// low-utilization files for deletion. This is the manual counterpart to
1723    /// the background cleaner daemon, used to reclaim space on demand.
1724    ///
1725    /// Unlike the throttled daemon pass, this performs a forced cleaning pass
1726    /// (JE's `cleanLog()` likewise cleans regardless of the daemon's
1727    /// utilization budget), so callers can deterministically reclaim obsolete
1728    /// files in tests and batch maintenance.
1729    ///
1730    /// Returns `Ok(files_cleaned)`. Returns `Err` if the environment is closed
1731    /// or read-only (no cleaner is available).
1732    pub fn clean_log(&self) -> Result<u32> {
1733        self.check_open()?;
1734        let env_impl = self.env_impl.lock();
1735        let result = env_impl
1736            .run_cleaner(u32::MAX, true)
1737            .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1738        Ok(result.files_cleaned)
1739    }
1740
1741    /// Recomputes the cached disk-limit violation state immediately (JE:
1742    /// `Cleaner.freshenLogSizeStats`).
1743    ///
1744    /// The background checkpointer daemon refreshes this on its interval and
1745    /// the cleaner refreshes after each pass, so most callers never need this.
1746    /// It is useful in tests and for tools that want the disk-limit decision
1747    /// to reflect the current log size / free space right now rather than at
1748    /// the next daemon wakeup.  Cheap no-op when `MAX_DISK`/`FREE_DISK` are 0.
1749    pub fn refresh_disk_limit(&self) -> Result<()> {
1750        self.check_open()?;
1751        self.env_impl.lock().refresh_disk_limit();
1752        Ok(())
1753    }
1754
1755    /// Explicitly trigger the memory evictor.
1756    ///
1757    /// Mirrors `Environment.evictMemory()` in JE (`Environment.java:1860`).
1758    /// Requests that the cache evictor free cache pages down toward the
1759    /// configured cache size.  Useful after bulk inserts to reclaim memory
1760    /// proactively rather than waiting for the background daemon.
1761    ///
1762    /// Returns `Ok(bytes_evicted)`.  Returns `Err` if the environment is
1763    /// closed or invalid.
1764    pub fn evict_memory(&self) -> Result<usize> {
1765        self.check_open()?;
1766        let env_impl = self.env_impl.lock();
1767        let bytes = env_impl.evict_memory();
1768        Ok(bytes)
1769    }
1770
1771    /// The cache eviction algorithm in effect ("LRU"|"Clock"|"ARC"|"CAR"|"LIRS").
1772    /// Reflects the runtime-selected policy, used to verify `EVICTOR_ALGORITHM`
1773    /// wiring took effect.
1774    pub fn evictor_algorithm_name(&self) -> Result<&'static str> {
1775        self.check_open()?;
1776        Ok(self.env_impl.lock().evictor_algorithm_name())
1777    }
1778
1779    /// Current cache memory in use (bytes), as tracked by the evictor arbiter.
1780    /// Unlike [`stats`](Self::stats)`.cache_usage` (a placeholder), this
1781    /// reflects the live tree-memory budget the evictor drives down.
1782    pub fn cache_usage_bytes(&self) -> Result<i64> {
1783        self.check_open()?;
1784        Ok(self.env_impl.lock().get_cache_usage())
1785    }
1786    ///
1787    /// Called by Database::close().
1788    #[allow(dead_code)] // exercised by in-crate tests; close() path lives in DBI
1789    pub(crate) fn mark_database_closed(&self, name: &str) {
1790        let databases = self.databases.lock();
1791        if let Some(db_handle) = databases.get(name) {
1792            db_handle.open.store(false, Ordering::Release);
1793        }
1794    }
1795
1796    /// Internal method to mark a transaction as complete.
1797    ///
1798    /// Historically a no-op call site; now superseded by
1799    /// `Transaction::commit` / `Transaction::abort` calling
1800    /// `ActiveTxns::mark_complete` directly via the shared `Arc<ActiveTxns>`.
1801    /// Kept for backwards compatibility with internal tests.
1802    #[allow(dead_code)] // exercised by in-crate tests
1803    pub(crate) fn mark_transaction_complete(&self, txn_id: u64) {
1804        self.active_txns.mark_complete(txn_id);
1805    }
1806
1807    fn check_open(&self) -> Result<()> {
1808        if !self.open.load(Ordering::Acquire) {
1809            return Err(NoxuError::EnvironmentClosed);
1810        }
1811        if !self.env_valid.load(Ordering::Acquire) {
1812            return Err(NoxuError::environment_with_reason(
1813                crate::error::EnvironmentFailureReason::ForcedShutdown,
1814                "environment has been invalidated due to a prior fatal error"
1815                    .to_string(),
1816            ));
1817        }
1818        Ok(())
1819    }
1820
1821    /// Every mutating env-layer
1822    /// operation funnels through this helper so a `read_only=true`
1823    /// environment cannot create / remove / rename / truncate databases
1824    /// nor begin a (writable) transaction.
1825    fn check_writable(&self, what: &str) -> Result<()> {
1826        self.check_open()?;
1827        if self.config.read_only {
1828            return Err(NoxuError::OperationNotAllowed(format!(
1829                "{what}: environment is read-only",
1830            )));
1831        }
1832        Ok(())
1833    }
1834}
1835
1836/// Helper used by `Environment::write_txn_commit_for_recovered` and
1837/// `write_txn_abort_for_recovered` to write a `TxnCommit` / `TxnAbort` WAL
1838/// frame for a transaction id that has no in-memory `Txn` (the original
1839/// process crashed before it could commit; recovery surfaced it via
1840/// `recovered_prepared_txns`).
1841///
1842/// `vlsn` is the pre-allocated VLSN to embed in the `dtvlsn` field of the
1843/// TxnEndEntry payload.  Pass `NULL_VLSN` (0) for non-replicated environments.
1844/// The R-3 fix requires the VLSN to be embedded so the X-14 VLSN rebuild
1845/// on a second crash can reconstruct the VLSN index from TxnCommit records.
1846///
1847/// XA two-phase commit support.
1848fn write_txn_end_for_recovered(
1849    lm: &LogManager,
1850    txn_id: u64,
1851    is_commit: bool,
1852    fsync: bool,
1853    flush: bool,
1854    vlsn: u64,
1855) -> Result<noxu_util::Lsn> {
1856    use bytes::BytesMut;
1857    use noxu_log::{LogEntryType, Provisional, entry::TxnEndEntry};
1858    use noxu_util::{
1859        lsn::NULL_LSN,
1860        vlsn::{NULL_VLSN, Vlsn},
1861    };
1862
1863    let timestamp = std::time::SystemTime::now()
1864        .duration_since(std::time::UNIX_EPOCH)
1865        .unwrap_or_default()
1866        .as_millis() as u64;
1867
1868    // R-3: embed the pre-allocated VLSN (if any) in the dtvlsn field so the
1869    // X-14 VLSN rebuild on second crash can find it in TxnCommit records.
1870    let dtvlsn = if vlsn > 0 { Vlsn::new(vlsn as i64) } else { NULL_VLSN };
1871
1872    let entry = if is_commit {
1873        TxnEndEntry::new_commit(txn_id as i64, NULL_LSN, timestamp, 0, dtvlsn)
1874    } else {
1875        TxnEndEntry::new_abort(txn_id as i64, NULL_LSN, timestamp, 0, NULL_VLSN)
1876    };
1877
1878    let entry_type = if is_commit {
1879        LogEntryType::TxnCommit
1880    } else {
1881        LogEntryType::TxnAbort
1882    };
1883
1884    let mut buf = BytesMut::with_capacity(entry.log_size());
1885    entry.write_to_log(&mut buf);
1886
1887    lm.log(entry_type, &buf, Provisional::No, flush, fsync).map_err(|e| {
1888        NoxuError::environment_with_reason(
1889            crate::error::EnvironmentFailureReason::LogWrite,
1890            e.to_string(),
1891        )
1892    })
1893}
1894
1895impl Drop for Environment {
1896    fn drop(&mut self) {
1897        // Best effort close on drop
1898        let _ = self.close();
1899    }
1900}
1901
1902#[cfg(test)]
1903mod tests {
1904    use super::*;
1905    use tempfile::TempDir;
1906
1907    fn temp_env_config() -> (TempDir, EnvironmentConfig) {
1908        let temp_dir = TempDir::new().unwrap();
1909        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1910            .with_allow_create(true)
1911            .with_transactional(true);
1912        (temp_dir, config)
1913    }
1914
1915    #[test]
1916    fn test_open_environment() {
1917        let (temp_dir, config) = temp_env_config();
1918        let env = Environment::open(config).unwrap();
1919        assert!(env.is_valid());
1920        assert_eq!(env.home(), temp_dir.path());
1921        env.close().unwrap();
1922    }
1923
1924    #[test]
1925    fn test_env_check_leaks_reports_leaked_lock() {
1926        use noxu_txn::LockType;
1927        let (_temp_dir, config) = temp_env_config();
1928        // env_check_leaks defaults to true.
1929        assert!(config.env_check_leaks);
1930        let env = Environment::open(config).unwrap();
1931
1932        // Deliberately leak a lock directly on the shared lock manager: a
1933        // locker (id 999) acquires a write lock and never releases it. This is
1934        // exactly the leak env_check_leaks is meant to surface (an application
1935        // that dropped a transaction/cursor without releasing its locks).
1936        {
1937            let env_impl = env.env_impl.lock();
1938            let lm = env_impl.get_lock_manager();
1939            lm.lock(0xDEAD_BEEF, 999, LockType::Write, false, false).unwrap();
1940            // The leak is observable before close.
1941            let leaks = lm.report_leaked_locks();
1942            assert!(
1943                leaks.iter().any(|(lsn, owners)| *lsn == 0xDEAD_BEEF
1944                    && owners.contains(&999)),
1945                "the deliberately-leaked lock must be reported"
1946            );
1947        }
1948
1949        // close() must still succeed; env_check_leaks only warns (side effect).
1950        // There are no *tracked* active transactions, so the active-txn guard
1951        // does not block the close.
1952        env.close().unwrap();
1953    }
1954
1955    #[test]
1956    fn test_env_check_leaks_clean_close_no_leak() {
1957        let (_temp_dir, config) = temp_env_config();
1958        let env = Environment::open(config).unwrap();
1959        // A clean env with no outstanding locks reports zero leaks.
1960        {
1961            let env_impl = env.env_impl.lock();
1962            assert!(
1963                env_impl.get_lock_manager().report_leaked_locks().is_empty(),
1964                "a freshly opened env must have no leaked locks"
1965            );
1966        }
1967        env.close().unwrap();
1968    }
1969
1970    // exception_listener (JE ExceptionListener)
1971    #[test]
1972    fn test_exception_listener_fires_on_daemon_error() {
1973        use crate::error::{
1974            ExceptionEvent, ExceptionListener, ExceptionSource,
1975        };
1976        use std::sync::Arc as StdArc;
1977        use std::sync::Mutex as StdMutex;
1978
1979        // A listener that records every event it receives.
1980        #[derive(Default)]
1981        struct Recorder {
1982            events: StdMutex<Vec<ExceptionEvent>>,
1983        }
1984        impl ExceptionListener for Recorder {
1985            fn exception_event(&self, event: &ExceptionEvent) {
1986                self.events.lock().unwrap().push(event.clone());
1987            }
1988        }
1989
1990        let recorder = StdArc::new(Recorder::default());
1991        let (_temp_dir, config) = temp_env_config();
1992        let config = config.with_exception_listener(recorder.clone());
1993        let env = Environment::open(config).unwrap();
1994
1995        // Drive the SAME dispatch path a background daemon uses when its
1996        // do_checkpoint()/do_clean() returns an error.  This exercises the
1997        // full production wiring: config listener -> set_exception_sink ->
1998        // ExceptionDispatcher::dispatch -> adapter -> ExceptionListener.
1999        {
2000            let env_impl = env.env_impl.lock();
2001            let d = env_impl.exception_dispatcher();
2002            assert!(
2003                d.is_installed(),
2004                "the registered listener must install a daemon sink"
2005            );
2006            d.dispatch("Cleaner", "simulated background cleaner failure");
2007            d.dispatch("Checkpointer", "simulated checkpoint failure");
2008        }
2009
2010        let events = recorder.events.lock().unwrap();
2011        assert_eq!(events.len(), 2, "listener must receive both daemon errors");
2012        assert_eq!(events[0].source, ExceptionSource::Cleaner);
2013        assert!(events[0].message.contains("cleaner failure"));
2014        assert_eq!(events[1].source, ExceptionSource::Checkpointer);
2015        assert!(events[1].message.contains("checkpoint failure"));
2016        drop(events);
2017
2018        env.close().unwrap();
2019    }
2020
2021    #[test]
2022    fn test_no_exception_listener_leaves_sink_uninstalled() {
2023        let (_temp_dir, config) = temp_env_config();
2024        // No listener registered (the default).
2025        let env = Environment::open(config).unwrap();
2026        {
2027            let env_impl = env.env_impl.lock();
2028            assert!(
2029                !env_impl.exception_dispatcher().is_installed(),
2030                "no listener means no sink installed; dispatch is a no-op"
2031            );
2032            // dispatch must be a safe no-op with no sink.
2033            env_impl.exception_dispatcher().dispatch("Cleaner", "ignored");
2034        }
2035        env.close().unwrap();
2036    }
2037
2038    // STARTUP_DUMP_THRESHOLD (startup_dump_threshold_ms)
2039    #[test]
2040    fn test_startup_dump_triggered_predicate() {
2041        // Disabled: threshold 0 never triggers regardless of elapsed.
2042        assert!(!Environment::startup_dump_triggered(1_000_000, 0));
2043        // Below threshold: no dump.
2044        assert!(!Environment::startup_dump_triggered(9, 10));
2045        // At threshold: dump (>=).
2046        assert!(Environment::startup_dump_triggered(10, 10));
2047        // Above threshold: dump.
2048        assert!(Environment::startup_dump_triggered(50, 10));
2049    }
2050
2051    #[test]
2052    fn test_open_with_startup_dump_threshold_smoke() {
2053        // A 1 ms threshold makes any real open exceed it, exercising the
2054        // dump path (stats snapshot + warn!).  We only assert open still
2055        // succeeds and the env is valid; the log line is a side effect.
2056        let temp_dir = TempDir::new().unwrap();
2057        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2058            .with_allow_create(true)
2059            .with_transactional(true)
2060            .with_startup_dump_threshold_ms(1);
2061        let env = Environment::open(config).unwrap();
2062        assert!(env.is_valid());
2063        env.close().unwrap();
2064    }
2065
2066    #[test]
2067    fn test_open_creates_directory() {
2068        let temp_dir = TempDir::new().unwrap();
2069        let home = temp_dir.path().join("subdir");
2070        let config =
2071            EnvironmentConfig::new(home.clone()).with_allow_create(true);
2072
2073        let env = Environment::open(config).unwrap();
2074        assert!(home.exists());
2075        assert!(home.is_dir());
2076        env.close().unwrap();
2077    }
2078
2079    #[test]
2080    fn test_open_fails_without_allow_create() {
2081        let temp_dir = TempDir::new().unwrap();
2082        let home = temp_dir.path().join("nonexistent");
2083        let config = EnvironmentConfig::new(home).with_allow_create(false);
2084
2085        let result = Environment::open(config);
2086        assert!(result.is_err());
2087    }
2088
2089    #[test]
2090    fn test_close_environment() {
2091        let (_temp_dir, config) = temp_env_config();
2092        let env = Environment::open(config).unwrap();
2093        assert!(env.is_valid());
2094        env.close().unwrap();
2095        assert!(!env.is_valid());
2096    }
2097
2098    #[test]
2099    fn test_close_twice_fails() {
2100        let (_temp_dir, config) = temp_env_config();
2101        let env = Environment::open(config).unwrap();
2102        env.close().unwrap();
2103        let result = env.close();
2104        assert!(result.is_err());
2105    }
2106
2107    #[test]
2108    fn test_close_with_open_database_fails() {
2109        let (_temp_dir, config) = temp_env_config();
2110        let env = Environment::open(config).unwrap();
2111
2112        let db_config = DatabaseConfig::new()
2113            .with_allow_create(true)
2114            .with_transactional(true);
2115        let _db = env.open_database(None, "testdb", &db_config).unwrap();
2116
2117        let result = env.close();
2118        assert!(result.is_err());
2119    }
2120
2121    #[test]
2122    fn test_open_database() {
2123        let (_temp_dir, config) = temp_env_config();
2124        let env = Environment::open(config).unwrap();
2125
2126        let db_config = DatabaseConfig::new()
2127            .with_allow_create(true)
2128            .with_transactional(true);
2129        let db = env.open_database(None, "testdb", &db_config).unwrap();
2130        assert_eq!(db.name(), "testdb");
2131        assert!(db.is_valid());
2132    }
2133
2134    #[test]
2135    fn test_open_database_twice_fails() {
2136        let (_temp_dir, config) = temp_env_config();
2137        let env = Environment::open(config).unwrap();
2138
2139        let db_config = DatabaseConfig::new()
2140            .with_allow_create(true)
2141            .with_transactional(true);
2142        let _db1 = env.open_database(None, "testdb", &db_config).unwrap();
2143        let result = env.open_database(None, "testdb", &db_config);
2144        assert!(result.is_err());
2145    }
2146
2147    #[test]
2148    fn test_open_database_without_create_fails() {
2149        let (_temp_dir, config) = temp_env_config();
2150        let env = Environment::open(config).unwrap();
2151
2152        let db_config = DatabaseConfig::new().with_allow_create(false);
2153        let result = env.open_database(None, "nonexistent", &db_config);
2154        assert!(result.is_err());
2155    }
2156
2157    #[test]
2158    fn test_open_database_empty_name_fails() {
2159        let (_temp_dir, config) = temp_env_config();
2160        let env = Environment::open(config).unwrap();
2161
2162        let db_config = DatabaseConfig::new()
2163            .with_allow_create(true)
2164            .with_transactional(true);
2165        let result = env.open_database(None, "", &db_config);
2166        assert!(result.is_err());
2167    }
2168
2169    #[test]
2170    fn test_remove_database() {
2171        let (_temp_dir, config) = temp_env_config();
2172        let env = Environment::open(config).unwrap();
2173
2174        let db_config = DatabaseConfig::new()
2175            .with_allow_create(true)
2176            .with_transactional(true);
2177        let db = env.open_database(None, "testdb", &db_config).unwrap();
2178        db.close().unwrap();
2179
2180        env.remove_database(None, "testdb").unwrap();
2181        let names = env.database_names().unwrap();
2182        assert!(!names.contains(&"testdb".to_string()));
2183    }
2184
2185    #[test]
2186    fn test_remove_open_database_fails() {
2187        let (_temp_dir, config) = temp_env_config();
2188        let env = Environment::open(config).unwrap();
2189
2190        let db_config = DatabaseConfig::new()
2191            .with_allow_create(true)
2192            .with_transactional(true);
2193        let _db = env.open_database(None, "testdb", &db_config).unwrap();
2194
2195        let result = env.remove_database(None, "testdb");
2196        assert!(result.is_err());
2197    }
2198
2199    #[test]
2200    fn test_remove_nonexistent_database_fails() {
2201        let (_temp_dir, config) = temp_env_config();
2202        let env = Environment::open(config).unwrap();
2203
2204        let result = env.remove_database(None, "nonexistent");
2205        assert!(result.is_err());
2206    }
2207
2208    #[test]
2209    fn test_rename_database() {
2210        let (_temp_dir, config) = temp_env_config();
2211        let env = Environment::open(config).unwrap();
2212
2213        let db_config = DatabaseConfig::new()
2214            .with_allow_create(true)
2215            .with_transactional(true);
2216        let db = env.open_database(None, "oldname", &db_config).unwrap();
2217        db.close().unwrap();
2218
2219        env.rename_database(None, "oldname", "newname").unwrap();
2220
2221        let names = env.database_names().unwrap();
2222        assert!(!names.contains(&"oldname".to_string()));
2223        assert!(names.contains(&"newname".to_string()));
2224    }
2225
2226    #[test]
2227    fn test_rename_to_same_name() {
2228        let (_temp_dir, config) = temp_env_config();
2229        let env = Environment::open(config).unwrap();
2230
2231        let db_config = DatabaseConfig::new()
2232            .with_allow_create(true)
2233            .with_transactional(true);
2234        let db = env.open_database(None, "testdb", &db_config).unwrap();
2235        db.close().unwrap();
2236
2237        env.rename_database(None, "testdb", "testdb").unwrap();
2238    }
2239
2240    #[test]
2241    fn test_rename_open_database_fails() {
2242        let (_temp_dir, config) = temp_env_config();
2243        let env = Environment::open(config).unwrap();
2244
2245        let db_config = DatabaseConfig::new()
2246            .with_allow_create(true)
2247            .with_transactional(true);
2248        let _db = env.open_database(None, "testdb", &db_config).unwrap();
2249
2250        let result = env.rename_database(None, "testdb", "newname");
2251        assert!(result.is_err());
2252    }
2253
2254    #[test]
2255    fn test_rename_nonexistent_database_fails() {
2256        let (_temp_dir, config) = temp_env_config();
2257        let env = Environment::open(config).unwrap();
2258
2259        let result = env.rename_database(None, "nonexistent", "newname");
2260        assert!(result.is_err());
2261    }
2262
2263    #[test]
2264    fn test_rename_to_existing_database_fails() {
2265        let (_temp_dir, config) = temp_env_config();
2266        let env = Environment::open(config).unwrap();
2267
2268        let db_config = DatabaseConfig::new()
2269            .with_allow_create(true)
2270            .with_transactional(true);
2271        let db1 = env.open_database(None, "db1", &db_config).unwrap();
2272        let db2 = env.open_database(None, "db2", &db_config).unwrap();
2273        db1.close().unwrap();
2274        db2.close().unwrap();
2275
2276        let result = env.rename_database(None, "db1", "db2");
2277        assert!(result.is_err());
2278    }
2279
2280    #[test]
2281    fn test_database_names() {
2282        let (_temp_dir, config) = temp_env_config();
2283        let env = Environment::open(config).unwrap();
2284
2285        let db_config = DatabaseConfig::new()
2286            .with_allow_create(true)
2287            .with_transactional(true);
2288        let _db1 = env.open_database(None, "db1", &db_config).unwrap();
2289        let _db2 = env.open_database(None, "db2", &db_config).unwrap();
2290
2291        let names = env.database_names().unwrap();
2292        assert_eq!(names.len(), 2);
2293        assert!(names.contains(&"db1".to_string()));
2294        assert!(names.contains(&"db2".to_string()));
2295    }
2296
2297    /// C-4 / JE 1-I: a database opened inside an explicit transaction that is
2298    /// subsequently aborted must NOT persist after env close + reopen.
2299    #[test]
2300    fn test_transactional_open_database_abort_removes_db() {
2301        let (temp_dir, config) = temp_env_config();
2302        {
2303            let env = Environment::open(config).unwrap();
2304            let txn = env.begin_transaction(None).unwrap();
2305            let db_config = DatabaseConfig::new()
2306                .with_allow_create(true)
2307                .with_transactional(true);
2308            let _db = env
2309                .open_database(Some(&txn), "aborted_db", &db_config)
2310                .unwrap();
2311            txn.abort().unwrap();
2312            // After abort the database must not appear in the committed list.
2313            let names = env.database_names().unwrap();
2314            assert!(
2315                !names.contains(&"aborted_db".to_string()),
2316                "aborted database must not appear in database_names() \
2317                 (C-4 committed-only semantics), got: {:?}",
2318                names
2319            );
2320            drop(env);
2321        }
2322        // Reopen: the aborted database must NOT have been written to the WAL.
2323        let env2 = Environment::open(
2324            EnvironmentConfig::new(temp_dir.path().to_path_buf())
2325                .with_allow_create(false)
2326                .with_transactional(true),
2327        )
2328        .unwrap();
2329        let names2 = env2.database_names().unwrap();
2330        assert!(
2331            !names2.contains(&"aborted_db".to_string()),
2332            "after env reopen, aborted database must not appear: {:?}",
2333            names2
2334        );
2335    }
2336
2337    /// C-4 / JE 1-I: `database_names()` must NOT return a database that
2338    /// was opened inside a concurrent uncommitted transaction.
2339    #[test]
2340    fn test_get_database_names_excludes_uncommitted() {
2341        let (_temp_dir, config) = temp_env_config();
2342        let env = Environment::open(config).unwrap();
2343
2344        let db_config = DatabaseConfig::new()
2345            .with_allow_create(true)
2346            .with_transactional(true);
2347        let txn = env.begin_transaction(None).unwrap();
2348        let _db =
2349            env.open_database(Some(&txn), "pending_db", &db_config).unwrap();
2350
2351        // While txn is still uncommitted, another observer must not see
2352        // the database in the committed-names list.
2353        let names = env.database_names().unwrap();
2354        assert!(
2355            !names.contains(&"pending_db".to_string()),
2356            "uncommitted database must be invisible to database_names() \
2357             (C-4 / JE 1-J): got {:?}",
2358            names
2359        );
2360
2361        // After commit the database must appear.
2362        txn.commit().unwrap();
2363        let names_after = env.database_names().unwrap();
2364        assert!(
2365            names_after.contains(&"pending_db".to_string()),
2366            "committed database must appear in database_names()"
2367        );
2368    }
2369
2370    #[test]
2371    fn test_begin_transaction() {
2372        let temp_dir = TempDir::new().unwrap();
2373        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2374            .with_allow_create(true)
2375            .with_transactional(false);
2376        let env = Environment::open(config).unwrap();
2377
2378        let result = env.begin_transaction(None);
2379        assert!(result.is_err());
2380    }
2381
2382    #[test]
2383    fn test_is_transactional() {
2384        let (_temp_dir, config) = temp_env_config();
2385        let env = Environment::open(config).unwrap();
2386        assert!(env.is_transactional());
2387    }
2388
2389    #[test]
2390    fn test_is_not_transactional() {
2391        let temp_dir = TempDir::new().unwrap();
2392        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2393            .with_allow_create(true)
2394            .with_transactional(false);
2395        let env = Environment::open(config).unwrap();
2396        assert!(!env.is_transactional());
2397    }
2398
2399    #[test]
2400    fn test_is_read_only() {
2401        let temp_dir = TempDir::new().unwrap();
2402        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2403            .with_allow_create(true)
2404            .with_read_only(true);
2405        let env = Environment::open(config).unwrap();
2406        assert!(env.is_read_only());
2407    }
2408
2409    #[test]
2410    fn test_operations_on_closed_environment_fail() {
2411        let (_temp_dir, config) = temp_env_config();
2412        let env = Environment::open(config).unwrap();
2413        env.close().unwrap();
2414
2415        let db_config = DatabaseConfig::new()
2416            .with_allow_create(true)
2417            .with_transactional(true);
2418        assert!(env.open_database(None, "test", &db_config).is_err());
2419        assert!(env.remove_database(None, "test").is_err());
2420        assert!(env.rename_database(None, "a", "b").is_err());
2421        assert!(env.begin_transaction(None).is_err());
2422        assert!(env.database_names().is_err());
2423    }
2424
2425    // ========================================================================
2426    // Additional branch-coverage tests
2427    // ========================================================================
2428
2429    /// open() with a path that points to a file (not a directory) fails.
2430    #[test]
2431    fn test_open_fails_if_home_is_a_file() {
2432        use std::io::Write;
2433        let temp_dir = TempDir::new().unwrap();
2434        let file_path = temp_dir.path().join("not_a_dir.txt");
2435        let mut f = std::fs::File::create(&file_path).unwrap();
2436        writeln!(f, "data").unwrap();
2437        drop(f);
2438
2439        let config = EnvironmentConfig::new(file_path).with_allow_create(false);
2440        // The path exists but is not a directory — must fail.
2441        let result = Environment::open(config);
2442        assert!(result.is_err());
2443    }
2444
2445    /// open_database() with node_max_entries > 0 hits the set_node_max_entries branch.
2446    #[test]
2447    fn test_open_database_with_node_max_entries() {
2448        let (_temp_dir, config) = temp_env_config();
2449        let env = Environment::open(config).unwrap();
2450
2451        let mut db_config = DatabaseConfig::new()
2452            .with_allow_create(true)
2453            .with_transactional(true);
2454        db_config.set_node_max_entries(64);
2455        let db = env.open_database(None, "testdb_entries", &db_config).unwrap();
2456        assert!(db.is_valid());
2457    }
2458
2459    /// begin_transaction() with an explicit TransactionConfig.
2460    #[test]
2461    fn test_begin_transaction_with_explicit_config() {
2462        use crate::transaction_config::TransactionConfig;
2463        let (_temp_dir, config) = temp_env_config();
2464        let env = Environment::open(config).unwrap();
2465
2466        let txn_config = TransactionConfig::new();
2467        let txn = env.begin_transaction(Some(&txn_config)).unwrap();
2468        assert!(txn.is_valid());
2469    }
2470
2471    /// rename_database() when the old name is not in the databases map
2472    /// (handle was never registered) still succeeds at the env_impl level and
2473    /// the missing-handle branch (`if let Some(...)` => false) is taken.
2474    #[test]
2475    fn test_rename_database_handle_not_in_map() {
2476        let (_temp_dir, config) = temp_env_config();
2477        let env = Environment::open(config).unwrap();
2478
2479        // Create the DB using env_impl directly (bypassing Environment::open_database
2480        // so the handle is NOT in the databases map), then immediately close it
2481        // so that reference_count returns to 0 (no open user handles).
2482        {
2483            let env_impl = env.env_impl.lock();
2484            let mut dbi_config = noxu_dbi::DatabaseConfig::new();
2485            dbi_config.set_allow_create(true);
2486            let db_arc =
2487                env_impl.open_database("ghost_db", &dbi_config).unwrap();
2488            let db_id = db_arc.read().get_id();
2489            env_impl.close_database(db_id).unwrap();
2490        }
2491
2492        // rename_database should succeed and hit the `if let Some(handle)` false branch.
2493        env.rename_database(None, "ghost_db", "ghost_db_renamed").unwrap();
2494
2495        let names = env.database_names().unwrap();
2496        assert!(names.contains(&"ghost_db_renamed".to_string()));
2497        assert!(!names.contains(&"ghost_db".to_string()));
2498    }
2499
2500    /// close() with active transactions returns an error.
2501    #[test]
2502    fn test_close_with_active_transactions_fails() {
2503        let (_temp_dir, config) = temp_env_config();
2504        let env = Environment::open(config).unwrap();
2505
2506        let _txn = env.begin_transaction(None).unwrap();
2507
2508        let result = env.close();
2509        assert!(result.is_err());
2510    }
2511
2512    /// config() and home() return the correct values.
2513    #[test]
2514    fn test_get_config_and_home() {
2515        let (temp_dir, config) = temp_env_config();
2516        let env = Environment::open(config).unwrap();
2517
2518        assert!(env.config().allow_create);
2519        assert_eq!(env.home(), temp_dir.path());
2520        env.close().unwrap();
2521    }
2522
2523    /// mark_database_closed() when the database is in the map.
2524    #[test]
2525    fn test_mark_database_closed_known_name() {
2526        let (_temp_dir, config) = temp_env_config();
2527        let env = Environment::open(config).unwrap();
2528
2529        let db_config = DatabaseConfig::new()
2530            .with_allow_create(true)
2531            .with_transactional(true);
2532        let db = env.open_database(None, "mydb", &db_config).unwrap();
2533        // db is open — mark it closed via the internal API.
2534        env.mark_database_closed("mydb");
2535        // The database handle is now marked closed in the map; close() should succeed.
2536        let _ = db.is_valid(); // just use the variable
2537        env.close().unwrap();
2538    }
2539
2540    /// mark_database_closed() for an unknown name is a no-op.
2541    #[test]
2542    fn test_mark_database_closed_unknown_name_is_noop() {
2543        let (_temp_dir, config) = temp_env_config();
2544        let env = Environment::open(config).unwrap();
2545        // No database named "ghost" — should not panic.
2546        env.mark_database_closed("ghost");
2547        env.close().unwrap();
2548    }
2549
2550    /// mark_transaction_complete() removes the transaction from the active set.
2551    #[test]
2552    fn test_mark_transaction_complete_allows_env_close() {
2553        let (_temp_dir, config) = temp_env_config();
2554        let env = Environment::open(config).unwrap();
2555
2556        let txn = env.begin_transaction(None).unwrap();
2557        let txn_id = txn.id();
2558
2559        // Without removing the txn, close would fail.
2560        // Remove it via the internal API.
2561        env.mark_transaction_complete(txn_id);
2562
2563        // Now close should succeed.
2564        env.close().unwrap();
2565    }
2566
2567    // ── verify ─────────────────────────────────────────────────────────────
2568
2569    #[test]
2570    fn test_verify_empty_environment_passes() {
2571        use crate::VerifyConfig;
2572        let (_tmp, config) = temp_env_config();
2573        let env = Environment::open(config).unwrap();
2574        let verify_cfg = VerifyConfig::default();
2575        let result = env.verify(&verify_cfg).unwrap();
2576        assert!(result.passed, "empty env should pass: {:?}", result.errors);
2577    }
2578
2579    #[test]
2580    fn test_verify_environment_with_data_passes() {
2581        use crate::{DatabaseConfig, DatabaseEntry, VerifyConfig};
2582        let (_tmp, config) = temp_env_config();
2583        let env = Environment::open(config).unwrap();
2584
2585        let mut db_config = DatabaseConfig::new();
2586        db_config.set_allow_create(true);
2587        let db = env.open_database(None, "vtest", &db_config).unwrap();
2588        for i in 0u32..10 {
2589            let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
2590            let v = DatabaseEntry::from_bytes(&(i * 3).to_be_bytes());
2591            db.put(&k, &v).unwrap();
2592        }
2593
2594        let verify_cfg = VerifyConfig::default();
2595        let result = env.verify(&verify_cfg).unwrap();
2596        assert!(
2597            result.passed,
2598            "env with data should pass: {:?}",
2599            result.errors
2600        );
2601        assert!(result.records_verified >= 10);
2602        db.close().unwrap();
2603        env.close().unwrap();
2604    }
2605
2606    #[test]
2607    fn test_verify_closed_environment_fails() {
2608        use crate::VerifyConfig;
2609        let (_tmp, config) = temp_env_config();
2610        let env = Environment::open(config).unwrap();
2611        env.close().unwrap();
2612        let verify_cfg = VerifyConfig::default();
2613        assert!(env.verify(&verify_cfg).is_err());
2614    }
2615
2616    // ── checkpoint ──────────────────────────────────────────────────────────
2617
2618    #[test]
2619    fn test_checkpoint_default_succeeds() {
2620        let (_tmp, config) = temp_env_config();
2621        let env = Environment::open(config).unwrap();
2622        // Transactional env has a checkpointer; call with no config.
2623        env.checkpoint(None).unwrap();
2624        env.close().unwrap();
2625    }
2626
2627    #[test]
2628    fn test_checkpoint_with_config_succeeds() {
2629        let (_tmp, config) = temp_env_config();
2630        let env = Environment::open(config).unwrap();
2631        let ckpt_cfg = CheckpointConfig {
2632            force: true,
2633            k_bytes: 0,
2634            minutes: 0,
2635            minimize_recovery_time: false,
2636        };
2637        env.checkpoint(Some(&ckpt_cfg)).unwrap();
2638        env.close().unwrap();
2639    }
2640
2641    #[test]
2642    fn test_checkpoint_closed_env_fails() {
2643        let (_tmp, config) = temp_env_config();
2644        let env = Environment::open(config).unwrap();
2645        env.close().unwrap();
2646        assert!(env.checkpoint(None).is_err());
2647    }
2648
2649    // ── get_mutable_config / set_mutable_config ──────────────────────────────
2650
2651    #[test]
2652    fn test_get_mutable_config_returns_current_values() {
2653        let (_tmp, config) = temp_env_config();
2654        let env = Environment::open(config).unwrap();
2655        let mc = env.mutable_config().unwrap();
2656        // cache_size should be Some() with the default value.
2657        assert!(mc.cache_size.is_some());
2658        assert!(mc.run_cleaner.is_some());
2659        assert!(mc.run_checkpointer.is_some());
2660        assert!(mc.run_evictor.is_some());
2661        env.close().unwrap();
2662    }
2663
2664    #[test]
2665    fn test_get_mutable_config_closed_env_fails() {
2666        let (_tmp, config) = temp_env_config();
2667        let env = Environment::open(config).unwrap();
2668        env.close().unwrap();
2669        assert!(env.mutable_config().is_err());
2670    }
2671
2672    #[test]
2673    fn test_set_mutable_config_updates_cache_size() {
2674        let (_tmp, config) = temp_env_config();
2675        let mut env = Environment::open(config).unwrap();
2676        let new_size: usize = 128 * 1024 * 1024; // 128 MiB
2677        let mc = EnvironmentMutableConfig::new().with_cache_size(new_size);
2678        env.set_mutable_config(mc).unwrap();
2679        let updated = env.mutable_config().unwrap();
2680        assert_eq!(updated.cache_size.unwrap(), new_size);
2681        env.close().unwrap();
2682    }
2683
2684    #[test]
2685    fn test_set_mutable_config_updates_timeouts() {
2686        let (_tmp, config) = temp_env_config();
2687        let mut env = Environment::open(config).unwrap();
2688        let mc = EnvironmentMutableConfig {
2689            lock_timeout_ms: Some(5_000),
2690            txn_timeout_ms: Some(10_000),
2691            ..EnvironmentMutableConfig::default()
2692        };
2693        env.set_mutable_config(mc).unwrap();
2694        // After setting, values should be reflected (lock_timeout_ms is advisory at
2695        // the config layer; verify via get_mutable_config).
2696        let updated = env.mutable_config().unwrap();
2697        assert_eq!(updated.lock_timeout_ms, Some(5_000));
2698        assert_eq!(updated.txn_timeout_ms, Some(10_000));
2699        env.close().unwrap();
2700    }
2701
2702    #[test]
2703    fn test_set_mutable_config_none_timeout_unchanged() {
2704        let (_tmp, config) = temp_env_config();
2705        let mut env = Environment::open(config).unwrap();
2706        let original = env.mutable_config().unwrap();
2707        // None means "unchanged".  See Wave 1C audit cleanup
2708        // (Transaction-Env F19/F20): the previous implementation used
2709        // 0 as the sentinel which prevented users from clearing a
2710        // timeout.
2711        let mc = EnvironmentMutableConfig {
2712            lock_timeout_ms: None,
2713            txn_timeout_ms: None,
2714            ..EnvironmentMutableConfig::default()
2715        };
2716        env.set_mutable_config(mc).unwrap();
2717        let updated = env.mutable_config().unwrap();
2718        assert_eq!(updated.lock_timeout_ms, original.lock_timeout_ms);
2719        assert_eq!(updated.txn_timeout_ms, original.txn_timeout_ms);
2720        env.close().unwrap();
2721    }
2722
2723    #[test]
2724    fn test_set_mutable_config_closed_env_fails() {
2725        let (_tmp, config) = temp_env_config();
2726        let mut env = Environment::open(config).unwrap();
2727        env.close().unwrap();
2728        let mc = EnvironmentMutableConfig::new();
2729        assert!(env.set_mutable_config(mc).is_err());
2730    }
2731
2732    // ========================================================================
2733    // Audit transaction-env F4 / F5 / F6 / F7 / F10 — Wave 2C-4
2734    // ========================================================================
2735
2736    /// F5 — read-only env rejects database creation.
2737    #[test]
2738    fn test_read_only_env_rejects_create_database() {
2739        // First create the env writably so the directory exists.
2740        let temp_dir = TempDir::new().unwrap();
2741        {
2742            let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2743                .with_allow_create(true)
2744                .with_transactional(true);
2745            let _env = Environment::open(config).unwrap();
2746        }
2747        // Re-open read-only.
2748        let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2749            .with_read_only(true)
2750            .with_transactional(true);
2751        let env = Environment::open(ro_config).unwrap();
2752
2753        let db_cfg = DatabaseConfig::new()
2754            .with_allow_create(true)
2755            .with_transactional(true);
2756        let result = env.open_database(None, "new", &db_cfg);
2757        assert!(
2758            result.is_err(),
2759            "open_database with allow_create on read-only env must fail",
2760        );
2761    }
2762
2763    /// F5 — read-only env rejects remove_database.
2764    #[test]
2765    fn test_read_only_env_rejects_remove_database() {
2766        let temp_dir = TempDir::new().unwrap();
2767        {
2768            let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2769                .with_allow_create(true)
2770                .with_transactional(true);
2771            let _env = Environment::open(config).unwrap();
2772        }
2773        let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2774            .with_read_only(true)
2775            .with_transactional(true);
2776        let env = Environment::open(ro_config).unwrap();
2777
2778        assert!(env.remove_database(None, "test").is_err());
2779        assert!(env.truncate_database(None, "test").is_err());
2780        assert!(env.rename_database(None, "a", "b").is_err());
2781    }
2782
2783    /// F5 — read-only env rejects writable transactions.
2784    #[test]
2785    fn test_read_only_env_rejects_writable_txn() {
2786        let temp_dir = TempDir::new().unwrap();
2787        {
2788            let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2789                .with_allow_create(true)
2790                .with_transactional(true);
2791            let _env = Environment::open(config).unwrap();
2792        }
2793        let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2794            .with_read_only(true)
2795            .with_transactional(true);
2796        let env = Environment::open(ro_config).unwrap();
2797
2798        // Default txn config is writable — must be rejected.
2799        let result = env.begin_transaction(None);
2800        assert!(result.is_err(), "writable txn on read-only env must fail");
2801
2802        // Read-only txn must be allowed.
2803        let ro_txn_cfg = TransactionConfig::default().with_read_only(true);
2804        let _txn = env
2805            .begin_transaction(Some(&ro_txn_cfg))
2806            .expect("read-only txn on read-only env must succeed");
2807    }
2808
2809    /// F6 — checkpoint() with `force=false` and a fresh `minutes`
2810    /// threshold skips the checkpoint when it has just run.
2811    #[test]
2812    fn test_checkpoint_minutes_threshold_skips() {
2813        let (_tmp, config) = temp_env_config();
2814        let env = Environment::open(config).unwrap();
2815
2816        // First call: runs (no prior checkpoint).
2817        env.checkpoint(None).unwrap();
2818
2819        // Second call with minutes=60 and force=false: should skip.
2820        let cfg = CheckpointConfig::default().with_minutes(60);
2821        env.checkpoint(Some(&cfg)).unwrap();
2822        // No assertion-able effect we can read here, but the call must
2823        // succeed and not error.
2824
2825        // Third call with force=true must run regardless.
2826        let cfg = CheckpointConfig::default().with_force(true).with_minutes(60);
2827        env.checkpoint(Some(&cfg)).unwrap();
2828        env.close().unwrap();
2829    }
2830
2831    /// F7 — set_mutable_config(cache_size) pushes through to the
2832    /// evictor's Arbiter.
2833    #[test]
2834    fn test_set_mutable_config_pushes_cache_size_to_evictor() {
2835        let (_tmp, config) = temp_env_config();
2836        let mut env = Environment::open(config).unwrap();
2837
2838        let mc = EnvironmentMutableConfig {
2839            cache_size: Some(64 * 1024 * 1024),
2840            ..EnvironmentMutableConfig::default()
2841        };
2842        env.set_mutable_config(mc).unwrap();
2843
2844        let env_impl = env.env_impl.lock();
2845        let evictor = env_impl.get_evictor();
2846        assert_eq!(
2847            evictor.get_arbiter().get_max_memory(),
2848            64 * 1024 * 1024,
2849            "set_mutable_config(cache_size) must push to Arbiter",
2850        );
2851    }
2852
2853    /// F4 — env-level `txn_no_sync = true` makes explicit-txn commits
2854    /// inherit COMMIT_NO_SYNC when the caller does not specify a
2855    /// TransactionConfig.
2856    #[test]
2857    #[allow(deprecated)] // tests the deprecated txn_no_sync flag
2858    fn test_env_txn_no_sync_applies_to_explicit_txn() {
2859        let temp_dir = TempDir::new().unwrap();
2860        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2861            .with_allow_create(true)
2862            .with_transactional(true)
2863            .with_txn_no_sync(true);
2864        let env = Environment::open(config).unwrap();
2865
2866        let txn = env.begin_transaction(None).unwrap();
2867        // The transaction must have inherited COMMIT_NO_SYNC.
2868        let dur = txn.durability().expect("durability must be set");
2869        assert_eq!(
2870            dur,
2871            crate::durability::Durability::COMMIT_NO_SYNC,
2872            "env txn_no_sync=true must propagate to explicit-txn durability",
2873        );
2874        txn.commit().unwrap();
2875        env.close().unwrap();
2876    }
2877
2878    /// F10 — dropping an open transaction performs an actual abort,
2879    /// releasing locks instead of leaking them.
2880    #[test]
2881    fn test_drop_aborts_open_transaction() {
2882        let (_tmp, config) = temp_env_config();
2883        let env = Environment::open(config).unwrap();
2884
2885        let initial_active = env.active_txns.len();
2886        {
2887            let _txn = env.begin_transaction(None).unwrap();
2888            assert_eq!(env.active_txns.len(), initial_active + 1);
2889            // Drop _txn at scope exit without commit/abort.
2890        }
2891        // After drop, the active-txns registry must have pruned the entry.
2892        assert_eq!(
2893            env.active_txns.len(),
2894            initial_active,
2895            "Transaction::Drop must abort and prune from active_txns",
2896        );
2897        // close() must succeed because no txns remain registered.
2898        env.close().unwrap();
2899    }
2900
2901    // ── X-3: recovered XA commit assigns real VLSN ─────────────────────
2902
2903    /// X-3: after calling `write_txn_commit_for_recovered` on an environment
2904    /// wired with a mock replica coordinator, the coordinator's
2905    /// `alloc_vlsn_for_recovered_commit` must be called with a non-NULL LSN.
2906    #[test]
2907    fn test_x3_recovered_commit_calls_alloc_vlsn() {
2908        use noxu_dbi::{
2909            AckWaitError, ReplicaAckCoordinator, ReplicaAckPolicyKind,
2910        };
2911        use noxu_util::Lsn;
2912        use std::sync::Arc;
2913        use std::sync::atomic::{AtomicU64, Ordering as AO};
2914        use std::time::Duration;
2915
2916        // Mock coordinator that records the LSN passed to alloc_vlsn_for_recovered_commit.
2917        struct MockCoord {
2918            last_lsn: AtomicU64,
2919        }
2920        impl ReplicaAckCoordinator for MockCoord {
2921            fn await_replica_acks(
2922                &self,
2923                _policy: ReplicaAckPolicyKind,
2924                _timeout: Duration,
2925            ) -> std::result::Result<u32, AckWaitError> {
2926                Ok(0)
2927            }
2928            fn alloc_vlsn_for_recovered_commit(&self, lsn: Lsn) -> u64 {
2929                self.last_lsn.store(lsn.as_u64(), AO::SeqCst);
2930                // Return the file_number as a fake VLSN (non-zero = success).
2931                lsn.file_number() as u64 + 1
2932            }
2933        }
2934
2935        let (tmp, config) = temp_env_config();
2936        let env = Environment::open(config).unwrap();
2937
2938        // Wire the mock coordinator.
2939        let coord = Arc::new(MockCoord { last_lsn: AtomicU64::new(0) });
2940        env.set_replica_coordinator(coord.clone());
2941
2942        // Write a fake txn_id=42 commit (simulates recovered XA).
2943        env.write_txn_commit_for_recovered(42).unwrap();
2944
2945        // The coordinator must have been called with a non-zero LSN.
2946        let recorded_lsn = coord.last_lsn.load(AO::SeqCst);
2947        assert_ne!(
2948            recorded_lsn, 0,
2949            "X-3: alloc_vlsn_for_recovered_commit must be called with the commit LSN"
2950        );
2951
2952        env.close().unwrap();
2953        drop(tmp);
2954    }
2955
2956    /// CLN-2 / VerifyUtils.checkLsns wired into Environment::verify.
2957    ///
2958    /// A healthy env (live tree LSNs disjoint from the obsolete set) passes
2959    /// verify; after seeding one live LSN into the obsolete tracker the same
2960    /// verify FAILS with the "Obsolete LSN set contains valid LSN" error.
2961    #[test]
2962    fn test_verify_checklsns_detects_live_in_obsolete() {
2963        use crate::database_entry::DatabaseEntry;
2964        let (_tmp, config) = temp_env_config();
2965        let env = Environment::open(config).unwrap();
2966
2967        let db_config = DatabaseConfig::new()
2968            .with_allow_create(true)
2969            .with_transactional(true);
2970        let db = env.open_database(None, "cln2db", &db_config).unwrap();
2971        db.put(
2972            DatabaseEntry::from_bytes(b"alpha"),
2973            DatabaseEntry::from_bytes(b"1"),
2974        )
2975        .unwrap();
2976        db.put(
2977            DatabaseEntry::from_bytes(b"beta"),
2978            DatabaseEntry::from_bytes(b"2"),
2979        )
2980        .unwrap();
2981        db.put(
2982            DatabaseEntry::from_bytes(b"gamma"),
2983            DatabaseEntry::from_bytes(b"3"),
2984        )
2985        .unwrap();
2986
2987        let cfg = noxu_engine::VerifyConfig::default();
2988
2989        // Healthy: live LSNs disjoint from obsolete -> verify passes.
2990        let healthy = env.verify(&cfg).unwrap();
2991        assert!(
2992            healthy.is_passed(),
2993            "healthy env must pass checkLsns: {:?}",
2994            healthy.errors
2995        );
2996
2997        // Grab a real live LSN from the db's tree.
2998        let live_lsn = {
2999            let guard = db.db_impl.read();
3000            let tree = guard
3001                .get_real_tree()
3002                .expect("invariant: populated db has a real tree");
3003            noxu_engine::gather_tree_lsns(&tree)
3004                .into_iter()
3005                .next()
3006                .expect("invariant: at least one live LSN")
3007        };
3008
3009        // Seed the bug: record that exact live LSN as obsolete in the live
3010        // tracker (mislabels live data obsolete -> the cleaner would delete
3011        // live data).
3012        {
3013            let env_impl = env.env_impl.lock();
3014            let tracker = env_impl
3015                .get_utilization_tracker()
3016                .expect("invariant: rw env has a tracker");
3017            tracker.lock().count_obsolete_node(
3018                live_lsn.file_number(),
3019                live_lsn.file_offset(),
3020                10,
3021                true,
3022                None,
3023            );
3024        }
3025
3026        // Now verify must DETECT the overlap.
3027        let bad = env.verify(&cfg).unwrap();
3028        assert!(
3029            !bad.is_passed(),
3030            "verify must detect live LSN in obsolete set"
3031        );
3032        assert!(
3033            bad.errors.iter().any(|e| matches!(
3034                e,
3035                noxu_engine::VerifyError::DataInconsistency { description }
3036                    if description.contains("Obsolete LSN set contains valid LSN")
3037            )),
3038            "expected checkLsns error, got: {:?}",
3039            bad.errors
3040        );
3041
3042        drop(db);
3043        env.close().unwrap();
3044    }
3045
3046    /// DBI-10 / JE EnvConfigObserver + MemoryBudget.envConfigUpdate: a runtime
3047    /// cache-size change must reach the live evictor Arbiter's max-memory.
3048    #[test]
3049    fn test_set_mutable_config_pushes_cache_size_to_arbiter() {
3050        let (_tmp, config) = temp_env_config();
3051        let mut env = Environment::open(config).unwrap();
3052
3053        let new_cache = 256 * 1024 * 1024_usize; // 256 MiB
3054        let mc = EnvironmentMutableConfig::new().with_cache_size(new_cache);
3055        env.set_mutable_config(mc).unwrap();
3056
3057        let arbiter_max = env.env_impl.lock().get_arbiter_max_memory();
3058        assert_eq!(
3059            arbiter_max, new_cache as i64,
3060            "Arbiter max-memory must reflect the new cache size"
3061        );
3062        env.close().unwrap();
3063    }
3064
3065    /// DBI-10 / JE EnvConfigObserver: a runtime cleaner minUtilization change
3066    /// must reach the running cleaner (noxu.cleaner.minUtilization is
3067    /// mutable). FAILS on main (no propagation); PASSES after the push.
3068    #[test]
3069    fn test_set_mutable_config_pushes_cleaner_min_utilization() {
3070        let (_tmp, config) = temp_env_config();
3071        let mut env = Environment::open(config).unwrap();
3072
3073        let cleaner = env
3074            .env_impl
3075            .lock()
3076            .get_cleaner()
3077            .expect("invariant: transactional env has a cleaner");
3078        let before = cleaner.get_min_utilization();
3079
3080        let new_pct = if before == 70 { 40 } else { 70 };
3081        let mc = EnvironmentMutableConfig::new()
3082            .with_cleaner_min_utilization(new_pct);
3083        env.set_mutable_config(mc).unwrap();
3084
3085        assert_eq!(
3086            cleaner.get_min_utilization(),
3087            new_pct,
3088            "running cleaner must reflect the new minUtilization"
3089        );
3090        env.close().unwrap();
3091    }
3092}