1use crate::daemon_manager::DaemonManager;
4use crate::engine_config::EngineConfig;
5use crate::env_stats::{
6 EnvironmentStats, EvictorStatsSnapshot, LockStatsSnapshot,
7 LogStatsSnapshot, TxnStatsSnapshot,
8};
9use crate::error::{EngineError, Result};
10use noxu_cleaner::{CleanResult, Cleaner};
11use noxu_dbi::EnvironmentImpl;
12use noxu_evictor::{Arbiter, EvictResult, EvictionSource, Evictor};
13use noxu_recovery::{
14 CheckpointConfig, CheckpointResult, Checkpointer, RecoveryManager,
15 log_scanner::InMemoryLogScanner,
16};
17use noxu_sync::Mutex;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20
21pub struct Engine {
33 config: EngineConfig,
35
36 env_impl: Arc<Mutex<EnvironmentImpl>>,
38
39 evictor: Arc<Evictor>,
41
42 cleaner: Arc<Cleaner>,
44
45 checkpointer: Arc<Checkpointer>,
47
48 daemon_manager: Mutex<DaemonManager>,
50
51 open: AtomicBool,
53
54 cache_usage: Arc<AtomicI64>,
56}
57
58impl Engine {
59 pub fn open(config: EngineConfig) -> Result<Self> {
77 config.validate().map_err(EngineError::InvalidConfig)?;
79
80 if config.allow_create && !config.home.exists() {
82 std::fs::create_dir_all(&config.home)?;
83 }
84
85 if !config.home.exists() {
87 return Err(EngineError::InvalidConfig(format!(
88 "environment directory does not exist: {}",
89 config.home.display()
90 )));
91 }
92
93 let env_impl = EnvironmentImpl::new(
95 &config.home,
96 config.read_only,
97 config.transactional,
98 )?;
99 let env_impl = Arc::new(Mutex::new(env_impl));
100
101 let cache_usage = Arc::new(AtomicI64::new(0));
103
104 let arbiter = Arbiter::new(
106 config.cache_size as i64,
107 Arc::clone(&cache_usage),
108 (config.cache_size / 10) as i64, (config.cache_size / 5) as i64, );
111
112 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
114
115 let cleaner = Arc::new(Cleaner::new(
117 config.cleaner_min_utilization,
118 config.cleaner_min_file_count,
119 0, ));
121
122 let checkpoint_config = CheckpointConfig::default()
124 .bytes_interval(config.checkpoint_bytes_interval);
125 let checkpointer = Arc::new(Checkpointer::new(checkpoint_config));
126
127 let mut recovery_manager = RecoveryManager::new();
131 let mut scanner = InMemoryLogScanner::new();
132 log::info!("Running recovery...");
133 let recovery_info =
134 recovery_manager.recover(&mut scanner, None, true)?;
135 log::info!(
136 "Recovery completed: last_used_lsn={:?}, checkpoint_start_lsn={:?}",
137 recovery_info.last_used_lsn,
138 recovery_info.checkpoint_start_lsn
139 );
140
141 let mut daemon_manager = DaemonManager::new(&config);
143
144 daemon_manager.start_daemons(
146 Arc::clone(&evictor),
147 Arc::clone(&cleaner),
148 Arc::clone(&checkpointer),
149 );
150
151 let engine = Engine {
152 config,
153 env_impl,
154 evictor,
155 cleaner,
156 checkpointer,
157 daemon_manager: Mutex::new(daemon_manager),
158 open: AtomicBool::new(true),
159 cache_usage,
160 };
161
162 log::info!("Engine opened successfully");
163 Ok(engine)
164 }
165
166 pub fn close(&self) -> Result<()> {
175 if !self.is_open() {
176 return Err(EngineError::EnvironmentClosed);
177 }
178
179 log::info!("Closing engine...");
180
181 self.open.store(false, Ordering::Relaxed);
183
184 self.daemon_manager.lock().shutdown();
186
187 if !self.config.read_only
189 && self.config.checkpointer_enabled
190 && let Err(e) = self.checkpointer.do_checkpoint("close")
191 {
192 log::warn!("Final checkpoint failed: {}", e);
193 }
194
195 log::info!("Engine closed successfully");
199 Ok(())
200 }
201
202 pub fn is_open(&self) -> bool {
204 self.open.load(Ordering::Relaxed)
205 }
206
207 pub fn get_env_impl(&self) -> &Arc<Mutex<EnvironmentImpl>> {
209 &self.env_impl
210 }
211
212 pub fn get_evictor(&self) -> &Arc<Evictor> {
214 &self.evictor
215 }
216
217 pub fn get_cleaner(&self) -> &Arc<Cleaner> {
219 &self.cleaner
220 }
221
222 pub fn get_checkpointer(&self) -> &Arc<Checkpointer> {
224 &self.checkpointer
225 }
226
227 pub fn get_config(&self) -> &EngineConfig {
229 &self.config
230 }
231
232 pub fn checkpoint(&self, invoker: &str) -> Result<CheckpointResult> {
246 if !self.is_open() {
247 return Err(EngineError::EnvironmentClosed);
248 }
249
250 if self.config.read_only {
251 return Err(EngineError::InvalidConfig(
252 "cannot checkpoint read-only environment".to_string(),
253 ));
254 }
255
256 let result = self.checkpointer.do_checkpoint(invoker)?;
257 Ok(result)
258 }
259
260 pub fn clean(&self, n_files: u32) -> Result<CleanResult> {
274 if !self.is_open() {
275 return Err(EngineError::EnvironmentClosed);
276 }
277
278 if self.config.read_only {
279 return Err(EngineError::InvalidConfig(
280 "cannot clean read-only environment".to_string(),
281 ));
282 }
283
284 let result = self
285 .cleaner
286 .do_clean(n_files, false)
287 .map_err(EngineError::DatabaseError)?;
288 Ok(result)
289 }
290
291 pub fn clean_adaptive(&self) -> Result<(CleanResult, u64)> {
305 if !self.is_open() {
306 return Err(EngineError::EnvironmentClosed);
307 }
308 if self.config.read_only {
309 return Err(EngineError::InvalidConfig(
310 "cannot clean read-only environment".to_string(),
311 ));
312 }
313
314 let bytes_written = {
316 let env_impl = self.env_impl.lock();
317 env_impl
318 .get_log_manager()
319 .map(|lm| lm.get_stats().n_sequential_write_bytes)
320 .unwrap_or(0)
321 };
322
323 let cleaning_needed =
325 self.cleaner.get_file_selector().lock().has_files_to_clean();
326
327 let (sleep_ms, n_files) =
328 self.cleaner.throttle.update(bytes_written, cleaning_needed);
329
330 let result = self
331 .cleaner
332 .do_clean(n_files, false)
333 .map_err(EngineError::DatabaseError)?;
334
335 Ok((result, sleep_ms))
336 }
337
338 pub fn evict(&self) -> Result<EvictResult> {
346 if !self.is_open() {
347 return Err(EngineError::EnvironmentClosed);
348 }
349
350 let result = self.evictor.do_evict(EvictionSource::Manual);
351 Ok(result)
352 }
353
354 pub fn get_stats(&self) -> EnvironmentStats {
358 let evictor_stats = self.evictor.get_stats();
359 let cleaner_stats = self.cleaner.get_stats();
360 let checkpoint_stats = self.checkpointer.get_stats();
361
362 let env_impl = self.env_impl.lock();
363 let n_databases = env_impl.n_databases() as u32;
364 let log_stats = env_impl.get_log_manager().map(|lm| lm.get_stats());
365 let lock_stats = env_impl.get_lock_manager().get_stats();
366 let real_n_lock_tables =
367 env_impl.get_lock_manager().n_lock_tables() as u64;
368 let txn_stats = env_impl.get_txn_manager().get_stats();
369 let throughput = env_impl.get_throughput_snapshot();
370 drop(env_impl);
371
372 EnvironmentStats {
373 cache_size: self.config.cache_size,
374 cache_usage: self.cache_usage.load(Ordering::Relaxed) as u64,
375 n_databases,
376 evictor: EvictorStatsSnapshot::from(evictor_stats),
377 log: log_stats
378 .as_ref()
379 .map(LogStatsSnapshot::from)
380 .unwrap_or_default(),
381 lock: LockStatsSnapshot {
382 n_lock_tables: real_n_lock_tables,
386 ..LockStatsSnapshot::from(&lock_stats)
387 },
388 txn: TxnStatsSnapshot::from(&txn_stats),
389 cleaner: cleaner_stats.snapshot(),
390 checkpoint: checkpoint_stats.snapshot(),
391 throughput,
392 }
393 }
394
395 pub fn get_cache_usage(&self) -> u64 {
397 self.cache_usage.load(Ordering::Relaxed) as u64
398 }
399
400 pub fn get_cache_budget(&self) -> u64 {
402 self.config.cache_size
403 }
404
405 pub fn is_cache_over_budget(&self) -> bool {
407 self.get_cache_usage() > self.get_cache_budget()
408 }
409}
410
411impl Drop for Engine {
412 fn drop(&mut self) {
413 if self.is_open()
414 && let Err(e) = self.close()
415 {
416 log::error!("Error closing engine in drop: {}", e);
417 }
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use tempfile::TempDir;
425
426 fn temp_config() -> (TempDir, EngineConfig) {
427 let dir = TempDir::new().unwrap();
428 let config = EngineConfig::new(dir.path())
429 .allow_create(true)
430 .cache_size(10 * 1024 * 1024)
431 .evictor_wakeup_interval_ms(100)
432 .cleaner_wakeup_interval_ms(100)
433 .checkpointer_wakeup_interval_ms(100);
434 (dir, config)
435 }
436
437 #[test]
438 fn test_engine_open_and_close() {
439 let (_dir, config) = temp_config();
440 let engine = Engine::open(config).unwrap();
441 assert!(engine.is_open());
442
443 engine.close().unwrap();
444 assert!(!engine.is_open());
445 }
446
447 #[test]
448 fn test_engine_open_creates_directory() {
449 let dir = TempDir::new().unwrap();
450 let home = dir.path().join("newdb");
451 let config = EngineConfig::new(&home).allow_create(true);
452
453 assert!(!home.exists());
454 let engine = Engine::open(config).unwrap();
455 assert!(home.exists());
456 assert!(engine.is_open());
457 }
458
459 #[test]
460 fn test_engine_open_fails_without_create() {
461 let dir = TempDir::new().unwrap();
462 let home = dir.path().join("nonexistent");
463 let config = EngineConfig::new(&home).allow_create(false);
464
465 let result = Engine::open(config);
466 assert!(result.is_err());
467 }
468
469 #[test]
470 fn test_engine_invalid_config() {
471 let dir = TempDir::new().unwrap();
472 let config = EngineConfig::new(dir.path()).cache_size(1024); let result = Engine::open(config);
475 assert!(result.is_err());
476 match result {
477 Err(EngineError::InvalidConfig(_)) => { }
478 _ => panic!("Expected InvalidConfig error"),
479 }
480 }
481
482 #[test]
483 fn test_engine_double_close() {
484 let (_dir, config) = temp_config();
485 let engine = Engine::open(config).unwrap();
486
487 engine.close().unwrap();
488 let result = engine.close();
489 assert!(result.is_err());
490 assert!(matches!(result.unwrap_err(), EngineError::EnvironmentClosed));
491 }
492
493 #[test]
494 fn test_engine_get_subsystems() {
495 let (_dir, config) = temp_config();
496 let engine = Engine::open(config).unwrap();
497
498 assert!(engine.get_env_impl().lock().is_open());
499 assert_eq!(
500 engine.get_evictor().get_lru_sizes().0
501 + engine.get_evictor().get_lru_sizes().1,
502 0
503 );
504 let _ = engine.get_cleaner();
506 let _ = engine.get_checkpointer();
507 }
508
509 #[test]
510 fn test_engine_checkpoint() {
511 let (_dir, config) = temp_config();
512 let engine = Engine::open(config).unwrap();
513
514 let result = engine.checkpoint("test");
515 assert!(result.is_ok());
516 }
517
518 #[test]
519 fn test_engine_checkpoint_readonly() {
520 let dir = TempDir::new().unwrap();
521 let config = EngineConfig::new(dir.path())
522 .allow_create(true)
523 .cache_size(10 * 1024 * 1024)
524 .read_only(true)
525 .cleaner_enabled(false)
526 .checkpointer_enabled(false)
527 .evictor_wakeup_interval_ms(100);
528 let engine = Engine::open(config).unwrap();
529
530 let result = engine.checkpoint("test");
531 assert!(result.is_err());
532 }
533
534 #[test]
535 fn test_engine_clean() {
536 let (_dir, config) = temp_config();
537 let engine = Engine::open(config).unwrap();
538
539 let result = engine.clean(5);
540 assert!(result.is_ok());
541 }
542
543 #[test]
544 fn test_engine_clean_readonly() {
545 let dir = TempDir::new().unwrap();
546 let config = EngineConfig::new(dir.path())
547 .allow_create(true)
548 .cache_size(10 * 1024 * 1024)
549 .read_only(true)
550 .cleaner_enabled(false)
551 .checkpointer_enabled(false)
552 .evictor_wakeup_interval_ms(100);
553 let engine = Engine::open(config).unwrap();
554
555 let result = engine.clean(5);
556 assert!(result.is_err());
557 }
558
559 #[test]
560 fn test_engine_evict() {
561 let (_dir, config) = temp_config();
562 let engine = Engine::open(config).unwrap();
563
564 let result = engine.evict();
565 assert!(result.is_ok());
566 let _evict_result = result.unwrap();
567 }
569
570 #[test]
571 fn test_engine_get_stats() {
572 let (_dir, config) = temp_config();
573 let engine = Engine::open(config).unwrap();
574
575 let stats = engine.get_stats();
576 assert_eq!(stats.cache_size, 10 * 1024 * 1024);
577 assert_eq!(stats.lock.n_lock_tables, 64);
580 }
581
582 #[test]
583 fn test_engine_cache_budget() {
584 let (_dir, config) = temp_config();
585 let engine = Engine::open(config).unwrap();
586
587 assert_eq!(engine.get_cache_budget(), 10 * 1024 * 1024);
588 assert_eq!(engine.get_cache_usage(), 0); assert!(!engine.is_cache_over_budget());
590 }
591
592 #[test]
593 fn test_engine_operations_after_close() {
594 let (_dir, config) = temp_config();
595 let engine = Engine::open(config).unwrap();
596 engine.close().unwrap();
597
598 assert!(engine.checkpoint("test").is_err());
599 assert!(engine.clean(5).is_err());
600 assert!(engine.evict().is_err());
601 }
602
603 #[test]
604 fn test_engine_drop_closes() {
605 let (_dir, config) = temp_config();
606 let engine = Engine::open(config).unwrap();
607 assert!(engine.is_open());
608 drop(engine);
609 }
611
612 #[test]
613 fn test_engine_readonly() {
614 let dir = TempDir::new().unwrap();
615 let config = EngineConfig::new(dir.path())
616 .allow_create(true)
617 .cache_size(10 * 1024 * 1024)
618 .read_only(true)
619 .cleaner_enabled(false)
620 .checkpointer_enabled(false)
621 .evictor_wakeup_interval_ms(100);
622 let engine = Engine::open(config).unwrap();
623
624 assert!(engine.is_open());
625 assert!(engine.get_config().read_only);
626
627 let stats = engine.get_stats();
629 assert_eq!(stats.cache_size, 10 * 1024 * 1024);
630
631 assert!(engine.checkpoint("test").is_err());
633 assert!(engine.clean(5).is_err());
634 }
635}