Skip to main content

noxu_db/
environment.rs

1//! Environment handle.
2//!
3
4use crate::checkpoint_config::CheckpointConfig;
5use crate::database::Database;
6use crate::database_config::DatabaseConfig;
7use crate::environment_config::EnvironmentConfig;
8use crate::environment_mutable_config::EnvironmentMutableConfig;
9use crate::error::{NoxuError, Result};
10use crate::transaction::Transaction;
11use crate::transaction_config::TransactionConfig;
12use hashbrown::HashMap;
13use noxu_dbi::{DbiEnvConfig, EnvironmentImpl};
14use noxu_engine::EnvironmentStats;
15use noxu_engine::env_stats::{
16    EvictorStatsSnapshot, LockStatsSnapshot, LogStatsSnapshot, TxnStatsSnapshot,
17};
18use noxu_log::LogManager;
19use noxu_sync::Mutex;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
23use std::time::Instant;
24
25/// A database environment.
26///
27///
28///
29/// An Environment provides support for caching, locking, logging, and
30/// transactions. It is the top-level handle through which databases are
31/// opened and transactions are started.
32///
33/// # Example
34/// ```ignore
35/// use noxu_db::{Environment, EnvironmentConfig};
36/// use std::path::PathBuf;
37///
38/// let config = EnvironmentConfig::new(PathBuf::from("/tmp/mydb"))
39///     .allow_create(true)
40///     .transactional(true);
41/// let env = Environment::open(config).unwrap();
42/// env.close().unwrap();
43/// ```
44pub struct Environment {
45    /// Home directory path
46    home: PathBuf,
47    /// Configuration used to open this environment
48    config: EnvironmentConfig,
49    /// Open databases by name (tracks which names are currently open via this handle)
50    databases: Mutex<HashMap<String, Arc<DatabaseHandle>>>,
51    /// Active transactions registry, shared with `Transaction` so that
52    /// `Transaction::commit()` / `Transaction::abort()` can prune their
53    /// own entry on completion (F1: `mark_transaction_complete` was dead
54    /// code, so `env.close()` after `txn.commit()` always failed).
55    active_txns: Arc<ActiveTxns>,
56    /// Next transaction ID
57    next_txn_id: AtomicU64,
58    /// Whether the environment is open
59    open: AtomicBool,
60    /// Whether the environment is valid (not invalidated by a fatal error).
61    ///
62    /// Mirrors `EnvironmentImpl.isValid()` / `envInvalid` AtomicBoolean.
63    /// Set to `false` when an `EnvironmentFailure` with `invalidates_environment() == true`
64    /// is returned; all subsequent API calls check this and return `EnvironmentFailure`.
65    env_valid: AtomicBool,
66    /// The real internal environment implementation (B-tree backed).
67    env_impl: Arc<Mutex<EnvironmentImpl>>,
68    /// Cached log manager — acquired once at open; None for non-transactional envs.
69    /// Used by stat_fsync_count() to avoid env_impl.lock() on the stats hot path.
70    log_manager: Option<Arc<LogManager>>,
71    /// Bookkeeping for `Environment::checkpoint(CheckpointConfig)` so that
72    /// `force` / `k_bytes` / `minutes` can gate whether the call actually
73    /// runs a checkpoint.
74    last_checkpoint_time: Mutex<Option<Instant>>,
75    last_checkpoint_end_lsn: Mutex<noxu_util::Lsn>,
76    /// Optional replica-ack coordinator (typically a
77    /// `noxu_rep::ReplicatedEnvironment`).  When set via
78    /// [`Environment::set_replica_coordinator`], every new
79    /// `Transaction` is wired to the coordinator and its
80    /// `commit_with_durability` blocks until the configured
81    /// `ReplicaAckPolicy` is satisfied (or the configured timeout
82    /// elapses, in which case `NoxuError::InsufficientReplicas` is
83    /// returned).  Closes finding F1 of
84    /// `docs/src/internal/api-audit-2026-05-rep.md`.
85    replica_coordinator: Mutex<Option<noxu_dbi::SharedReplicaAckCoordinator>>,
86    /// Per-commit timeout for replica acknowledgments.  Mirrors
87    /// `noxu_rep::RepConfig::replica_ack_timeout`; defaults to 5s.
88    replica_ack_timeout: Mutex<std::time::Duration>,
89}
90
91/// Internal database handle state.
92struct DatabaseHandle {
93    name: String,
94    #[expect(dead_code)]
95    id: u64,
96    #[expect(dead_code)]
97    config: DatabaseConfig,
98    /// Shared open flag — same `Arc<AtomicBool>` as `Database.open` so that
99    /// `Database::close()` setting the flag to false also marks this handle
100    /// as closed, letting `Environment::close()` succeed.
101    open: Arc<AtomicBool>,
102}
103
104/// Internal transaction state.
105struct TransactionState {
106    #[expect(dead_code)]
107    id: u64,
108    #[expect(dead_code)]
109    config: TransactionConfig,
110    #[expect(dead_code)]
111    committed: AtomicBool,
112    #[expect(dead_code)]
113    aborted: AtomicBool,
114}
115
116/// Shared registry of active transactions, owned by `Environment` and
117/// referenced (via `Arc`) by every `Transaction` so that `commit()` /
118/// `abort()` can prune their own entry without a callback into
119/// `Environment` itself.
120///
121/// Resolves F1 of the May 2026 API audit: `Environment::active_txns` was
122/// previously a private `Mutex<HashMap>` that no `Transaction` could see,
123/// so `mark_transaction_complete` was dead code and `env.close()` after a
124/// commit always returned `OperationNotAllowed`.
125pub(crate) struct ActiveTxns {
126    txns: Mutex<HashMap<u64, Arc<TransactionState>>>,
127}
128
129impl ActiveTxns {
130    fn new() -> Self {
131        Self { txns: Mutex::new(HashMap::new()) }
132    }
133
134    fn insert(&self, id: u64, state: Arc<TransactionState>) {
135        self.txns.lock().insert(id, state);
136    }
137
138    /// Removes the entry for the given transaction id.
139    ///
140    /// Called by `Transaction::commit_with_durability` and `Transaction::abort`
141    /// once the transaction has reached a terminal state.
142    pub(crate) fn mark_complete(&self, id: u64) {
143        self.txns.lock().remove(&id);
144    }
145
146    fn len(&self) -> usize {
147        self.txns.lock().len()
148    }
149
150    fn is_empty(&self) -> bool {
151        self.txns.lock().is_empty()
152    }
153}
154
155impl Environment {
156    /// Opens or creates a database environment.
157    ///
158    /// Constructor.
159    ///
160    /// # Arguments
161    /// * `config` - The environment configuration
162    ///
163    /// # Returns
164    /// The opened environment handle
165    ///
166    /// # Errors
167    /// Returns an error if:
168    /// - The environment directory does not exist and `allow_create` is false
169    /// - The environment directory exists but is not writable and `read_only` is false
170    /// - Invalid configuration parameters are provided
171    pub fn open(config: EnvironmentConfig) -> Result<Self> {
172        let home = config.home.clone();
173
174        // Validate home directory
175        if !home.exists() {
176            if config.allow_create {
177                std::fs::create_dir_all(&home).map_err(|e| {
178                    NoxuError::environment(format!(
179                        "Failed to create environment directory {:?}: {}",
180                        home, e
181                    ))
182                })?;
183            } else {
184                return Err(NoxuError::environment(format!(
185                    "Environment directory {:?} does not exist and allow_create is false",
186                    home
187                )));
188            }
189        }
190
191        if !home.is_dir() {
192            return Err(NoxuError::environment(format!(
193                "Environment home {:?} is not a directory",
194                home
195            )));
196        }
197
198        // Check write permissions if not read-only
199        if !config.read_only {
200            // Test write access by creating a temp file
201            let test_file = home.join(".noxu_write_test");
202            std::fs::write(&test_file, b"test").map_err(|e| {
203                NoxuError::environment(format!(
204                    "Environment directory {:?} is not writable: {}",
205                    home, e
206                ))
207            })?;
208            let _ = std::fs::remove_file(&test_file);
209        }
210
211        // Warn about any unimplemented config parameters that have been set
212        // to non-default values (re-audit JE F-1: no silently-ignored params).
213        crate::unimplemented_params::warn_unimplemented_params(&config);
214
215        // Translate EnvironmentConfig into DbiEnvConfig (the noxu-dbi struct)
216        // to avoid a circular dependency between the two crates.
217        let buf_size = if config.log_buffer_size > 0 {
218            config.log_buffer_size
219        } else {
220            (config.log_total_buffer_bytes as usize)
221                .checked_div(config.log_num_buffers)
222                .unwrap_or(1024 * 1024)
223        };
224        let dbi_cfg = DbiEnvConfig {
225            // Core
226            read_only: config.read_only,
227            transactional: config.transactional,
228            env_is_locking: config.env_is_locking,
229            env_recovery_force_checkpoint: config.env_recovery_force_checkpoint,
230            env_recovery_force_checkpoint_field: config
231                .env_recovery_force_checkpoint,
232            env_recovery_force_new_file: config.env_recovery_force_new_file,
233            halt_on_commit_after_checksum_exception: config
234                .halt_on_commit_after_checksum_exception,
235            env_check_leaks: config.env_check_leaks,
236            env_forced_yield: config.env_forced_yield,
237            env_fair_latches: config.env_fair_latches,
238            env_latch_timeout_ms: config.env_latch_timeout_ms,
239            env_ttl_clock_tolerance_ms: config.env_ttl_clock_tolerance_ms,
240            env_expiration_enabled: config.env_expiration_enabled,
241            env_db_eviction: config.env_db_eviction,
242            // Memory
243            cache_size: config.cache_size,
244            cache_percent: config.cache_percent,
245            max_off_heap_memory: config.max_off_heap_memory,
246            max_disk: config.max_disk,
247            free_disk: config.free_disk,
248            // Log
249            log_file_max_bytes: config.log_file_max_bytes,
250            log_file_cache_size: config.log_file_cache_size,
251            log_checksum_read: config.log_checksum_read,
252            log_verify_checksums: config.log_verify_checksums,
253            log_fsync_timeout_ms: config.log_fsync_timeout_ms,
254            log_fsync_time_limit_ms: config.log_fsync_time_limit_ms,
255            log_num_buffers: config.log_num_buffers,
256            log_buffer_size: buf_size,
257            log_fault_read_size: config.log_fault_read_size,
258            log_iterator_read_size: config.log_iterator_read_size,
259            log_iterator_max_size: config.log_iterator_max_size,
260            log_n_data_directories: config.log_n_data_directories,
261            log_mem_only: config.log_mem_only,
262            log_detect_file_delete: config.log_detect_file_delete,
263            log_detect_file_delete_interval_ms: config
264                .log_detect_file_delete_interval_ms,
265            log_flush_sync_interval_ms: config.log_flush_sync_interval_ms,
266            log_flush_no_sync_interval_ms: config.log_flush_no_sync_interval_ms,
267            log_use_odsync: config.log_use_odsync,
268            log_use_write_queue: config.log_use_write_queue,
269            log_write_queue_size: config.log_write_queue_size,
270            log_group_commit_threshold: config.log_group_commit_threshold,
271            log_group_commit_interval_ms: config.log_group_commit_interval_ms,
272            // B-tree
273            node_max_entries: config.node_max_entries,
274            node_dup_tree_max_entries: config.node_dup_tree_max_entries,
275            tree_max_embedded_ln: config.tree_max_embedded_ln,
276            tree_max_delta: config.tree_max_delta,
277            tree_bin_delta: config.tree_bin_delta,
278            tree_min_memory: config.tree_min_memory,
279            tree_compact_max_key_length: config.tree_compact_max_key_length,
280            // INCompressor
281            run_in_compressor: config.run_in_compressor,
282            in_compressor_wakeup_interval_ms: config
283                .in_compressor_wakeup_interval_ms,
284            compressor_deadlock_retry: config.compressor_deadlock_retry,
285            compressor_lock_timeout_ms: config.compressor_lock_timeout_ms,
286            compressor_purge_root: config.compressor_purge_root,
287            // Cleaner
288            run_cleaner: config.run_cleaner,
289            cleaner_min_utilization: config.cleaner_min_utilization,
290            cleaner_min_file_utilization: config.cleaner_min_file_utilization,
291            cleaner_threads: config.cleaner_threads,
292            cleaner_min_file_count: config.cleaner_min_file_count,
293            cleaner_min_age: config.cleaner_min_age,
294            cleaner_bytes_interval: config.cleaner_bytes_interval,
295            cleaner_wakeup_interval_ms: config.cleaner_wakeup_interval_ms,
296            cleaner_fetch_obsolete_size: config.cleaner_fetch_obsolete_size,
297            cleaner_adjust_utilization: config.cleaner_adjust_utilization,
298            cleaner_deadlock_retry: config.cleaner_deadlock_retry,
299            cleaner_lock_timeout_ms: config.cleaner_lock_timeout_ms,
300            cleaner_expunge: config.cleaner_expunge,
301            cleaner_use_deleted_dir: config.cleaner_use_deleted_dir,
302            cleaner_max_batch_files: config.cleaner_max_batch_files,
303            cleaner_read_size: config.cleaner_read_size,
304            cleaner_detail_max_memory_percentage: config
305                .cleaner_detail_max_memory_percentage,
306            cleaner_look_ahead_cache_size: config.cleaner_look_ahead_cache_size,
307            cleaner_foreground_proactive_migration: config
308                .cleaner_foreground_proactive_migration,
309            cleaner_background_proactive_migration: config
310                .cleaner_background_proactive_migration,
311            cleaner_lazy_migration: config.cleaner_lazy_migration,
312            cleaner_expiration_enabled: config.cleaner_expiration_enabled,
313            // Checkpointer
314            run_checkpointer: config.run_checkpointer,
315            checkpointer_bytes_interval: config.checkpointer_bytes_interval,
316            checkpointer_wakeup_interval_ms: config
317                .checkpointer_wakeup_interval_ms,
318            checkpointer_deadlock_retry: config.checkpointer_deadlock_retry,
319            checkpointer_high_priority: config.checkpointer_high_priority,
320            // Evictor
321            run_evictor: config.run_evictor,
322            evictor_nodes_per_scan: config.evictor_nodes_per_scan,
323            evictor_evict_bytes: config.evictor_evict_bytes,
324            evictor_critical_percentage: config.evictor_critical_percentage,
325            evictor_lru_only: config.evictor_lru_only,
326            evictor_n_lru_lists: config.evictor_n_lru_lists,
327            evictor_deadlock_retry: config.evictor_deadlock_retry,
328            evictor_core_threads: config.evictor_core_threads,
329            evictor_max_threads: config.evictor_max_threads,
330            evictor_keep_alive_ms: config.evictor_keep_alive_ms,
331            evictor_allow_bin_deltas: config.evictor_allow_bin_deltas,
332            // Off-heap evictor
333            run_offheap_evictor: config.run_offheap_evictor,
334            offheap_evict_bytes: config.offheap_evict_bytes,
335            offheap_n_lru_lists: config.offheap_n_lru_lists,
336            offheap_checksum: config.offheap_checksum,
337            offheap_core_threads: config.offheap_core_threads,
338            offheap_max_threads: config.offheap_max_threads,
339            offheap_keep_alive_ms: config.offheap_keep_alive_ms,
340            // Locking
341            lock_timeout_ms: config.lock_timeout_ms,
342            lock_deadlock_detect: config.lock_deadlock_detect,
343            lock_deadlock_detect_delay_ms: config.lock_deadlock_detect_delay_ms,
344            // Transactions
345            txn_timeout_ms: config.txn_timeout_ms,
346            txn_serializable_isolation: config.txn_serializable_isolation,
347            txn_deadlock_stack_trace: config.txn_deadlock_stack_trace,
348            txn_dump_locks: config.txn_dump_locks,
349            // Verifier
350            run_verifier: config.run_verifier,
351            verify_log: config.verify_log,
352            verify_log_read_delay_ms: config.verify_log_read_delay_ms,
353            verify_btree: config.verify_btree,
354            verify_secondaries: config.verify_secondaries,
355            verify_data_records: config.verify_data_records,
356            verify_obsolete_records: config.verify_obsolete_records,
357            verify_btree_batch_size: config.verify_btree_batch_size,
358            verify_btree_batch_delay_ms: config.verify_btree_batch_delay_ms,
359            // Stats
360            stats_collect: config.stats_collect,
361            stats_collect_interval_secs: config.stats_collect_interval_secs,
362            // Background rate limits
363            env_background_read_limit_kb: config.env_background_read_limit_kb,
364            env_background_write_limit_kb: config.env_background_write_limit_kb,
365            env_background_sleep_interval_us: config
366                .env_background_sleep_interval_us,
367        };
368        let env_impl = EnvironmentImpl::from_dbi_config(home.clone(), &dbi_cfg)
369            .map_err(|e| NoxuError::environment(e.to_string()))?;
370
371        let log_manager = env_impl.get_log_manager();
372        let env_impl_arc = Arc::new(Mutex::new(env_impl));
373        Ok(Environment {
374            home,
375            config,
376            databases: Mutex::new(HashMap::new()),
377            active_txns: Arc::new(ActiveTxns::new()),
378            next_txn_id: AtomicU64::new(1),
379            open: AtomicBool::new(true),
380            env_valid: AtomicBool::new(true),
381            env_impl: env_impl_arc,
382            log_manager,
383            last_checkpoint_time: Mutex::new(None),
384            last_checkpoint_end_lsn: Mutex::new(noxu_util::NULL_LSN),
385            replica_coordinator: Mutex::new(None),
386            replica_ack_timeout: Mutex::new(std::time::Duration::from_secs(5)),
387        })
388    }
389
390    /// Closes the environment handle.
391    ///
392    ///
393    ///
394    /// # Errors
395    /// Returns an error if:
396    /// - The environment is already closed
397    /// - There are open database handles
398    /// - There are active transactions
399    pub fn close(&self) -> Result<()> {
400        if !self.open.load(Ordering::Acquire) {
401            return Err(NoxuError::EnvironmentClosed);
402        }
403
404        // Check for open databases
405        let databases = self.databases.lock();
406        let open_dbs: Vec<String> = databases
407            .iter()
408            .filter(|(_, db)| db.open.load(Ordering::Acquire))
409            .map(|(name, _)| name.clone())
410            .collect();
411
412        if !open_dbs.is_empty() {
413            return Err(NoxuError::OperationNotAllowed(format!(
414                "Cannot close environment with open database handles: {:?}",
415                open_dbs
416            )));
417        }
418
419        // Check for active transactions
420        if !self.active_txns.is_empty() {
421            return Err(NoxuError::OperationNotAllowed(format!(
422                "Cannot close environment with {} active transactions",
423                self.active_txns.len()
424            )));
425        }
426
427        self.open.store(false, Ordering::Release);
428        let env_impl = self.env_impl.lock();
429        let _ = env_impl.close();
430        Ok(())
431    }
432
433    /// Opens or creates a database.
434    ///
435    ///
436    ///
437    /// # Arguments
438    /// * `txn` - Optional transaction handle (currently ignored)
439    /// * `name` - Database name
440    /// * `config` - Database configuration
441    ///
442    /// # Returns
443    /// The opened database handle
444    ///
445    /// # Errors
446    /// Returns an error if:
447    /// - The environment is closed
448    /// - The database name is invalid
449    /// - The database does not exist and `allow_create` is false
450    /// - A handle for `name` is already open in this `Environment`
451    ///   (`DatabaseAlreadyExists`)
452    pub fn open_database(
453        &self,
454        txn: Option<&Transaction>,
455        name: &str,
456        config: &DatabaseConfig,
457    ) -> Result<Database> {
458        self.check_open()?;
459
460        // Audit transaction-env F5 (Wave 2C-4): on a read-only env,
461        // open_database must not create new databases nor open existing
462        // ones in writable mode.  Pre-fix the request silently created an
463        // in-memory-only database (no WAL backing) which violated the
464        // "no write operations" guarantee in the user-facing docs.
465        if self.config.read_only {
466            if config.allow_create {
467                return Err(NoxuError::OperationNotAllowed(
468                    "open_database: cannot create a database on a read-only \
469                     environment (DatabaseConfig::with_allow_create(true))"
470                        .to_string(),
471                ));
472            }
473            if !config.read_only {
474                return Err(NoxuError::OperationNotAllowed(
475                    "open_database: read-only environment requires the \
476                     database to be opened read-only \
477                     (DatabaseConfig::with_read_only(true))"
478                        .to_string(),
479                ));
480            }
481        }
482
483        if name.is_empty() {
484            return Err(NoxuError::IllegalArgument(
485                "Database name cannot be empty".to_string(),
486            ));
487        }
488
489        let mut databases = self.databases.lock();
490
491        // Check if database is already open via this environment handle
492        if let Some(db_handle) = databases.get(name)
493            && db_handle.open.load(Ordering::Acquire)
494        {
495            return Err(NoxuError::DatabaseAlreadyExists(format!(
496                "Database '{}' is already open",
497                name
498            )));
499        }
500
501        // Build the noxu-dbi config from noxu-db config
502        let mut dbi_config = noxu_dbi::DatabaseConfig::new();
503        dbi_config.set_allow_create(config.allow_create);
504        dbi_config.set_sorted_duplicates(config.sorted_duplicates);
505        dbi_config.set_read_only(config.read_only);
506        dbi_config.set_temporary(config.temporary);
507        dbi_config.set_transactional(config.transactional);
508        dbi_config.deferred_write = config.deferred_write;
509        // Audit database F7 (Wave 2C-4): plumb key_prefixing through;
510        // pre-fix the outer flag was silently dropped on the floor.
511        dbi_config.set_key_prefixing(config.key_prefixing);
512        if config.node_max_entries > 0 {
513            dbi_config.set_node_max_entries(config.node_max_entries as i32);
514        }
515
516        // Open the database via EnvironmentImpl (creates if allow_create, else errors)
517        // C-4 / JE 1-I: if a transaction is supplied and this is a new
518        // creation, use the transactional path so the name registration is
519        // deferred until commit.
520        let is_transactional_create = txn.is_some() && config.allow_create;
521        let db_impl_arc = {
522            let env_impl = self.env_impl.lock();
523            if is_transactional_create {
524                // SAFETY: is_transactional_create implies txn.is_some().
525                let txn_id = txn
526                    .expect("invariant: txn is Some when is_transactional_create")
527                    .get_id();
528                env_impl
529                    .open_database_transactional(name, &dbi_config, txn_id)
530            } else {
531                env_impl.open_database(name, &dbi_config)
532            }
533            .map_err(|e| {
534                match &e {
535                    noxu_dbi::DbiError::DatabaseNotFound(_) => {
536                        NoxuError::DatabaseNotFound(format!(
537                            "Database '{}' does not exist and allow_create is false",
538                            name
539                        ))
540                    }
541                    _ => NoxuError::environment(e.to_string()),
542                }
543            })?
544        };
545
546        // Register abort/commit callbacks on the transaction so that
547        // transactional database creation is properly rolled back or
548        // finalised when the transaction resolves (C-4 / JE 1-I).
549        if is_transactional_create {
550            let env_impl_arc = Arc::clone(&self.env_impl);
551            let db_name_abort = name.to_string();
552            let db_name_commit = name.to_string();
553            let env_impl_arc2 = Arc::clone(&self.env_impl);
554            // SAFETY: is_transactional_create implies txn.is_some().
555            let txn_ref = txn
556                .expect("invariant: txn is Some when is_transactional_create");
557            txn_ref.register_abort_callback(move || {
558                env_impl_arc.lock().abort_pending_database(&db_name_abort);
559            });
560            txn_ref.register_commit_callback(move || {
561                env_impl_arc2.lock().commit_pending_database(&db_name_commit);
562            });
563        }
564
565        let db_id = db_impl_arc.read().get_id().id() as u64;
566
567        // Shared open flag: stored in both `DatabaseHandle` and `Database`.
568        // When `Database::close()` sets it to false the env-side handle is
569        // also marked as closed, so `Environment::close()` can proceed.
570        let open_flag = Arc::new(AtomicBool::new(true));
571
572        let db_handle = Arc::new(DatabaseHandle {
573            name: name.to_string(),
574            id: db_id,
575            config: config.clone(),
576            open: Arc::clone(&open_flag),
577        });
578
579        databases.insert(name.to_string(), db_handle);
580        drop(databases);
581
582        Ok(Database::new(
583            name.to_string(),
584            db_id,
585            config.clone(),
586            db_impl_arc,
587            Arc::clone(&self.env_impl),
588            open_flag,
589            self.config.txn_no_sync,
590            self.config.txn_write_no_sync,
591        ))
592    }
593
594    /// Removes a database.
595    ///
596    ///
597    ///
598    /// # Arguments
599    /// * `txn` - Optional transaction handle (currently ignored)
600    /// * `name` - Database name
601    ///
602    /// # Errors
603    /// Returns an error if:
604    /// - The environment is closed
605    /// - The database does not exist
606    /// - The database is currently open
607    pub fn remove_database(
608        &self,
609        _txn: Option<&Transaction>,
610        name: &str,
611    ) -> Result<()> {
612        self.check_writable("remove_database")?;
613
614        let mut databases = self.databases.lock();
615        {
616            let env_impl = self.env_impl.lock();
617            env_impl.remove_database(name).map_err(|e| match &e {
618                noxu_dbi::DbiError::DatabaseNotFound(_) => {
619                    NoxuError::DatabaseNotFound(format!(
620                        "Database '{}' does not exist",
621                        name
622                    ))
623                }
624                _ => NoxuError::environment(e.to_string()),
625            })?;
626        }
627        databases.remove(name);
628
629        Ok(())
630    }
631
632    /// Truncates a database: removes all records while keeping the database
633    /// registered and any open handles valid.
634    ///
635    /// Returns the number of records that were in the database before truncation.
636    ///
637    /// Mirrors `Environment.truncateDatabase(txn, dbName, returnCount)`.
638    pub fn truncate_database(
639        &self,
640        _txn: Option<&Transaction>,
641        name: &str,
642    ) -> Result<u64> {
643        self.check_writable("truncate_database")?;
644        let env_impl = self.env_impl.lock();
645        env_impl.truncate_database(name).map_err(|e| match &e {
646            noxu_dbi::DbiError::DatabaseNotFound(_) => {
647                NoxuError::DatabaseNotFound(format!(
648                    "Database '{}' does not exist",
649                    name
650                ))
651            }
652            _ => NoxuError::environment(e.to_string()),
653        })
654    }
655
656    /// Renames a database.
657    ///
658    ///
659    ///
660    /// # Arguments
661    /// * `txn` - Optional transaction handle (currently ignored)
662    /// * `old_name` - Current database name
663    /// * `new_name` - New database name
664    ///
665    /// # Errors
666    /// Returns an error if:
667    /// - The environment is closed
668    /// - The source database does not exist
669    /// - The destination database already exists
670    /// - The source database is currently open
671    pub fn rename_database(
672        &self,
673        _txn: Option<&Transaction>,
674        old_name: &str,
675        new_name: &str,
676    ) -> Result<()> {
677        self.check_writable("rename_database")?;
678
679        if old_name == new_name {
680            return Ok(());
681        }
682
683        let mut databases = self.databases.lock();
684        {
685            let env_impl = self.env_impl.lock();
686            env_impl.rename_database(old_name, new_name).map_err(
687                |e| match &e {
688                    noxu_dbi::DbiError::DatabaseNotFound(_) => {
689                        NoxuError::DatabaseNotFound(format!(
690                            "Database '{}' does not exist",
691                            old_name
692                        ))
693                    }
694                    noxu_dbi::DbiError::DatabaseAlreadyExists(_) => {
695                        NoxuError::DatabaseAlreadyExists(format!(
696                            "Database '{}' already exists",
697                            new_name
698                        ))
699                    }
700                    _ => NoxuError::environment(e.to_string()),
701                },
702            )?;
703        }
704
705        if let Some(handle) = databases.remove(old_name) {
706            databases.insert(new_name.to_string(), handle);
707        }
708
709        Ok(())
710    }
711
712    /// Begins a new transaction.
713    ///
714    /// # Arguments
715    /// * `config` - Optional transaction configuration
716    ///
717    /// # Returns
718    /// A new transaction handle.
719    ///
720    /// # Errors
721    /// Returns an error if:
722    /// - The environment is closed
723    /// - The environment is not transactional
724    ///
725    /// # Nested transactions
726    /// Nested (child) transactions are not supported.  In v1.5 this method
727    /// took an `Option<&Transaction>` `parent` argument that was rejected
728    /// at runtime with [`NoxuError::Unsupported`] (Decision 3B in
729    /// `docs/src/internal/v1.5-decisions-2026-05.md`, audit finding F11).
730    /// In v2.0 the parameter has been removed entirely — the
731    /// type system now enforces the constraint, so what was a runtime
732    /// error is now a compile error.
733    #[allow(deprecated)] // Transaction::new / with_log_manager / with_inner_txn / with_env_impl are pub(internal)
734    pub fn begin_transaction(
735        &self,
736        config: Option<&TransactionConfig>,
737    ) -> Result<Transaction> {
738        self.check_open()?;
739
740        if !self.config.transactional {
741            return Err(NoxuError::OperationNotAllowed(
742                "Cannot begin transaction on non-transactional environment"
743                    .to_string(),
744            ));
745        }
746
747        // Audit transaction-env F5 (Wave 2C-4): on a read-only env, only
748        // explicitly read-only transactions are allowed.  A writable txn
749        // on a read-only env was previously accepted but every commit
750        // silently no-op'd because `log_manager` was None.
751        if self.config.read_only
752            && !config.map(|c| c.read_only).unwrap_or(false)
753        {
754            return Err(NoxuError::OperationNotAllowed(
755                "begin_transaction: read-only environment requires the \
756                 transaction to be read-only \
757                 (TransactionConfig::with_read_only(true))"
758                    .to_string(),
759            ));
760        }
761
762        let txn_id = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
763        // F3: when the caller does not supply a TransactionConfig, the
764        // environment-level `Durability` default (`EnvironmentConfig::durability`,
765        // settable via `EnvironmentConfig::with_durability`) must be
766        // honoured.  Pre-fix `unwrap_or_default()` produced a config with
767        // `Durability::COMMIT_SYNC` regardless of the env setting, so a
768        // user opening with `.with_durability(COMMIT_NO_SYNC)` and then
769        // calling `begin_transaction(None)` still fsynced on every
770        // commit.
771        // Audit transaction-env F4 (Wave 2C-4): the env-level
772        // `txn_no_sync` / `txn_write_no_sync` flags now apply to explicit
773        // commits as well as auto-commit.  When neither config nor
774        // env-default sets a durability override, derive one from the
775        // boolean flags.  An explicit `with_durability(...)` on the
776        // TransactionConfig still wins.
777        let mut txn_config = match config.cloned() {
778            Some(c) => c,
779            None => TransactionConfig::default()
780                .with_durability(self.config.durability),
781        };
782        if config.is_none() {
783            // No caller config: env flags can override the inherited
784            // durability if they request a less-strict sync policy.
785            let derived = match (
786                self.config.txn_no_sync,
787                self.config.txn_write_no_sync,
788            ) {
789                (true, _) => {
790                    Some(crate::durability::Durability::COMMIT_NO_SYNC)
791                }
792                (_, true) => {
793                    Some(crate::durability::Durability::COMMIT_WRITE_NO_SYNC)
794                }
795                _ => None,
796            };
797            if let Some(d) = derived {
798                txn_config = txn_config.with_durability(d);
799            }
800        }
801
802        let txn_state = Arc::new(TransactionState {
803            id: txn_id,
804            config: txn_config.clone(),
805            committed: AtomicBool::new(false),
806            aborted: AtomicBool::new(false),
807        });
808
809        let mut active_txns = self.active_txns.txns.lock();
810        active_txns.insert(txn_id, txn_state);
811        drop(active_txns);
812
813        // Wire the transaction to the WAL so commit/abort write log entries.
814        // Also create an inner Txn for per-record lock management.
815        let env_guard = self.env_impl.lock();
816        let inner_txn = env_guard
817            .begin_txn()
818            .map(|mut t| {
819                // Propagate all relevant TransactionConfig fields into the
820                // inner Txn for lock management and isolation behavior.
821                if txn_config.read_committed {
822                    t.set_read_committed_isolation(true);
823                }
824                if txn_config.read_uncommitted {
825                    // F2: previously this branch was missing, so the
826                    // user-set `with_read_uncommitted(true)` flag was
827                    // silently dropped and dirty reads were impossible
828                    // at the txn level.
829                    t.set_read_uncommitted_default(true);
830                }
831                if txn_config.serializable_isolation {
832                    t.set_serializable_isolation(true);
833                }
834                if txn_config.importunate {
835                    t.set_importunate(true);
836                }
837                if txn_config.no_wait {
838                    t.set_no_wait(true);
839                }
840                if txn_config.lock_timeout_ms > 0 {
841                    t.set_lock_timeout(txn_config.lock_timeout_ms);
842                }
843                if txn_config.txn_timeout_ms > 0 {
844                    t.set_txn_timeout(txn_config.txn_timeout_ms);
845                }
846                Arc::new(std::sync::Mutex::new(t))
847            })
848            .ok();
849        let txn = if let Some(lm) = env_guard.get_log_manager() {
850            Transaction::with_log_manager(txn_id, txn_config, lm)
851        } else {
852            Transaction::new(txn_id, txn_config)
853        };
854        drop(env_guard);
855
856        let txn =
857            if let Some(it) = inner_txn { txn.with_inner_txn(it) } else { txn };
858
859        // Wire env_impl so Transaction::abort() can apply undo records.
860        // Txn environment reference during construction.
861        let txn = txn.with_env_impl(Arc::clone(&self.env_impl));
862
863        // Wire the active-txns registry so commit/abort can prune their
864        // own entry (F1).  Without this, every successful txn left an
865        // entry in `active_txns` and `env.close()` returned
866        // `OperationNotAllowed`.
867        let txn = txn.with_active_txns(Arc::clone(&self.active_txns));
868
869        // F1: if a replica-ack coordinator has been installed (via
870        // `set_replica_coordinator`), wire it into the transaction so
871        // that `commit_with_durability` blocks until the configured
872        // `ReplicaAckPolicy` is satisfied.
873        let txn = if let Some(coord) = self.replica_coordinator.lock().clone() {
874            let timeout = *self.replica_ack_timeout.lock();
875            txn.with_replica_coordinator(coord, timeout)
876        } else {
877            txn
878        };
879
880        Ok(txn)
881    }
882
883    /// Returns a list of database names.
884    ///
885    ///
886    ///
887    /// # Returns
888    /// A vector of database names
889    ///
890    /// # Errors
891    /// Returns an error if the environment is closed
892    pub fn get_database_names(&self) -> Result<Vec<String>> {
893        self.check_open()?;
894        let env_impl = self.env_impl.lock();
895        Ok(env_impl.get_database_names())
896    }
897
898    /// Install a replica-ack coordinator on this environment.
899    ///
900    /// After this call, every transaction begun on this environment
901    /// will consult the coordinator on `commit_with_durability` and
902    /// block until the configured `ReplicaAckPolicy` is satisfied (or
903    /// until `replica_ack_timeout` elapses, in which case
904    /// `NoxuError::InsufficientReplicas` is returned).
905    ///
906    /// `noxu_rep::ReplicatedEnvironment` implements
907    /// `noxu_dbi::ReplicaAckCoordinator`; users typically wire it as:
908    ///
909    /// ```ignore
910    /// let rep_env = Arc::new(ReplicatedEnvironment::new(rep_config)?);
911    /// env.set_replica_coordinator(rep_env.clone());
912    /// rep_env.with_environment(env_impl);
913    /// ```
914    ///
915    /// Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
916    pub fn set_replica_coordinator(
917        &self,
918        coord: noxu_dbi::SharedReplicaAckCoordinator,
919    ) {
920        *self.replica_coordinator.lock() = Some(coord);
921    }
922
923    /// Clear any installed replica-ack coordinator.
924    ///
925    /// Subsequent `commit_with_durability` calls revert to local-only
926    /// durability semantics.
927    pub fn clear_replica_coordinator(&self) {
928        *self.replica_coordinator.lock() = None;
929    }
930
931    /// Set the per-commit timeout used when waiting for replica
932    /// acknowledgments.
933    ///
934    /// Default is 5 seconds.  Mirrors
935    /// `noxu_rep::RepConfig::replica_ack_timeout`.
936    pub fn set_replica_ack_timeout(&self, timeout: std::time::Duration) {
937        *self.replica_ack_timeout.lock() = timeout;
938    }
939
940    /// Returns the per-commit replica-ack timeout.
941    pub fn get_replica_ack_timeout(&self) -> std::time::Duration {
942        *self.replica_ack_timeout.lock()
943    }
944
945    /// Returns the home directory path.
946    ///
947    ///
948    pub fn get_home(&self) -> &Path {
949        &self.home
950    }
951
952    /// Returns the environment configuration.
953    ///
954    ///
955    pub fn get_config(&self) -> &EnvironmentConfig {
956        &self.config
957    }
958
959    /// Returns the mutable subset of environment configuration.
960    ///
961    /// Mirrors `Environment.getMutableConfig()`.  The returned struct reflects the
962    /// current runtime values; pass it (modified) to `set_mutable_config()` to
963    /// apply changes without re-opening the environment.
964    pub fn get_mutable_config(&self) -> Result<EnvironmentMutableConfig> {
965        self.check_open()?;
966        Ok(EnvironmentMutableConfig {
967            cache_size: Some(self.config.cache_size as usize),
968            durability: None,
969            txn_no_sync: self.config.txn_no_sync,
970            txn_write_no_sync: self.config.txn_write_no_sync,
971            run_cleaner: Some(self.config.run_cleaner),
972            run_checkpointer: Some(self.config.run_checkpointer),
973            run_evictor: Some(self.config.run_evictor),
974            lock_timeout_ms: Some(self.config.lock_timeout_ms),
975            txn_timeout_ms: Some(self.config.txn_timeout_ms),
976        })
977    }
978
979    /// Applies a set of mutable configuration changes to the running environment.
980    ///
981    /// Mirrors `Environment.setMutableConfig(EnvironmentMutableConfig)`.
982    /// Only the fields that differ from their sentinel "no-change" values are
983    /// applied (`None` means unchanged).  `Some(0)` for a timeout clears it
984    /// (matches JE: 0 = no timeout).
985    ///
986    /// # Errors
987    /// Returns an error if the environment is closed or invalidated.
988    pub fn set_mutable_config(
989        &mut self,
990        cfg: EnvironmentMutableConfig,
991    ) -> Result<()> {
992        self.check_open()?;
993        if let Some(sz) = cfg.cache_size {
994            self.config.cache_size = sz as u64;
995            // Audit transaction-env F7 (Wave 2C-4): push the cache-size
996            // change to the evictor's Arbiter so it actually takes
997            // effect at runtime; pre-fix the value was only recorded in
998            // `self.config`.
999            let env_impl = self.env_impl.lock();
1000            let evictor = env_impl.get_evictor();
1001            evictor.get_arbiter().set_max_memory(sz as i64);
1002        }
1003        if let Some(ms) = cfg.lock_timeout_ms {
1004            self.config.lock_timeout_ms = ms;
1005            // Push the new default to the live LockManager.
1006            let env_impl = self.env_impl.lock();
1007            env_impl.get_lock_manager().set_lock_timeout(ms);
1008        }
1009        if let Some(ms) = cfg.txn_timeout_ms {
1010            self.config.txn_timeout_ms = ms;
1011            // The TxnManager does not currently track a default txn
1012            // timeout (each Txn snapshots the value at `begin_txn` from
1013            // its own TransactionConfig).  We record the new env-level
1014            // default here so that future `begin_transaction` calls that
1015            // rely on the env default pick it up; live txns keep their
1016            // original timeout.  Tracked under transaction-env F7
1017            // residual; pushing into running txns requires a TxnManager
1018            // API change beyond Wave 2C-4.
1019        }
1020        self.config.txn_no_sync = cfg.txn_no_sync;
1021        self.config.txn_write_no_sync = cfg.txn_write_no_sync;
1022        // Daemon enable/disable flags are advisory at runtime; dbi-level wiring
1023        // for live daemon pause/resume is future work (mirrors where
1024        // setMutableConfig re-reads the flag on next daemon wakeup).
1025        if let Some(v) = cfg.run_cleaner {
1026            self.config.run_cleaner = v;
1027        }
1028        if let Some(v) = cfg.run_checkpointer {
1029            self.config.run_checkpointer = v;
1030        }
1031        if let Some(v) = cfg.run_evictor {
1032            self.config.run_evictor = v;
1033        }
1034        Ok(())
1035    }
1036
1037    /// Runs a checkpoint.
1038    ///
1039    /// Mirrors `Environment.checkpoint(CheckpointConfig)`.  If the environment has
1040    /// no checkpointer (e.g. non-transactional or in-memory), this is a no-op.
1041    ///
1042    /// # Arguments
1043    /// * `config` - Optional checkpoint options (force, thresholds, etc.)
1044    ///
1045    /// # Errors
1046    /// Returns an error if the environment is closed, invalidated, or if the
1047    /// checkpoint itself fails (e.g. disk write error).
1048    pub fn checkpoint(&self, config: Option<&CheckpointConfig>) -> Result<()> {
1049        self.check_open()?;
1050
1051        // Audit transaction-env F6 (Wave 2C-4): honour `force` /
1052        // `k_bytes` / `minutes` / `minimize_recovery_time` in
1053        // `CheckpointConfig`.  Pre-fix the entire config was a no-op.
1054        // Threshold gating happens in the wrapper layer; the underlying
1055        // `noxu_recovery::Checkpointer::do_checkpoint` is invoker-only.
1056        let cfg = config.cloned().unwrap_or_default();
1057
1058        if !cfg.force {
1059            // k_bytes: skip the checkpoint if not enough log bytes have
1060            // been written since the last successful checkpoint.
1061            if cfg.k_bytes > 0 {
1062                let cur_lsn = self
1063                    .log_manager
1064                    .as_ref()
1065                    .map(|lm| lm.get_end_of_log())
1066                    .unwrap_or(noxu_util::NULL_LSN);
1067                let last = *self.last_checkpoint_end_lsn.lock();
1068                let bytes_written =
1069                    cur_lsn.as_u64().saturating_sub(last.as_u64());
1070                let threshold = (cfg.k_bytes as u64) * 1024;
1071                if bytes_written < threshold {
1072                    log::debug!(
1073                        "checkpoint: skipping (k_bytes threshold {} not \
1074                         met, only {} bytes since last checkpoint)",
1075                        threshold,
1076                        bytes_written,
1077                    );
1078                    return Ok(());
1079                }
1080            }
1081
1082            // minutes: skip the checkpoint if not enough wall-clock time
1083            // has elapsed since the last successful checkpoint.
1084            if cfg.minutes > 0 {
1085                let last_at = *self.last_checkpoint_time.lock();
1086                if let Some(at) = last_at {
1087                    let elapsed = at.elapsed();
1088                    let threshold =
1089                        std::time::Duration::from_secs(cfg.minutes as u64 * 60);
1090                    if elapsed < threshold {
1091                        log::debug!(
1092                            "checkpoint: skipping (minutes threshold {:?} \
1093                             not met, only {:?} since last checkpoint)",
1094                            threshold,
1095                            elapsed,
1096                        );
1097                        return Ok(());
1098                    }
1099                }
1100            }
1101        }
1102
1103        // `minimize_recovery_time` is currently advisory — the recovery
1104        // checkpointer always writes the full set of dirty BINs; the
1105        // "minimal" path requires a pluggable BIN-flush filter that is
1106        // outside the scope of Wave 2C-4.  We surface the request in the
1107        // invoker label so it shows up in structured logs.
1108        let invoker = match (cfg.force, cfg.minimize_recovery_time) {
1109            (true, true) => "manual_force_full",
1110            (true, false) => "manual_force",
1111            (false, true) => "manual_full",
1112            (false, false) => "manual",
1113        };
1114
1115        let env_impl = self.env_impl.lock();
1116        env_impl
1117            .run_checkpoint_with_invoker(invoker)
1118            .map_err(|e| NoxuError::environment(e.to_string()))?;
1119        drop(env_impl);
1120
1121        // Update bookkeeping so subsequent threshold-gated calls can
1122        // honour `k_bytes` / `minutes`.
1123        *self.last_checkpoint_time.lock() = Some(Instant::now());
1124        if let Some(lm) = &self.log_manager {
1125            *self.last_checkpoint_end_lsn.lock() = lm.get_end_of_log();
1126        }
1127        Ok(())
1128    }
1129
1130    /// Returns `true` if the environment is open and has not been invalidated by a fatal error.
1131    ///
1132    /// Mirrors `Environment.isValid()`.  Returns `false` after the environment is closed
1133    /// or after an `EnvironmentFailure` whose `reason.invalidates_environment()` returns
1134    /// `true` (e.g. `LogChecksum`, `BtreeCorruption`, `DiskLimit`).
1135    /// Once invalidated the environment must be closed and re-opened.
1136    pub fn is_valid(&self) -> bool {
1137        self.open.load(Ordering::Acquire)
1138            && self.env_valid.load(Ordering::Acquire)
1139    }
1140
1141    /// Invalidates the environment in response to a fatal error.
1142    ///
1143    /// Called internally when an `EnvironmentFailure` with
1144    /// `reason.invalidates_environment() == true` propagates out of a
1145    /// background daemon.  After invalidation `is_valid()` returns `false`
1146    /// and all subsequent public API calls return `EnvironmentFailure`.
1147    pub fn invalidate(&self) {
1148        self.env_valid.store(false, Ordering::Release);
1149    }
1150
1151    /// Returns whether the environment is transactional.
1152    ///
1153    /// Via environment.
1154    pub fn is_transactional(&self) -> bool {
1155        self.config.transactional
1156    }
1157
1158    /// Returns whether the environment is read-only.
1159    ///
1160    /// Via environment.
1161    pub fn is_read_only(&self) -> bool {
1162        self.config.read_only
1163    }
1164
1165    /// Returns a snapshot of environment statistics from all subsystems.
1166    ///
1167    /// Mirrors `Environment.getStats(StatsConfig)`.
1168    pub fn get_stats(&self) -> Result<EnvironmentStats> {
1169        self.check_open()?;
1170        let env_impl = self.env_impl.lock();
1171        let n_databases = env_impl.n_databases() as u32;
1172        // Use cached log_manager for the log stats to avoid double-locking.
1173        let log = self
1174            .log_manager
1175            .as_ref()
1176            .map(|lm| LogStatsSnapshot::from(&lm.get_stats()))
1177            .unwrap_or_default();
1178        let lock =
1179            LockStatsSnapshot::from(&env_impl.get_lock_manager().get_stats());
1180        let txn =
1181            TxnStatsSnapshot::from(&env_impl.get_txn_manager().get_stats());
1182        let throughput = env_impl.get_throughput_snapshot();
1183        let evictor =
1184            EvictorStatsSnapshot::from(env_impl.get_evictor().get_stats());
1185        let cleaner = env_impl
1186            .get_cleaner()
1187            .map(|c| c.get_stats().snapshot())
1188            .unwrap_or_default();
1189        let checkpoint = env_impl
1190            .get_checkpointer()
1191            .map(|cp| cp.get_stats().snapshot())
1192            .unwrap_or_default();
1193        Ok(EnvironmentStats {
1194            cache_size: self.config.cache_size,
1195            cache_usage: 0,
1196            n_databases,
1197            log,
1198            lock,
1199            txn,
1200            throughput,
1201            evictor,
1202            cleaner,
1203            checkpoint,
1204        })
1205    }
1206
1207    /// Returns the total number of fdatasync calls performed by the log manager.
1208    ///
1209    /// Useful for benchmarking
1210    /// and for verifying that group commit is working (fewer fsyncs than commits).
1211    /// Returns 0 if the environment is non-transactional (no log manager).
1212    pub fn stat_fsync_count(&self) -> u64 {
1213        self.log_manager.as_ref().map(|lm| lm.fsync_count()).unwrap_or(0)
1214    }
1215
1216    // -------------------------------------------------------------------
1217    // Wave 3-2: XA crash-durable two-phase commit support
1218    // -------------------------------------------------------------------
1219
1220    /// Returns the list of XA in-doubt prepared transactions surfaced by
1221    /// the most recent recovery pass.
1222    ///
1223    /// The XA layer (`noxu_xa::XaEnvironment::xa_recover`) reads this
1224    /// list to populate its return value with XIDs that completed phase
1225    /// 1 of two-phase commit but were not committed or aborted before
1226    /// the previous shutdown / crash.  An empty `Vec` means there are
1227    /// no in-doubt transactions to resolve.
1228    ///
1229    /// See `noxu_xa` for XA two-phase commit.
1230    pub fn recovered_prepared_txns(
1231        &self,
1232    ) -> Vec<noxu_recovery::PreparedTxnInfo> {
1233        let env_impl = self.env_impl.lock();
1234        env_impl.recovered_prepared_txns()
1235    }
1236
1237    /// Removes and returns the LN replay list for a recovered prepared
1238    /// transaction.
1239    ///
1240    /// Used by `xa_commit(xid)` after locating the txn id from
1241    /// [`Self::recovered_prepared_txns`].  The XA layer iterates the
1242    /// returned list and applies each LN to the in-memory tree before
1243    /// writing the `TxnCommit` WAL frame.
1244    pub fn take_recovered_prepared_lns(
1245        &self,
1246        txn_id: u64,
1247    ) -> Vec<noxu_recovery::PreparedLnReplay> {
1248        let env_impl = self.env_impl.lock();
1249        env_impl.take_recovered_prepared_lns(txn_id)
1250    }
1251
1252    /// Removes a recovered prepared txn entry after the XA layer has
1253    /// successfully resolved it (`xa_commit` or `xa_rollback`).
1254    /// Idempotent.
1255    pub fn forget_recovered_prepared_txn(&self, txn_id: u64) {
1256        let env_impl = self.env_impl.lock();
1257        env_impl.forget_recovered_prepared_txn(txn_id);
1258    }
1259
1260    /// Writes a `TxnCommit` WAL frame for `txn_id` and fsyncs.
1261    ///
1262    /// Used by `xa_commit(xid)` to durably resolve a recovered prepared
1263    /// transaction without requiring an in-memory `Txn` (which the
1264    /// crash destroyed).  The caller must have already replayed any
1265    /// LNs into the in-memory tree via
1266    /// [`Self::take_recovered_prepared_lns`] and applied them.
1267    ///
1268    /// See `noxu_xa` for XA two-phase commit.
1269    pub fn write_txn_commit_for_recovered(&self, txn_id: u64) -> Result<()> {
1270        let lm = match &self.log_manager {
1271            Some(lm) => lm,
1272            None => return Ok(()), // Non-transactional env (shouldn't happen).
1273        };
1274        // R-3: pre-allocate the VLSN BEFORE writing the WAL entry so the
1275        // TxnCommit record carries it.  On a second crash, the X-14 VLSN
1276        // rebuild scans TxnCommit records with non-NULL dtvlsn and includes
1277        // them — fixing the double-crash VLSN loss reported in Keith R-3.
1278        let pre_vlsn =
1279            if let Some(coord) = self.replica_coordinator.lock().as_ref() {
1280                coord.pre_alloc_vlsn_for_recovered_commit()
1281            } else {
1282                0
1283            };
1284
1285        let commit_lsn = write_txn_end_for_recovered(
1286            lm, txn_id, true, /* is_commit */
1287            true, /* fsync */
1288            true, /* flush */
1289            pre_vlsn,
1290        )?;
1291
1292        // Register the pre-allocated VLSN in the VlsnIndex now that we have
1293        // the actual commit LSN.  Also keep the legacy alloc path for any
1294        // coordinator that doesn't implement pre_alloc (returns 0).
1295        if let Some(coord) = self.replica_coordinator.lock().as_ref() {
1296            if pre_vlsn > 0 {
1297                coord.register_recovered_commit_vlsn(pre_vlsn, commit_lsn);
1298                log::debug!(
1299                    "write_txn_commit_for_recovered: txn_id={} commit_lsn={:?} \
1300                     embedded+registered vlsn={} (R-3)",
1301                    txn_id,
1302                    commit_lsn,
1303                    pre_vlsn
1304                );
1305            } else {
1306                // Fallback: coordinator returned 0 for pre_alloc (non-master
1307                // or non-replicated); try the legacy single-step allocator.
1308                let vlsn = coord.alloc_vlsn_for_recovered_commit(commit_lsn);
1309                if vlsn > 0 {
1310                    log::debug!(
1311                        "write_txn_commit_for_recovered: txn_id={} commit_lsn={:?} \
1312                         assigned vlsn={} (X-3 legacy path)",
1313                        txn_id,
1314                        commit_lsn,
1315                        vlsn
1316                    );
1317                }
1318            }
1319        }
1320        Ok(())
1321    }
1322
1323    /// Writes a `TxnAbort` WAL frame for `txn_id`.  Used by `xa_rollback(xid)`
1324    /// to durably resolve a recovered prepared transaction.
1325    pub fn write_txn_abort_for_recovered(&self, txn_id: u64) -> Result<()> {
1326        let lm = match &self.log_manager {
1327            Some(lm) => lm,
1328            None => return Ok(()),
1329        };
1330        write_txn_end_for_recovered(
1331            lm, txn_id, false, /* is_commit */
1332            false, /* fsync */
1333            false, /* flush */
1334            0,     /* vlsn: NULL_VLSN for abort */
1335        )
1336        .map(|_| ())
1337    }
1338
1339    /// Replays a recovered prepared transaction’s LNs into the in-memory
1340    /// tree at `xa_commit` resolution time.
1341    ///
1342    /// Iterates the LN list (already removed from the recovered map by
1343    /// the caller) and applies each insert/update/delete to the
1344    /// matching `DatabaseImpl`'s tree.  This makes the prepared writes
1345    /// observable to subsequent reads in the same process — without
1346    /// this step, a recovered+committed XA branch's writes would only
1347    /// become visible after a second recovery on the next reopen.
1348    pub fn apply_recovered_prepared_lns(
1349        &self,
1350        lns: &[noxu_recovery::PreparedLnReplay],
1351    ) -> Result<()> {
1352        let env_impl = self.env_impl.lock();
1353        for ln in lns {
1354            let db_id = noxu_dbi::DatabaseId::new(ln.db_id as i64);
1355            let Some(db_arc) = env_impl.get_database_by_id(db_id) else {
1356                continue;
1357            };
1358            let db_guard = db_arc.read();
1359            let Some(tree) = db_guard.get_real_tree() else {
1360                continue;
1361            };
1362            match ln.operation {
1363                noxu_recovery::PreparedLnOperation::Insert
1364                | noxu_recovery::PreparedLnOperation::Update => {
1365                    if let Some(data) = &ln.data {
1366                        let _ = tree.insert(
1367                            ln.key.clone(),
1368                            data.clone(),
1369                            ln.original_lsn,
1370                        );
1371                    }
1372                }
1373                noxu_recovery::PreparedLnOperation::Delete => {
1374                    if tree.delete(&ln.key) {
1375                        db_guard.decrement_entry_count();
1376                    }
1377                }
1378            }
1379        }
1380        Ok(())
1381    }
1382
1383    /// Verifies the structural integrity of all databases in this environment.
1384    ///
1385    /// Iterates every open `DatabaseImpl` in the environment's db_map and
1386    /// calls `verify_database_impl()` on each one (B-tree key-order checks,
1387    /// LSN validity, child-pointer completeness).  Results are merged into a
1388    /// single `VerifyResult`.
1389    ///
1390    /// Mirrors `Environment.verify(VerifyConfig, PrintStream)` in
1391    /// creates a `BtreeVerifier` and calls `verifier.verifyAll()`.
1392    ///
1393    /// # Arguments
1394    /// * `config` - Verification options (btree, log, checksums, max_errors).
1395    ///
1396    /// # Returns
1397    /// A combined `VerifyResult` over all databases.
1398    ///
1399    /// # Errors
1400    /// Returns an error if the environment is closed or invalidated.
1401    pub fn verify(
1402        &self,
1403        config: &noxu_engine::VerifyConfig,
1404    ) -> Result<noxu_engine::VerifyResult> {
1405        self.check_open()?;
1406        let env_impl = self.env_impl.lock();
1407        let all_dbs = env_impl.get_all_database_impls();
1408        drop(env_impl);
1409
1410        let mut merged = noxu_engine::VerifyResult::new();
1411        for db_arc in &all_dbs {
1412            let guard = db_arc.read();
1413            let result = noxu_engine::verify_database_impl(&guard, config);
1414            merged.databases_verified += result.databases_verified;
1415            merged.records_verified += result.records_verified;
1416            for err in result.errors {
1417                merged.add_error(err);
1418                if merged.error_count() >= config.max_errors as usize {
1419                    return Ok(merged);
1420                }
1421            }
1422            for w in result.warnings {
1423                merged.add_warning(w);
1424            }
1425        }
1426        Ok(merged)
1427    }
1428
1429    /// Explicitly trigger BIN compression for all open databases.
1430    ///
1431    /// Mirrors `Environment.compress()` in JE (`Environment.java:1887`).
1432    /// Synchronously runs one pass of the INCompressor logic: finds every
1433    /// BIN with known-deleted slots and compresses them.  Useful in tests
1434    /// to drain the compressor queue before taking a checkpoint, and for
1435    /// applications that want deterministic memory reclamation after bulk
1436    /// deletes.
1437    ///
1438    /// Returns `Ok(n)` where `n` is the number of BINs compressed.  Returns
1439    /// `Err` if the environment is closed or invalid.
1440    pub fn compress(&self) -> Result<usize> {
1441        self.check_open()?;
1442        let env_impl = self.env_impl.lock();
1443        let n = env_impl.compress_all();
1444        Ok(n)
1445    }
1446
1447    /// Explicitly trigger the memory evictor.
1448    ///
1449    /// Mirrors `Environment.evictMemory()` in JE (`Environment.java:1860`).
1450    /// Requests that the cache evictor free cache pages down toward the
1451    /// configured cache size.  Useful after bulk inserts to reclaim memory
1452    /// proactively rather than waiting for the background daemon.
1453    ///
1454    /// Returns `Ok(bytes_evicted)`.  Returns `Err` if the environment is
1455    /// closed or invalid.
1456    pub fn evict_memory(&self) -> Result<usize> {
1457        self.check_open()?;
1458        let env_impl = self.env_impl.lock();
1459        let bytes = env_impl.evict_memory();
1460        Ok(bytes)
1461    }
1462    ///
1463    /// Called by Database::close().
1464    pub(crate) fn mark_database_closed(&self, name: &str) {
1465        let databases = self.databases.lock();
1466        if let Some(db_handle) = databases.get(name) {
1467            db_handle.open.store(false, Ordering::Release);
1468        }
1469    }
1470
1471    /// Internal method to mark a transaction as complete.
1472    ///
1473    /// Historically a no-op call site; now superseded by
1474    /// `Transaction::commit` / `Transaction::abort` calling
1475    /// `ActiveTxns::mark_complete` directly via the shared `Arc<ActiveTxns>`.
1476    /// Kept for backwards compatibility with internal tests.
1477    pub(crate) fn mark_transaction_complete(&self, txn_id: u64) {
1478        self.active_txns.mark_complete(txn_id);
1479    }
1480
1481    fn check_open(&self) -> Result<()> {
1482        if !self.open.load(Ordering::Acquire) {
1483            return Err(NoxuError::EnvironmentClosed);
1484        }
1485        if !self.env_valid.load(Ordering::Acquire) {
1486            return Err(NoxuError::environment_with_reason(
1487                crate::error::EnvironmentFailureReason::ForcedShutdown,
1488                "environment has been invalidated due to a prior fatal error"
1489                    .to_string(),
1490            ));
1491        }
1492        Ok(())
1493    }
1494
1495    /// Every mutating env-layer
1496    /// operation funnels through this helper so a `read_only=true`
1497    /// environment cannot create / remove / rename / truncate databases
1498    /// nor begin a (writable) transaction.
1499    fn check_writable(&self, what: &str) -> Result<()> {
1500        self.check_open()?;
1501        if self.config.read_only {
1502            return Err(NoxuError::OperationNotAllowed(format!(
1503                "{what}: environment is read-only",
1504            )));
1505        }
1506        Ok(())
1507    }
1508}
1509
1510/// Helper used by `Environment::write_txn_commit_for_recovered` and
1511/// `write_txn_abort_for_recovered` to write a `TxnCommit` / `TxnAbort` WAL
1512/// frame for a transaction id that has no in-memory `Txn` (the original
1513/// process crashed before it could commit; recovery surfaced it via
1514/// `recovered_prepared_txns`).
1515///
1516/// `vlsn` is the pre-allocated VLSN to embed in the `dtvlsn` field of the
1517/// TxnEndEntry payload.  Pass `NULL_VLSN` (0) for non-replicated environments.
1518/// The R-3 fix requires the VLSN to be embedded so the X-14 VLSN rebuild
1519/// on a second crash can reconstruct the VLSN index from TxnCommit records.
1520///
1521/// XA two-phase commit support.
1522fn write_txn_end_for_recovered(
1523    lm: &LogManager,
1524    txn_id: u64,
1525    is_commit: bool,
1526    fsync: bool,
1527    flush: bool,
1528    vlsn: u64,
1529) -> Result<noxu_util::Lsn> {
1530    use bytes::BytesMut;
1531    use noxu_log::{LogEntryType, Provisional, entry::TxnEndEntry};
1532    use noxu_util::{
1533        lsn::NULL_LSN,
1534        vlsn::{NULL_VLSN, Vlsn},
1535    };
1536
1537    let timestamp = std::time::SystemTime::now()
1538        .duration_since(std::time::UNIX_EPOCH)
1539        .unwrap_or_default()
1540        .as_millis() as u64;
1541
1542    // R-3: embed the pre-allocated VLSN (if any) in the dtvlsn field so the
1543    // X-14 VLSN rebuild on second crash can find it in TxnCommit records.
1544    let dtvlsn = if vlsn > 0 { Vlsn::new(vlsn as i64) } else { NULL_VLSN };
1545
1546    let entry = if is_commit {
1547        TxnEndEntry::new_commit(txn_id as i64, NULL_LSN, timestamp, 0, dtvlsn)
1548    } else {
1549        TxnEndEntry::new_abort(txn_id as i64, NULL_LSN, timestamp, 0, NULL_VLSN)
1550    };
1551
1552    let entry_type = if is_commit {
1553        LogEntryType::TxnCommit
1554    } else {
1555        LogEntryType::TxnAbort
1556    };
1557
1558    let mut buf = BytesMut::with_capacity(entry.log_size());
1559    entry.write_to_log(&mut buf);
1560
1561    lm.log(entry_type, &buf, Provisional::No, flush, fsync).map_err(|e| {
1562        NoxuError::environment_with_reason(
1563            crate::error::EnvironmentFailureReason::LogWrite,
1564            e.to_string(),
1565        )
1566    })
1567}
1568
1569impl Drop for Environment {
1570    fn drop(&mut self) {
1571        // Best effort close on drop
1572        let _ = self.close();
1573    }
1574}
1575
1576#[cfg(test)]
1577mod tests {
1578    use super::*;
1579    use tempfile::TempDir;
1580
1581    fn temp_env_config() -> (TempDir, EnvironmentConfig) {
1582        let temp_dir = TempDir::new().unwrap();
1583        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1584            .with_allow_create(true)
1585            .with_transactional(true);
1586        (temp_dir, config)
1587    }
1588
1589    #[test]
1590    fn test_open_environment() {
1591        let (temp_dir, config) = temp_env_config();
1592        let env = Environment::open(config).unwrap();
1593        assert!(env.is_valid());
1594        assert_eq!(env.get_home(), temp_dir.path());
1595        env.close().unwrap();
1596    }
1597
1598    #[test]
1599    fn test_open_creates_directory() {
1600        let temp_dir = TempDir::new().unwrap();
1601        let home = temp_dir.path().join("subdir");
1602        let config =
1603            EnvironmentConfig::new(home.clone()).with_allow_create(true);
1604
1605        let env = Environment::open(config).unwrap();
1606        assert!(home.exists());
1607        assert!(home.is_dir());
1608        env.close().unwrap();
1609    }
1610
1611    #[test]
1612    fn test_open_fails_without_allow_create() {
1613        let temp_dir = TempDir::new().unwrap();
1614        let home = temp_dir.path().join("nonexistent");
1615        let config = EnvironmentConfig::new(home).with_allow_create(false);
1616
1617        let result = Environment::open(config);
1618        assert!(result.is_err());
1619    }
1620
1621    #[test]
1622    fn test_close_environment() {
1623        let (_temp_dir, config) = temp_env_config();
1624        let env = Environment::open(config).unwrap();
1625        assert!(env.is_valid());
1626        env.close().unwrap();
1627        assert!(!env.is_valid());
1628    }
1629
1630    #[test]
1631    fn test_close_twice_fails() {
1632        let (_temp_dir, config) = temp_env_config();
1633        let env = Environment::open(config).unwrap();
1634        env.close().unwrap();
1635        let result = env.close();
1636        assert!(result.is_err());
1637    }
1638
1639    #[test]
1640    fn test_close_with_open_database_fails() {
1641        let (_temp_dir, config) = temp_env_config();
1642        let env = Environment::open(config).unwrap();
1643
1644        let db_config = DatabaseConfig::new().with_allow_create(true);
1645        let _db = env.open_database(None, "testdb", &db_config).unwrap();
1646
1647        let result = env.close();
1648        assert!(result.is_err());
1649    }
1650
1651    #[test]
1652    fn test_open_database() {
1653        let (_temp_dir, config) = temp_env_config();
1654        let env = Environment::open(config).unwrap();
1655
1656        let db_config = DatabaseConfig::new().with_allow_create(true);
1657        let db = env.open_database(None, "testdb", &db_config).unwrap();
1658        assert_eq!(db.get_database_name(), "testdb");
1659        assert!(db.is_valid());
1660    }
1661
1662    #[test]
1663    fn test_open_database_twice_fails() {
1664        let (_temp_dir, config) = temp_env_config();
1665        let env = Environment::open(config).unwrap();
1666
1667        let db_config = DatabaseConfig::new().with_allow_create(true);
1668        let _db1 = env.open_database(None, "testdb", &db_config).unwrap();
1669        let result = env.open_database(None, "testdb", &db_config);
1670        assert!(result.is_err());
1671    }
1672
1673    #[test]
1674    fn test_open_database_without_create_fails() {
1675        let (_temp_dir, config) = temp_env_config();
1676        let env = Environment::open(config).unwrap();
1677
1678        let db_config = DatabaseConfig::new().with_allow_create(false);
1679        let result = env.open_database(None, "nonexistent", &db_config);
1680        assert!(result.is_err());
1681    }
1682
1683    #[test]
1684    fn test_open_database_empty_name_fails() {
1685        let (_temp_dir, config) = temp_env_config();
1686        let env = Environment::open(config).unwrap();
1687
1688        let db_config = DatabaseConfig::new().with_allow_create(true);
1689        let result = env.open_database(None, "", &db_config);
1690        assert!(result.is_err());
1691    }
1692
1693    #[test]
1694    fn test_remove_database() {
1695        let (_temp_dir, config) = temp_env_config();
1696        let env = Environment::open(config).unwrap();
1697
1698        let db_config = DatabaseConfig::new().with_allow_create(true);
1699        let db = env.open_database(None, "testdb", &db_config).unwrap();
1700        db.close().unwrap();
1701
1702        env.remove_database(None, "testdb").unwrap();
1703        let names = env.get_database_names().unwrap();
1704        assert!(!names.contains(&"testdb".to_string()));
1705    }
1706
1707    #[test]
1708    fn test_remove_open_database_fails() {
1709        let (_temp_dir, config) = temp_env_config();
1710        let env = Environment::open(config).unwrap();
1711
1712        let db_config = DatabaseConfig::new().with_allow_create(true);
1713        let _db = env.open_database(None, "testdb", &db_config).unwrap();
1714
1715        let result = env.remove_database(None, "testdb");
1716        assert!(result.is_err());
1717    }
1718
1719    #[test]
1720    fn test_remove_nonexistent_database_fails() {
1721        let (_temp_dir, config) = temp_env_config();
1722        let env = Environment::open(config).unwrap();
1723
1724        let result = env.remove_database(None, "nonexistent");
1725        assert!(result.is_err());
1726    }
1727
1728    #[test]
1729    fn test_rename_database() {
1730        let (_temp_dir, config) = temp_env_config();
1731        let env = Environment::open(config).unwrap();
1732
1733        let db_config = DatabaseConfig::new().with_allow_create(true);
1734        let db = env.open_database(None, "oldname", &db_config).unwrap();
1735        db.close().unwrap();
1736
1737        env.rename_database(None, "oldname", "newname").unwrap();
1738
1739        let names = env.get_database_names().unwrap();
1740        assert!(!names.contains(&"oldname".to_string()));
1741        assert!(names.contains(&"newname".to_string()));
1742    }
1743
1744    #[test]
1745    fn test_rename_to_same_name() {
1746        let (_temp_dir, config) = temp_env_config();
1747        let env = Environment::open(config).unwrap();
1748
1749        let db_config = DatabaseConfig::new().with_allow_create(true);
1750        let db = env.open_database(None, "testdb", &db_config).unwrap();
1751        db.close().unwrap();
1752
1753        env.rename_database(None, "testdb", "testdb").unwrap();
1754    }
1755
1756    #[test]
1757    fn test_rename_open_database_fails() {
1758        let (_temp_dir, config) = temp_env_config();
1759        let env = Environment::open(config).unwrap();
1760
1761        let db_config = DatabaseConfig::new().with_allow_create(true);
1762        let _db = env.open_database(None, "testdb", &db_config).unwrap();
1763
1764        let result = env.rename_database(None, "testdb", "newname");
1765        assert!(result.is_err());
1766    }
1767
1768    #[test]
1769    fn test_rename_nonexistent_database_fails() {
1770        let (_temp_dir, config) = temp_env_config();
1771        let env = Environment::open(config).unwrap();
1772
1773        let result = env.rename_database(None, "nonexistent", "newname");
1774        assert!(result.is_err());
1775    }
1776
1777    #[test]
1778    fn test_rename_to_existing_database_fails() {
1779        let (_temp_dir, config) = temp_env_config();
1780        let env = Environment::open(config).unwrap();
1781
1782        let db_config = DatabaseConfig::new().with_allow_create(true);
1783        let db1 = env.open_database(None, "db1", &db_config).unwrap();
1784        let db2 = env.open_database(None, "db2", &db_config).unwrap();
1785        db1.close().unwrap();
1786        db2.close().unwrap();
1787
1788        let result = env.rename_database(None, "db1", "db2");
1789        assert!(result.is_err());
1790    }
1791
1792    #[test]
1793    fn test_get_database_names() {
1794        let (_temp_dir, config) = temp_env_config();
1795        let env = Environment::open(config).unwrap();
1796
1797        let db_config = DatabaseConfig::new().with_allow_create(true);
1798        let _db1 = env.open_database(None, "db1", &db_config).unwrap();
1799        let _db2 = env.open_database(None, "db2", &db_config).unwrap();
1800
1801        let names = env.get_database_names().unwrap();
1802        assert_eq!(names.len(), 2);
1803        assert!(names.contains(&"db1".to_string()));
1804        assert!(names.contains(&"db2".to_string()));
1805    }
1806
1807    /// C-4 / JE 1-I: a database opened inside an explicit transaction that is
1808    /// subsequently aborted must NOT persist after env close + reopen.
1809    #[test]
1810    fn test_transactional_open_database_abort_removes_db() {
1811        let (temp_dir, config) = temp_env_config();
1812        {
1813            let env = Environment::open(config).unwrap();
1814            let txn = env.begin_transaction(None).unwrap();
1815            let db_config = DatabaseConfig::new()
1816                .with_allow_create(true)
1817                .with_transactional(true);
1818            let _db = env
1819                .open_database(Some(&txn), "aborted_db", &db_config)
1820                .unwrap();
1821            txn.abort().unwrap();
1822            // After abort the database must not appear in the committed list.
1823            let names = env.get_database_names().unwrap();
1824            assert!(
1825                !names.contains(&"aborted_db".to_string()),
1826                "aborted database must not appear in get_database_names() \
1827                 (C-4 committed-only semantics), got: {:?}",
1828                names
1829            );
1830            drop(env);
1831        }
1832        // Reopen: the aborted database must NOT have been written to the WAL.
1833        let env2 = Environment::open(
1834            EnvironmentConfig::new(temp_dir.path().to_path_buf())
1835                .with_allow_create(false)
1836                .with_transactional(true),
1837        )
1838        .unwrap();
1839        let names2 = env2.get_database_names().unwrap();
1840        assert!(
1841            !names2.contains(&"aborted_db".to_string()),
1842            "after env reopen, aborted database must not appear: {:?}",
1843            names2
1844        );
1845    }
1846
1847    /// C-4 / JE 1-I: `get_database_names()` must NOT return a database that
1848    /// was opened inside a concurrent uncommitted transaction.
1849    #[test]
1850    fn test_get_database_names_excludes_uncommitted() {
1851        let (_temp_dir, config) = temp_env_config();
1852        let env = Environment::open(config).unwrap();
1853
1854        let db_config = DatabaseConfig::new()
1855            .with_allow_create(true)
1856            .with_transactional(true);
1857        let txn = env.begin_transaction(None).unwrap();
1858        let _db =
1859            env.open_database(Some(&txn), "pending_db", &db_config).unwrap();
1860
1861        // While txn is still uncommitted, another observer must not see
1862        // the database in the committed-names list.
1863        let names = env.get_database_names().unwrap();
1864        assert!(
1865            !names.contains(&"pending_db".to_string()),
1866            "uncommitted database must be invisible to get_database_names() \
1867             (C-4 / JE 1-J): got {:?}",
1868            names
1869        );
1870
1871        // After commit the database must appear.
1872        txn.commit().unwrap();
1873        let names_after = env.get_database_names().unwrap();
1874        assert!(
1875            names_after.contains(&"pending_db".to_string()),
1876            "committed database must appear in get_database_names()"
1877        );
1878    }
1879
1880    #[test]
1881    fn test_begin_transaction() {
1882        let temp_dir = TempDir::new().unwrap();
1883        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1884            .with_allow_create(true)
1885            .with_transactional(false);
1886        let env = Environment::open(config).unwrap();
1887
1888        let result = env.begin_transaction(None);
1889        assert!(result.is_err());
1890    }
1891
1892    #[test]
1893    fn test_is_transactional() {
1894        let (_temp_dir, config) = temp_env_config();
1895        let env = Environment::open(config).unwrap();
1896        assert!(env.is_transactional());
1897    }
1898
1899    #[test]
1900    fn test_is_not_transactional() {
1901        let temp_dir = TempDir::new().unwrap();
1902        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1903            .with_allow_create(true)
1904            .with_transactional(false);
1905        let env = Environment::open(config).unwrap();
1906        assert!(!env.is_transactional());
1907    }
1908
1909    #[test]
1910    fn test_is_read_only() {
1911        let temp_dir = TempDir::new().unwrap();
1912        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1913            .with_allow_create(true)
1914            .with_read_only(true);
1915        let env = Environment::open(config).unwrap();
1916        assert!(env.is_read_only());
1917    }
1918
1919    #[test]
1920    fn test_operations_on_closed_environment_fail() {
1921        let (_temp_dir, config) = temp_env_config();
1922        let env = Environment::open(config).unwrap();
1923        env.close().unwrap();
1924
1925        let db_config = DatabaseConfig::new().with_allow_create(true);
1926        assert!(env.open_database(None, "test", &db_config).is_err());
1927        assert!(env.remove_database(None, "test").is_err());
1928        assert!(env.rename_database(None, "a", "b").is_err());
1929        assert!(env.begin_transaction(None).is_err());
1930        assert!(env.get_database_names().is_err());
1931    }
1932
1933    // ========================================================================
1934    // Additional branch-coverage tests
1935    // ========================================================================
1936
1937    /// open() with a path that points to a file (not a directory) fails.
1938    #[test]
1939    fn test_open_fails_if_home_is_a_file() {
1940        use std::io::Write;
1941        let temp_dir = TempDir::new().unwrap();
1942        let file_path = temp_dir.path().join("not_a_dir.txt");
1943        let mut f = std::fs::File::create(&file_path).unwrap();
1944        writeln!(f, "data").unwrap();
1945        drop(f);
1946
1947        let config = EnvironmentConfig::new(file_path).with_allow_create(false);
1948        // The path exists but is not a directory — must fail.
1949        let result = Environment::open(config);
1950        assert!(result.is_err());
1951    }
1952
1953    /// open_database() with node_max_entries > 0 hits the set_node_max_entries branch.
1954    #[test]
1955    fn test_open_database_with_node_max_entries() {
1956        let (_temp_dir, config) = temp_env_config();
1957        let env = Environment::open(config).unwrap();
1958
1959        let mut db_config = DatabaseConfig::new().with_allow_create(true);
1960        db_config.set_node_max_entries(64);
1961        let db = env.open_database(None, "testdb_entries", &db_config).unwrap();
1962        assert!(db.is_valid());
1963    }
1964
1965    /// begin_transaction() with an explicit TransactionConfig.
1966    #[test]
1967    fn test_begin_transaction_with_explicit_config() {
1968        use crate::transaction_config::TransactionConfig;
1969        let (_temp_dir, config) = temp_env_config();
1970        let env = Environment::open(config).unwrap();
1971
1972        let txn_config = TransactionConfig::new();
1973        let txn = env.begin_transaction(Some(&txn_config)).unwrap();
1974        assert!(txn.is_valid());
1975    }
1976
1977    /// rename_database() when the old name is not in the databases map
1978    /// (handle was never registered) still succeeds at the env_impl level and
1979    /// the missing-handle branch (`if let Some(...)` => false) is taken.
1980    #[test]
1981    fn test_rename_database_handle_not_in_map() {
1982        let (_temp_dir, config) = temp_env_config();
1983        let env = Environment::open(config).unwrap();
1984
1985        // Create the DB using env_impl directly (bypassing Environment::open_database
1986        // so the handle is NOT in the databases map), then immediately close it
1987        // so that reference_count returns to 0 (no open user handles).
1988        {
1989            let env_impl = env.env_impl.lock();
1990            let mut dbi_config = noxu_dbi::DatabaseConfig::new();
1991            dbi_config.set_allow_create(true);
1992            let db_arc =
1993                env_impl.open_database("ghost_db", &dbi_config).unwrap();
1994            let db_id = db_arc.read().get_id();
1995            env_impl.close_database(db_id).unwrap();
1996        }
1997
1998        // rename_database should succeed and hit the `if let Some(handle)` false branch.
1999        env.rename_database(None, "ghost_db", "ghost_db_renamed").unwrap();
2000
2001        let names = env.get_database_names().unwrap();
2002        assert!(names.contains(&"ghost_db_renamed".to_string()));
2003        assert!(!names.contains(&"ghost_db".to_string()));
2004    }
2005
2006    /// close() with active transactions returns an error.
2007    #[test]
2008    fn test_close_with_active_transactions_fails() {
2009        let (_temp_dir, config) = temp_env_config();
2010        let env = Environment::open(config).unwrap();
2011
2012        let _txn = env.begin_transaction(None).unwrap();
2013
2014        let result = env.close();
2015        assert!(result.is_err());
2016    }
2017
2018    /// get_config() and get_home() return the correct values.
2019    #[test]
2020    fn test_get_config_and_home() {
2021        let (temp_dir, config) = temp_env_config();
2022        let env = Environment::open(config).unwrap();
2023
2024        assert!(env.get_config().allow_create);
2025        assert_eq!(env.get_home(), temp_dir.path());
2026        env.close().unwrap();
2027    }
2028
2029    /// mark_database_closed() when the database is in the map.
2030    #[test]
2031    fn test_mark_database_closed_known_name() {
2032        let (_temp_dir, config) = temp_env_config();
2033        let env = Environment::open(config).unwrap();
2034
2035        let db_config = DatabaseConfig::new().with_allow_create(true);
2036        let db = env.open_database(None, "mydb", &db_config).unwrap();
2037        // db is open — mark it closed via the internal API.
2038        env.mark_database_closed("mydb");
2039        // The database handle is now marked closed in the map; close() should succeed.
2040        let _ = db.is_valid(); // just use the variable
2041        env.close().unwrap();
2042    }
2043
2044    /// mark_database_closed() for an unknown name is a no-op.
2045    #[test]
2046    fn test_mark_database_closed_unknown_name_is_noop() {
2047        let (_temp_dir, config) = temp_env_config();
2048        let env = Environment::open(config).unwrap();
2049        // No database named "ghost" — should not panic.
2050        env.mark_database_closed("ghost");
2051        env.close().unwrap();
2052    }
2053
2054    /// mark_transaction_complete() removes the transaction from the active set.
2055    #[test]
2056    fn test_mark_transaction_complete_allows_env_close() {
2057        let (_temp_dir, config) = temp_env_config();
2058        let env = Environment::open(config).unwrap();
2059
2060        let txn = env.begin_transaction(None).unwrap();
2061        let txn_id = txn.get_id();
2062
2063        // Without removing the txn, close would fail.
2064        // Remove it via the internal API.
2065        env.mark_transaction_complete(txn_id);
2066
2067        // Now close should succeed.
2068        env.close().unwrap();
2069    }
2070
2071    // ── verify ─────────────────────────────────────────────────────────────
2072
2073    #[test]
2074    fn test_verify_empty_environment_passes() {
2075        use crate::VerifyConfig;
2076        let (_tmp, config) = temp_env_config();
2077        let env = Environment::open(config).unwrap();
2078        let verify_cfg = VerifyConfig::default();
2079        let result = env.verify(&verify_cfg).unwrap();
2080        assert!(result.passed, "empty env should pass: {:?}", result.errors);
2081    }
2082
2083    #[test]
2084    fn test_verify_environment_with_data_passes() {
2085        use crate::{DatabaseConfig, DatabaseEntry, VerifyConfig};
2086        let (_tmp, config) = temp_env_config();
2087        let env = Environment::open(config).unwrap();
2088
2089        let mut db_config = DatabaseConfig::new();
2090        db_config.set_allow_create(true);
2091        let db = env.open_database(None, "vtest", &db_config).unwrap();
2092        for i in 0u32..10 {
2093            let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
2094            let v = DatabaseEntry::from_bytes(&(i * 3).to_be_bytes());
2095            db.put(None, &k, &v).unwrap();
2096        }
2097
2098        let verify_cfg = VerifyConfig::default();
2099        let result = env.verify(&verify_cfg).unwrap();
2100        assert!(
2101            result.passed,
2102            "env with data should pass: {:?}",
2103            result.errors
2104        );
2105        assert!(result.records_verified >= 10);
2106        db.close().unwrap();
2107        env.close().unwrap();
2108    }
2109
2110    #[test]
2111    fn test_verify_closed_environment_fails() {
2112        use crate::VerifyConfig;
2113        let (_tmp, config) = temp_env_config();
2114        let env = Environment::open(config).unwrap();
2115        env.close().unwrap();
2116        let verify_cfg = VerifyConfig::default();
2117        assert!(env.verify(&verify_cfg).is_err());
2118    }
2119
2120    // ── checkpoint ──────────────────────────────────────────────────────────
2121
2122    #[test]
2123    fn test_checkpoint_default_succeeds() {
2124        let (_tmp, config) = temp_env_config();
2125        let env = Environment::open(config).unwrap();
2126        // Transactional env has a checkpointer; call with no config.
2127        env.checkpoint(None).unwrap();
2128        env.close().unwrap();
2129    }
2130
2131    #[test]
2132    fn test_checkpoint_with_config_succeeds() {
2133        let (_tmp, config) = temp_env_config();
2134        let env = Environment::open(config).unwrap();
2135        let ckpt_cfg = CheckpointConfig {
2136            force: true,
2137            k_bytes: 0,
2138            minutes: 0,
2139            minimize_recovery_time: false,
2140        };
2141        env.checkpoint(Some(&ckpt_cfg)).unwrap();
2142        env.close().unwrap();
2143    }
2144
2145    #[test]
2146    fn test_checkpoint_closed_env_fails() {
2147        let (_tmp, config) = temp_env_config();
2148        let env = Environment::open(config).unwrap();
2149        env.close().unwrap();
2150        assert!(env.checkpoint(None).is_err());
2151    }
2152
2153    // ── get_mutable_config / set_mutable_config ──────────────────────────────
2154
2155    #[test]
2156    fn test_get_mutable_config_returns_current_values() {
2157        let (_tmp, config) = temp_env_config();
2158        let env = Environment::open(config).unwrap();
2159        let mc = env.get_mutable_config().unwrap();
2160        // cache_size should be Some() with the default value.
2161        assert!(mc.cache_size.is_some());
2162        assert!(mc.run_cleaner.is_some());
2163        assert!(mc.run_checkpointer.is_some());
2164        assert!(mc.run_evictor.is_some());
2165        env.close().unwrap();
2166    }
2167
2168    #[test]
2169    fn test_get_mutable_config_closed_env_fails() {
2170        let (_tmp, config) = temp_env_config();
2171        let env = Environment::open(config).unwrap();
2172        env.close().unwrap();
2173        assert!(env.get_mutable_config().is_err());
2174    }
2175
2176    #[test]
2177    fn test_set_mutable_config_updates_cache_size() {
2178        let (_tmp, config) = temp_env_config();
2179        let mut env = Environment::open(config).unwrap();
2180        let new_size: usize = 128 * 1024 * 1024; // 128 MiB
2181        let mc = EnvironmentMutableConfig::new().with_cache_size(new_size);
2182        env.set_mutable_config(mc).unwrap();
2183        let updated = env.get_mutable_config().unwrap();
2184        assert_eq!(updated.cache_size.unwrap(), new_size);
2185        env.close().unwrap();
2186    }
2187
2188    #[test]
2189    fn test_set_mutable_config_updates_timeouts() {
2190        let (_tmp, config) = temp_env_config();
2191        let mut env = Environment::open(config).unwrap();
2192        let mc = EnvironmentMutableConfig {
2193            lock_timeout_ms: Some(5_000),
2194            txn_timeout_ms: Some(10_000),
2195            ..EnvironmentMutableConfig::default()
2196        };
2197        env.set_mutable_config(mc).unwrap();
2198        // After setting, values should be reflected (lock_timeout_ms is advisory at
2199        // the config layer; verify via get_mutable_config).
2200        let updated = env.get_mutable_config().unwrap();
2201        assert_eq!(updated.lock_timeout_ms, Some(5_000));
2202        assert_eq!(updated.txn_timeout_ms, Some(10_000));
2203        env.close().unwrap();
2204    }
2205
2206    #[test]
2207    fn test_set_mutable_config_none_timeout_unchanged() {
2208        let (_tmp, config) = temp_env_config();
2209        let mut env = Environment::open(config).unwrap();
2210        let original = env.get_mutable_config().unwrap();
2211        // None means "unchanged".  See Wave 1C audit cleanup
2212        // (Transaction-Env F19/F20): the previous implementation used
2213        // 0 as the sentinel which prevented users from clearing a
2214        // timeout.
2215        let mc = EnvironmentMutableConfig {
2216            lock_timeout_ms: None,
2217            txn_timeout_ms: None,
2218            ..EnvironmentMutableConfig::default()
2219        };
2220        env.set_mutable_config(mc).unwrap();
2221        let updated = env.get_mutable_config().unwrap();
2222        assert_eq!(updated.lock_timeout_ms, original.lock_timeout_ms);
2223        assert_eq!(updated.txn_timeout_ms, original.txn_timeout_ms);
2224        env.close().unwrap();
2225    }
2226
2227    #[test]
2228    fn test_set_mutable_config_closed_env_fails() {
2229        let (_tmp, config) = temp_env_config();
2230        let mut env = Environment::open(config).unwrap();
2231        env.close().unwrap();
2232        let mc = EnvironmentMutableConfig::new();
2233        assert!(env.set_mutable_config(mc).is_err());
2234    }
2235
2236    // ========================================================================
2237    // Audit transaction-env F4 / F5 / F6 / F7 / F10 — Wave 2C-4
2238    // ========================================================================
2239
2240    /// F5 — read-only env rejects database creation.
2241    #[test]
2242    fn test_read_only_env_rejects_create_database() {
2243        // First create the env writably so the directory exists.
2244        let temp_dir = TempDir::new().unwrap();
2245        {
2246            let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2247                .with_allow_create(true)
2248                .with_transactional(true);
2249            let _env = Environment::open(config).unwrap();
2250        }
2251        // Re-open read-only.
2252        let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2253            .with_read_only(true)
2254            .with_transactional(true);
2255        let env = Environment::open(ro_config).unwrap();
2256
2257        let db_cfg = DatabaseConfig::new().with_allow_create(true);
2258        let result = env.open_database(None, "new", &db_cfg);
2259        assert!(
2260            result.is_err(),
2261            "open_database with allow_create on read-only env must fail",
2262        );
2263    }
2264
2265    /// F5 — read-only env rejects remove_database.
2266    #[test]
2267    fn test_read_only_env_rejects_remove_database() {
2268        let temp_dir = TempDir::new().unwrap();
2269        {
2270            let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2271                .with_allow_create(true)
2272                .with_transactional(true);
2273            let _env = Environment::open(config).unwrap();
2274        }
2275        let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2276            .with_read_only(true)
2277            .with_transactional(true);
2278        let env = Environment::open(ro_config).unwrap();
2279
2280        assert!(env.remove_database(None, "test").is_err());
2281        assert!(env.truncate_database(None, "test").is_err());
2282        assert!(env.rename_database(None, "a", "b").is_err());
2283    }
2284
2285    /// F5 — read-only env rejects writable transactions.
2286    #[test]
2287    fn test_read_only_env_rejects_writable_txn() {
2288        let temp_dir = TempDir::new().unwrap();
2289        {
2290            let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2291                .with_allow_create(true)
2292                .with_transactional(true);
2293            let _env = Environment::open(config).unwrap();
2294        }
2295        let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2296            .with_read_only(true)
2297            .with_transactional(true);
2298        let env = Environment::open(ro_config).unwrap();
2299
2300        // Default txn config is writable — must be rejected.
2301        let result = env.begin_transaction(None);
2302        assert!(result.is_err(), "writable txn on read-only env must fail");
2303
2304        // Read-only txn must be allowed.
2305        let ro_txn_cfg = TransactionConfig::default().with_read_only(true);
2306        let _txn = env
2307            .begin_transaction(Some(&ro_txn_cfg))
2308            .expect("read-only txn on read-only env must succeed");
2309    }
2310
2311    /// F6 — checkpoint() with `force=false` and a fresh `minutes`
2312    /// threshold skips the checkpoint when it has just run.
2313    #[test]
2314    fn test_checkpoint_minutes_threshold_skips() {
2315        let (_tmp, config) = temp_env_config();
2316        let env = Environment::open(config).unwrap();
2317
2318        // First call: runs (no prior checkpoint).
2319        env.checkpoint(None).unwrap();
2320
2321        // Second call with minutes=60 and force=false: should skip.
2322        let cfg = CheckpointConfig::default().with_minutes(60);
2323        env.checkpoint(Some(&cfg)).unwrap();
2324        // No assertion-able effect we can read here, but the call must
2325        // succeed and not error.
2326
2327        // Third call with force=true must run regardless.
2328        let cfg = CheckpointConfig::default().with_force(true).with_minutes(60);
2329        env.checkpoint(Some(&cfg)).unwrap();
2330        env.close().unwrap();
2331    }
2332
2333    /// F7 — set_mutable_config(cache_size) pushes through to the
2334    /// evictor's Arbiter.
2335    #[test]
2336    fn test_set_mutable_config_pushes_cache_size_to_evictor() {
2337        let (_tmp, config) = temp_env_config();
2338        let mut env = Environment::open(config).unwrap();
2339
2340        let mc = EnvironmentMutableConfig {
2341            cache_size: Some(64 * 1024 * 1024),
2342            ..EnvironmentMutableConfig::default()
2343        };
2344        env.set_mutable_config(mc).unwrap();
2345
2346        let env_impl = env.env_impl.lock();
2347        let evictor = env_impl.get_evictor();
2348        assert_eq!(
2349            evictor.get_arbiter().get_max_memory(),
2350            64 * 1024 * 1024,
2351            "set_mutable_config(cache_size) must push to Arbiter",
2352        );
2353    }
2354
2355    /// F4 — env-level `txn_no_sync = true` makes explicit-txn commits
2356    /// inherit COMMIT_NO_SYNC when the caller does not specify a
2357    /// TransactionConfig.
2358    #[test]
2359    #[allow(deprecated)] // tests the deprecated txn_no_sync flag
2360    fn test_env_txn_no_sync_applies_to_explicit_txn() {
2361        let temp_dir = TempDir::new().unwrap();
2362        let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2363            .with_allow_create(true)
2364            .with_transactional(true)
2365            .with_txn_no_sync(true);
2366        let env = Environment::open(config).unwrap();
2367
2368        let txn = env.begin_transaction(None).unwrap();
2369        // The transaction must have inherited COMMIT_NO_SYNC.
2370        let dur = txn.get_durability().expect("durability must be set");
2371        assert_eq!(
2372            dur,
2373            crate::durability::Durability::COMMIT_NO_SYNC,
2374            "env txn_no_sync=true must propagate to explicit-txn durability",
2375        );
2376        txn.commit().unwrap();
2377        env.close().unwrap();
2378    }
2379
2380    /// F10 — dropping an open transaction performs an actual abort,
2381    /// releasing locks instead of leaking them.
2382    #[test]
2383    fn test_drop_aborts_open_transaction() {
2384        let (_tmp, config) = temp_env_config();
2385        let env = Environment::open(config).unwrap();
2386
2387        let initial_active = env.active_txns.len();
2388        {
2389            let _txn = env.begin_transaction(None).unwrap();
2390            assert_eq!(env.active_txns.len(), initial_active + 1);
2391            // Drop _txn at scope exit without commit/abort.
2392        }
2393        // After drop, the active-txns registry must have pruned the entry.
2394        assert_eq!(
2395            env.active_txns.len(),
2396            initial_active,
2397            "Transaction::Drop must abort and prune from active_txns",
2398        );
2399        // close() must succeed because no txns remain registered.
2400        env.close().unwrap();
2401    }
2402
2403    // ── X-3: recovered XA commit assigns real VLSN ─────────────────────
2404
2405    /// X-3: after calling `write_txn_commit_for_recovered` on an environment
2406    /// wired with a mock replica coordinator, the coordinator's
2407    /// `alloc_vlsn_for_recovered_commit` must be called with a non-NULL LSN.
2408    #[test]
2409    fn test_x3_recovered_commit_calls_alloc_vlsn() {
2410        use noxu_dbi::{
2411            AckWaitError, ReplicaAckCoordinator, ReplicaAckPolicyKind,
2412        };
2413        use noxu_util::Lsn;
2414        use std::sync::Arc;
2415        use std::sync::atomic::{AtomicU64, Ordering as AO};
2416        use std::time::Duration;
2417
2418        // Mock coordinator that records the LSN passed to alloc_vlsn_for_recovered_commit.
2419        struct MockCoord {
2420            last_lsn: AtomicU64,
2421        }
2422        impl ReplicaAckCoordinator for MockCoord {
2423            fn await_replica_acks(
2424                &self,
2425                _policy: ReplicaAckPolicyKind,
2426                _timeout: Duration,
2427            ) -> std::result::Result<u32, AckWaitError> {
2428                Ok(0)
2429            }
2430            fn alloc_vlsn_for_recovered_commit(&self, lsn: Lsn) -> u64 {
2431                self.last_lsn.store(lsn.as_u64(), AO::SeqCst);
2432                // Return the file_number as a fake VLSN (non-zero = success).
2433                lsn.file_number() as u64 + 1
2434            }
2435        }
2436
2437        let (tmp, config) = temp_env_config();
2438        let env = Environment::open(config).unwrap();
2439
2440        // Wire the mock coordinator.
2441        let coord = Arc::new(MockCoord { last_lsn: AtomicU64::new(0) });
2442        env.set_replica_coordinator(coord.clone());
2443
2444        // Write a fake txn_id=42 commit (simulates recovered XA).
2445        env.write_txn_commit_for_recovered(42).unwrap();
2446
2447        // The coordinator must have been called with a non-zero LSN.
2448        let recorded_lsn = coord.last_lsn.load(AO::SeqCst);
2449        assert_ne!(
2450            recorded_lsn, 0,
2451            "X-3: alloc_vlsn_for_recovered_commit must be called with the commit LSN"
2452        );
2453
2454        env.close().unwrap();
2455        drop(tmp);
2456    }
2457}