1use crate::checkpoint_config::CheckpointConfig;
5use crate::database::Database;
6use crate::database_config::DatabaseConfig;
7use crate::environment_config::EnvironmentConfig;
8use crate::environment_mutable_config::EnvironmentMutableConfig;
9use crate::error::{NoxuError, Result};
10use crate::transaction::Transaction;
11use crate::transaction_config::TransactionConfig;
12use hashbrown::HashMap;
13use noxu_dbi::{DbiEnvConfig, EnvironmentImpl};
14use noxu_engine::EnvironmentStats;
15use noxu_engine::env_stats::{
16 EvictorStatsSnapshot, LockStatsSnapshot, LogStatsSnapshot, TxnStatsSnapshot,
17};
18use noxu_log::LogManager;
19use noxu_sync::Mutex;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
23use std::time::Instant;
24
25pub(crate) fn build_environment_stats(
32 env_impl: &EnvironmentImpl,
33 log_manager: Option<&LogManager>,
34 cache_size: u64,
35) -> EnvironmentStats {
36 let n_databases = env_impl.n_databases() as u32;
37 let log = log_manager
38 .map(|lm| LogStatsSnapshot::from(&lm.get_stats()))
39 .unwrap_or_default();
40 let lock =
41 LockStatsSnapshot::from(&env_impl.get_lock_manager().get_stats());
42 let txn = TxnStatsSnapshot::from(&env_impl.get_txn_manager().get_stats());
43 let throughput = env_impl.get_throughput_snapshot();
44 let evictor =
45 EvictorStatsSnapshot::from(env_impl.get_evictor().get_stats());
46 let cleaner = env_impl
47 .get_cleaner()
48 .map(|c| c.get_stats().snapshot())
49 .unwrap_or_default();
50 let checkpoint = env_impl
51 .get_checkpointer()
52 .map(|cp| cp.get_stats().snapshot())
53 .unwrap_or_default();
54 EnvironmentStats {
55 cache_size,
56 cache_usage: env_impl.get_cache_usage().max(0) as u64,
57 n_databases,
58 log,
59 lock,
60 txn,
61 throughput,
62 evictor,
63 cleaner,
64 checkpoint,
65 }
66}
67
68pub struct Environment {
88 home: PathBuf,
90 config: EnvironmentConfig,
92 databases: Mutex<HashMap<String, Arc<DatabaseHandle>>>,
94 active_txns: Arc<ActiveTxns>,
99 next_txn_id: AtomicU64,
101 open: AtomicBool,
103 env_valid: AtomicBool,
109 env_impl: Arc<Mutex<EnvironmentImpl>>,
111 log_manager: Option<Arc<LogManager>>,
114 last_checkpoint_time: Mutex<Option<Instant>>,
118 last_checkpoint_end_lsn: Mutex<noxu_util::Lsn>,
119 replica_coordinator: Mutex<Option<noxu_dbi::SharedReplicaAckCoordinator>>,
129 replica_ack_timeout: Mutex<std::time::Duration>,
132 stats_dumper: Mutex<Option<crate::stats_file::StatsFileDumper>>,
136
137 verify_daemon: Mutex<Option<crate::verify_daemon::VerifyDaemon>>,
143}
144
145struct DatabaseHandle {
147 #[expect(dead_code)]
148 name: String,
149 #[expect(dead_code)]
150 id: u64,
151 #[expect(dead_code)]
152 config: DatabaseConfig,
153 open: Arc<AtomicBool>,
157}
158
159struct TransactionState {
161 #[expect(dead_code)]
162 id: u64,
163 #[expect(dead_code)]
164 config: TransactionConfig,
165 #[expect(dead_code)]
166 committed: AtomicBool,
167 #[expect(dead_code)]
168 aborted: AtomicBool,
169}
170
171pub(crate) struct ActiveTxns {
181 txns: Mutex<HashMap<u64, Arc<TransactionState>>>,
182}
183
184impl ActiveTxns {
185 fn new() -> Self {
186 Self { txns: Mutex::new(HashMap::new()) }
187 }
188
189 #[allow(dead_code)] fn insert(&self, id: u64, state: Arc<TransactionState>) {
191 self.txns.lock().insert(id, state);
192 }
193
194 pub(crate) fn mark_complete(&self, id: u64) {
199 self.txns.lock().remove(&id);
200 }
201
202 fn len(&self) -> usize {
203 self.txns.lock().len()
204 }
205
206 fn is_empty(&self) -> bool {
207 self.txns.lock().is_empty()
208 }
209}
210
211impl Environment {
212 pub fn open(config: EnvironmentConfig) -> Result<Self> {
228 let open_start = std::time::Instant::now();
229 let home = config.home.clone();
230
231 if !home.exists() {
233 if config.allow_create {
234 std::fs::create_dir_all(&home).map_err(|e| {
235 NoxuError::environment(format!(
236 "Failed to create environment directory {:?}: {}",
237 home, e
238 ))
239 })?;
240 } else {
241 return Err(NoxuError::environment(format!(
242 "Environment directory {:?} does not exist and allow_create is false",
243 home
244 )));
245 }
246 }
247
248 if !home.is_dir() {
249 return Err(NoxuError::environment(format!(
250 "Environment home {:?} is not a directory",
251 home
252 )));
253 }
254
255 if !config.read_only {
257 let test_file = home.join(".noxu_write_test");
259 std::fs::write(&test_file, b"test").map_err(|e| {
260 NoxuError::environment(format!(
261 "Environment directory {:?} is not writable: {}",
262 home, e
263 ))
264 })?;
265 let _ = std::fs::remove_file(&test_file);
266 }
267
268 crate::unimplemented_params::warn_unimplemented_params(&config);
271
272 let buf_size = if config.log_buffer_size > 0 {
275 config.log_buffer_size
276 } else {
277 (config.log_total_buffer_bytes as usize)
278 .checked_div(config.log_num_buffers)
279 .unwrap_or(1024 * 1024)
280 };
281 let dbi_cfg = DbiEnvConfig {
282 read_only: config.read_only,
284 transactional: config.transactional,
285 env_is_locking: config.env_is_locking,
286 env_recovery_force_checkpoint: config.env_recovery_force_checkpoint,
287 env_recovery_force_checkpoint_field: config
288 .env_recovery_force_checkpoint,
289 env_recovery_force_new_file: config.env_recovery_force_new_file,
290 halt_on_commit_after_checksum_exception: config
291 .halt_on_commit_after_checksum_exception,
292 env_check_leaks: config.env_check_leaks,
293 env_forced_yield: config.env_forced_yield,
294 env_fair_latches: config.env_fair_latches,
295 env_latch_timeout_ms: config.env_latch_timeout_ms,
296 env_ttl_clock_tolerance_ms: config.env_ttl_clock_tolerance_ms,
297 env_expiration_enabled: config.env_expiration_enabled,
298 dos_producer_queue_timeout_ms: config.dos_producer_queue_timeout_ms,
299 env_db_eviction: config.env_db_eviction,
300 cache_size: config.cache_size,
302 cache_percent: config.cache_percent,
303 shared_cache: config.shared_cache,
304 max_off_heap_memory: config.max_off_heap_memory,
305 max_disk: config.max_disk,
306 free_disk: config.free_disk,
307 reserved_disk: config.reserved_disk,
308 log_file_max_bytes: config.log_file_max_bytes,
310 log_file_cache_size: config.log_file_cache_size,
311 log_checksum_read: config.log_checksum_read,
312 log_verify_checksums: config.log_verify_checksums,
313 log_fsync_timeout_ms: config.log_fsync_timeout_ms,
314 log_fsync_time_limit_ms: config.log_fsync_time_limit_ms,
315 log_num_buffers: config.log_num_buffers,
316 log_buffer_size: buf_size,
317 log_fault_read_size: config.log_fault_read_size,
318 log_iterator_read_size: config.log_iterator_read_size,
319 log_iterator_max_size: config.log_iterator_max_size,
320 log_n_data_directories: config.log_n_data_directories,
321 log_mem_only: config.log_mem_only,
322 log_detect_file_delete: config.log_detect_file_delete,
323 log_detect_file_delete_interval_ms: config
324 .log_detect_file_delete_interval_ms,
325 log_flush_sync_interval_ms: config.log_flush_sync_interval_ms,
326 log_flush_no_sync_interval_ms: config.log_flush_no_sync_interval_ms,
327 log_use_odsync: config.log_use_odsync,
328 log_use_write_queue: config.log_use_write_queue,
329 log_write_queue_size: config.log_write_queue_size,
330 log_group_commit_threshold: config.log_group_commit_threshold,
331 log_group_commit_interval_ms: config.log_group_commit_interval_ms,
332 node_max_entries: config.node_max_entries,
334 node_dup_tree_max_entries: config.node_dup_tree_max_entries,
335 tree_max_embedded_ln: config.tree_max_embedded_ln,
336 tree_max_delta: config.tree_max_delta,
337 tree_bin_delta: config.tree_bin_delta,
338 tree_bin_delta_percent: config.tree_bin_delta_percent,
339 tree_min_memory: config.tree_min_memory,
340 tree_compact_max_key_length: config.tree_compact_max_key_length,
341 run_in_compressor: config.run_in_compressor,
343 in_compressor_wakeup_interval_ms: config
344 .in_compressor_wakeup_interval_ms,
345 compressor_deadlock_retry: config.compressor_deadlock_retry,
346 compressor_lock_timeout_ms: config.compressor_lock_timeout_ms,
347 compressor_purge_root: config.compressor_purge_root,
348 run_cleaner: config.run_cleaner,
350 cleaner_min_utilization: config.cleaner_min_utilization,
351
352 cleaner_two_pass_gap: config.cleaner_two_pass_gap,
353
354 cleaner_two_pass_threshold: config.cleaner_two_pass_threshold,
355 cleaner_min_file_utilization: config.cleaner_min_file_utilization,
356 cleaner_threads: config.cleaner_threads,
357 cleaner_min_file_count: config.cleaner_min_file_count,
358 cleaner_min_age: config.cleaner_min_age,
359 cleaner_bytes_interval: config.cleaner_bytes_interval,
360 cleaner_wakeup_interval_ms: config.cleaner_wakeup_interval_ms,
361 cleaner_fetch_obsolete_size: config.cleaner_fetch_obsolete_size,
362 cleaner_adjust_utilization: config.cleaner_adjust_utilization,
363 cleaner_deadlock_retry: config.cleaner_deadlock_retry,
364 cleaner_lock_timeout_ms: config.cleaner_lock_timeout_ms,
365 cleaner_expunge: config.cleaner_expunge,
366 cleaner_use_deleted_dir: config.cleaner_use_deleted_dir,
367 cleaner_max_batch_files: config.cleaner_max_batch_files,
368 cleaner_read_size: config.cleaner_read_size,
369 cleaner_detail_max_memory_percentage: config
370 .cleaner_detail_max_memory_percentage,
371 cleaner_look_ahead_cache_size: config.cleaner_look_ahead_cache_size,
372 cleaner_foreground_proactive_migration: config
373 .cleaner_foreground_proactive_migration,
374 cleaner_background_proactive_migration: config
375 .cleaner_background_proactive_migration,
376 cleaner_lazy_migration: config.cleaner_lazy_migration,
377 cleaner_expiration_enabled: config.cleaner_expiration_enabled,
378 run_checkpointer: config.run_checkpointer,
380 checkpointer_bytes_interval: config.checkpointer_bytes_interval,
381 checkpointer_wakeup_interval_ms: config
382 .checkpointer_wakeup_interval_ms,
383 checkpointer_deadlock_retry: config.checkpointer_deadlock_retry,
384 checkpointer_high_priority: config.checkpointer_high_priority,
385 run_evictor: config.run_evictor,
387 evictor_nodes_per_scan: config.evictor_nodes_per_scan,
388 evictor_algorithm: config.evictor_algorithm.clone(),
389 evictor_evict_bytes: config.evictor_evict_bytes,
390 evictor_critical_percentage: config.evictor_critical_percentage,
391 evictor_lru_only: config.evictor_lru_only,
392 evictor_use_dirty_lru: config.evictor_use_dirty_lru,
393 evictor_mutate_bins: config.evictor_mutate_bins,
394 evictor_n_lru_lists: config.evictor_n_lru_lists,
395 evictor_deadlock_retry: config.evictor_deadlock_retry,
396 evictor_core_threads: config.evictor_core_threads,
397 evictor_max_threads: config.evictor_max_threads,
398 evictor_keep_alive_ms: config.evictor_keep_alive_ms,
399 evictor_allow_bin_deltas: config.evictor_allow_bin_deltas,
400 run_offheap_evictor: config.run_offheap_evictor,
402 offheap_evict_bytes: config.offheap_evict_bytes,
403 offheap_n_lru_lists: config.offheap_n_lru_lists,
404 offheap_checksum: config.offheap_checksum,
405 offheap_core_threads: config.offheap_core_threads,
406 offheap_max_threads: config.offheap_max_threads,
407 offheap_keep_alive_ms: config.offheap_keep_alive_ms,
408 lock_timeout_ms: config.lock_timeout_ms,
410 lock_deadlock_detect: config.lock_deadlock_detect,
411 lock_deadlock_detect_delay_ms: config.lock_deadlock_detect_delay_ms,
412 n_lock_tables: config.lock_n_lock_tables as usize,
413 txn_timeout_ms: config.txn_timeout_ms,
415 txn_serializable_isolation: config.txn_serializable_isolation,
416 txn_deadlock_stack_trace: config.txn_deadlock_stack_trace,
417 txn_dump_locks: config.txn_dump_locks,
418 run_verifier: config.run_verifier,
420 verify_log: config.verify_log,
421 verify_log_read_delay_ms: config.verify_log_read_delay_ms,
422 verify_btree: config.verify_btree,
423 verify_secondaries: config.verify_secondaries,
424 verify_data_records: config.verify_data_records,
425 verify_obsolete_records: config.verify_obsolete_records,
426 verify_btree_batch_size: config.verify_btree_batch_size,
427 verify_btree_batch_delay_ms: config.verify_btree_batch_delay_ms,
428 stats_collect: config.stats_collect,
430 stats_collect_interval_secs: config.stats_collect_interval_secs,
431 env_background_read_limit_kb: config.env_background_read_limit_kb,
433 env_background_write_limit_kb: config.env_background_write_limit_kb,
434 env_background_sleep_interval_us: config
435 .env_background_sleep_interval_us,
436 };
437 let env_impl = EnvironmentImpl::from_dbi_config(home.clone(), &dbi_cfg)
438 .map_err(|e| NoxuError::environment(e.to_string()))?;
439
440 if let Some(listener) = config.exception_listener() {
448 env_impl.set_exception_sink(std::sync::Arc::new(
449 move |source: &str, message: &str| {
450 let src = match source {
451 "Checkpointer" => {
452 crate::error::ExceptionSource::Checkpointer
453 }
454 "Cleaner" => crate::error::ExceptionSource::Cleaner,
455 "Evictor" => crate::error::ExceptionSource::Evictor,
456 "INCompressor" => {
457 crate::error::ExceptionSource::INCompressor
458 }
459 "Verifier" => crate::error::ExceptionSource::Verifier,
460 other => crate::error::ExceptionSource::Unknown(
461 other.to_string(),
462 ),
463 };
464 let thread_name = std::thread::current()
465 .name()
466 .unwrap_or("<unnamed>")
467 .to_string();
468 let event = crate::error::ExceptionEvent::new(
469 message.to_string(),
470 src,
471 thread_name,
472 );
473 listener.exception_event(&event);
474 },
475 ));
476 }
477
478 let log_manager = env_impl.get_log_manager();
479 let env_impl_arc = Arc::new(Mutex::new(env_impl));
480
481 let stats_dumper = if config.stats_collect {
487 let dir = config
488 .stats_file_directory
489 .clone()
490 .unwrap_or_else(|| home.clone());
491 let interval = std::time::Duration::from_secs(
492 config.stats_collect_interval_secs.max(1),
493 );
494 Some(crate::stats_file::StatsFileDumper::start(
495 Arc::clone(&env_impl_arc),
496 log_manager.clone(),
497 config.cache_size,
498 dir,
499 interval,
500 config.stats_file_row_count,
501 config.stats_max_files,
502 ))
503 } else {
504 None
505 };
506
507 let verify_daemon = if config.run_verifier
519 && !config.verify_schedule.is_empty()
520 {
521 match crate::verify_daemon::CronSchedule::parse(
522 &config.verify_schedule,
523 ) {
524 Some(schedule) => {
525 let vconfig = noxu_engine::VerifyConfig::new()
526 .with_btree_verification(true);
527 Some(crate::verify_daemon::VerifyDaemon::start(
528 Arc::clone(&env_impl_arc),
529 schedule,
530 vconfig,
531 ))
532 }
533 None => {
534 log::warn!(
535 "verify_schedule={:?} is not a valid 5-field cron \
536 expression; the background verifier will NOT run",
537 config.verify_schedule,
538 );
539 None
540 }
541 }
542 } else {
543 None
544 };
545
546 let env = Environment {
547 home,
548 config,
549 databases: Mutex::new(HashMap::new()),
550 active_txns: Arc::new(ActiveTxns::new()),
551 next_txn_id: AtomicU64::new(1),
552 open: AtomicBool::new(true),
553 env_valid: AtomicBool::new(true),
554 env_impl: env_impl_arc,
555 log_manager,
556 last_checkpoint_time: Mutex::new(None),
557 last_checkpoint_end_lsn: Mutex::new(noxu_util::NULL_LSN),
558 replica_coordinator: Mutex::new(None),
559 replica_ack_timeout: Mutex::new(std::time::Duration::from_secs(5)),
560 stats_dumper: Mutex::new(stats_dumper),
561 verify_daemon: Mutex::new(verify_daemon),
562 };
563
564 let threshold_ms = env.config.startup_dump_threshold_ms;
573 if threshold_ms > 0 {
574 let elapsed_ms = open_start.elapsed().as_millis() as u64;
575 if Self::startup_dump_triggered(elapsed_ms, threshold_ms) {
576 match env.stats() {
577 Ok(stats) => log::warn!(
578 "startup dump: Environment::open took {elapsed_ms} ms \
579 (threshold {threshold_ms} ms). Startup is dominated by \
580 crash recovery. Stats snapshot after open: {stats:?}"
581 ),
582 Err(e) => log::warn!(
583 "startup dump: Environment::open took {elapsed_ms} ms \
584 (threshold {threshold_ms} ms); stats unavailable: {e}"
585 ),
586 }
587 }
588 }
589
590 Ok(env)
591 }
592
593 #[inline]
598 fn startup_dump_triggered(elapsed_ms: u64, threshold_ms: u64) -> bool {
599 threshold_ms > 0 && elapsed_ms >= threshold_ms
600 }
601
602 pub fn close(&self) -> Result<()> {
610 if !self.open.load(Ordering::Acquire) {
611 return Err(NoxuError::EnvironmentClosed);
612 }
613
614 let databases = self.databases.lock();
616 let open_dbs: Vec<String> = databases
617 .iter()
618 .filter(|(_, db)| db.open.load(Ordering::Acquire))
619 .map(|(name, _)| name.clone())
620 .collect();
621
622 if !open_dbs.is_empty() {
623 return Err(NoxuError::OperationNotAllowed(format!(
624 "Cannot close environment with open database handles: {:?}",
625 open_dbs
626 )));
627 }
628
629 if !self.active_txns.is_empty() {
631 return Err(NoxuError::OperationNotAllowed(format!(
632 "Cannot close environment with {} active transactions",
633 self.active_txns.len()
634 )));
635 }
636
637 self.open.store(false, Ordering::Release);
638 if let Some(dumper) = self.stats_dumper.lock().take() {
641 dumper.stop();
642 }
643 if let Some(verifier) = self.verify_daemon.lock().take() {
646 verifier.stop();
647 }
648 let env_impl = self.env_impl.lock();
649 if self.config.env_check_leaks {
655 let leaks = env_impl.get_lock_manager().report_leaked_locks();
656 if !leaks.is_empty() {
657 let total_locks: usize =
658 leaks.iter().map(|(_, owners)| owners.len()).sum();
659 log::warn!(
660 "env_check_leaks: {} lock(s) still held by {} locker(s) at \
661 Environment::close — an application likely leaked a \
662 transaction or cursor. Leaked (lsn, owners): {:?}",
663 leaks.len(),
664 total_locks,
665 leaks,
666 );
667 }
668 }
669 let _ = env_impl.close();
670 Ok(())
671 }
672
673 pub fn open_database(
693 &self,
694 txn: Option<&Transaction>,
695 name: &str,
696 config: &DatabaseConfig,
697 ) -> Result<Database> {
698 self.check_open()?;
699
700 if self.config.read_only {
706 if config.allow_create {
707 return Err(NoxuError::OperationNotAllowed(
708 "open_database: cannot create a database on a read-only \
709 environment (DatabaseConfig::with_allow_create(true))"
710 .to_string(),
711 ));
712 }
713 if !config.read_only {
714 return Err(NoxuError::OperationNotAllowed(
715 "open_database: read-only environment requires the \
716 database to be opened read-only \
717 (DatabaseConfig::with_read_only(true))"
718 .to_string(),
719 ));
720 }
721 }
722
723 if name.is_empty() {
724 return Err(NoxuError::IllegalArgument(
725 "Database name cannot be empty".to_string(),
726 ));
727 }
728
729 let mut databases = self.databases.lock();
730
731 if let Some(db_handle) = databases.get(name)
733 && db_handle.open.load(Ordering::Acquire)
734 {
735 return Err(NoxuError::DatabaseAlreadyExists(format!(
736 "Database '{}' is already open",
737 name
738 )));
739 }
740
741 let mut dbi_config = noxu_dbi::DatabaseConfig::new();
743 dbi_config.set_allow_create(config.allow_create);
744 dbi_config.set_sorted_duplicates(config.sorted_duplicates);
745 dbi_config.set_read_only(config.read_only);
746 dbi_config.set_temporary(config.temporary);
747 dbi_config.set_transactional(config.transactional);
748 dbi_config.deferred_write = config.deferred_write;
749 dbi_config.set_key_prefixing(config.key_prefixing);
752 dbi_config.btree_comparator =
755 config.btree_comparator.as_ref().map(|c| {
756 noxu_dbi::ConfigComparator {
757 identity: c.identity().to_string(),
758 func: c.func(),
759 }
760 });
761 dbi_config.duplicate_comparator = config
762 .duplicate_comparator
763 .as_ref()
764 .map(|c| noxu_dbi::ConfigComparator {
765 identity: c.identity().to_string(),
766 func: c.func(),
767 });
768 dbi_config.override_btree_comparator = config.override_btree_comparator;
769 dbi_config.override_duplicate_comparator =
770 config.override_duplicate_comparator;
771 dbi_config.triggers = config.triggers.0.clone();
775 if config.node_max_entries > 0 {
776 dbi_config.set_node_max_entries(config.node_max_entries as i32);
777 }
778
779 let is_transactional_create = txn.is_some() && config.allow_create;
784 let db_impl_arc = {
785 let env_impl = self.env_impl.lock();
786 if is_transactional_create {
787 let txn_id = txn
789 .expect("invariant: txn is Some when is_transactional_create")
790 .id();
791 env_impl
792 .open_database_transactional(name, &dbi_config, txn_id)
793 } else {
794 env_impl.open_database(name, &dbi_config)
795 }
796 .map_err(|e| {
797 match &e {
798 noxu_dbi::DbiError::DatabaseNotFound(_) => {
799 NoxuError::DatabaseNotFound(format!(
800 "Database '{}' does not exist and allow_create is false",
801 name
802 ))
803 }
804 _ => NoxuError::environment(e.to_string()),
805 }
806 })?
807 };
808
809 if is_transactional_create {
813 let env_impl_arc = Arc::clone(&self.env_impl);
814 let db_name_abort = name.to_string();
815 let db_name_commit = name.to_string();
816 let env_impl_arc2 = Arc::clone(&self.env_impl);
817 let txn_ref = txn
819 .expect("invariant: txn is Some when is_transactional_create");
820 txn_ref.register_abort_callback(move || {
821 env_impl_arc.lock().abort_pending_database(&db_name_abort);
822 });
823 txn_ref.register_commit_callback(move || {
824 env_impl_arc2.lock().commit_pending_database(&db_name_commit);
825 });
826 }
827
828 let db_id = db_impl_arc.read().get_id().id() as u64;
829
830 let open_flag = Arc::new(AtomicBool::new(true));
834
835 let db_handle = Arc::new(DatabaseHandle {
836 name: name.to_string(),
837 id: db_id,
838 config: config.clone(),
839 open: Arc::clone(&open_flag),
840 });
841
842 databases.insert(name.to_string(), db_handle);
843 drop(databases);
844
845 Ok(Database::new(
846 name.to_string(),
847 db_id,
848 config.clone(),
849 db_impl_arc,
850 Arc::clone(&self.env_impl),
851 open_flag,
852 self.config.txn_no_sync,
853 self.config.txn_write_no_sync,
854 ))
855 }
856
857 pub fn remove_database(
871 &self,
872 _txn: Option<&Transaction>,
873 name: &str,
874 ) -> Result<()> {
875 self.check_writable("remove_database")?;
876
877 let mut databases = self.databases.lock();
878 {
879 let env_impl = self.env_impl.lock();
880 env_impl.remove_database(name).map_err(|e| match &e {
881 noxu_dbi::DbiError::DatabaseNotFound(_) => {
882 NoxuError::DatabaseNotFound(format!(
883 "Database '{}' does not exist",
884 name
885 ))
886 }
887 _ => NoxuError::environment(e.to_string()),
888 })?;
889 }
890 databases.remove(name);
891
892 Ok(())
893 }
894
895 pub fn truncate_database(
902 &self,
903 _txn: Option<&Transaction>,
904 name: &str,
905 ) -> Result<u64> {
906 self.check_writable("truncate_database")?;
907 let env_impl = self.env_impl.lock();
908 env_impl.truncate_database(name).map_err(|e| match &e {
909 noxu_dbi::DbiError::DatabaseNotFound(_) => {
910 NoxuError::DatabaseNotFound(format!(
911 "Database '{}' does not exist",
912 name
913 ))
914 }
915 _ => NoxuError::environment(e.to_string()),
916 })
917 }
918
919 pub fn rename_database(
935 &self,
936 _txn: Option<&Transaction>,
937 old_name: &str,
938 new_name: &str,
939 ) -> Result<()> {
940 self.check_writable("rename_database")?;
941
942 if old_name == new_name {
943 return Ok(());
944 }
945
946 let mut databases = self.databases.lock();
947 {
948 let env_impl = self.env_impl.lock();
949 env_impl.rename_database(old_name, new_name).map_err(
950 |e| match &e {
951 noxu_dbi::DbiError::DatabaseNotFound(_) => {
952 NoxuError::DatabaseNotFound(format!(
953 "Database '{}' does not exist",
954 old_name
955 ))
956 }
957 noxu_dbi::DbiError::DatabaseAlreadyExists(_) => {
958 NoxuError::DatabaseAlreadyExists(format!(
959 "Database '{}' already exists",
960 new_name
961 ))
962 }
963 _ => NoxuError::environment(e.to_string()),
964 },
965 )?;
966 }
967
968 if let Some(handle) = databases.remove(old_name) {
969 databases.insert(new_name.to_string(), handle);
970 }
971
972 Ok(())
973 }
974
975 pub fn begin_transaction(
997 &self,
998 config: Option<&TransactionConfig>,
999 ) -> Result<Transaction> {
1000 self.check_open()?;
1001
1002 if !self.config.transactional {
1003 return Err(NoxuError::OperationNotAllowed(
1004 "Cannot begin transaction on non-transactional environment"
1005 .to_string(),
1006 ));
1007 }
1008
1009 if self.config.read_only
1014 && !config.map(|c| c.read_only).unwrap_or(false)
1015 {
1016 return Err(NoxuError::OperationNotAllowed(
1017 "begin_transaction: read-only environment requires the \
1018 transaction to be read-only \
1019 (TransactionConfig::with_read_only(true))"
1020 .to_string(),
1021 ));
1022 }
1023
1024 let txn_id = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
1025 let mut txn_config = match config.cloned() {
1040 Some(c) => c,
1041 None => TransactionConfig::default()
1042 .with_durability(self.config.durability),
1043 };
1044 if config.is_none() {
1045 let derived = match (
1048 self.config.txn_no_sync,
1049 self.config.txn_write_no_sync,
1050 ) {
1051 (true, _) => {
1052 Some(crate::durability::Durability::COMMIT_NO_SYNC)
1053 }
1054 (_, true) => {
1055 Some(crate::durability::Durability::COMMIT_WRITE_NO_SYNC)
1056 }
1057 _ => None,
1058 };
1059 if let Some(d) = derived {
1060 txn_config = txn_config.with_durability(d);
1061 }
1062 }
1063
1064 let txn_state = Arc::new(TransactionState {
1065 id: txn_id,
1066 config: txn_config.clone(),
1067 committed: AtomicBool::new(false),
1068 aborted: AtomicBool::new(false),
1069 });
1070
1071 let mut active_txns = self.active_txns.txns.lock();
1072 active_txns.insert(txn_id, txn_state);
1073 drop(active_txns);
1074
1075 let env_guard = self.env_impl.lock();
1078 let inner_txn = env_guard
1079 .begin_txn()
1080 .map(|mut t| {
1081 if txn_config.read_committed {
1084 t.set_read_committed_isolation(true);
1085 }
1086 if txn_config.read_uncommitted {
1087 t.set_read_uncommitted_default(true);
1092 }
1093 if txn_config.serializable_isolation {
1094 t.set_serializable_isolation(true);
1095 }
1096 if txn_config.importunate {
1097 t.set_importunate(true);
1098 }
1099 if txn_config.no_wait {
1100 t.set_no_wait(true);
1101 }
1102 if txn_config.lock_timeout_ms > 0 {
1103 t.set_lock_timeout(txn_config.lock_timeout_ms);
1104 }
1105 if txn_config.txn_timeout_ms > 0 {
1106 t.set_txn_timeout(txn_config.txn_timeout_ms);
1107 }
1108 Arc::new(std::sync::Mutex::new(t))
1109 })
1110 .ok();
1111 if txn_config.serializable_isolation && inner_txn.is_some() {
1119 env_guard.get_txn_manager().register_serializable();
1120 }
1121 let txn = if let Some(lm) = env_guard.get_log_manager() {
1122 Transaction::with_log_manager(txn_id, txn_config, lm)
1123 } else {
1124 Transaction::new(txn_id, txn_config)
1125 };
1126 drop(env_guard);
1127
1128 let txn =
1129 if let Some(it) = inner_txn { txn.with_inner_txn(it) } else { txn };
1130
1131 let txn = txn.with_env_impl(Arc::clone(&self.env_impl));
1134
1135 let txn = txn.with_active_txns(Arc::clone(&self.active_txns));
1140
1141 let txn = if let Some(coord) = self.replica_coordinator.lock().clone() {
1146 let timeout = *self.replica_ack_timeout.lock();
1147 txn.with_replica_coordinator(coord, timeout)
1148 } else {
1149 txn
1150 };
1151
1152 Ok(txn)
1153 }
1154
1155 pub fn database_names(&self) -> Result<Vec<String>> {
1165 self.check_open()?;
1166 let env_impl = self.env_impl.lock();
1167 Ok(env_impl.get_database_names())
1168 }
1169
1170 pub fn set_replica_coordinator(
1189 &self,
1190 coord: noxu_dbi::SharedReplicaAckCoordinator,
1191 ) {
1192 *self.replica_coordinator.lock() = Some(coord);
1193 }
1194
1195 pub fn clear_replica_coordinator(&self) {
1200 *self.replica_coordinator.lock() = None;
1201 }
1202
1203 pub fn set_replica_ack_timeout(&self, timeout: std::time::Duration) {
1209 *self.replica_ack_timeout.lock() = timeout;
1210 }
1211
1212 pub fn replica_ack_timeout(&self) -> std::time::Duration {
1214 *self.replica_ack_timeout.lock()
1215 }
1216
1217 pub fn home(&self) -> &Path {
1221 &self.home
1222 }
1223
1224 pub fn config(&self) -> &EnvironmentConfig {
1228 &self.config
1229 }
1230
1231 pub fn mutable_config(&self) -> Result<EnvironmentMutableConfig> {
1237 self.check_open()?;
1238 Ok(EnvironmentMutableConfig {
1239 cache_size: Some(self.config.cache_size as usize),
1240 durability: None,
1241 txn_no_sync: self.config.txn_no_sync,
1242 txn_write_no_sync: self.config.txn_write_no_sync,
1243 run_cleaner: Some(self.config.run_cleaner),
1244 run_checkpointer: Some(self.config.run_checkpointer),
1245 run_evictor: Some(self.config.run_evictor),
1246 lock_timeout_ms: Some(self.config.lock_timeout_ms),
1247 txn_timeout_ms: Some(self.config.txn_timeout_ms),
1248 cleaner_min_utilization: Some(
1249 self.config.cleaner_min_utilization as u32,
1250 ),
1251 })
1252 }
1253
1254 pub fn set_mutable_config(
1264 &mut self,
1265 cfg: EnvironmentMutableConfig,
1266 ) -> Result<()> {
1267 self.check_open()?;
1268 if let Some(sz) = cfg.cache_size {
1269 self.config.cache_size = sz as u64;
1270 let env_impl = self.env_impl.lock();
1275 let evictor = env_impl.get_evictor();
1276 evictor.get_arbiter().set_max_memory(sz as i64);
1277 }
1278 if let Some(ms) = cfg.lock_timeout_ms {
1279 self.config.lock_timeout_ms = ms;
1280 let env_impl = self.env_impl.lock();
1282 env_impl.get_lock_manager().set_lock_timeout(ms);
1283 }
1284 if let Some(ms) = cfg.txn_timeout_ms {
1285 self.config.txn_timeout_ms = ms;
1286 }
1295 if let Some(pct) = cfg.cleaner_min_utilization {
1300 self.config.cleaner_min_utilization = pct.min(100) as u8;
1301 let env_impl = self.env_impl.lock();
1302 if let Some(cleaner) = env_impl.get_cleaner() {
1303 cleaner.set_min_utilization(pct);
1304 }
1305 }
1306 self.config.txn_no_sync = cfg.txn_no_sync;
1307 self.config.txn_write_no_sync = cfg.txn_write_no_sync;
1308 if let Some(v) = cfg.run_cleaner {
1312 self.config.run_cleaner = v;
1313 }
1314 if let Some(v) = cfg.run_checkpointer {
1315 self.config.run_checkpointer = v;
1316 }
1317 if let Some(v) = cfg.run_evictor {
1318 self.config.run_evictor = v;
1319 }
1320 Ok(())
1321 }
1322
1323 pub fn checkpoint(&self, config: Option<&CheckpointConfig>) -> Result<()> {
1335 self.check_open()?;
1336
1337 let cfg = config.cloned().unwrap_or_default();
1343
1344 if !cfg.force {
1345 if cfg.k_bytes > 0 {
1348 let cur_lsn = self
1349 .log_manager
1350 .as_ref()
1351 .map(|lm| lm.get_end_of_log())
1352 .unwrap_or(noxu_util::NULL_LSN);
1353 let last = *self.last_checkpoint_end_lsn.lock();
1354 let bytes_written =
1355 cur_lsn.as_u64().saturating_sub(last.as_u64());
1356 let threshold = (cfg.k_bytes as u64) * 1024;
1357 if bytes_written < threshold {
1358 log::debug!(
1359 "checkpoint: skipping (k_bytes threshold {} not \
1360 met, only {} bytes since last checkpoint)",
1361 threshold,
1362 bytes_written,
1363 );
1364 return Ok(());
1365 }
1366 }
1367
1368 if cfg.minutes > 0 {
1371 let last_at = *self.last_checkpoint_time.lock();
1372 if let Some(at) = last_at {
1373 let elapsed = at.elapsed();
1374 let threshold =
1375 std::time::Duration::from_secs(cfg.minutes as u64 * 60);
1376 if elapsed < threshold {
1377 log::debug!(
1378 "checkpoint: skipping (minutes threshold {:?} \
1379 not met, only {:?} since last checkpoint)",
1380 threshold,
1381 elapsed,
1382 );
1383 return Ok(());
1384 }
1385 }
1386 }
1387 }
1388
1389 let invoker = match (cfg.force, cfg.minimize_recovery_time) {
1395 (true, true) => "manual_force_full",
1396 (true, false) => "manual_force",
1397 (false, true) => "manual_full",
1398 (false, false) => "manual",
1399 };
1400
1401 let env_impl = self.env_impl.lock();
1402 env_impl
1403 .run_checkpoint_with_invoker(invoker)
1404 .map_err(|e| NoxuError::environment(e.to_string()))?;
1405 drop(env_impl);
1406
1407 *self.last_checkpoint_time.lock() = Some(Instant::now());
1410 if let Some(lm) = &self.log_manager {
1411 *self.last_checkpoint_end_lsn.lock() = lm.get_end_of_log();
1412 }
1413 Ok(())
1414 }
1415
1416 pub fn is_valid(&self) -> bool {
1423 self.open.load(Ordering::Acquire)
1424 && self.env_valid.load(Ordering::Acquire)
1425 }
1426
1427 pub fn invalidate(&self) {
1434 self.env_valid.store(false, Ordering::Release);
1435 }
1436
1437 pub fn is_transactional(&self) -> bool {
1441 self.config.transactional
1442 }
1443
1444 pub fn is_read_only(&self) -> bool {
1448 self.config.read_only
1449 }
1450
1451 pub fn stats(&self) -> Result<EnvironmentStats> {
1455 self.check_open()?;
1456 let env_impl = self.env_impl.lock();
1457 Ok(build_environment_stats(
1458 &env_impl,
1459 self.log_manager.as_deref(),
1460 self.config.cache_size,
1461 ))
1462 }
1463
1464 pub fn stat_fsync_count(&self) -> u64 {
1470 self.log_manager.as_ref().map(|lm| lm.fsync_count()).unwrap_or(0)
1471 }
1472
1473 pub fn recovered_prepared_txns(
1488 &self,
1489 ) -> Vec<noxu_recovery::PreparedTxnInfo> {
1490 let env_impl = self.env_impl.lock();
1491 env_impl.recovered_prepared_txns()
1492 }
1493
1494 pub fn take_recovered_prepared_lns(
1502 &self,
1503 txn_id: u64,
1504 ) -> Vec<noxu_recovery::PreparedLnReplay> {
1505 let env_impl = self.env_impl.lock();
1506 env_impl.take_recovered_prepared_lns(txn_id)
1507 }
1508
1509 pub fn forget_recovered_prepared_txn(&self, txn_id: u64) {
1513 let env_impl = self.env_impl.lock();
1514 env_impl.forget_recovered_prepared_txn(txn_id);
1515 }
1516
1517 pub fn write_txn_commit_for_recovered(&self, txn_id: u64) -> Result<()> {
1527 let lm = match &self.log_manager {
1528 Some(lm) => lm,
1529 None => return Ok(()), };
1531 let pre_vlsn =
1536 if let Some(coord) = self.replica_coordinator.lock().as_ref() {
1537 coord.pre_alloc_vlsn_for_recovered_commit()
1538 } else {
1539 0
1540 };
1541
1542 let commit_lsn = write_txn_end_for_recovered(
1543 lm, txn_id, true, true, true, pre_vlsn,
1547 )?;
1548
1549 if let Some(coord) = self.replica_coordinator.lock().as_ref() {
1553 if pre_vlsn > 0 {
1554 coord.register_recovered_commit_vlsn(pre_vlsn, commit_lsn);
1555 log::debug!(
1556 "write_txn_commit_for_recovered: txn_id={} commit_lsn={:?} \
1557 embedded+registered vlsn={} (R-3)",
1558 txn_id,
1559 commit_lsn,
1560 pre_vlsn
1561 );
1562 } else {
1563 let vlsn = coord.alloc_vlsn_for_recovered_commit(commit_lsn);
1566 if vlsn > 0 {
1567 log::debug!(
1568 "write_txn_commit_for_recovered: txn_id={} commit_lsn={:?} \
1569 assigned vlsn={} (X-3 legacy path)",
1570 txn_id,
1571 commit_lsn,
1572 vlsn
1573 );
1574 }
1575 }
1576 }
1577 Ok(())
1578 }
1579
1580 pub fn write_txn_abort_for_recovered(&self, txn_id: u64) -> Result<()> {
1583 let lm = match &self.log_manager {
1584 Some(lm) => lm,
1585 None => return Ok(()),
1586 };
1587 write_txn_end_for_recovered(
1588 lm, txn_id, false, false, false, 0, )
1593 .map(|_| ())
1594 }
1595
1596 pub fn apply_recovered_prepared_lns(
1606 &self,
1607 lns: &[noxu_recovery::PreparedLnReplay],
1608 ) -> Result<()> {
1609 let env_impl = self.env_impl.lock();
1610 for ln in lns {
1611 let db_id = noxu_dbi::DatabaseId::new(ln.db_id as i64);
1612 let Some(db_arc) = env_impl.get_database_by_id(db_id) else {
1613 continue;
1614 };
1615 let db_guard = db_arc.read();
1616 let Some(tree) = db_guard.get_real_tree() else {
1617 continue;
1618 };
1619 match ln.operation {
1620 noxu_recovery::PreparedLnOperation::Insert
1621 | noxu_recovery::PreparedLnOperation::Update => {
1622 if let Some(data) = &ln.data {
1623 let _ = tree.insert(
1624 ln.key.clone(),
1625 data.clone(),
1626 ln.original_lsn,
1627 );
1628 }
1629 }
1630 noxu_recovery::PreparedLnOperation::Delete => {
1631 if tree.delete(&ln.key) {
1632 db_guard.decrement_entry_count();
1633 }
1634 }
1635 }
1636 }
1637 Ok(())
1638 }
1639
1640 pub fn verify(
1659 &self,
1660 config: &noxu_engine::VerifyConfig,
1661 ) -> Result<noxu_engine::VerifyResult> {
1662 self.check_open()?;
1663 let env_impl = self.env_impl.lock();
1664 let all_dbs = env_impl.get_all_database_impls();
1665 let tracker_guard =
1670 env_impl.get_utilization_tracker().map(|t| t.lock());
1671
1672 let mut merged = noxu_engine::VerifyResult::new();
1673 for db_arc in &all_dbs {
1674 let guard = db_arc.read();
1675 let result = noxu_engine::verify_database_impl(&guard, config);
1676 merged.databases_verified += result.databases_verified;
1677 merged.records_verified += result.records_verified;
1678 for err in result.errors {
1679 merged.add_error(err);
1680 if merged.error_count() >= config.max_errors as usize {
1681 return Ok(merged);
1682 }
1683 }
1684 for w in result.warnings {
1685 merged.add_warning(w);
1686 }
1687 if let Some(ref t) = tracker_guard {
1690 noxu_engine::check_lsns_against_tracker(&guard, t, &mut merged);
1691 if merged.error_count() >= config.max_errors as usize {
1692 return Ok(merged);
1693 }
1694 }
1695 }
1696 Ok(merged)
1697 }
1698
1699 pub fn compress(&self) -> Result<usize> {
1711 self.check_open()?;
1712 let env_impl = self.env_impl.lock();
1713 let n = env_impl.compress_all();
1714 Ok(n)
1715 }
1716
1717 pub fn clean_log(&self) -> Result<u32> {
1733 self.check_open()?;
1734 let env_impl = self.env_impl.lock();
1735 let result = env_impl
1736 .run_cleaner(u32::MAX, true)
1737 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1738 Ok(result.files_cleaned)
1739 }
1740
1741 pub fn refresh_disk_limit(&self) -> Result<()> {
1750 self.check_open()?;
1751 self.env_impl.lock().refresh_disk_limit();
1752 Ok(())
1753 }
1754
1755 pub fn evict_memory(&self) -> Result<usize> {
1765 self.check_open()?;
1766 let env_impl = self.env_impl.lock();
1767 let bytes = env_impl.evict_memory();
1768 Ok(bytes)
1769 }
1770
1771 pub fn evictor_algorithm_name(&self) -> Result<&'static str> {
1775 self.check_open()?;
1776 Ok(self.env_impl.lock().evictor_algorithm_name())
1777 }
1778
1779 pub fn cache_usage_bytes(&self) -> Result<i64> {
1783 self.check_open()?;
1784 Ok(self.env_impl.lock().get_cache_usage())
1785 }
1786 #[allow(dead_code)] pub(crate) fn mark_database_closed(&self, name: &str) {
1790 let databases = self.databases.lock();
1791 if let Some(db_handle) = databases.get(name) {
1792 db_handle.open.store(false, Ordering::Release);
1793 }
1794 }
1795
1796 #[allow(dead_code)] pub(crate) fn mark_transaction_complete(&self, txn_id: u64) {
1804 self.active_txns.mark_complete(txn_id);
1805 }
1806
1807 fn check_open(&self) -> Result<()> {
1808 if !self.open.load(Ordering::Acquire) {
1809 return Err(NoxuError::EnvironmentClosed);
1810 }
1811 if !self.env_valid.load(Ordering::Acquire) {
1812 return Err(NoxuError::environment_with_reason(
1813 crate::error::EnvironmentFailureReason::ForcedShutdown,
1814 "environment has been invalidated due to a prior fatal error"
1815 .to_string(),
1816 ));
1817 }
1818 Ok(())
1819 }
1820
1821 fn check_writable(&self, what: &str) -> Result<()> {
1826 self.check_open()?;
1827 if self.config.read_only {
1828 return Err(NoxuError::OperationNotAllowed(format!(
1829 "{what}: environment is read-only",
1830 )));
1831 }
1832 Ok(())
1833 }
1834}
1835
1836fn write_txn_end_for_recovered(
1849 lm: &LogManager,
1850 txn_id: u64,
1851 is_commit: bool,
1852 fsync: bool,
1853 flush: bool,
1854 vlsn: u64,
1855) -> Result<noxu_util::Lsn> {
1856 use bytes::BytesMut;
1857 use noxu_log::{LogEntryType, Provisional, entry::TxnEndEntry};
1858 use noxu_util::{
1859 lsn::NULL_LSN,
1860 vlsn::{NULL_VLSN, Vlsn},
1861 };
1862
1863 let timestamp = std::time::SystemTime::now()
1864 .duration_since(std::time::UNIX_EPOCH)
1865 .unwrap_or_default()
1866 .as_millis() as u64;
1867
1868 let dtvlsn = if vlsn > 0 { Vlsn::new(vlsn as i64) } else { NULL_VLSN };
1871
1872 let entry = if is_commit {
1873 TxnEndEntry::new_commit(txn_id as i64, NULL_LSN, timestamp, 0, dtvlsn)
1874 } else {
1875 TxnEndEntry::new_abort(txn_id as i64, NULL_LSN, timestamp, 0, NULL_VLSN)
1876 };
1877
1878 let entry_type = if is_commit {
1879 LogEntryType::TxnCommit
1880 } else {
1881 LogEntryType::TxnAbort
1882 };
1883
1884 let mut buf = BytesMut::with_capacity(entry.log_size());
1885 entry.write_to_log(&mut buf);
1886
1887 lm.log(entry_type, &buf, Provisional::No, flush, fsync).map_err(|e| {
1888 NoxuError::environment_with_reason(
1889 crate::error::EnvironmentFailureReason::LogWrite,
1890 e.to_string(),
1891 )
1892 })
1893}
1894
1895impl Drop for Environment {
1896 fn drop(&mut self) {
1897 let _ = self.close();
1899 }
1900}
1901
1902#[cfg(test)]
1903mod tests {
1904 use super::*;
1905 use tempfile::TempDir;
1906
1907 fn temp_env_config() -> (TempDir, EnvironmentConfig) {
1908 let temp_dir = TempDir::new().unwrap();
1909 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1910 .with_allow_create(true)
1911 .with_transactional(true);
1912 (temp_dir, config)
1913 }
1914
1915 #[test]
1916 fn test_open_environment() {
1917 let (temp_dir, config) = temp_env_config();
1918 let env = Environment::open(config).unwrap();
1919 assert!(env.is_valid());
1920 assert_eq!(env.home(), temp_dir.path());
1921 env.close().unwrap();
1922 }
1923
1924 #[test]
1925 fn test_env_check_leaks_reports_leaked_lock() {
1926 use noxu_txn::LockType;
1927 let (_temp_dir, config) = temp_env_config();
1928 assert!(config.env_check_leaks);
1930 let env = Environment::open(config).unwrap();
1931
1932 {
1937 let env_impl = env.env_impl.lock();
1938 let lm = env_impl.get_lock_manager();
1939 lm.lock(0xDEAD_BEEF, 999, LockType::Write, false, false).unwrap();
1940 let leaks = lm.report_leaked_locks();
1942 assert!(
1943 leaks.iter().any(|(lsn, owners)| *lsn == 0xDEAD_BEEF
1944 && owners.contains(&999)),
1945 "the deliberately-leaked lock must be reported"
1946 );
1947 }
1948
1949 env.close().unwrap();
1953 }
1954
1955 #[test]
1956 fn test_env_check_leaks_clean_close_no_leak() {
1957 let (_temp_dir, config) = temp_env_config();
1958 let env = Environment::open(config).unwrap();
1959 {
1961 let env_impl = env.env_impl.lock();
1962 assert!(
1963 env_impl.get_lock_manager().report_leaked_locks().is_empty(),
1964 "a freshly opened env must have no leaked locks"
1965 );
1966 }
1967 env.close().unwrap();
1968 }
1969
1970 #[test]
1972 fn test_exception_listener_fires_on_daemon_error() {
1973 use crate::error::{
1974 ExceptionEvent, ExceptionListener, ExceptionSource,
1975 };
1976 use std::sync::Arc as StdArc;
1977 use std::sync::Mutex as StdMutex;
1978
1979 #[derive(Default)]
1981 struct Recorder {
1982 events: StdMutex<Vec<ExceptionEvent>>,
1983 }
1984 impl ExceptionListener for Recorder {
1985 fn exception_event(&self, event: &ExceptionEvent) {
1986 self.events.lock().unwrap().push(event.clone());
1987 }
1988 }
1989
1990 let recorder = StdArc::new(Recorder::default());
1991 let (_temp_dir, config) = temp_env_config();
1992 let config = config.with_exception_listener(recorder.clone());
1993 let env = Environment::open(config).unwrap();
1994
1995 {
2000 let env_impl = env.env_impl.lock();
2001 let d = env_impl.exception_dispatcher();
2002 assert!(
2003 d.is_installed(),
2004 "the registered listener must install a daemon sink"
2005 );
2006 d.dispatch("Cleaner", "simulated background cleaner failure");
2007 d.dispatch("Checkpointer", "simulated checkpoint failure");
2008 }
2009
2010 let events = recorder.events.lock().unwrap();
2011 assert_eq!(events.len(), 2, "listener must receive both daemon errors");
2012 assert_eq!(events[0].source, ExceptionSource::Cleaner);
2013 assert!(events[0].message.contains("cleaner failure"));
2014 assert_eq!(events[1].source, ExceptionSource::Checkpointer);
2015 assert!(events[1].message.contains("checkpoint failure"));
2016 drop(events);
2017
2018 env.close().unwrap();
2019 }
2020
2021 #[test]
2022 fn test_no_exception_listener_leaves_sink_uninstalled() {
2023 let (_temp_dir, config) = temp_env_config();
2024 let env = Environment::open(config).unwrap();
2026 {
2027 let env_impl = env.env_impl.lock();
2028 assert!(
2029 !env_impl.exception_dispatcher().is_installed(),
2030 "no listener means no sink installed; dispatch is a no-op"
2031 );
2032 env_impl.exception_dispatcher().dispatch("Cleaner", "ignored");
2034 }
2035 env.close().unwrap();
2036 }
2037
2038 #[test]
2040 fn test_startup_dump_triggered_predicate() {
2041 assert!(!Environment::startup_dump_triggered(1_000_000, 0));
2043 assert!(!Environment::startup_dump_triggered(9, 10));
2045 assert!(Environment::startup_dump_triggered(10, 10));
2047 assert!(Environment::startup_dump_triggered(50, 10));
2049 }
2050
2051 #[test]
2052 fn test_open_with_startup_dump_threshold_smoke() {
2053 let temp_dir = TempDir::new().unwrap();
2057 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2058 .with_allow_create(true)
2059 .with_transactional(true)
2060 .with_startup_dump_threshold_ms(1);
2061 let env = Environment::open(config).unwrap();
2062 assert!(env.is_valid());
2063 env.close().unwrap();
2064 }
2065
2066 #[test]
2067 fn test_open_creates_directory() {
2068 let temp_dir = TempDir::new().unwrap();
2069 let home = temp_dir.path().join("subdir");
2070 let config =
2071 EnvironmentConfig::new(home.clone()).with_allow_create(true);
2072
2073 let env = Environment::open(config).unwrap();
2074 assert!(home.exists());
2075 assert!(home.is_dir());
2076 env.close().unwrap();
2077 }
2078
2079 #[test]
2080 fn test_open_fails_without_allow_create() {
2081 let temp_dir = TempDir::new().unwrap();
2082 let home = temp_dir.path().join("nonexistent");
2083 let config = EnvironmentConfig::new(home).with_allow_create(false);
2084
2085 let result = Environment::open(config);
2086 assert!(result.is_err());
2087 }
2088
2089 #[test]
2090 fn test_close_environment() {
2091 let (_temp_dir, config) = temp_env_config();
2092 let env = Environment::open(config).unwrap();
2093 assert!(env.is_valid());
2094 env.close().unwrap();
2095 assert!(!env.is_valid());
2096 }
2097
2098 #[test]
2099 fn test_close_twice_fails() {
2100 let (_temp_dir, config) = temp_env_config();
2101 let env = Environment::open(config).unwrap();
2102 env.close().unwrap();
2103 let result = env.close();
2104 assert!(result.is_err());
2105 }
2106
2107 #[test]
2108 fn test_close_with_open_database_fails() {
2109 let (_temp_dir, config) = temp_env_config();
2110 let env = Environment::open(config).unwrap();
2111
2112 let db_config = DatabaseConfig::new()
2113 .with_allow_create(true)
2114 .with_transactional(true);
2115 let _db = env.open_database(None, "testdb", &db_config).unwrap();
2116
2117 let result = env.close();
2118 assert!(result.is_err());
2119 }
2120
2121 #[test]
2122 fn test_open_database() {
2123 let (_temp_dir, config) = temp_env_config();
2124 let env = Environment::open(config).unwrap();
2125
2126 let db_config = DatabaseConfig::new()
2127 .with_allow_create(true)
2128 .with_transactional(true);
2129 let db = env.open_database(None, "testdb", &db_config).unwrap();
2130 assert_eq!(db.name(), "testdb");
2131 assert!(db.is_valid());
2132 }
2133
2134 #[test]
2135 fn test_open_database_twice_fails() {
2136 let (_temp_dir, config) = temp_env_config();
2137 let env = Environment::open(config).unwrap();
2138
2139 let db_config = DatabaseConfig::new()
2140 .with_allow_create(true)
2141 .with_transactional(true);
2142 let _db1 = env.open_database(None, "testdb", &db_config).unwrap();
2143 let result = env.open_database(None, "testdb", &db_config);
2144 assert!(result.is_err());
2145 }
2146
2147 #[test]
2148 fn test_open_database_without_create_fails() {
2149 let (_temp_dir, config) = temp_env_config();
2150 let env = Environment::open(config).unwrap();
2151
2152 let db_config = DatabaseConfig::new().with_allow_create(false);
2153 let result = env.open_database(None, "nonexistent", &db_config);
2154 assert!(result.is_err());
2155 }
2156
2157 #[test]
2158 fn test_open_database_empty_name_fails() {
2159 let (_temp_dir, config) = temp_env_config();
2160 let env = Environment::open(config).unwrap();
2161
2162 let db_config = DatabaseConfig::new()
2163 .with_allow_create(true)
2164 .with_transactional(true);
2165 let result = env.open_database(None, "", &db_config);
2166 assert!(result.is_err());
2167 }
2168
2169 #[test]
2170 fn test_remove_database() {
2171 let (_temp_dir, config) = temp_env_config();
2172 let env = Environment::open(config).unwrap();
2173
2174 let db_config = DatabaseConfig::new()
2175 .with_allow_create(true)
2176 .with_transactional(true);
2177 let db = env.open_database(None, "testdb", &db_config).unwrap();
2178 db.close().unwrap();
2179
2180 env.remove_database(None, "testdb").unwrap();
2181 let names = env.database_names().unwrap();
2182 assert!(!names.contains(&"testdb".to_string()));
2183 }
2184
2185 #[test]
2186 fn test_remove_open_database_fails() {
2187 let (_temp_dir, config) = temp_env_config();
2188 let env = Environment::open(config).unwrap();
2189
2190 let db_config = DatabaseConfig::new()
2191 .with_allow_create(true)
2192 .with_transactional(true);
2193 let _db = env.open_database(None, "testdb", &db_config).unwrap();
2194
2195 let result = env.remove_database(None, "testdb");
2196 assert!(result.is_err());
2197 }
2198
2199 #[test]
2200 fn test_remove_nonexistent_database_fails() {
2201 let (_temp_dir, config) = temp_env_config();
2202 let env = Environment::open(config).unwrap();
2203
2204 let result = env.remove_database(None, "nonexistent");
2205 assert!(result.is_err());
2206 }
2207
2208 #[test]
2209 fn test_rename_database() {
2210 let (_temp_dir, config) = temp_env_config();
2211 let env = Environment::open(config).unwrap();
2212
2213 let db_config = DatabaseConfig::new()
2214 .with_allow_create(true)
2215 .with_transactional(true);
2216 let db = env.open_database(None, "oldname", &db_config).unwrap();
2217 db.close().unwrap();
2218
2219 env.rename_database(None, "oldname", "newname").unwrap();
2220
2221 let names = env.database_names().unwrap();
2222 assert!(!names.contains(&"oldname".to_string()));
2223 assert!(names.contains(&"newname".to_string()));
2224 }
2225
2226 #[test]
2227 fn test_rename_to_same_name() {
2228 let (_temp_dir, config) = temp_env_config();
2229 let env = Environment::open(config).unwrap();
2230
2231 let db_config = DatabaseConfig::new()
2232 .with_allow_create(true)
2233 .with_transactional(true);
2234 let db = env.open_database(None, "testdb", &db_config).unwrap();
2235 db.close().unwrap();
2236
2237 env.rename_database(None, "testdb", "testdb").unwrap();
2238 }
2239
2240 #[test]
2241 fn test_rename_open_database_fails() {
2242 let (_temp_dir, config) = temp_env_config();
2243 let env = Environment::open(config).unwrap();
2244
2245 let db_config = DatabaseConfig::new()
2246 .with_allow_create(true)
2247 .with_transactional(true);
2248 let _db = env.open_database(None, "testdb", &db_config).unwrap();
2249
2250 let result = env.rename_database(None, "testdb", "newname");
2251 assert!(result.is_err());
2252 }
2253
2254 #[test]
2255 fn test_rename_nonexistent_database_fails() {
2256 let (_temp_dir, config) = temp_env_config();
2257 let env = Environment::open(config).unwrap();
2258
2259 let result = env.rename_database(None, "nonexistent", "newname");
2260 assert!(result.is_err());
2261 }
2262
2263 #[test]
2264 fn test_rename_to_existing_database_fails() {
2265 let (_temp_dir, config) = temp_env_config();
2266 let env = Environment::open(config).unwrap();
2267
2268 let db_config = DatabaseConfig::new()
2269 .with_allow_create(true)
2270 .with_transactional(true);
2271 let db1 = env.open_database(None, "db1", &db_config).unwrap();
2272 let db2 = env.open_database(None, "db2", &db_config).unwrap();
2273 db1.close().unwrap();
2274 db2.close().unwrap();
2275
2276 let result = env.rename_database(None, "db1", "db2");
2277 assert!(result.is_err());
2278 }
2279
2280 #[test]
2281 fn test_database_names() {
2282 let (_temp_dir, config) = temp_env_config();
2283 let env = Environment::open(config).unwrap();
2284
2285 let db_config = DatabaseConfig::new()
2286 .with_allow_create(true)
2287 .with_transactional(true);
2288 let _db1 = env.open_database(None, "db1", &db_config).unwrap();
2289 let _db2 = env.open_database(None, "db2", &db_config).unwrap();
2290
2291 let names = env.database_names().unwrap();
2292 assert_eq!(names.len(), 2);
2293 assert!(names.contains(&"db1".to_string()));
2294 assert!(names.contains(&"db2".to_string()));
2295 }
2296
2297 #[test]
2300 fn test_transactional_open_database_abort_removes_db() {
2301 let (temp_dir, config) = temp_env_config();
2302 {
2303 let env = Environment::open(config).unwrap();
2304 let txn = env.begin_transaction(None).unwrap();
2305 let db_config = DatabaseConfig::new()
2306 .with_allow_create(true)
2307 .with_transactional(true);
2308 let _db = env
2309 .open_database(Some(&txn), "aborted_db", &db_config)
2310 .unwrap();
2311 txn.abort().unwrap();
2312 let names = env.database_names().unwrap();
2314 assert!(
2315 !names.contains(&"aborted_db".to_string()),
2316 "aborted database must not appear in database_names() \
2317 (C-4 committed-only semantics), got: {:?}",
2318 names
2319 );
2320 drop(env);
2321 }
2322 let env2 = Environment::open(
2324 EnvironmentConfig::new(temp_dir.path().to_path_buf())
2325 .with_allow_create(false)
2326 .with_transactional(true),
2327 )
2328 .unwrap();
2329 let names2 = env2.database_names().unwrap();
2330 assert!(
2331 !names2.contains(&"aborted_db".to_string()),
2332 "after env reopen, aborted database must not appear: {:?}",
2333 names2
2334 );
2335 }
2336
2337 #[test]
2340 fn test_get_database_names_excludes_uncommitted() {
2341 let (_temp_dir, config) = temp_env_config();
2342 let env = Environment::open(config).unwrap();
2343
2344 let db_config = DatabaseConfig::new()
2345 .with_allow_create(true)
2346 .with_transactional(true);
2347 let txn = env.begin_transaction(None).unwrap();
2348 let _db =
2349 env.open_database(Some(&txn), "pending_db", &db_config).unwrap();
2350
2351 let names = env.database_names().unwrap();
2354 assert!(
2355 !names.contains(&"pending_db".to_string()),
2356 "uncommitted database must be invisible to database_names() \
2357 (C-4 / JE 1-J): got {:?}",
2358 names
2359 );
2360
2361 txn.commit().unwrap();
2363 let names_after = env.database_names().unwrap();
2364 assert!(
2365 names_after.contains(&"pending_db".to_string()),
2366 "committed database must appear in database_names()"
2367 );
2368 }
2369
2370 #[test]
2371 fn test_begin_transaction() {
2372 let temp_dir = TempDir::new().unwrap();
2373 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2374 .with_allow_create(true)
2375 .with_transactional(false);
2376 let env = Environment::open(config).unwrap();
2377
2378 let result = env.begin_transaction(None);
2379 assert!(result.is_err());
2380 }
2381
2382 #[test]
2383 fn test_is_transactional() {
2384 let (_temp_dir, config) = temp_env_config();
2385 let env = Environment::open(config).unwrap();
2386 assert!(env.is_transactional());
2387 }
2388
2389 #[test]
2390 fn test_is_not_transactional() {
2391 let temp_dir = TempDir::new().unwrap();
2392 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2393 .with_allow_create(true)
2394 .with_transactional(false);
2395 let env = Environment::open(config).unwrap();
2396 assert!(!env.is_transactional());
2397 }
2398
2399 #[test]
2400 fn test_is_read_only() {
2401 let temp_dir = TempDir::new().unwrap();
2402 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2403 .with_allow_create(true)
2404 .with_read_only(true);
2405 let env = Environment::open(config).unwrap();
2406 assert!(env.is_read_only());
2407 }
2408
2409 #[test]
2410 fn test_operations_on_closed_environment_fail() {
2411 let (_temp_dir, config) = temp_env_config();
2412 let env = Environment::open(config).unwrap();
2413 env.close().unwrap();
2414
2415 let db_config = DatabaseConfig::new()
2416 .with_allow_create(true)
2417 .with_transactional(true);
2418 assert!(env.open_database(None, "test", &db_config).is_err());
2419 assert!(env.remove_database(None, "test").is_err());
2420 assert!(env.rename_database(None, "a", "b").is_err());
2421 assert!(env.begin_transaction(None).is_err());
2422 assert!(env.database_names().is_err());
2423 }
2424
2425 #[test]
2431 fn test_open_fails_if_home_is_a_file() {
2432 use std::io::Write;
2433 let temp_dir = TempDir::new().unwrap();
2434 let file_path = temp_dir.path().join("not_a_dir.txt");
2435 let mut f = std::fs::File::create(&file_path).unwrap();
2436 writeln!(f, "data").unwrap();
2437 drop(f);
2438
2439 let config = EnvironmentConfig::new(file_path).with_allow_create(false);
2440 let result = Environment::open(config);
2442 assert!(result.is_err());
2443 }
2444
2445 #[test]
2447 fn test_open_database_with_node_max_entries() {
2448 let (_temp_dir, config) = temp_env_config();
2449 let env = Environment::open(config).unwrap();
2450
2451 let mut db_config = DatabaseConfig::new()
2452 .with_allow_create(true)
2453 .with_transactional(true);
2454 db_config.set_node_max_entries(64);
2455 let db = env.open_database(None, "testdb_entries", &db_config).unwrap();
2456 assert!(db.is_valid());
2457 }
2458
2459 #[test]
2461 fn test_begin_transaction_with_explicit_config() {
2462 use crate::transaction_config::TransactionConfig;
2463 let (_temp_dir, config) = temp_env_config();
2464 let env = Environment::open(config).unwrap();
2465
2466 let txn_config = TransactionConfig::new();
2467 let txn = env.begin_transaction(Some(&txn_config)).unwrap();
2468 assert!(txn.is_valid());
2469 }
2470
2471 #[test]
2475 fn test_rename_database_handle_not_in_map() {
2476 let (_temp_dir, config) = temp_env_config();
2477 let env = Environment::open(config).unwrap();
2478
2479 {
2483 let env_impl = env.env_impl.lock();
2484 let mut dbi_config = noxu_dbi::DatabaseConfig::new();
2485 dbi_config.set_allow_create(true);
2486 let db_arc =
2487 env_impl.open_database("ghost_db", &dbi_config).unwrap();
2488 let db_id = db_arc.read().get_id();
2489 env_impl.close_database(db_id).unwrap();
2490 }
2491
2492 env.rename_database(None, "ghost_db", "ghost_db_renamed").unwrap();
2494
2495 let names = env.database_names().unwrap();
2496 assert!(names.contains(&"ghost_db_renamed".to_string()));
2497 assert!(!names.contains(&"ghost_db".to_string()));
2498 }
2499
2500 #[test]
2502 fn test_close_with_active_transactions_fails() {
2503 let (_temp_dir, config) = temp_env_config();
2504 let env = Environment::open(config).unwrap();
2505
2506 let _txn = env.begin_transaction(None).unwrap();
2507
2508 let result = env.close();
2509 assert!(result.is_err());
2510 }
2511
2512 #[test]
2514 fn test_get_config_and_home() {
2515 let (temp_dir, config) = temp_env_config();
2516 let env = Environment::open(config).unwrap();
2517
2518 assert!(env.config().allow_create);
2519 assert_eq!(env.home(), temp_dir.path());
2520 env.close().unwrap();
2521 }
2522
2523 #[test]
2525 fn test_mark_database_closed_known_name() {
2526 let (_temp_dir, config) = temp_env_config();
2527 let env = Environment::open(config).unwrap();
2528
2529 let db_config = DatabaseConfig::new()
2530 .with_allow_create(true)
2531 .with_transactional(true);
2532 let db = env.open_database(None, "mydb", &db_config).unwrap();
2533 env.mark_database_closed("mydb");
2535 let _ = db.is_valid(); env.close().unwrap();
2538 }
2539
2540 #[test]
2542 fn test_mark_database_closed_unknown_name_is_noop() {
2543 let (_temp_dir, config) = temp_env_config();
2544 let env = Environment::open(config).unwrap();
2545 env.mark_database_closed("ghost");
2547 env.close().unwrap();
2548 }
2549
2550 #[test]
2552 fn test_mark_transaction_complete_allows_env_close() {
2553 let (_temp_dir, config) = temp_env_config();
2554 let env = Environment::open(config).unwrap();
2555
2556 let txn = env.begin_transaction(None).unwrap();
2557 let txn_id = txn.id();
2558
2559 env.mark_transaction_complete(txn_id);
2562
2563 env.close().unwrap();
2565 }
2566
2567 #[test]
2570 fn test_verify_empty_environment_passes() {
2571 use crate::VerifyConfig;
2572 let (_tmp, config) = temp_env_config();
2573 let env = Environment::open(config).unwrap();
2574 let verify_cfg = VerifyConfig::default();
2575 let result = env.verify(&verify_cfg).unwrap();
2576 assert!(result.passed, "empty env should pass: {:?}", result.errors);
2577 }
2578
2579 #[test]
2580 fn test_verify_environment_with_data_passes() {
2581 use crate::{DatabaseConfig, DatabaseEntry, VerifyConfig};
2582 let (_tmp, config) = temp_env_config();
2583 let env = Environment::open(config).unwrap();
2584
2585 let mut db_config = DatabaseConfig::new();
2586 db_config.set_allow_create(true);
2587 let db = env.open_database(None, "vtest", &db_config).unwrap();
2588 for i in 0u32..10 {
2589 let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
2590 let v = DatabaseEntry::from_bytes(&(i * 3).to_be_bytes());
2591 db.put(&k, &v).unwrap();
2592 }
2593
2594 let verify_cfg = VerifyConfig::default();
2595 let result = env.verify(&verify_cfg).unwrap();
2596 assert!(
2597 result.passed,
2598 "env with data should pass: {:?}",
2599 result.errors
2600 );
2601 assert!(result.records_verified >= 10);
2602 db.close().unwrap();
2603 env.close().unwrap();
2604 }
2605
2606 #[test]
2607 fn test_verify_closed_environment_fails() {
2608 use crate::VerifyConfig;
2609 let (_tmp, config) = temp_env_config();
2610 let env = Environment::open(config).unwrap();
2611 env.close().unwrap();
2612 let verify_cfg = VerifyConfig::default();
2613 assert!(env.verify(&verify_cfg).is_err());
2614 }
2615
2616 #[test]
2619 fn test_checkpoint_default_succeeds() {
2620 let (_tmp, config) = temp_env_config();
2621 let env = Environment::open(config).unwrap();
2622 env.checkpoint(None).unwrap();
2624 env.close().unwrap();
2625 }
2626
2627 #[test]
2628 fn test_checkpoint_with_config_succeeds() {
2629 let (_tmp, config) = temp_env_config();
2630 let env = Environment::open(config).unwrap();
2631 let ckpt_cfg = CheckpointConfig {
2632 force: true,
2633 k_bytes: 0,
2634 minutes: 0,
2635 minimize_recovery_time: false,
2636 };
2637 env.checkpoint(Some(&ckpt_cfg)).unwrap();
2638 env.close().unwrap();
2639 }
2640
2641 #[test]
2642 fn test_checkpoint_closed_env_fails() {
2643 let (_tmp, config) = temp_env_config();
2644 let env = Environment::open(config).unwrap();
2645 env.close().unwrap();
2646 assert!(env.checkpoint(None).is_err());
2647 }
2648
2649 #[test]
2652 fn test_get_mutable_config_returns_current_values() {
2653 let (_tmp, config) = temp_env_config();
2654 let env = Environment::open(config).unwrap();
2655 let mc = env.mutable_config().unwrap();
2656 assert!(mc.cache_size.is_some());
2658 assert!(mc.run_cleaner.is_some());
2659 assert!(mc.run_checkpointer.is_some());
2660 assert!(mc.run_evictor.is_some());
2661 env.close().unwrap();
2662 }
2663
2664 #[test]
2665 fn test_get_mutable_config_closed_env_fails() {
2666 let (_tmp, config) = temp_env_config();
2667 let env = Environment::open(config).unwrap();
2668 env.close().unwrap();
2669 assert!(env.mutable_config().is_err());
2670 }
2671
2672 #[test]
2673 fn test_set_mutable_config_updates_cache_size() {
2674 let (_tmp, config) = temp_env_config();
2675 let mut env = Environment::open(config).unwrap();
2676 let new_size: usize = 128 * 1024 * 1024; let mc = EnvironmentMutableConfig::new().with_cache_size(new_size);
2678 env.set_mutable_config(mc).unwrap();
2679 let updated = env.mutable_config().unwrap();
2680 assert_eq!(updated.cache_size.unwrap(), new_size);
2681 env.close().unwrap();
2682 }
2683
2684 #[test]
2685 fn test_set_mutable_config_updates_timeouts() {
2686 let (_tmp, config) = temp_env_config();
2687 let mut env = Environment::open(config).unwrap();
2688 let mc = EnvironmentMutableConfig {
2689 lock_timeout_ms: Some(5_000),
2690 txn_timeout_ms: Some(10_000),
2691 ..EnvironmentMutableConfig::default()
2692 };
2693 env.set_mutable_config(mc).unwrap();
2694 let updated = env.mutable_config().unwrap();
2697 assert_eq!(updated.lock_timeout_ms, Some(5_000));
2698 assert_eq!(updated.txn_timeout_ms, Some(10_000));
2699 env.close().unwrap();
2700 }
2701
2702 #[test]
2703 fn test_set_mutable_config_none_timeout_unchanged() {
2704 let (_tmp, config) = temp_env_config();
2705 let mut env = Environment::open(config).unwrap();
2706 let original = env.mutable_config().unwrap();
2707 let mc = EnvironmentMutableConfig {
2712 lock_timeout_ms: None,
2713 txn_timeout_ms: None,
2714 ..EnvironmentMutableConfig::default()
2715 };
2716 env.set_mutable_config(mc).unwrap();
2717 let updated = env.mutable_config().unwrap();
2718 assert_eq!(updated.lock_timeout_ms, original.lock_timeout_ms);
2719 assert_eq!(updated.txn_timeout_ms, original.txn_timeout_ms);
2720 env.close().unwrap();
2721 }
2722
2723 #[test]
2724 fn test_set_mutable_config_closed_env_fails() {
2725 let (_tmp, config) = temp_env_config();
2726 let mut env = Environment::open(config).unwrap();
2727 env.close().unwrap();
2728 let mc = EnvironmentMutableConfig::new();
2729 assert!(env.set_mutable_config(mc).is_err());
2730 }
2731
2732 #[test]
2738 fn test_read_only_env_rejects_create_database() {
2739 let temp_dir = TempDir::new().unwrap();
2741 {
2742 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2743 .with_allow_create(true)
2744 .with_transactional(true);
2745 let _env = Environment::open(config).unwrap();
2746 }
2747 let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2749 .with_read_only(true)
2750 .with_transactional(true);
2751 let env = Environment::open(ro_config).unwrap();
2752
2753 let db_cfg = DatabaseConfig::new()
2754 .with_allow_create(true)
2755 .with_transactional(true);
2756 let result = env.open_database(None, "new", &db_cfg);
2757 assert!(
2758 result.is_err(),
2759 "open_database with allow_create on read-only env must fail",
2760 );
2761 }
2762
2763 #[test]
2765 fn test_read_only_env_rejects_remove_database() {
2766 let temp_dir = TempDir::new().unwrap();
2767 {
2768 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2769 .with_allow_create(true)
2770 .with_transactional(true);
2771 let _env = Environment::open(config).unwrap();
2772 }
2773 let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2774 .with_read_only(true)
2775 .with_transactional(true);
2776 let env = Environment::open(ro_config).unwrap();
2777
2778 assert!(env.remove_database(None, "test").is_err());
2779 assert!(env.truncate_database(None, "test").is_err());
2780 assert!(env.rename_database(None, "a", "b").is_err());
2781 }
2782
2783 #[test]
2785 fn test_read_only_env_rejects_writable_txn() {
2786 let temp_dir = TempDir::new().unwrap();
2787 {
2788 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2789 .with_allow_create(true)
2790 .with_transactional(true);
2791 let _env = Environment::open(config).unwrap();
2792 }
2793 let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2794 .with_read_only(true)
2795 .with_transactional(true);
2796 let env = Environment::open(ro_config).unwrap();
2797
2798 let result = env.begin_transaction(None);
2800 assert!(result.is_err(), "writable txn on read-only env must fail");
2801
2802 let ro_txn_cfg = TransactionConfig::default().with_read_only(true);
2804 let _txn = env
2805 .begin_transaction(Some(&ro_txn_cfg))
2806 .expect("read-only txn on read-only env must succeed");
2807 }
2808
2809 #[test]
2812 fn test_checkpoint_minutes_threshold_skips() {
2813 let (_tmp, config) = temp_env_config();
2814 let env = Environment::open(config).unwrap();
2815
2816 env.checkpoint(None).unwrap();
2818
2819 let cfg = CheckpointConfig::default().with_minutes(60);
2821 env.checkpoint(Some(&cfg)).unwrap();
2822 let cfg = CheckpointConfig::default().with_force(true).with_minutes(60);
2827 env.checkpoint(Some(&cfg)).unwrap();
2828 env.close().unwrap();
2829 }
2830
2831 #[test]
2834 fn test_set_mutable_config_pushes_cache_size_to_evictor() {
2835 let (_tmp, config) = temp_env_config();
2836 let mut env = Environment::open(config).unwrap();
2837
2838 let mc = EnvironmentMutableConfig {
2839 cache_size: Some(64 * 1024 * 1024),
2840 ..EnvironmentMutableConfig::default()
2841 };
2842 env.set_mutable_config(mc).unwrap();
2843
2844 let env_impl = env.env_impl.lock();
2845 let evictor = env_impl.get_evictor();
2846 assert_eq!(
2847 evictor.get_arbiter().get_max_memory(),
2848 64 * 1024 * 1024,
2849 "set_mutable_config(cache_size) must push to Arbiter",
2850 );
2851 }
2852
2853 #[test]
2857 #[allow(deprecated)] fn test_env_txn_no_sync_applies_to_explicit_txn() {
2859 let temp_dir = TempDir::new().unwrap();
2860 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2861 .with_allow_create(true)
2862 .with_transactional(true)
2863 .with_txn_no_sync(true);
2864 let env = Environment::open(config).unwrap();
2865
2866 let txn = env.begin_transaction(None).unwrap();
2867 let dur = txn.durability().expect("durability must be set");
2869 assert_eq!(
2870 dur,
2871 crate::durability::Durability::COMMIT_NO_SYNC,
2872 "env txn_no_sync=true must propagate to explicit-txn durability",
2873 );
2874 txn.commit().unwrap();
2875 env.close().unwrap();
2876 }
2877
2878 #[test]
2881 fn test_drop_aborts_open_transaction() {
2882 let (_tmp, config) = temp_env_config();
2883 let env = Environment::open(config).unwrap();
2884
2885 let initial_active = env.active_txns.len();
2886 {
2887 let _txn = env.begin_transaction(None).unwrap();
2888 assert_eq!(env.active_txns.len(), initial_active + 1);
2889 }
2891 assert_eq!(
2893 env.active_txns.len(),
2894 initial_active,
2895 "Transaction::Drop must abort and prune from active_txns",
2896 );
2897 env.close().unwrap();
2899 }
2900
2901 #[test]
2907 fn test_x3_recovered_commit_calls_alloc_vlsn() {
2908 use noxu_dbi::{
2909 AckWaitError, ReplicaAckCoordinator, ReplicaAckPolicyKind,
2910 };
2911 use noxu_util::Lsn;
2912 use std::sync::Arc;
2913 use std::sync::atomic::{AtomicU64, Ordering as AO};
2914 use std::time::Duration;
2915
2916 struct MockCoord {
2918 last_lsn: AtomicU64,
2919 }
2920 impl ReplicaAckCoordinator for MockCoord {
2921 fn await_replica_acks(
2922 &self,
2923 _policy: ReplicaAckPolicyKind,
2924 _timeout: Duration,
2925 ) -> std::result::Result<u32, AckWaitError> {
2926 Ok(0)
2927 }
2928 fn alloc_vlsn_for_recovered_commit(&self, lsn: Lsn) -> u64 {
2929 self.last_lsn.store(lsn.as_u64(), AO::SeqCst);
2930 lsn.file_number() as u64 + 1
2932 }
2933 }
2934
2935 let (tmp, config) = temp_env_config();
2936 let env = Environment::open(config).unwrap();
2937
2938 let coord = Arc::new(MockCoord { last_lsn: AtomicU64::new(0) });
2940 env.set_replica_coordinator(coord.clone());
2941
2942 env.write_txn_commit_for_recovered(42).unwrap();
2944
2945 let recorded_lsn = coord.last_lsn.load(AO::SeqCst);
2947 assert_ne!(
2948 recorded_lsn, 0,
2949 "X-3: alloc_vlsn_for_recovered_commit must be called with the commit LSN"
2950 );
2951
2952 env.close().unwrap();
2953 drop(tmp);
2954 }
2955
2956 #[test]
2962 fn test_verify_checklsns_detects_live_in_obsolete() {
2963 use crate::database_entry::DatabaseEntry;
2964 let (_tmp, config) = temp_env_config();
2965 let env = Environment::open(config).unwrap();
2966
2967 let db_config = DatabaseConfig::new()
2968 .with_allow_create(true)
2969 .with_transactional(true);
2970 let db = env.open_database(None, "cln2db", &db_config).unwrap();
2971 db.put(
2972 DatabaseEntry::from_bytes(b"alpha"),
2973 DatabaseEntry::from_bytes(b"1"),
2974 )
2975 .unwrap();
2976 db.put(
2977 DatabaseEntry::from_bytes(b"beta"),
2978 DatabaseEntry::from_bytes(b"2"),
2979 )
2980 .unwrap();
2981 db.put(
2982 DatabaseEntry::from_bytes(b"gamma"),
2983 DatabaseEntry::from_bytes(b"3"),
2984 )
2985 .unwrap();
2986
2987 let cfg = noxu_engine::VerifyConfig::default();
2988
2989 let healthy = env.verify(&cfg).unwrap();
2991 assert!(
2992 healthy.is_passed(),
2993 "healthy env must pass checkLsns: {:?}",
2994 healthy.errors
2995 );
2996
2997 let live_lsn = {
2999 let guard = db.db_impl.read();
3000 let tree = guard
3001 .get_real_tree()
3002 .expect("invariant: populated db has a real tree");
3003 noxu_engine::gather_tree_lsns(&tree)
3004 .into_iter()
3005 .next()
3006 .expect("invariant: at least one live LSN")
3007 };
3008
3009 {
3013 let env_impl = env.env_impl.lock();
3014 let tracker = env_impl
3015 .get_utilization_tracker()
3016 .expect("invariant: rw env has a tracker");
3017 tracker.lock().count_obsolete_node(
3018 live_lsn.file_number(),
3019 live_lsn.file_offset(),
3020 10,
3021 true,
3022 None,
3023 );
3024 }
3025
3026 let bad = env.verify(&cfg).unwrap();
3028 assert!(
3029 !bad.is_passed(),
3030 "verify must detect live LSN in obsolete set"
3031 );
3032 assert!(
3033 bad.errors.iter().any(|e| matches!(
3034 e,
3035 noxu_engine::VerifyError::DataInconsistency { description }
3036 if description.contains("Obsolete LSN set contains valid LSN")
3037 )),
3038 "expected checkLsns error, got: {:?}",
3039 bad.errors
3040 );
3041
3042 drop(db);
3043 env.close().unwrap();
3044 }
3045
3046 #[test]
3049 fn test_set_mutable_config_pushes_cache_size_to_arbiter() {
3050 let (_tmp, config) = temp_env_config();
3051 let mut env = Environment::open(config).unwrap();
3052
3053 let new_cache = 256 * 1024 * 1024_usize; let mc = EnvironmentMutableConfig::new().with_cache_size(new_cache);
3055 env.set_mutable_config(mc).unwrap();
3056
3057 let arbiter_max = env.env_impl.lock().get_arbiter_max_memory();
3058 assert_eq!(
3059 arbiter_max, new_cache as i64,
3060 "Arbiter max-memory must reflect the new cache size"
3061 );
3062 env.close().unwrap();
3063 }
3064
3065 #[test]
3069 fn test_set_mutable_config_pushes_cleaner_min_utilization() {
3070 let (_tmp, config) = temp_env_config();
3071 let mut env = Environment::open(config).unwrap();
3072
3073 let cleaner = env
3074 .env_impl
3075 .lock()
3076 .get_cleaner()
3077 .expect("invariant: transactional env has a cleaner");
3078 let before = cleaner.get_min_utilization();
3079
3080 let new_pct = if before == 70 { 40 } else { 70 };
3081 let mc = EnvironmentMutableConfig::new()
3082 .with_cleaner_min_utilization(new_pct);
3083 env.set_mutable_config(mc).unwrap();
3084
3085 assert_eq!(
3086 cleaner.get_min_utilization(),
3087 new_pct,
3088 "running cleaner must reflect the new minUtilization"
3089 );
3090 env.close().unwrap();
3091 }
3092}