1use crate::daemon_manager::DaemonManager;
4use crate::engine_config::EngineConfig;
5use crate::env_stats::{
6 EnvironmentStats, EvictorStatsSnapshot, LockStatsSnapshot,
7 LogStatsSnapshot, TxnStatsSnapshot,
8};
9use crate::error::{EngineError, Result};
10use noxu_cleaner::{CleanResult, Cleaner};
11use noxu_dbi::EnvironmentImpl;
12use noxu_evictor::{Arbiter, EvictResult, EvictionSource, Evictor};
13use noxu_recovery::{
14 CheckpointConfig, CheckpointResult, Checkpointer, RecoveryManager,
15 log_scanner::InMemoryLogScanner,
16};
17use noxu_sync::Mutex;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20
21pub struct Engine {
33 config: EngineConfig,
35
36 env_impl: Arc<Mutex<EnvironmentImpl>>,
38
39 evictor: Arc<Evictor>,
41
42 cleaner: Arc<Cleaner>,
44
45 checkpointer: Arc<Checkpointer>,
47
48 daemon_manager: Mutex<DaemonManager>,
50
51 open: AtomicBool,
53
54 cache_usage: Arc<AtomicI64>,
56}
57
58impl Engine {
59 pub fn open(config: EngineConfig) -> Result<Self> {
77 config.validate().map_err(EngineError::InvalidConfig)?;
79
80 if config.allow_create && !config.home.exists() {
82 std::fs::create_dir_all(&config.home)?;
83 }
84
85 if !config.home.exists() {
87 return Err(EngineError::InvalidConfig(format!(
88 "environment directory does not exist: {}",
89 config.home.display()
90 )));
91 }
92
93 let env_impl = EnvironmentImpl::new(
95 &config.home,
96 config.read_only,
97 config.transactional,
98 )?;
99 let env_impl = Arc::new(Mutex::new(env_impl));
100
101 let cache_usage = Arc::new(AtomicI64::new(0));
103
104 let arbiter = Arbiter::new(
106 config.cache_size as i64,
107 Arc::clone(&cache_usage),
108 (config.cache_size / 10) as i64, (config.cache_size / 5) as i64, );
111
112 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
114
115 let cleaner = Arc::new(Cleaner::new(
117 config.cleaner_min_utilization,
118 config.cleaner_min_file_count,
119 0, ));
121
122 let checkpoint_config = CheckpointConfig::default()
124 .bytes_interval(config.checkpoint_bytes_interval);
125 let checkpointer = Arc::new(Checkpointer::new(checkpoint_config));
126
127 let mut recovery_manager = RecoveryManager::new();
131 let mut scanner = InMemoryLogScanner::new();
132 log::info!("Running recovery...");
133 let recovery_info =
134 recovery_manager.recover(&mut scanner, None, true)?;
135 log::info!(
136 "Recovery completed: last_used_lsn={:?}, checkpoint_start_lsn={:?}",
137 recovery_info.last_used_lsn,
138 recovery_info.checkpoint_start_lsn
139 );
140
141 let mut daemon_manager = DaemonManager::new(&config);
143
144 daemon_manager.start_daemons(
146 Arc::clone(&evictor),
147 Arc::clone(&cleaner),
148 Arc::clone(&checkpointer),
149 );
150
151 let engine = Engine {
152 config,
153 env_impl,
154 evictor,
155 cleaner,
156 checkpointer,
157 daemon_manager: Mutex::new(daemon_manager),
158 open: AtomicBool::new(true),
159 cache_usage,
160 };
161
162 log::info!("Engine opened successfully");
163 Ok(engine)
164 }
165
166 pub fn close(&self) -> Result<()> {
179 if !self.is_open() {
180 return Err(EngineError::EnvironmentClosed);
181 }
182
183 log::info!("Closing engine...");
184
185 self.open.store(false, Ordering::Relaxed);
187
188 self.daemon_manager.lock().shutdown();
190
191 if !self.config.read_only
193 && self.config.checkpointer_enabled
194 && let Err(e) = self.checkpointer.do_checkpoint("close")
195 {
196 log::warn!("Final checkpoint failed: {}", e);
197 }
198
199 if let Err(e) = self.env_impl.lock().close() {
203 log::warn!("EnvironmentImpl close failed: {}", e);
204 }
205
206 log::info!("Engine closed successfully");
207 Ok(())
208 }
209
210 pub fn is_open(&self) -> bool {
212 self.open.load(Ordering::Relaxed)
213 }
214
215 pub fn get_env_impl(&self) -> &Arc<Mutex<EnvironmentImpl>> {
217 &self.env_impl
218 }
219
220 pub fn get_evictor(&self) -> &Arc<Evictor> {
222 &self.evictor
223 }
224
225 pub fn get_cleaner(&self) -> &Arc<Cleaner> {
227 &self.cleaner
228 }
229
230 pub fn get_checkpointer(&self) -> &Arc<Checkpointer> {
232 &self.checkpointer
233 }
234
235 pub fn get_config(&self) -> &EngineConfig {
237 &self.config
238 }
239
240 pub fn checkpoint(&self, invoker: &str) -> Result<CheckpointResult> {
254 if !self.is_open() {
255 return Err(EngineError::EnvironmentClosed);
256 }
257
258 if self.config.read_only {
259 return Err(EngineError::InvalidConfig(
260 "cannot checkpoint read-only environment".to_string(),
261 ));
262 }
263
264 let result = self.checkpointer.do_checkpoint(invoker)?;
265 Ok(result)
266 }
267
268 pub fn clean(&self, n_files: u32) -> Result<CleanResult> {
282 if !self.is_open() {
283 return Err(EngineError::EnvironmentClosed);
284 }
285
286 if self.config.read_only {
287 return Err(EngineError::InvalidConfig(
288 "cannot clean read-only environment".to_string(),
289 ));
290 }
291
292 let result = self
293 .cleaner
294 .do_clean(n_files, false)
295 .map_err(EngineError::DatabaseError)?;
296 Ok(result)
297 }
298
299 pub fn clean_adaptive(&self) -> Result<(CleanResult, u64)> {
313 if !self.is_open() {
314 return Err(EngineError::EnvironmentClosed);
315 }
316 if self.config.read_only {
317 return Err(EngineError::InvalidConfig(
318 "cannot clean read-only environment".to_string(),
319 ));
320 }
321
322 let bytes_written = {
324 let env_impl = self.env_impl.lock();
325 env_impl
326 .get_log_manager()
327 .map(|lm| lm.get_stats().n_sequential_write_bytes)
328 .unwrap_or(0)
329 };
330
331 let cleaning_needed =
333 self.cleaner.get_file_selector().lock().has_files_to_clean();
334
335 let (sleep_ms, n_files) =
336 self.cleaner.throttle.update(bytes_written, cleaning_needed);
337
338 let result = self
339 .cleaner
340 .do_clean(n_files, false)
341 .map_err(EngineError::DatabaseError)?;
342
343 Ok((result, sleep_ms))
344 }
345
346 pub fn evict(&self) -> Result<EvictResult> {
354 if !self.is_open() {
355 return Err(EngineError::EnvironmentClosed);
356 }
357
358 let result = self.evictor.do_evict(EvictionSource::Manual);
359 Ok(result)
360 }
361
362 pub fn get_stats(&self) -> EnvironmentStats {
366 let evictor_stats = self.evictor.get_stats();
367 let cleaner_stats = self.cleaner.get_stats();
368 let checkpoint_stats = self.checkpointer.get_stats();
369
370 let env_impl = self.env_impl.lock();
371 let n_databases = env_impl.n_databases() as u32;
372 let log_stats = env_impl.get_log_manager().map(|lm| lm.get_stats());
373 let lock_stats = env_impl.get_lock_manager().get_stats();
374 let real_n_lock_tables =
375 env_impl.get_lock_manager().n_lock_tables() as u64;
376 let txn_stats = env_impl.get_txn_manager().get_stats();
377 let throughput = env_impl.get_throughput_snapshot();
378 drop(env_impl);
379
380 EnvironmentStats {
381 cache_size: self.config.cache_size,
382 cache_usage: self.cache_usage.load(Ordering::Relaxed) as u64,
383 n_databases,
384 evictor: EvictorStatsSnapshot::from(evictor_stats),
385 log: log_stats
386 .as_ref()
387 .map(LogStatsSnapshot::from)
388 .unwrap_or_default(),
389 lock: LockStatsSnapshot {
390 n_lock_tables: real_n_lock_tables,
394 ..LockStatsSnapshot::from(&lock_stats)
395 },
396 txn: TxnStatsSnapshot::from(&txn_stats),
397 cleaner: cleaner_stats.snapshot(),
398 checkpoint: checkpoint_stats.snapshot(),
399 throughput,
400 }
401 }
402
403 pub fn get_cache_usage(&self) -> u64 {
405 self.cache_usage.load(Ordering::Relaxed) as u64
406 }
407
408 pub fn get_cache_budget(&self) -> u64 {
410 self.config.cache_size
411 }
412
413 pub fn is_cache_over_budget(&self) -> bool {
415 self.get_cache_usage() > self.get_cache_budget()
416 }
417}
418
419impl Drop for Engine {
420 fn drop(&mut self) {
421 if self.is_open()
422 && let Err(e) = self.close()
423 {
424 log::error!("Error closing engine in drop: {}", e);
425 }
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use tempfile::TempDir;
433
434 fn temp_config() -> (TempDir, EngineConfig) {
435 let dir = TempDir::new().unwrap();
436 let config = EngineConfig::new(dir.path())
437 .allow_create(true)
438 .cache_size(10 * 1024 * 1024)
439 .evictor_wakeup_interval_ms(100)
440 .cleaner_wakeup_interval_ms(100)
441 .checkpointer_wakeup_interval_ms(100);
442 (dir, config)
443 }
444
445 #[test]
446 fn test_engine_open_and_close() {
447 let (_dir, config) = temp_config();
448 let engine = Engine::open(config).unwrap();
449 assert!(engine.is_open());
450 assert!(engine.get_env_impl().lock().is_open());
451
452 engine.close().unwrap();
453 assert!(!engine.is_open());
454 assert!(
457 !engine.get_env_impl().lock().is_open(),
458 "EnvironmentImpl must be closed after Engine::close"
459 );
460 }
461
462 #[test]
463 fn test_engine_open_creates_directory() {
464 let dir = TempDir::new().unwrap();
465 let home = dir.path().join("newdb");
466 let config = EngineConfig::new(&home).allow_create(true);
467
468 assert!(!home.exists());
469 let engine = Engine::open(config).unwrap();
470 assert!(home.exists());
471 assert!(engine.is_open());
472 }
473
474 #[test]
475 fn test_engine_open_fails_without_create() {
476 let dir = TempDir::new().unwrap();
477 let home = dir.path().join("nonexistent");
478 let config = EngineConfig::new(&home).allow_create(false);
479
480 let result = Engine::open(config);
481 assert!(result.is_err());
482 }
483
484 #[test]
485 fn test_engine_invalid_config() {
486 let dir = TempDir::new().unwrap();
487 let config = EngineConfig::new(dir.path()).cache_size(1024); let result = Engine::open(config);
490 assert!(result.is_err());
491 match result {
492 Err(EngineError::InvalidConfig(_)) => { }
493 _ => panic!("Expected InvalidConfig error"),
494 }
495 }
496
497 #[test]
498 fn test_engine_double_close() {
499 let (_dir, config) = temp_config();
500 let engine = Engine::open(config).unwrap();
501
502 engine.close().unwrap();
503 let result = engine.close();
504 assert!(result.is_err());
505 assert!(matches!(result.unwrap_err(), EngineError::EnvironmentClosed));
506 }
507
508 #[test]
509 fn test_engine_get_subsystems() {
510 let (_dir, config) = temp_config();
511 let engine = Engine::open(config).unwrap();
512
513 assert!(engine.get_env_impl().lock().is_open());
514 assert_eq!(
515 engine.get_evictor().get_lru_sizes().0
516 + engine.get_evictor().get_lru_sizes().1,
517 0
518 );
519 let _ = engine.get_cleaner();
521 let _ = engine.get_checkpointer();
522 }
523
524 #[test]
525 fn test_engine_checkpoint() {
526 let (_dir, config) = temp_config();
527 let engine = Engine::open(config).unwrap();
528
529 let result = engine.checkpoint("test");
530 assert!(result.is_ok());
531 }
532
533 #[test]
534 fn test_engine_checkpoint_readonly() {
535 let dir = TempDir::new().unwrap();
536 let config = EngineConfig::new(dir.path())
537 .allow_create(true)
538 .cache_size(10 * 1024 * 1024)
539 .read_only(true)
540 .cleaner_enabled(false)
541 .checkpointer_enabled(false)
542 .evictor_wakeup_interval_ms(100);
543 let engine = Engine::open(config).unwrap();
544
545 let result = engine.checkpoint("test");
546 assert!(result.is_err());
547 }
548
549 #[test]
550 fn test_engine_clean() {
551 let (_dir, config) = temp_config();
552 let engine = Engine::open(config).unwrap();
553
554 let result = engine.clean(5);
555 assert!(result.is_ok());
556 }
557
558 #[test]
559 fn test_engine_clean_readonly() {
560 let dir = TempDir::new().unwrap();
561 let config = EngineConfig::new(dir.path())
562 .allow_create(true)
563 .cache_size(10 * 1024 * 1024)
564 .read_only(true)
565 .cleaner_enabled(false)
566 .checkpointer_enabled(false)
567 .evictor_wakeup_interval_ms(100);
568 let engine = Engine::open(config).unwrap();
569
570 let result = engine.clean(5);
571 assert!(result.is_err());
572 }
573
574 #[test]
575 fn test_engine_evict() {
576 let (_dir, config) = temp_config();
577 let engine = Engine::open(config).unwrap();
578
579 let result = engine.evict();
580 assert!(result.is_ok());
581 let _evict_result = result.unwrap();
582 }
584
585 #[test]
586 fn test_engine_get_stats() {
587 let (_dir, config) = temp_config();
588 let engine = Engine::open(config).unwrap();
589
590 let stats = engine.get_stats();
591 assert_eq!(stats.cache_size, 10 * 1024 * 1024);
592 assert_eq!(stats.lock.n_lock_tables, 64);
595 }
596
597 #[test]
598 fn test_engine_cache_budget() {
599 let (_dir, config) = temp_config();
600 let engine = Engine::open(config).unwrap();
601
602 assert_eq!(engine.get_cache_budget(), 10 * 1024 * 1024);
603 assert_eq!(engine.get_cache_usage(), 0); assert!(!engine.is_cache_over_budget());
605 }
606
607 #[test]
608 fn test_engine_operations_after_close() {
609 let (_dir, config) = temp_config();
610 let engine = Engine::open(config).unwrap();
611 engine.close().unwrap();
612
613 assert!(engine.checkpoint("test").is_err());
614 assert!(engine.clean(5).is_err());
615 assert!(engine.evict().is_err());
616 }
617
618 #[test]
619 fn test_engine_drop_closes() {
620 let (_dir, config) = temp_config();
621 let engine = Engine::open(config).unwrap();
622 assert!(engine.is_open());
623 drop(engine);
624 }
626
627 #[test]
628 fn test_engine_readonly() {
629 let dir = TempDir::new().unwrap();
630 let config = EngineConfig::new(dir.path())
631 .allow_create(true)
632 .cache_size(10 * 1024 * 1024)
633 .read_only(true)
634 .cleaner_enabled(false)
635 .checkpointer_enabled(false)
636 .evictor_wakeup_interval_ms(100);
637 let engine = Engine::open(config).unwrap();
638
639 assert!(engine.is_open());
640 assert!(engine.get_config().read_only);
641
642 let stats = engine.get_stats();
644 assert_eq!(stats.cache_size, 10 * 1024 * 1024);
645
646 assert!(engine.checkpoint("test").is_err());
648 assert!(engine.clean(5).is_err());
649 }
650}