1use crate::checkpoint_config::CheckpointConfig;
5use crate::database::Database;
6use crate::database_config::DatabaseConfig;
7use crate::environment_config::EnvironmentConfig;
8use crate::environment_mutable_config::EnvironmentMutableConfig;
9use crate::error::{NoxuError, Result};
10use crate::transaction::Transaction;
11use crate::transaction_config::TransactionConfig;
12use hashbrown::HashMap;
13use noxu_dbi::{DbiEnvConfig, EnvironmentImpl};
14use noxu_engine::EnvironmentStats;
15use noxu_engine::env_stats::{
16 EvictorStatsSnapshot, LockStatsSnapshot, LogStatsSnapshot, TxnStatsSnapshot,
17};
18use noxu_log::LogManager;
19use noxu_sync::Mutex;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
23use std::time::Instant;
24
25pub struct Environment {
45 home: PathBuf,
47 config: EnvironmentConfig,
49 databases: Mutex<HashMap<String, Arc<DatabaseHandle>>>,
51 active_txns: Arc<ActiveTxns>,
56 next_txn_id: AtomicU64,
58 open: AtomicBool,
60 env_valid: AtomicBool,
66 env_impl: Arc<Mutex<EnvironmentImpl>>,
68 log_manager: Option<Arc<LogManager>>,
71 last_checkpoint_time: Mutex<Option<Instant>>,
75 last_checkpoint_end_lsn: Mutex<noxu_util::Lsn>,
76 replica_coordinator: Mutex<Option<noxu_dbi::SharedReplicaAckCoordinator>>,
86 replica_ack_timeout: Mutex<std::time::Duration>,
89}
90
91struct DatabaseHandle {
93 name: String,
94 #[expect(dead_code)]
95 id: u64,
96 #[expect(dead_code)]
97 config: DatabaseConfig,
98 open: Arc<AtomicBool>,
102}
103
104struct TransactionState {
106 #[expect(dead_code)]
107 id: u64,
108 #[expect(dead_code)]
109 config: TransactionConfig,
110 #[expect(dead_code)]
111 committed: AtomicBool,
112 #[expect(dead_code)]
113 aborted: AtomicBool,
114}
115
116pub(crate) struct ActiveTxns {
126 txns: Mutex<HashMap<u64, Arc<TransactionState>>>,
127}
128
129impl ActiveTxns {
130 fn new() -> Self {
131 Self { txns: Mutex::new(HashMap::new()) }
132 }
133
134 fn insert(&self, id: u64, state: Arc<TransactionState>) {
135 self.txns.lock().insert(id, state);
136 }
137
138 pub(crate) fn mark_complete(&self, id: u64) {
143 self.txns.lock().remove(&id);
144 }
145
146 fn len(&self) -> usize {
147 self.txns.lock().len()
148 }
149
150 fn is_empty(&self) -> bool {
151 self.txns.lock().is_empty()
152 }
153}
154
155impl Environment {
156 pub fn open(config: EnvironmentConfig) -> Result<Self> {
172 let home = config.home.clone();
173
174 if !home.exists() {
176 if config.allow_create {
177 std::fs::create_dir_all(&home).map_err(|e| {
178 NoxuError::environment(format!(
179 "Failed to create environment directory {:?}: {}",
180 home, e
181 ))
182 })?;
183 } else {
184 return Err(NoxuError::environment(format!(
185 "Environment directory {:?} does not exist and allow_create is false",
186 home
187 )));
188 }
189 }
190
191 if !home.is_dir() {
192 return Err(NoxuError::environment(format!(
193 "Environment home {:?} is not a directory",
194 home
195 )));
196 }
197
198 if !config.read_only {
200 let test_file = home.join(".noxu_write_test");
202 std::fs::write(&test_file, b"test").map_err(|e| {
203 NoxuError::environment(format!(
204 "Environment directory {:?} is not writable: {}",
205 home, e
206 ))
207 })?;
208 let _ = std::fs::remove_file(&test_file);
209 }
210
211 crate::unimplemented_params::warn_unimplemented_params(&config);
214
215 let buf_size = if config.log_buffer_size > 0 {
218 config.log_buffer_size
219 } else {
220 (config.log_total_buffer_bytes as usize)
221 .checked_div(config.log_num_buffers)
222 .unwrap_or(1024 * 1024)
223 };
224 let dbi_cfg = DbiEnvConfig {
225 read_only: config.read_only,
227 transactional: config.transactional,
228 env_is_locking: config.env_is_locking,
229 env_recovery_force_checkpoint: config.env_recovery_force_checkpoint,
230 env_recovery_force_checkpoint_field: config
231 .env_recovery_force_checkpoint,
232 env_recovery_force_new_file: config.env_recovery_force_new_file,
233 halt_on_commit_after_checksum_exception: config
234 .halt_on_commit_after_checksum_exception,
235 env_check_leaks: config.env_check_leaks,
236 env_forced_yield: config.env_forced_yield,
237 env_fair_latches: config.env_fair_latches,
238 env_latch_timeout_ms: config.env_latch_timeout_ms,
239 env_ttl_clock_tolerance_ms: config.env_ttl_clock_tolerance_ms,
240 env_expiration_enabled: config.env_expiration_enabled,
241 env_db_eviction: config.env_db_eviction,
242 cache_size: config.cache_size,
244 cache_percent: config.cache_percent,
245 max_off_heap_memory: config.max_off_heap_memory,
246 max_disk: config.max_disk,
247 free_disk: config.free_disk,
248 log_file_max_bytes: config.log_file_max_bytes,
250 log_file_cache_size: config.log_file_cache_size,
251 log_checksum_read: config.log_checksum_read,
252 log_verify_checksums: config.log_verify_checksums,
253 log_fsync_timeout_ms: config.log_fsync_timeout_ms,
254 log_fsync_time_limit_ms: config.log_fsync_time_limit_ms,
255 log_num_buffers: config.log_num_buffers,
256 log_buffer_size: buf_size,
257 log_fault_read_size: config.log_fault_read_size,
258 log_iterator_read_size: config.log_iterator_read_size,
259 log_iterator_max_size: config.log_iterator_max_size,
260 log_n_data_directories: config.log_n_data_directories,
261 log_mem_only: config.log_mem_only,
262 log_detect_file_delete: config.log_detect_file_delete,
263 log_detect_file_delete_interval_ms: config
264 .log_detect_file_delete_interval_ms,
265 log_flush_sync_interval_ms: config.log_flush_sync_interval_ms,
266 log_flush_no_sync_interval_ms: config.log_flush_no_sync_interval_ms,
267 log_use_odsync: config.log_use_odsync,
268 log_use_write_queue: config.log_use_write_queue,
269 log_write_queue_size: config.log_write_queue_size,
270 log_group_commit_threshold: config.log_group_commit_threshold,
271 log_group_commit_interval_ms: config.log_group_commit_interval_ms,
272 node_max_entries: config.node_max_entries,
274 node_dup_tree_max_entries: config.node_dup_tree_max_entries,
275 tree_max_embedded_ln: config.tree_max_embedded_ln,
276 tree_max_delta: config.tree_max_delta,
277 tree_bin_delta: config.tree_bin_delta,
278 tree_min_memory: config.tree_min_memory,
279 tree_compact_max_key_length: config.tree_compact_max_key_length,
280 run_in_compressor: config.run_in_compressor,
282 in_compressor_wakeup_interval_ms: config
283 .in_compressor_wakeup_interval_ms,
284 compressor_deadlock_retry: config.compressor_deadlock_retry,
285 compressor_lock_timeout_ms: config.compressor_lock_timeout_ms,
286 compressor_purge_root: config.compressor_purge_root,
287 run_cleaner: config.run_cleaner,
289 cleaner_min_utilization: config.cleaner_min_utilization,
290 cleaner_min_file_utilization: config.cleaner_min_file_utilization,
291 cleaner_threads: config.cleaner_threads,
292 cleaner_min_file_count: config.cleaner_min_file_count,
293 cleaner_min_age: config.cleaner_min_age,
294 cleaner_bytes_interval: config.cleaner_bytes_interval,
295 cleaner_wakeup_interval_ms: config.cleaner_wakeup_interval_ms,
296 cleaner_fetch_obsolete_size: config.cleaner_fetch_obsolete_size,
297 cleaner_adjust_utilization: config.cleaner_adjust_utilization,
298 cleaner_deadlock_retry: config.cleaner_deadlock_retry,
299 cleaner_lock_timeout_ms: config.cleaner_lock_timeout_ms,
300 cleaner_expunge: config.cleaner_expunge,
301 cleaner_use_deleted_dir: config.cleaner_use_deleted_dir,
302 cleaner_max_batch_files: config.cleaner_max_batch_files,
303 cleaner_read_size: config.cleaner_read_size,
304 cleaner_detail_max_memory_percentage: config
305 .cleaner_detail_max_memory_percentage,
306 cleaner_look_ahead_cache_size: config.cleaner_look_ahead_cache_size,
307 cleaner_foreground_proactive_migration: config
308 .cleaner_foreground_proactive_migration,
309 cleaner_background_proactive_migration: config
310 .cleaner_background_proactive_migration,
311 cleaner_lazy_migration: config.cleaner_lazy_migration,
312 cleaner_expiration_enabled: config.cleaner_expiration_enabled,
313 run_checkpointer: config.run_checkpointer,
315 checkpointer_bytes_interval: config.checkpointer_bytes_interval,
316 checkpointer_wakeup_interval_ms: config
317 .checkpointer_wakeup_interval_ms,
318 checkpointer_deadlock_retry: config.checkpointer_deadlock_retry,
319 checkpointer_high_priority: config.checkpointer_high_priority,
320 run_evictor: config.run_evictor,
322 evictor_nodes_per_scan: config.evictor_nodes_per_scan,
323 evictor_evict_bytes: config.evictor_evict_bytes,
324 evictor_critical_percentage: config.evictor_critical_percentage,
325 evictor_lru_only: config.evictor_lru_only,
326 evictor_n_lru_lists: config.evictor_n_lru_lists,
327 evictor_deadlock_retry: config.evictor_deadlock_retry,
328 evictor_core_threads: config.evictor_core_threads,
329 evictor_max_threads: config.evictor_max_threads,
330 evictor_keep_alive_ms: config.evictor_keep_alive_ms,
331 evictor_allow_bin_deltas: config.evictor_allow_bin_deltas,
332 run_offheap_evictor: config.run_offheap_evictor,
334 offheap_evict_bytes: config.offheap_evict_bytes,
335 offheap_n_lru_lists: config.offheap_n_lru_lists,
336 offheap_checksum: config.offheap_checksum,
337 offheap_core_threads: config.offheap_core_threads,
338 offheap_max_threads: config.offheap_max_threads,
339 offheap_keep_alive_ms: config.offheap_keep_alive_ms,
340 lock_timeout_ms: config.lock_timeout_ms,
342 lock_deadlock_detect: config.lock_deadlock_detect,
343 lock_deadlock_detect_delay_ms: config.lock_deadlock_detect_delay_ms,
344 txn_timeout_ms: config.txn_timeout_ms,
346 txn_serializable_isolation: config.txn_serializable_isolation,
347 txn_deadlock_stack_trace: config.txn_deadlock_stack_trace,
348 txn_dump_locks: config.txn_dump_locks,
349 run_verifier: config.run_verifier,
351 verify_log: config.verify_log,
352 verify_log_read_delay_ms: config.verify_log_read_delay_ms,
353 verify_btree: config.verify_btree,
354 verify_secondaries: config.verify_secondaries,
355 verify_data_records: config.verify_data_records,
356 verify_obsolete_records: config.verify_obsolete_records,
357 verify_btree_batch_size: config.verify_btree_batch_size,
358 verify_btree_batch_delay_ms: config.verify_btree_batch_delay_ms,
359 stats_collect: config.stats_collect,
361 stats_collect_interval_secs: config.stats_collect_interval_secs,
362 env_background_read_limit_kb: config.env_background_read_limit_kb,
364 env_background_write_limit_kb: config.env_background_write_limit_kb,
365 env_background_sleep_interval_us: config
366 .env_background_sleep_interval_us,
367 };
368 let env_impl = EnvironmentImpl::from_dbi_config(home.clone(), &dbi_cfg)
369 .map_err(|e| NoxuError::environment(e.to_string()))?;
370
371 let log_manager = env_impl.get_log_manager();
372 let env_impl_arc = Arc::new(Mutex::new(env_impl));
373 Ok(Environment {
374 home,
375 config,
376 databases: Mutex::new(HashMap::new()),
377 active_txns: Arc::new(ActiveTxns::new()),
378 next_txn_id: AtomicU64::new(1),
379 open: AtomicBool::new(true),
380 env_valid: AtomicBool::new(true),
381 env_impl: env_impl_arc,
382 log_manager,
383 last_checkpoint_time: Mutex::new(None),
384 last_checkpoint_end_lsn: Mutex::new(noxu_util::NULL_LSN),
385 replica_coordinator: Mutex::new(None),
386 replica_ack_timeout: Mutex::new(std::time::Duration::from_secs(5)),
387 })
388 }
389
390 pub fn close(&self) -> Result<()> {
400 if !self.open.load(Ordering::Acquire) {
401 return Err(NoxuError::EnvironmentClosed);
402 }
403
404 let databases = self.databases.lock();
406 let open_dbs: Vec<String> = databases
407 .iter()
408 .filter(|(_, db)| db.open.load(Ordering::Acquire))
409 .map(|(name, _)| name.clone())
410 .collect();
411
412 if !open_dbs.is_empty() {
413 return Err(NoxuError::OperationNotAllowed(format!(
414 "Cannot close environment with open database handles: {:?}",
415 open_dbs
416 )));
417 }
418
419 if !self.active_txns.is_empty() {
421 return Err(NoxuError::OperationNotAllowed(format!(
422 "Cannot close environment with {} active transactions",
423 self.active_txns.len()
424 )));
425 }
426
427 self.open.store(false, Ordering::Release);
428 let env_impl = self.env_impl.lock();
429 let _ = env_impl.close();
430 Ok(())
431 }
432
433 pub fn open_database(
453 &self,
454 txn: Option<&Transaction>,
455 name: &str,
456 config: &DatabaseConfig,
457 ) -> Result<Database> {
458 self.check_open()?;
459
460 if self.config.read_only {
466 if config.allow_create {
467 return Err(NoxuError::OperationNotAllowed(
468 "open_database: cannot create a database on a read-only \
469 environment (DatabaseConfig::with_allow_create(true))"
470 .to_string(),
471 ));
472 }
473 if !config.read_only {
474 return Err(NoxuError::OperationNotAllowed(
475 "open_database: read-only environment requires the \
476 database to be opened read-only \
477 (DatabaseConfig::with_read_only(true))"
478 .to_string(),
479 ));
480 }
481 }
482
483 if name.is_empty() {
484 return Err(NoxuError::IllegalArgument(
485 "Database name cannot be empty".to_string(),
486 ));
487 }
488
489 let mut databases = self.databases.lock();
490
491 if let Some(db_handle) = databases.get(name)
493 && db_handle.open.load(Ordering::Acquire)
494 {
495 return Err(NoxuError::DatabaseAlreadyExists(format!(
496 "Database '{}' is already open",
497 name
498 )));
499 }
500
501 let mut dbi_config = noxu_dbi::DatabaseConfig::new();
503 dbi_config.set_allow_create(config.allow_create);
504 dbi_config.set_sorted_duplicates(config.sorted_duplicates);
505 dbi_config.set_read_only(config.read_only);
506 dbi_config.set_temporary(config.temporary);
507 dbi_config.set_transactional(config.transactional);
508 dbi_config.deferred_write = config.deferred_write;
509 dbi_config.set_key_prefixing(config.key_prefixing);
512 if config.node_max_entries > 0 {
513 dbi_config.set_node_max_entries(config.node_max_entries as i32);
514 }
515
516 let is_transactional_create = txn.is_some() && config.allow_create;
521 let db_impl_arc = {
522 let env_impl = self.env_impl.lock();
523 if is_transactional_create {
524 let txn_id = txn
526 .expect("invariant: txn is Some when is_transactional_create")
527 .get_id();
528 env_impl
529 .open_database_transactional(name, &dbi_config, txn_id)
530 } else {
531 env_impl.open_database(name, &dbi_config)
532 }
533 .map_err(|e| {
534 match &e {
535 noxu_dbi::DbiError::DatabaseNotFound(_) => {
536 NoxuError::DatabaseNotFound(format!(
537 "Database '{}' does not exist and allow_create is false",
538 name
539 ))
540 }
541 _ => NoxuError::environment(e.to_string()),
542 }
543 })?
544 };
545
546 if is_transactional_create {
550 let env_impl_arc = Arc::clone(&self.env_impl);
551 let db_name_abort = name.to_string();
552 let db_name_commit = name.to_string();
553 let env_impl_arc2 = Arc::clone(&self.env_impl);
554 let txn_ref = txn
556 .expect("invariant: txn is Some when is_transactional_create");
557 txn_ref.register_abort_callback(move || {
558 env_impl_arc.lock().abort_pending_database(&db_name_abort);
559 });
560 txn_ref.register_commit_callback(move || {
561 env_impl_arc2.lock().commit_pending_database(&db_name_commit);
562 });
563 }
564
565 let db_id = db_impl_arc.read().get_id().id() as u64;
566
567 let open_flag = Arc::new(AtomicBool::new(true));
571
572 let db_handle = Arc::new(DatabaseHandle {
573 name: name.to_string(),
574 id: db_id,
575 config: config.clone(),
576 open: Arc::clone(&open_flag),
577 });
578
579 databases.insert(name.to_string(), db_handle);
580 drop(databases);
581
582 Ok(Database::new(
583 name.to_string(),
584 db_id,
585 config.clone(),
586 db_impl_arc,
587 Arc::clone(&self.env_impl),
588 open_flag,
589 self.config.txn_no_sync,
590 self.config.txn_write_no_sync,
591 ))
592 }
593
594 pub fn remove_database(
608 &self,
609 _txn: Option<&Transaction>,
610 name: &str,
611 ) -> Result<()> {
612 self.check_writable("remove_database")?;
613
614 let mut databases = self.databases.lock();
615 {
616 let env_impl = self.env_impl.lock();
617 env_impl.remove_database(name).map_err(|e| match &e {
618 noxu_dbi::DbiError::DatabaseNotFound(_) => {
619 NoxuError::DatabaseNotFound(format!(
620 "Database '{}' does not exist",
621 name
622 ))
623 }
624 _ => NoxuError::environment(e.to_string()),
625 })?;
626 }
627 databases.remove(name);
628
629 Ok(())
630 }
631
632 pub fn truncate_database(
639 &self,
640 _txn: Option<&Transaction>,
641 name: &str,
642 ) -> Result<u64> {
643 self.check_writable("truncate_database")?;
644 let env_impl = self.env_impl.lock();
645 env_impl.truncate_database(name).map_err(|e| match &e {
646 noxu_dbi::DbiError::DatabaseNotFound(_) => {
647 NoxuError::DatabaseNotFound(format!(
648 "Database '{}' does not exist",
649 name
650 ))
651 }
652 _ => NoxuError::environment(e.to_string()),
653 })
654 }
655
656 pub fn rename_database(
672 &self,
673 _txn: Option<&Transaction>,
674 old_name: &str,
675 new_name: &str,
676 ) -> Result<()> {
677 self.check_writable("rename_database")?;
678
679 if old_name == new_name {
680 return Ok(());
681 }
682
683 let mut databases = self.databases.lock();
684 {
685 let env_impl = self.env_impl.lock();
686 env_impl.rename_database(old_name, new_name).map_err(
687 |e| match &e {
688 noxu_dbi::DbiError::DatabaseNotFound(_) => {
689 NoxuError::DatabaseNotFound(format!(
690 "Database '{}' does not exist",
691 old_name
692 ))
693 }
694 noxu_dbi::DbiError::DatabaseAlreadyExists(_) => {
695 NoxuError::DatabaseAlreadyExists(format!(
696 "Database '{}' already exists",
697 new_name
698 ))
699 }
700 _ => NoxuError::environment(e.to_string()),
701 },
702 )?;
703 }
704
705 if let Some(handle) = databases.remove(old_name) {
706 databases.insert(new_name.to_string(), handle);
707 }
708
709 Ok(())
710 }
711
712 #[allow(deprecated)] pub fn begin_transaction(
735 &self,
736 config: Option<&TransactionConfig>,
737 ) -> Result<Transaction> {
738 self.check_open()?;
739
740 if !self.config.transactional {
741 return Err(NoxuError::OperationNotAllowed(
742 "Cannot begin transaction on non-transactional environment"
743 .to_string(),
744 ));
745 }
746
747 if self.config.read_only
752 && !config.map(|c| c.read_only).unwrap_or(false)
753 {
754 return Err(NoxuError::OperationNotAllowed(
755 "begin_transaction: read-only environment requires the \
756 transaction to be read-only \
757 (TransactionConfig::with_read_only(true))"
758 .to_string(),
759 ));
760 }
761
762 let txn_id = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
763 let mut txn_config = match config.cloned() {
778 Some(c) => c,
779 None => TransactionConfig::default()
780 .with_durability(self.config.durability),
781 };
782 if config.is_none() {
783 let derived = match (
786 self.config.txn_no_sync,
787 self.config.txn_write_no_sync,
788 ) {
789 (true, _) => {
790 Some(crate::durability::Durability::COMMIT_NO_SYNC)
791 }
792 (_, true) => {
793 Some(crate::durability::Durability::COMMIT_WRITE_NO_SYNC)
794 }
795 _ => None,
796 };
797 if let Some(d) = derived {
798 txn_config = txn_config.with_durability(d);
799 }
800 }
801
802 let txn_state = Arc::new(TransactionState {
803 id: txn_id,
804 config: txn_config.clone(),
805 committed: AtomicBool::new(false),
806 aborted: AtomicBool::new(false),
807 });
808
809 let mut active_txns = self.active_txns.txns.lock();
810 active_txns.insert(txn_id, txn_state);
811 drop(active_txns);
812
813 let env_guard = self.env_impl.lock();
816 let inner_txn = env_guard
817 .begin_txn()
818 .map(|mut t| {
819 if txn_config.read_committed {
822 t.set_read_committed_isolation(true);
823 }
824 if txn_config.read_uncommitted {
825 t.set_read_uncommitted_default(true);
830 }
831 if txn_config.serializable_isolation {
832 t.set_serializable_isolation(true);
833 }
834 if txn_config.importunate {
835 t.set_importunate(true);
836 }
837 if txn_config.no_wait {
838 t.set_no_wait(true);
839 }
840 if txn_config.lock_timeout_ms > 0 {
841 t.set_lock_timeout(txn_config.lock_timeout_ms);
842 }
843 if txn_config.txn_timeout_ms > 0 {
844 t.set_txn_timeout(txn_config.txn_timeout_ms);
845 }
846 Arc::new(std::sync::Mutex::new(t))
847 })
848 .ok();
849 let txn = if let Some(lm) = env_guard.get_log_manager() {
850 Transaction::with_log_manager(txn_id, txn_config, lm)
851 } else {
852 Transaction::new(txn_id, txn_config)
853 };
854 drop(env_guard);
855
856 let txn =
857 if let Some(it) = inner_txn { txn.with_inner_txn(it) } else { txn };
858
859 let txn = txn.with_env_impl(Arc::clone(&self.env_impl));
862
863 let txn = txn.with_active_txns(Arc::clone(&self.active_txns));
868
869 let txn = if let Some(coord) = self.replica_coordinator.lock().clone() {
874 let timeout = *self.replica_ack_timeout.lock();
875 txn.with_replica_coordinator(coord, timeout)
876 } else {
877 txn
878 };
879
880 Ok(txn)
881 }
882
883 pub fn get_database_names(&self) -> Result<Vec<String>> {
893 self.check_open()?;
894 let env_impl = self.env_impl.lock();
895 Ok(env_impl.get_database_names())
896 }
897
898 pub fn set_replica_coordinator(
917 &self,
918 coord: noxu_dbi::SharedReplicaAckCoordinator,
919 ) {
920 *self.replica_coordinator.lock() = Some(coord);
921 }
922
923 pub fn clear_replica_coordinator(&self) {
928 *self.replica_coordinator.lock() = None;
929 }
930
931 pub fn set_replica_ack_timeout(&self, timeout: std::time::Duration) {
937 *self.replica_ack_timeout.lock() = timeout;
938 }
939
940 pub fn get_replica_ack_timeout(&self) -> std::time::Duration {
942 *self.replica_ack_timeout.lock()
943 }
944
945 pub fn get_home(&self) -> &Path {
949 &self.home
950 }
951
952 pub fn get_config(&self) -> &EnvironmentConfig {
956 &self.config
957 }
958
959 pub fn get_mutable_config(&self) -> Result<EnvironmentMutableConfig> {
965 self.check_open()?;
966 Ok(EnvironmentMutableConfig {
967 cache_size: Some(self.config.cache_size as usize),
968 durability: None,
969 txn_no_sync: self.config.txn_no_sync,
970 txn_write_no_sync: self.config.txn_write_no_sync,
971 run_cleaner: Some(self.config.run_cleaner),
972 run_checkpointer: Some(self.config.run_checkpointer),
973 run_evictor: Some(self.config.run_evictor),
974 lock_timeout_ms: Some(self.config.lock_timeout_ms),
975 txn_timeout_ms: Some(self.config.txn_timeout_ms),
976 })
977 }
978
979 pub fn set_mutable_config(
989 &mut self,
990 cfg: EnvironmentMutableConfig,
991 ) -> Result<()> {
992 self.check_open()?;
993 if let Some(sz) = cfg.cache_size {
994 self.config.cache_size = sz as u64;
995 let env_impl = self.env_impl.lock();
1000 let evictor = env_impl.get_evictor();
1001 evictor.get_arbiter().set_max_memory(sz as i64);
1002 }
1003 if let Some(ms) = cfg.lock_timeout_ms {
1004 self.config.lock_timeout_ms = ms;
1005 let env_impl = self.env_impl.lock();
1007 env_impl.get_lock_manager().set_lock_timeout(ms);
1008 }
1009 if let Some(ms) = cfg.txn_timeout_ms {
1010 self.config.txn_timeout_ms = ms;
1011 }
1020 self.config.txn_no_sync = cfg.txn_no_sync;
1021 self.config.txn_write_no_sync = cfg.txn_write_no_sync;
1022 if let Some(v) = cfg.run_cleaner {
1026 self.config.run_cleaner = v;
1027 }
1028 if let Some(v) = cfg.run_checkpointer {
1029 self.config.run_checkpointer = v;
1030 }
1031 if let Some(v) = cfg.run_evictor {
1032 self.config.run_evictor = v;
1033 }
1034 Ok(())
1035 }
1036
1037 pub fn checkpoint(&self, config: Option<&CheckpointConfig>) -> Result<()> {
1049 self.check_open()?;
1050
1051 let cfg = config.cloned().unwrap_or_default();
1057
1058 if !cfg.force {
1059 if cfg.k_bytes > 0 {
1062 let cur_lsn = self
1063 .log_manager
1064 .as_ref()
1065 .map(|lm| lm.get_end_of_log())
1066 .unwrap_or(noxu_util::NULL_LSN);
1067 let last = *self.last_checkpoint_end_lsn.lock();
1068 let bytes_written =
1069 cur_lsn.as_u64().saturating_sub(last.as_u64());
1070 let threshold = (cfg.k_bytes as u64) * 1024;
1071 if bytes_written < threshold {
1072 log::debug!(
1073 "checkpoint: skipping (k_bytes threshold {} not \
1074 met, only {} bytes since last checkpoint)",
1075 threshold,
1076 bytes_written,
1077 );
1078 return Ok(());
1079 }
1080 }
1081
1082 if cfg.minutes > 0 {
1085 let last_at = *self.last_checkpoint_time.lock();
1086 if let Some(at) = last_at {
1087 let elapsed = at.elapsed();
1088 let threshold =
1089 std::time::Duration::from_secs(cfg.minutes as u64 * 60);
1090 if elapsed < threshold {
1091 log::debug!(
1092 "checkpoint: skipping (minutes threshold {:?} \
1093 not met, only {:?} since last checkpoint)",
1094 threshold,
1095 elapsed,
1096 );
1097 return Ok(());
1098 }
1099 }
1100 }
1101 }
1102
1103 let invoker = match (cfg.force, cfg.minimize_recovery_time) {
1109 (true, true) => "manual_force_full",
1110 (true, false) => "manual_force",
1111 (false, true) => "manual_full",
1112 (false, false) => "manual",
1113 };
1114
1115 let env_impl = self.env_impl.lock();
1116 env_impl
1117 .run_checkpoint_with_invoker(invoker)
1118 .map_err(|e| NoxuError::environment(e.to_string()))?;
1119 drop(env_impl);
1120
1121 *self.last_checkpoint_time.lock() = Some(Instant::now());
1124 if let Some(lm) = &self.log_manager {
1125 *self.last_checkpoint_end_lsn.lock() = lm.get_end_of_log();
1126 }
1127 Ok(())
1128 }
1129
1130 pub fn is_valid(&self) -> bool {
1137 self.open.load(Ordering::Acquire)
1138 && self.env_valid.load(Ordering::Acquire)
1139 }
1140
1141 pub fn invalidate(&self) {
1148 self.env_valid.store(false, Ordering::Release);
1149 }
1150
1151 pub fn is_transactional(&self) -> bool {
1155 self.config.transactional
1156 }
1157
1158 pub fn is_read_only(&self) -> bool {
1162 self.config.read_only
1163 }
1164
1165 pub fn get_stats(&self) -> Result<EnvironmentStats> {
1169 self.check_open()?;
1170 let env_impl = self.env_impl.lock();
1171 let n_databases = env_impl.n_databases() as u32;
1172 let log = self
1174 .log_manager
1175 .as_ref()
1176 .map(|lm| LogStatsSnapshot::from(&lm.get_stats()))
1177 .unwrap_or_default();
1178 let lock =
1179 LockStatsSnapshot::from(&env_impl.get_lock_manager().get_stats());
1180 let txn =
1181 TxnStatsSnapshot::from(&env_impl.get_txn_manager().get_stats());
1182 let throughput = env_impl.get_throughput_snapshot();
1183 let evictor =
1184 EvictorStatsSnapshot::from(env_impl.get_evictor().get_stats());
1185 let cleaner = env_impl
1186 .get_cleaner()
1187 .map(|c| c.get_stats().snapshot())
1188 .unwrap_or_default();
1189 let checkpoint = env_impl
1190 .get_checkpointer()
1191 .map(|cp| cp.get_stats().snapshot())
1192 .unwrap_or_default();
1193 Ok(EnvironmentStats {
1194 cache_size: self.config.cache_size,
1195 cache_usage: 0,
1196 n_databases,
1197 log,
1198 lock,
1199 txn,
1200 throughput,
1201 evictor,
1202 cleaner,
1203 checkpoint,
1204 })
1205 }
1206
1207 pub fn stat_fsync_count(&self) -> u64 {
1213 self.log_manager.as_ref().map(|lm| lm.fsync_count()).unwrap_or(0)
1214 }
1215
1216 pub fn recovered_prepared_txns(
1231 &self,
1232 ) -> Vec<noxu_recovery::PreparedTxnInfo> {
1233 let env_impl = self.env_impl.lock();
1234 env_impl.recovered_prepared_txns()
1235 }
1236
1237 pub fn take_recovered_prepared_lns(
1245 &self,
1246 txn_id: u64,
1247 ) -> Vec<noxu_recovery::PreparedLnReplay> {
1248 let env_impl = self.env_impl.lock();
1249 env_impl.take_recovered_prepared_lns(txn_id)
1250 }
1251
1252 pub fn forget_recovered_prepared_txn(&self, txn_id: u64) {
1256 let env_impl = self.env_impl.lock();
1257 env_impl.forget_recovered_prepared_txn(txn_id);
1258 }
1259
1260 pub fn write_txn_commit_for_recovered(&self, txn_id: u64) -> Result<()> {
1270 let lm = match &self.log_manager {
1271 Some(lm) => lm,
1272 None => return Ok(()), };
1274 let pre_vlsn =
1279 if let Some(coord) = self.replica_coordinator.lock().as_ref() {
1280 coord.pre_alloc_vlsn_for_recovered_commit()
1281 } else {
1282 0
1283 };
1284
1285 let commit_lsn = write_txn_end_for_recovered(
1286 lm, txn_id, true, true, true, pre_vlsn,
1290 )?;
1291
1292 if let Some(coord) = self.replica_coordinator.lock().as_ref() {
1296 if pre_vlsn > 0 {
1297 coord.register_recovered_commit_vlsn(pre_vlsn, commit_lsn);
1298 log::debug!(
1299 "write_txn_commit_for_recovered: txn_id={} commit_lsn={:?} \
1300 embedded+registered vlsn={} (R-3)",
1301 txn_id,
1302 commit_lsn,
1303 pre_vlsn
1304 );
1305 } else {
1306 let vlsn = coord.alloc_vlsn_for_recovered_commit(commit_lsn);
1309 if vlsn > 0 {
1310 log::debug!(
1311 "write_txn_commit_for_recovered: txn_id={} commit_lsn={:?} \
1312 assigned vlsn={} (X-3 legacy path)",
1313 txn_id,
1314 commit_lsn,
1315 vlsn
1316 );
1317 }
1318 }
1319 }
1320 Ok(())
1321 }
1322
1323 pub fn write_txn_abort_for_recovered(&self, txn_id: u64) -> Result<()> {
1326 let lm = match &self.log_manager {
1327 Some(lm) => lm,
1328 None => return Ok(()),
1329 };
1330 write_txn_end_for_recovered(
1331 lm, txn_id, false, false, false, 0, )
1336 .map(|_| ())
1337 }
1338
1339 pub fn apply_recovered_prepared_lns(
1349 &self,
1350 lns: &[noxu_recovery::PreparedLnReplay],
1351 ) -> Result<()> {
1352 let env_impl = self.env_impl.lock();
1353 for ln in lns {
1354 let db_id = noxu_dbi::DatabaseId::new(ln.db_id as i64);
1355 let Some(db_arc) = env_impl.get_database_by_id(db_id) else {
1356 continue;
1357 };
1358 let db_guard = db_arc.read();
1359 let Some(tree) = db_guard.get_real_tree() else {
1360 continue;
1361 };
1362 match ln.operation {
1363 noxu_recovery::PreparedLnOperation::Insert
1364 | noxu_recovery::PreparedLnOperation::Update => {
1365 if let Some(data) = &ln.data {
1366 let _ = tree.insert(
1367 ln.key.clone(),
1368 data.clone(),
1369 ln.original_lsn,
1370 );
1371 }
1372 }
1373 noxu_recovery::PreparedLnOperation::Delete => {
1374 if tree.delete(&ln.key) {
1375 db_guard.decrement_entry_count();
1376 }
1377 }
1378 }
1379 }
1380 Ok(())
1381 }
1382
1383 pub fn verify(
1402 &self,
1403 config: &noxu_engine::VerifyConfig,
1404 ) -> Result<noxu_engine::VerifyResult> {
1405 self.check_open()?;
1406 let env_impl = self.env_impl.lock();
1407 let all_dbs = env_impl.get_all_database_impls();
1408 drop(env_impl);
1409
1410 let mut merged = noxu_engine::VerifyResult::new();
1411 for db_arc in &all_dbs {
1412 let guard = db_arc.read();
1413 let result = noxu_engine::verify_database_impl(&guard, config);
1414 merged.databases_verified += result.databases_verified;
1415 merged.records_verified += result.records_verified;
1416 for err in result.errors {
1417 merged.add_error(err);
1418 if merged.error_count() >= config.max_errors as usize {
1419 return Ok(merged);
1420 }
1421 }
1422 for w in result.warnings {
1423 merged.add_warning(w);
1424 }
1425 }
1426 Ok(merged)
1427 }
1428
1429 pub fn compress(&self) -> Result<usize> {
1441 self.check_open()?;
1442 let env_impl = self.env_impl.lock();
1443 let n = env_impl.compress_all();
1444 Ok(n)
1445 }
1446
1447 pub fn evict_memory(&self) -> Result<usize> {
1457 self.check_open()?;
1458 let env_impl = self.env_impl.lock();
1459 let bytes = env_impl.evict_memory();
1460 Ok(bytes)
1461 }
1462 pub(crate) fn mark_database_closed(&self, name: &str) {
1465 let databases = self.databases.lock();
1466 if let Some(db_handle) = databases.get(name) {
1467 db_handle.open.store(false, Ordering::Release);
1468 }
1469 }
1470
1471 pub(crate) fn mark_transaction_complete(&self, txn_id: u64) {
1478 self.active_txns.mark_complete(txn_id);
1479 }
1480
1481 fn check_open(&self) -> Result<()> {
1482 if !self.open.load(Ordering::Acquire) {
1483 return Err(NoxuError::EnvironmentClosed);
1484 }
1485 if !self.env_valid.load(Ordering::Acquire) {
1486 return Err(NoxuError::environment_with_reason(
1487 crate::error::EnvironmentFailureReason::ForcedShutdown,
1488 "environment has been invalidated due to a prior fatal error"
1489 .to_string(),
1490 ));
1491 }
1492 Ok(())
1493 }
1494
1495 fn check_writable(&self, what: &str) -> Result<()> {
1500 self.check_open()?;
1501 if self.config.read_only {
1502 return Err(NoxuError::OperationNotAllowed(format!(
1503 "{what}: environment is read-only",
1504 )));
1505 }
1506 Ok(())
1507 }
1508}
1509
1510fn write_txn_end_for_recovered(
1523 lm: &LogManager,
1524 txn_id: u64,
1525 is_commit: bool,
1526 fsync: bool,
1527 flush: bool,
1528 vlsn: u64,
1529) -> Result<noxu_util::Lsn> {
1530 use bytes::BytesMut;
1531 use noxu_log::{LogEntryType, Provisional, entry::TxnEndEntry};
1532 use noxu_util::{
1533 lsn::NULL_LSN,
1534 vlsn::{NULL_VLSN, Vlsn},
1535 };
1536
1537 let timestamp = std::time::SystemTime::now()
1538 .duration_since(std::time::UNIX_EPOCH)
1539 .unwrap_or_default()
1540 .as_millis() as u64;
1541
1542 let dtvlsn = if vlsn > 0 { Vlsn::new(vlsn as i64) } else { NULL_VLSN };
1545
1546 let entry = if is_commit {
1547 TxnEndEntry::new_commit(txn_id as i64, NULL_LSN, timestamp, 0, dtvlsn)
1548 } else {
1549 TxnEndEntry::new_abort(txn_id as i64, NULL_LSN, timestamp, 0, NULL_VLSN)
1550 };
1551
1552 let entry_type = if is_commit {
1553 LogEntryType::TxnCommit
1554 } else {
1555 LogEntryType::TxnAbort
1556 };
1557
1558 let mut buf = BytesMut::with_capacity(entry.log_size());
1559 entry.write_to_log(&mut buf);
1560
1561 lm.log(entry_type, &buf, Provisional::No, flush, fsync).map_err(|e| {
1562 NoxuError::environment_with_reason(
1563 crate::error::EnvironmentFailureReason::LogWrite,
1564 e.to_string(),
1565 )
1566 })
1567}
1568
1569impl Drop for Environment {
1570 fn drop(&mut self) {
1571 let _ = self.close();
1573 }
1574}
1575
1576#[cfg(test)]
1577mod tests {
1578 use super::*;
1579 use tempfile::TempDir;
1580
1581 fn temp_env_config() -> (TempDir, EnvironmentConfig) {
1582 let temp_dir = TempDir::new().unwrap();
1583 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1584 .with_allow_create(true)
1585 .with_transactional(true);
1586 (temp_dir, config)
1587 }
1588
1589 #[test]
1590 fn test_open_environment() {
1591 let (temp_dir, config) = temp_env_config();
1592 let env = Environment::open(config).unwrap();
1593 assert!(env.is_valid());
1594 assert_eq!(env.get_home(), temp_dir.path());
1595 env.close().unwrap();
1596 }
1597
1598 #[test]
1599 fn test_open_creates_directory() {
1600 let temp_dir = TempDir::new().unwrap();
1601 let home = temp_dir.path().join("subdir");
1602 let config =
1603 EnvironmentConfig::new(home.clone()).with_allow_create(true);
1604
1605 let env = Environment::open(config).unwrap();
1606 assert!(home.exists());
1607 assert!(home.is_dir());
1608 env.close().unwrap();
1609 }
1610
1611 #[test]
1612 fn test_open_fails_without_allow_create() {
1613 let temp_dir = TempDir::new().unwrap();
1614 let home = temp_dir.path().join("nonexistent");
1615 let config = EnvironmentConfig::new(home).with_allow_create(false);
1616
1617 let result = Environment::open(config);
1618 assert!(result.is_err());
1619 }
1620
1621 #[test]
1622 fn test_close_environment() {
1623 let (_temp_dir, config) = temp_env_config();
1624 let env = Environment::open(config).unwrap();
1625 assert!(env.is_valid());
1626 env.close().unwrap();
1627 assert!(!env.is_valid());
1628 }
1629
1630 #[test]
1631 fn test_close_twice_fails() {
1632 let (_temp_dir, config) = temp_env_config();
1633 let env = Environment::open(config).unwrap();
1634 env.close().unwrap();
1635 let result = env.close();
1636 assert!(result.is_err());
1637 }
1638
1639 #[test]
1640 fn test_close_with_open_database_fails() {
1641 let (_temp_dir, config) = temp_env_config();
1642 let env = Environment::open(config).unwrap();
1643
1644 let db_config = DatabaseConfig::new().with_allow_create(true);
1645 let _db = env.open_database(None, "testdb", &db_config).unwrap();
1646
1647 let result = env.close();
1648 assert!(result.is_err());
1649 }
1650
1651 #[test]
1652 fn test_open_database() {
1653 let (_temp_dir, config) = temp_env_config();
1654 let env = Environment::open(config).unwrap();
1655
1656 let db_config = DatabaseConfig::new().with_allow_create(true);
1657 let db = env.open_database(None, "testdb", &db_config).unwrap();
1658 assert_eq!(db.get_database_name(), "testdb");
1659 assert!(db.is_valid());
1660 }
1661
1662 #[test]
1663 fn test_open_database_twice_fails() {
1664 let (_temp_dir, config) = temp_env_config();
1665 let env = Environment::open(config).unwrap();
1666
1667 let db_config = DatabaseConfig::new().with_allow_create(true);
1668 let _db1 = env.open_database(None, "testdb", &db_config).unwrap();
1669 let result = env.open_database(None, "testdb", &db_config);
1670 assert!(result.is_err());
1671 }
1672
1673 #[test]
1674 fn test_open_database_without_create_fails() {
1675 let (_temp_dir, config) = temp_env_config();
1676 let env = Environment::open(config).unwrap();
1677
1678 let db_config = DatabaseConfig::new().with_allow_create(false);
1679 let result = env.open_database(None, "nonexistent", &db_config);
1680 assert!(result.is_err());
1681 }
1682
1683 #[test]
1684 fn test_open_database_empty_name_fails() {
1685 let (_temp_dir, config) = temp_env_config();
1686 let env = Environment::open(config).unwrap();
1687
1688 let db_config = DatabaseConfig::new().with_allow_create(true);
1689 let result = env.open_database(None, "", &db_config);
1690 assert!(result.is_err());
1691 }
1692
1693 #[test]
1694 fn test_remove_database() {
1695 let (_temp_dir, config) = temp_env_config();
1696 let env = Environment::open(config).unwrap();
1697
1698 let db_config = DatabaseConfig::new().with_allow_create(true);
1699 let db = env.open_database(None, "testdb", &db_config).unwrap();
1700 db.close().unwrap();
1701
1702 env.remove_database(None, "testdb").unwrap();
1703 let names = env.get_database_names().unwrap();
1704 assert!(!names.contains(&"testdb".to_string()));
1705 }
1706
1707 #[test]
1708 fn test_remove_open_database_fails() {
1709 let (_temp_dir, config) = temp_env_config();
1710 let env = Environment::open(config).unwrap();
1711
1712 let db_config = DatabaseConfig::new().with_allow_create(true);
1713 let _db = env.open_database(None, "testdb", &db_config).unwrap();
1714
1715 let result = env.remove_database(None, "testdb");
1716 assert!(result.is_err());
1717 }
1718
1719 #[test]
1720 fn test_remove_nonexistent_database_fails() {
1721 let (_temp_dir, config) = temp_env_config();
1722 let env = Environment::open(config).unwrap();
1723
1724 let result = env.remove_database(None, "nonexistent");
1725 assert!(result.is_err());
1726 }
1727
1728 #[test]
1729 fn test_rename_database() {
1730 let (_temp_dir, config) = temp_env_config();
1731 let env = Environment::open(config).unwrap();
1732
1733 let db_config = DatabaseConfig::new().with_allow_create(true);
1734 let db = env.open_database(None, "oldname", &db_config).unwrap();
1735 db.close().unwrap();
1736
1737 env.rename_database(None, "oldname", "newname").unwrap();
1738
1739 let names = env.get_database_names().unwrap();
1740 assert!(!names.contains(&"oldname".to_string()));
1741 assert!(names.contains(&"newname".to_string()));
1742 }
1743
1744 #[test]
1745 fn test_rename_to_same_name() {
1746 let (_temp_dir, config) = temp_env_config();
1747 let env = Environment::open(config).unwrap();
1748
1749 let db_config = DatabaseConfig::new().with_allow_create(true);
1750 let db = env.open_database(None, "testdb", &db_config).unwrap();
1751 db.close().unwrap();
1752
1753 env.rename_database(None, "testdb", "testdb").unwrap();
1754 }
1755
1756 #[test]
1757 fn test_rename_open_database_fails() {
1758 let (_temp_dir, config) = temp_env_config();
1759 let env = Environment::open(config).unwrap();
1760
1761 let db_config = DatabaseConfig::new().with_allow_create(true);
1762 let _db = env.open_database(None, "testdb", &db_config).unwrap();
1763
1764 let result = env.rename_database(None, "testdb", "newname");
1765 assert!(result.is_err());
1766 }
1767
1768 #[test]
1769 fn test_rename_nonexistent_database_fails() {
1770 let (_temp_dir, config) = temp_env_config();
1771 let env = Environment::open(config).unwrap();
1772
1773 let result = env.rename_database(None, "nonexistent", "newname");
1774 assert!(result.is_err());
1775 }
1776
1777 #[test]
1778 fn test_rename_to_existing_database_fails() {
1779 let (_temp_dir, config) = temp_env_config();
1780 let env = Environment::open(config).unwrap();
1781
1782 let db_config = DatabaseConfig::new().with_allow_create(true);
1783 let db1 = env.open_database(None, "db1", &db_config).unwrap();
1784 let db2 = env.open_database(None, "db2", &db_config).unwrap();
1785 db1.close().unwrap();
1786 db2.close().unwrap();
1787
1788 let result = env.rename_database(None, "db1", "db2");
1789 assert!(result.is_err());
1790 }
1791
1792 #[test]
1793 fn test_get_database_names() {
1794 let (_temp_dir, config) = temp_env_config();
1795 let env = Environment::open(config).unwrap();
1796
1797 let db_config = DatabaseConfig::new().with_allow_create(true);
1798 let _db1 = env.open_database(None, "db1", &db_config).unwrap();
1799 let _db2 = env.open_database(None, "db2", &db_config).unwrap();
1800
1801 let names = env.get_database_names().unwrap();
1802 assert_eq!(names.len(), 2);
1803 assert!(names.contains(&"db1".to_string()));
1804 assert!(names.contains(&"db2".to_string()));
1805 }
1806
1807 #[test]
1810 fn test_transactional_open_database_abort_removes_db() {
1811 let (temp_dir, config) = temp_env_config();
1812 {
1813 let env = Environment::open(config).unwrap();
1814 let txn = env.begin_transaction(None).unwrap();
1815 let db_config = DatabaseConfig::new()
1816 .with_allow_create(true)
1817 .with_transactional(true);
1818 let _db = env
1819 .open_database(Some(&txn), "aborted_db", &db_config)
1820 .unwrap();
1821 txn.abort().unwrap();
1822 let names = env.get_database_names().unwrap();
1824 assert!(
1825 !names.contains(&"aborted_db".to_string()),
1826 "aborted database must not appear in get_database_names() \
1827 (C-4 committed-only semantics), got: {:?}",
1828 names
1829 );
1830 drop(env);
1831 }
1832 let env2 = Environment::open(
1834 EnvironmentConfig::new(temp_dir.path().to_path_buf())
1835 .with_allow_create(false)
1836 .with_transactional(true),
1837 )
1838 .unwrap();
1839 let names2 = env2.get_database_names().unwrap();
1840 assert!(
1841 !names2.contains(&"aborted_db".to_string()),
1842 "after env reopen, aborted database must not appear: {:?}",
1843 names2
1844 );
1845 }
1846
1847 #[test]
1850 fn test_get_database_names_excludes_uncommitted() {
1851 let (_temp_dir, config) = temp_env_config();
1852 let env = Environment::open(config).unwrap();
1853
1854 let db_config = DatabaseConfig::new()
1855 .with_allow_create(true)
1856 .with_transactional(true);
1857 let txn = env.begin_transaction(None).unwrap();
1858 let _db =
1859 env.open_database(Some(&txn), "pending_db", &db_config).unwrap();
1860
1861 let names = env.get_database_names().unwrap();
1864 assert!(
1865 !names.contains(&"pending_db".to_string()),
1866 "uncommitted database must be invisible to get_database_names() \
1867 (C-4 / JE 1-J): got {:?}",
1868 names
1869 );
1870
1871 txn.commit().unwrap();
1873 let names_after = env.get_database_names().unwrap();
1874 assert!(
1875 names_after.contains(&"pending_db".to_string()),
1876 "committed database must appear in get_database_names()"
1877 );
1878 }
1879
1880 #[test]
1881 fn test_begin_transaction() {
1882 let temp_dir = TempDir::new().unwrap();
1883 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1884 .with_allow_create(true)
1885 .with_transactional(false);
1886 let env = Environment::open(config).unwrap();
1887
1888 let result = env.begin_transaction(None);
1889 assert!(result.is_err());
1890 }
1891
1892 #[test]
1893 fn test_is_transactional() {
1894 let (_temp_dir, config) = temp_env_config();
1895 let env = Environment::open(config).unwrap();
1896 assert!(env.is_transactional());
1897 }
1898
1899 #[test]
1900 fn test_is_not_transactional() {
1901 let temp_dir = TempDir::new().unwrap();
1902 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1903 .with_allow_create(true)
1904 .with_transactional(false);
1905 let env = Environment::open(config).unwrap();
1906 assert!(!env.is_transactional());
1907 }
1908
1909 #[test]
1910 fn test_is_read_only() {
1911 let temp_dir = TempDir::new().unwrap();
1912 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1913 .with_allow_create(true)
1914 .with_read_only(true);
1915 let env = Environment::open(config).unwrap();
1916 assert!(env.is_read_only());
1917 }
1918
1919 #[test]
1920 fn test_operations_on_closed_environment_fail() {
1921 let (_temp_dir, config) = temp_env_config();
1922 let env = Environment::open(config).unwrap();
1923 env.close().unwrap();
1924
1925 let db_config = DatabaseConfig::new().with_allow_create(true);
1926 assert!(env.open_database(None, "test", &db_config).is_err());
1927 assert!(env.remove_database(None, "test").is_err());
1928 assert!(env.rename_database(None, "a", "b").is_err());
1929 assert!(env.begin_transaction(None).is_err());
1930 assert!(env.get_database_names().is_err());
1931 }
1932
1933 #[test]
1939 fn test_open_fails_if_home_is_a_file() {
1940 use std::io::Write;
1941 let temp_dir = TempDir::new().unwrap();
1942 let file_path = temp_dir.path().join("not_a_dir.txt");
1943 let mut f = std::fs::File::create(&file_path).unwrap();
1944 writeln!(f, "data").unwrap();
1945 drop(f);
1946
1947 let config = EnvironmentConfig::new(file_path).with_allow_create(false);
1948 let result = Environment::open(config);
1950 assert!(result.is_err());
1951 }
1952
1953 #[test]
1955 fn test_open_database_with_node_max_entries() {
1956 let (_temp_dir, config) = temp_env_config();
1957 let env = Environment::open(config).unwrap();
1958
1959 let mut db_config = DatabaseConfig::new().with_allow_create(true);
1960 db_config.set_node_max_entries(64);
1961 let db = env.open_database(None, "testdb_entries", &db_config).unwrap();
1962 assert!(db.is_valid());
1963 }
1964
1965 #[test]
1967 fn test_begin_transaction_with_explicit_config() {
1968 use crate::transaction_config::TransactionConfig;
1969 let (_temp_dir, config) = temp_env_config();
1970 let env = Environment::open(config).unwrap();
1971
1972 let txn_config = TransactionConfig::new();
1973 let txn = env.begin_transaction(Some(&txn_config)).unwrap();
1974 assert!(txn.is_valid());
1975 }
1976
1977 #[test]
1981 fn test_rename_database_handle_not_in_map() {
1982 let (_temp_dir, config) = temp_env_config();
1983 let env = Environment::open(config).unwrap();
1984
1985 {
1989 let env_impl = env.env_impl.lock();
1990 let mut dbi_config = noxu_dbi::DatabaseConfig::new();
1991 dbi_config.set_allow_create(true);
1992 let db_arc =
1993 env_impl.open_database("ghost_db", &dbi_config).unwrap();
1994 let db_id = db_arc.read().get_id();
1995 env_impl.close_database(db_id).unwrap();
1996 }
1997
1998 env.rename_database(None, "ghost_db", "ghost_db_renamed").unwrap();
2000
2001 let names = env.get_database_names().unwrap();
2002 assert!(names.contains(&"ghost_db_renamed".to_string()));
2003 assert!(!names.contains(&"ghost_db".to_string()));
2004 }
2005
2006 #[test]
2008 fn test_close_with_active_transactions_fails() {
2009 let (_temp_dir, config) = temp_env_config();
2010 let env = Environment::open(config).unwrap();
2011
2012 let _txn = env.begin_transaction(None).unwrap();
2013
2014 let result = env.close();
2015 assert!(result.is_err());
2016 }
2017
2018 #[test]
2020 fn test_get_config_and_home() {
2021 let (temp_dir, config) = temp_env_config();
2022 let env = Environment::open(config).unwrap();
2023
2024 assert!(env.get_config().allow_create);
2025 assert_eq!(env.get_home(), temp_dir.path());
2026 env.close().unwrap();
2027 }
2028
2029 #[test]
2031 fn test_mark_database_closed_known_name() {
2032 let (_temp_dir, config) = temp_env_config();
2033 let env = Environment::open(config).unwrap();
2034
2035 let db_config = DatabaseConfig::new().with_allow_create(true);
2036 let db = env.open_database(None, "mydb", &db_config).unwrap();
2037 env.mark_database_closed("mydb");
2039 let _ = db.is_valid(); env.close().unwrap();
2042 }
2043
2044 #[test]
2046 fn test_mark_database_closed_unknown_name_is_noop() {
2047 let (_temp_dir, config) = temp_env_config();
2048 let env = Environment::open(config).unwrap();
2049 env.mark_database_closed("ghost");
2051 env.close().unwrap();
2052 }
2053
2054 #[test]
2056 fn test_mark_transaction_complete_allows_env_close() {
2057 let (_temp_dir, config) = temp_env_config();
2058 let env = Environment::open(config).unwrap();
2059
2060 let txn = env.begin_transaction(None).unwrap();
2061 let txn_id = txn.get_id();
2062
2063 env.mark_transaction_complete(txn_id);
2066
2067 env.close().unwrap();
2069 }
2070
2071 #[test]
2074 fn test_verify_empty_environment_passes() {
2075 use crate::VerifyConfig;
2076 let (_tmp, config) = temp_env_config();
2077 let env = Environment::open(config).unwrap();
2078 let verify_cfg = VerifyConfig::default();
2079 let result = env.verify(&verify_cfg).unwrap();
2080 assert!(result.passed, "empty env should pass: {:?}", result.errors);
2081 }
2082
2083 #[test]
2084 fn test_verify_environment_with_data_passes() {
2085 use crate::{DatabaseConfig, DatabaseEntry, VerifyConfig};
2086 let (_tmp, config) = temp_env_config();
2087 let env = Environment::open(config).unwrap();
2088
2089 let mut db_config = DatabaseConfig::new();
2090 db_config.set_allow_create(true);
2091 let db = env.open_database(None, "vtest", &db_config).unwrap();
2092 for i in 0u32..10 {
2093 let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
2094 let v = DatabaseEntry::from_bytes(&(i * 3).to_be_bytes());
2095 db.put(None, &k, &v).unwrap();
2096 }
2097
2098 let verify_cfg = VerifyConfig::default();
2099 let result = env.verify(&verify_cfg).unwrap();
2100 assert!(
2101 result.passed,
2102 "env with data should pass: {:?}",
2103 result.errors
2104 );
2105 assert!(result.records_verified >= 10);
2106 db.close().unwrap();
2107 env.close().unwrap();
2108 }
2109
2110 #[test]
2111 fn test_verify_closed_environment_fails() {
2112 use crate::VerifyConfig;
2113 let (_tmp, config) = temp_env_config();
2114 let env = Environment::open(config).unwrap();
2115 env.close().unwrap();
2116 let verify_cfg = VerifyConfig::default();
2117 assert!(env.verify(&verify_cfg).is_err());
2118 }
2119
2120 #[test]
2123 fn test_checkpoint_default_succeeds() {
2124 let (_tmp, config) = temp_env_config();
2125 let env = Environment::open(config).unwrap();
2126 env.checkpoint(None).unwrap();
2128 env.close().unwrap();
2129 }
2130
2131 #[test]
2132 fn test_checkpoint_with_config_succeeds() {
2133 let (_tmp, config) = temp_env_config();
2134 let env = Environment::open(config).unwrap();
2135 let ckpt_cfg = CheckpointConfig {
2136 force: true,
2137 k_bytes: 0,
2138 minutes: 0,
2139 minimize_recovery_time: false,
2140 };
2141 env.checkpoint(Some(&ckpt_cfg)).unwrap();
2142 env.close().unwrap();
2143 }
2144
2145 #[test]
2146 fn test_checkpoint_closed_env_fails() {
2147 let (_tmp, config) = temp_env_config();
2148 let env = Environment::open(config).unwrap();
2149 env.close().unwrap();
2150 assert!(env.checkpoint(None).is_err());
2151 }
2152
2153 #[test]
2156 fn test_get_mutable_config_returns_current_values() {
2157 let (_tmp, config) = temp_env_config();
2158 let env = Environment::open(config).unwrap();
2159 let mc = env.get_mutable_config().unwrap();
2160 assert!(mc.cache_size.is_some());
2162 assert!(mc.run_cleaner.is_some());
2163 assert!(mc.run_checkpointer.is_some());
2164 assert!(mc.run_evictor.is_some());
2165 env.close().unwrap();
2166 }
2167
2168 #[test]
2169 fn test_get_mutable_config_closed_env_fails() {
2170 let (_tmp, config) = temp_env_config();
2171 let env = Environment::open(config).unwrap();
2172 env.close().unwrap();
2173 assert!(env.get_mutable_config().is_err());
2174 }
2175
2176 #[test]
2177 fn test_set_mutable_config_updates_cache_size() {
2178 let (_tmp, config) = temp_env_config();
2179 let mut env = Environment::open(config).unwrap();
2180 let new_size: usize = 128 * 1024 * 1024; let mc = EnvironmentMutableConfig::new().with_cache_size(new_size);
2182 env.set_mutable_config(mc).unwrap();
2183 let updated = env.get_mutable_config().unwrap();
2184 assert_eq!(updated.cache_size.unwrap(), new_size);
2185 env.close().unwrap();
2186 }
2187
2188 #[test]
2189 fn test_set_mutable_config_updates_timeouts() {
2190 let (_tmp, config) = temp_env_config();
2191 let mut env = Environment::open(config).unwrap();
2192 let mc = EnvironmentMutableConfig {
2193 lock_timeout_ms: Some(5_000),
2194 txn_timeout_ms: Some(10_000),
2195 ..EnvironmentMutableConfig::default()
2196 };
2197 env.set_mutable_config(mc).unwrap();
2198 let updated = env.get_mutable_config().unwrap();
2201 assert_eq!(updated.lock_timeout_ms, Some(5_000));
2202 assert_eq!(updated.txn_timeout_ms, Some(10_000));
2203 env.close().unwrap();
2204 }
2205
2206 #[test]
2207 fn test_set_mutable_config_none_timeout_unchanged() {
2208 let (_tmp, config) = temp_env_config();
2209 let mut env = Environment::open(config).unwrap();
2210 let original = env.get_mutable_config().unwrap();
2211 let mc = EnvironmentMutableConfig {
2216 lock_timeout_ms: None,
2217 txn_timeout_ms: None,
2218 ..EnvironmentMutableConfig::default()
2219 };
2220 env.set_mutable_config(mc).unwrap();
2221 let updated = env.get_mutable_config().unwrap();
2222 assert_eq!(updated.lock_timeout_ms, original.lock_timeout_ms);
2223 assert_eq!(updated.txn_timeout_ms, original.txn_timeout_ms);
2224 env.close().unwrap();
2225 }
2226
2227 #[test]
2228 fn test_set_mutable_config_closed_env_fails() {
2229 let (_tmp, config) = temp_env_config();
2230 let mut env = Environment::open(config).unwrap();
2231 env.close().unwrap();
2232 let mc = EnvironmentMutableConfig::new();
2233 assert!(env.set_mutable_config(mc).is_err());
2234 }
2235
2236 #[test]
2242 fn test_read_only_env_rejects_create_database() {
2243 let temp_dir = TempDir::new().unwrap();
2245 {
2246 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2247 .with_allow_create(true)
2248 .with_transactional(true);
2249 let _env = Environment::open(config).unwrap();
2250 }
2251 let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2253 .with_read_only(true)
2254 .with_transactional(true);
2255 let env = Environment::open(ro_config).unwrap();
2256
2257 let db_cfg = DatabaseConfig::new().with_allow_create(true);
2258 let result = env.open_database(None, "new", &db_cfg);
2259 assert!(
2260 result.is_err(),
2261 "open_database with allow_create on read-only env must fail",
2262 );
2263 }
2264
2265 #[test]
2267 fn test_read_only_env_rejects_remove_database() {
2268 let temp_dir = TempDir::new().unwrap();
2269 {
2270 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2271 .with_allow_create(true)
2272 .with_transactional(true);
2273 let _env = Environment::open(config).unwrap();
2274 }
2275 let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2276 .with_read_only(true)
2277 .with_transactional(true);
2278 let env = Environment::open(ro_config).unwrap();
2279
2280 assert!(env.remove_database(None, "test").is_err());
2281 assert!(env.truncate_database(None, "test").is_err());
2282 assert!(env.rename_database(None, "a", "b").is_err());
2283 }
2284
2285 #[test]
2287 fn test_read_only_env_rejects_writable_txn() {
2288 let temp_dir = TempDir::new().unwrap();
2289 {
2290 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2291 .with_allow_create(true)
2292 .with_transactional(true);
2293 let _env = Environment::open(config).unwrap();
2294 }
2295 let ro_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2296 .with_read_only(true)
2297 .with_transactional(true);
2298 let env = Environment::open(ro_config).unwrap();
2299
2300 let result = env.begin_transaction(None);
2302 assert!(result.is_err(), "writable txn on read-only env must fail");
2303
2304 let ro_txn_cfg = TransactionConfig::default().with_read_only(true);
2306 let _txn = env
2307 .begin_transaction(Some(&ro_txn_cfg))
2308 .expect("read-only txn on read-only env must succeed");
2309 }
2310
2311 #[test]
2314 fn test_checkpoint_minutes_threshold_skips() {
2315 let (_tmp, config) = temp_env_config();
2316 let env = Environment::open(config).unwrap();
2317
2318 env.checkpoint(None).unwrap();
2320
2321 let cfg = CheckpointConfig::default().with_minutes(60);
2323 env.checkpoint(Some(&cfg)).unwrap();
2324 let cfg = CheckpointConfig::default().with_force(true).with_minutes(60);
2329 env.checkpoint(Some(&cfg)).unwrap();
2330 env.close().unwrap();
2331 }
2332
2333 #[test]
2336 fn test_set_mutable_config_pushes_cache_size_to_evictor() {
2337 let (_tmp, config) = temp_env_config();
2338 let mut env = Environment::open(config).unwrap();
2339
2340 let mc = EnvironmentMutableConfig {
2341 cache_size: Some(64 * 1024 * 1024),
2342 ..EnvironmentMutableConfig::default()
2343 };
2344 env.set_mutable_config(mc).unwrap();
2345
2346 let env_impl = env.env_impl.lock();
2347 let evictor = env_impl.get_evictor();
2348 assert_eq!(
2349 evictor.get_arbiter().get_max_memory(),
2350 64 * 1024 * 1024,
2351 "set_mutable_config(cache_size) must push to Arbiter",
2352 );
2353 }
2354
2355 #[test]
2359 #[allow(deprecated)] fn test_env_txn_no_sync_applies_to_explicit_txn() {
2361 let temp_dir = TempDir::new().unwrap();
2362 let config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2363 .with_allow_create(true)
2364 .with_transactional(true)
2365 .with_txn_no_sync(true);
2366 let env = Environment::open(config).unwrap();
2367
2368 let txn = env.begin_transaction(None).unwrap();
2369 let dur = txn.get_durability().expect("durability must be set");
2371 assert_eq!(
2372 dur,
2373 crate::durability::Durability::COMMIT_NO_SYNC,
2374 "env txn_no_sync=true must propagate to explicit-txn durability",
2375 );
2376 txn.commit().unwrap();
2377 env.close().unwrap();
2378 }
2379
2380 #[test]
2383 fn test_drop_aborts_open_transaction() {
2384 let (_tmp, config) = temp_env_config();
2385 let env = Environment::open(config).unwrap();
2386
2387 let initial_active = env.active_txns.len();
2388 {
2389 let _txn = env.begin_transaction(None).unwrap();
2390 assert_eq!(env.active_txns.len(), initial_active + 1);
2391 }
2393 assert_eq!(
2395 env.active_txns.len(),
2396 initial_active,
2397 "Transaction::Drop must abort and prune from active_txns",
2398 );
2399 env.close().unwrap();
2401 }
2402
2403 #[test]
2409 fn test_x3_recovered_commit_calls_alloc_vlsn() {
2410 use noxu_dbi::{
2411 AckWaitError, ReplicaAckCoordinator, ReplicaAckPolicyKind,
2412 };
2413 use noxu_util::Lsn;
2414 use std::sync::Arc;
2415 use std::sync::atomic::{AtomicU64, Ordering as AO};
2416 use std::time::Duration;
2417
2418 struct MockCoord {
2420 last_lsn: AtomicU64,
2421 }
2422 impl ReplicaAckCoordinator for MockCoord {
2423 fn await_replica_acks(
2424 &self,
2425 _policy: ReplicaAckPolicyKind,
2426 _timeout: Duration,
2427 ) -> std::result::Result<u32, AckWaitError> {
2428 Ok(0)
2429 }
2430 fn alloc_vlsn_for_recovered_commit(&self, lsn: Lsn) -> u64 {
2431 self.last_lsn.store(lsn.as_u64(), AO::SeqCst);
2432 lsn.file_number() as u64 + 1
2434 }
2435 }
2436
2437 let (tmp, config) = temp_env_config();
2438 let env = Environment::open(config).unwrap();
2439
2440 let coord = Arc::new(MockCoord { last_lsn: AtomicU64::new(0) });
2442 env.set_replica_coordinator(coord.clone());
2443
2444 env.write_txn_commit_for_recovered(42).unwrap();
2446
2447 let recorded_lsn = coord.last_lsn.load(AO::SeqCst);
2449 assert_ne!(
2450 recorded_lsn, 0,
2451 "X-3: alloc_vlsn_for_recovered_commit must be called with the commit LSN"
2452 );
2453
2454 env.close().unwrap();
2455 drop(tmp);
2456 }
2457}