Skip to main content

noxu_engine/
engine.rs

1//! Main engine implementation for Noxu DB.
2
3use crate::daemon_manager::DaemonManager;
4use crate::engine_config::EngineConfig;
5use crate::env_stats::{
6    EnvironmentStats, EvictorStatsSnapshot, LockStatsSnapshot,
7    LogStatsSnapshot, TxnStatsSnapshot,
8};
9use crate::error::{EngineError, Result};
10use noxu_cleaner::{CleanResult, Cleaner};
11use noxu_dbi::EnvironmentImpl;
12use noxu_evictor::{Arbiter, EvictResult, EvictionSource, Evictor};
13use noxu_recovery::{
14    CheckpointConfig, CheckpointResult, Checkpointer, RecoveryManager,
15    log_scanner::InMemoryLogScanner,
16};
17use noxu_sync::Mutex;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20
21/// The Noxu DB engine.
22///
23/// Wires together all internal subsystems:
24/// - EnvironmentImpl (dbi layer)
25/// - Evictor (cache management)
26/// - Cleaner (log GC)
27/// - Checkpointer (durability)
28/// - DaemonManager (background threads)
29///
30/// This is the internal engine that `noxu-db` wraps. It coordinates
31/// all subsystems and provides a unified interface for database operations.
32pub struct Engine {
33    /// Engine configuration.
34    config: EngineConfig,
35
36    /// The internal environment implementation (dbi layer).
37    env_impl: Arc<Mutex<EnvironmentImpl>>,
38
39    /// The evictor for cache management.
40    evictor: Arc<Evictor>,
41
42    /// The cleaner for log garbage collection.
43    cleaner: Arc<Cleaner>,
44
45    /// The checkpointer for durability.
46    checkpointer: Arc<Checkpointer>,
47
48    /// The daemon manager for background threads.
49    daemon_manager: Mutex<DaemonManager>,
50
51    /// Whether the engine is open.
52    open: AtomicBool,
53
54    /// Memory budget tracker (shared with arbiter).
55    cache_usage: Arc<AtomicI64>,
56}
57
58impl Engine {
59    /// Opens a Noxu DB environment with the given configuration.
60    ///
61    /// This is the main entry point for opening an environment. It:
62    /// 1. Validates configuration
63    /// 2. Creates the environment directory if needed
64    /// 3. Creates EnvironmentImpl (dbi layer)
65    /// 4. Creates Evictor, Cleaner, Checkpointer
66    /// 5. Runs recovery (RecoveryManager)
67    /// 6. Starts daemon threads
68    /// 7. Returns the Engine
69    ///
70    /// # Errors
71    /// Returns an error if:
72    /// - Configuration is invalid
73    /// - Environment directory cannot be created
74    /// - Recovery fails
75    /// - Any subsystem initialization fails
76    pub fn open(config: EngineConfig) -> Result<Self> {
77        // Validate configuration
78        config.validate().map_err(EngineError::InvalidConfig)?;
79
80        // Create environment directory if needed
81        if config.allow_create && !config.home.exists() {
82            std::fs::create_dir_all(&config.home)?;
83        }
84
85        // Verify directory exists
86        if !config.home.exists() {
87            return Err(EngineError::InvalidConfig(format!(
88                "environment directory does not exist: {}",
89                config.home.display()
90            )));
91        }
92
93        // Create EnvironmentImpl (dbi layer)
94        let env_impl = EnvironmentImpl::new(
95            &config.home,
96            config.read_only,
97            config.transactional,
98        )?;
99        let env_impl = Arc::new(Mutex::new(env_impl));
100
101        // Create cache usage tracker
102        let cache_usage = Arc::new(AtomicI64::new(0));
103
104        // Create arbiter for eviction decisions
105        let arbiter = Arbiter::new(
106            config.cache_size as i64,
107            Arc::clone(&cache_usage),
108            (config.cache_size / 10) as i64, // 10% eviction pledge
109            (config.cache_size / 5) as i64,  // 20% critical threshold
110        );
111
112        // Create evictor
113        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
114
115        // Create cleaner
116        let cleaner = Arc::new(Cleaner::new(
117            config.cleaner_min_utilization,
118            config.cleaner_min_file_count,
119            0, // min age
120        ));
121
122        // Create checkpointer
123        let checkpoint_config = CheckpointConfig::default()
124            .bytes_interval(config.checkpoint_bytes_interval);
125        let checkpointer = Arc::new(Checkpointer::new(checkpoint_config));
126
127        // Run recovery using an empty in-memory scanner (no log files yet on
128        // fresh open; a real LogFileScanner will replace this once the log
129        // manager is wired through the engine).
130        let mut recovery_manager = RecoveryManager::new();
131        let mut scanner = InMemoryLogScanner::new();
132        log::info!("Running recovery...");
133        let recovery_info =
134            recovery_manager.recover(&mut scanner, None, true)?;
135        log::info!(
136            "Recovery completed: last_used_lsn={:?}, checkpoint_start_lsn={:?}",
137            recovery_info.last_used_lsn,
138            recovery_info.checkpoint_start_lsn
139        );
140
141        // Create daemon manager
142        let mut daemon_manager = DaemonManager::new(&config);
143
144        // Start daemons
145        daemon_manager.start_daemons(
146            Arc::clone(&evictor),
147            Arc::clone(&cleaner),
148            Arc::clone(&checkpointer),
149        );
150
151        let engine = Engine {
152            config,
153            env_impl,
154            evictor,
155            cleaner,
156            checkpointer,
157            daemon_manager: Mutex::new(daemon_manager),
158            open: AtomicBool::new(true),
159            cache_usage,
160        };
161
162        log::info!("Engine opened successfully");
163        Ok(engine)
164    }
165
166    /// Closes the environment.
167    ///
168    /// Performs orderly shutdown:
169    /// 1. Stop daemon threads
170    /// 2. Flush final checkpoint
171    /// 3. Close EnvironmentImpl
172    ///
173    /// After close(), the Engine cannot be used.
174    pub fn close(&self) -> Result<()> {
175        if !self.is_open() {
176            return Err(EngineError::EnvironmentClosed);
177        }
178
179        log::info!("Closing engine...");
180
181        // Mark as closed
182        self.open.store(false, Ordering::Relaxed);
183
184        // Stop daemon threads
185        self.daemon_manager.lock().shutdown();
186
187        // Flush final checkpoint
188        if !self.config.read_only
189            && self.config.checkpointer_enabled
190            && let Err(e) = self.checkpointer.do_checkpoint("close")
191        {
192            log::warn!("Final checkpoint failed: {}", e);
193        }
194
195        // Close environment impl
196        // (EnvironmentImpl doesn't have explicit close yet - would be added in full implementation)
197
198        log::info!("Engine closed successfully");
199        Ok(())
200    }
201
202    /// Returns whether the engine is open.
203    pub fn is_open(&self) -> bool {
204        self.open.load(Ordering::Relaxed)
205    }
206
207    /// Gets a reference to the EnvironmentImpl.
208    pub fn get_env_impl(&self) -> &Arc<Mutex<EnvironmentImpl>> {
209        &self.env_impl
210    }
211
212    /// Gets a reference to the Evictor.
213    pub fn get_evictor(&self) -> &Arc<Evictor> {
214        &self.evictor
215    }
216
217    /// Gets a reference to the Cleaner.
218    pub fn get_cleaner(&self) -> &Arc<Cleaner> {
219        &self.cleaner
220    }
221
222    /// Gets a reference to the Checkpointer.
223    pub fn get_checkpointer(&self) -> &Arc<Checkpointer> {
224        &self.checkpointer
225    }
226
227    /// Gets the engine configuration.
228    pub fn get_config(&self) -> &EngineConfig {
229        &self.config
230    }
231
232    /// Performs a checkpoint.
233    ///
234    /// # Arguments
235    /// * `invoker` - Description of who invoked the checkpoint (for logging)
236    ///
237    /// # Returns
238    /// Information about the checkpoint that was performed.
239    ///
240    /// # Errors
241    /// * [`EngineError::EnvironmentClosed`] if the engine has been closed.
242    /// * [`EngineError::InvalidConfig`] if the engine is opened read-only.
243    /// * Any error returned by the underlying checkpointer
244    ///   (e.g. [`EngineError::DatabaseError`] propagated from log/tree I/O).
245    pub fn checkpoint(&self, invoker: &str) -> Result<CheckpointResult> {
246        if !self.is_open() {
247            return Err(EngineError::EnvironmentClosed);
248        }
249
250        if self.config.read_only {
251            return Err(EngineError::InvalidConfig(
252                "cannot checkpoint read-only environment".to_string(),
253            ));
254        }
255
256        let result = self.checkpointer.do_checkpoint(invoker)?;
257        Ok(result)
258    }
259
260    /// Performs log cleaning.
261    ///
262    /// # Arguments
263    /// * `n_files` - Maximum number of files to clean
264    ///
265    /// # Returns
266    /// Information about the cleaning operation.
267    ///
268    /// # Errors
269    /// * [`EngineError::EnvironmentClosed`] if the engine has been closed.
270    /// * [`EngineError::InvalidConfig`] if the engine is opened read-only.
271    /// * [`EngineError::DatabaseError`] for any I/O or tree error returned
272    ///   by the underlying cleaner.
273    pub fn clean(&self, n_files: u32) -> Result<CleanResult> {
274        if !self.is_open() {
275            return Err(EngineError::EnvironmentClosed);
276        }
277
278        if self.config.read_only {
279            return Err(EngineError::InvalidConfig(
280                "cannot clean read-only environment".to_string(),
281            ));
282        }
283
284        let result = self
285            .cleaner
286            .do_clean(n_files, false)
287            .map_err(EngineError::DatabaseError)?;
288        Ok(result)
289    }
290
291    /// Throttle-driven cleaning pass for use by the cleaner daemon.
292    ///
293    /// Reads the current log write-byte counter, updates the throttle, then
294    /// cleans `n_files` recommended by the throttle.
295    ///
296    /// Returns `(CleanResult, sleep_ms)` — the daemon should sleep
297    /// `sleep_ms` milliseconds before its next pass.
298    ///
299    /// # Errors
300    /// * [`EngineError::EnvironmentClosed`] if the engine has been closed.
301    /// * [`EngineError::InvalidConfig`] if the engine is opened read-only.
302    /// * [`EngineError::DatabaseError`] for any I/O or tree error returned
303    ///   by the underlying cleaner.
304    pub fn clean_adaptive(&self) -> Result<(CleanResult, u64)> {
305        if !self.is_open() {
306            return Err(EngineError::EnvironmentClosed);
307        }
308        if self.config.read_only {
309            return Err(EngineError::InvalidConfig(
310                "cannot clean read-only environment".to_string(),
311            ));
312        }
313
314        // Read current write byte count from log manager stats.
315        let bytes_written = {
316            let env_impl = self.env_impl.lock();
317            env_impl
318                .get_log_manager()
319                .map(|lm| lm.get_stats().n_sequential_write_bytes)
320                .unwrap_or(0)
321        };
322
323        // Determine if cleaning is needed (any file below min utilization).
324        let cleaning_needed =
325            self.cleaner.get_file_selector().lock().has_files_to_clean();
326
327        let (sleep_ms, n_files) =
328            self.cleaner.throttle.update(bytes_written, cleaning_needed);
329
330        let result = self
331            .cleaner
332            .do_clean(n_files, false)
333            .map_err(EngineError::DatabaseError)?;
334
335        Ok((result, sleep_ms))
336    }
337
338    /// Performs cache eviction.
339    ///
340    /// # Returns
341    /// Information about the eviction operation.
342    ///
343    /// # Errors
344    /// * [`EngineError::EnvironmentClosed`] if the engine has been closed.
345    pub fn evict(&self) -> Result<EvictResult> {
346        if !self.is_open() {
347            return Err(EngineError::EnvironmentClosed);
348        }
349
350        let result = self.evictor.do_evict(EvictionSource::Manual);
351        Ok(result)
352    }
353
354    /// Collects environment statistics.
355    ///
356    /// Returns a snapshot of statistics from all subsystems.
357    pub fn get_stats(&self) -> EnvironmentStats {
358        let evictor_stats = self.evictor.get_stats();
359        let cleaner_stats = self.cleaner.get_stats();
360        let checkpoint_stats = self.checkpointer.get_stats();
361
362        let env_impl = self.env_impl.lock();
363        let n_databases = env_impl.n_databases() as u32;
364        let log_stats = env_impl.get_log_manager().map(|lm| lm.get_stats());
365        let lock_stats = env_impl.get_lock_manager().get_stats();
366        let real_n_lock_tables =
367            env_impl.get_lock_manager().n_lock_tables() as u64;
368        let txn_stats = env_impl.get_txn_manager().get_stats();
369        let throughput = env_impl.get_throughput_snapshot();
370        drop(env_impl);
371
372        EnvironmentStats {
373            cache_size: self.config.cache_size,
374            cache_usage: self.cache_usage.load(Ordering::Relaxed) as u64,
375            n_databases,
376            evictor: EvictorStatsSnapshot::from(evictor_stats),
377            log: log_stats
378                .as_ref()
379                .map(LogStatsSnapshot::from)
380                .unwrap_or_default(),
381            lock: LockStatsSnapshot {
382                // Report the ACTUAL shard count from the live LockManager, not
383                // a decoupled config echo (was reporting config.lock_table_count
384                // which the LockManager never received). See JE-fidelity DRIFT-2.
385                n_lock_tables: real_n_lock_tables,
386                ..LockStatsSnapshot::from(&lock_stats)
387            },
388            txn: TxnStatsSnapshot::from(&txn_stats),
389            cleaner: cleaner_stats.snapshot(),
390            checkpoint: checkpoint_stats.snapshot(),
391            throughput,
392        }
393    }
394
395    /// Gets the current cache usage in bytes.
396    pub fn get_cache_usage(&self) -> u64 {
397        self.cache_usage.load(Ordering::Relaxed) as u64
398    }
399
400    /// Gets the cache budget in bytes.
401    pub fn get_cache_budget(&self) -> u64 {
402        self.config.cache_size
403    }
404
405    /// Checks if the cache is over budget.
406    pub fn is_cache_over_budget(&self) -> bool {
407        self.get_cache_usage() > self.get_cache_budget()
408    }
409}
410
411impl Drop for Engine {
412    fn drop(&mut self) {
413        if self.is_open()
414            && let Err(e) = self.close()
415        {
416            log::error!("Error closing engine in drop: {}", e);
417        }
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use tempfile::TempDir;
425
426    fn temp_config() -> (TempDir, EngineConfig) {
427        let dir = TempDir::new().unwrap();
428        let config = EngineConfig::new(dir.path())
429            .allow_create(true)
430            .cache_size(10 * 1024 * 1024)
431            .evictor_wakeup_interval_ms(100)
432            .cleaner_wakeup_interval_ms(100)
433            .checkpointer_wakeup_interval_ms(100);
434        (dir, config)
435    }
436
437    #[test]
438    fn test_engine_open_and_close() {
439        let (_dir, config) = temp_config();
440        let engine = Engine::open(config).unwrap();
441        assert!(engine.is_open());
442
443        engine.close().unwrap();
444        assert!(!engine.is_open());
445    }
446
447    #[test]
448    fn test_engine_open_creates_directory() {
449        let dir = TempDir::new().unwrap();
450        let home = dir.path().join("newdb");
451        let config = EngineConfig::new(&home).allow_create(true);
452
453        assert!(!home.exists());
454        let engine = Engine::open(config).unwrap();
455        assert!(home.exists());
456        assert!(engine.is_open());
457    }
458
459    #[test]
460    fn test_engine_open_fails_without_create() {
461        let dir = TempDir::new().unwrap();
462        let home = dir.path().join("nonexistent");
463        let config = EngineConfig::new(&home).allow_create(false);
464
465        let result = Engine::open(config);
466        assert!(result.is_err());
467    }
468
469    #[test]
470    fn test_engine_invalid_config() {
471        let dir = TempDir::new().unwrap();
472        let config = EngineConfig::new(dir.path()).cache_size(1024); // Too small
473
474        let result = Engine::open(config);
475        assert!(result.is_err());
476        match result {
477            Err(EngineError::InvalidConfig(_)) => { /* Expected */ }
478            _ => panic!("Expected InvalidConfig error"),
479        }
480    }
481
482    #[test]
483    fn test_engine_double_close() {
484        let (_dir, config) = temp_config();
485        let engine = Engine::open(config).unwrap();
486
487        engine.close().unwrap();
488        let result = engine.close();
489        assert!(result.is_err());
490        assert!(matches!(result.unwrap_err(), EngineError::EnvironmentClosed));
491    }
492
493    #[test]
494    fn test_engine_get_subsystems() {
495        let (_dir, config) = temp_config();
496        let engine = Engine::open(config).unwrap();
497
498        assert!(engine.get_env_impl().lock().is_open());
499        assert_eq!(
500            engine.get_evictor().get_lru_sizes().0
501                + engine.get_evictor().get_lru_sizes().1,
502            0
503        );
504        // Cleaner and checkpointer don't have simple accessors but we can verify they exist
505        let _ = engine.get_cleaner();
506        let _ = engine.get_checkpointer();
507    }
508
509    #[test]
510    fn test_engine_checkpoint() {
511        let (_dir, config) = temp_config();
512        let engine = Engine::open(config).unwrap();
513
514        let result = engine.checkpoint("test");
515        assert!(result.is_ok());
516    }
517
518    #[test]
519    fn test_engine_checkpoint_readonly() {
520        let dir = TempDir::new().unwrap();
521        let config = EngineConfig::new(dir.path())
522            .allow_create(true)
523            .cache_size(10 * 1024 * 1024)
524            .read_only(true)
525            .cleaner_enabled(false)
526            .checkpointer_enabled(false)
527            .evictor_wakeup_interval_ms(100);
528        let engine = Engine::open(config).unwrap();
529
530        let result = engine.checkpoint("test");
531        assert!(result.is_err());
532    }
533
534    #[test]
535    fn test_engine_clean() {
536        let (_dir, config) = temp_config();
537        let engine = Engine::open(config).unwrap();
538
539        let result = engine.clean(5);
540        assert!(result.is_ok());
541    }
542
543    #[test]
544    fn test_engine_clean_readonly() {
545        let dir = TempDir::new().unwrap();
546        let config = EngineConfig::new(dir.path())
547            .allow_create(true)
548            .cache_size(10 * 1024 * 1024)
549            .read_only(true)
550            .cleaner_enabled(false)
551            .checkpointer_enabled(false)
552            .evictor_wakeup_interval_ms(100);
553        let engine = Engine::open(config).unwrap();
554
555        let result = engine.clean(5);
556        assert!(result.is_err());
557    }
558
559    #[test]
560    fn test_engine_evict() {
561        let (_dir, config) = temp_config();
562        let engine = Engine::open(config).unwrap();
563
564        let result = engine.evict();
565        assert!(result.is_ok());
566        let _evict_result = result.unwrap();
567        // May or may not evict anything depending on cache state
568    }
569
570    #[test]
571    fn test_engine_get_stats() {
572        let (_dir, config) = temp_config();
573        let engine = Engine::open(config).unwrap();
574
575        let stats = engine.get_stats();
576        assert_eq!(stats.cache_size, 10 * 1024 * 1024);
577        // The engine reports the LIVE LockManager shard count (default 64),
578        // not the decoupled engine_config.lock_table_count (DRIFT-2 fix).
579        assert_eq!(stats.lock.n_lock_tables, 64);
580    }
581
582    #[test]
583    fn test_engine_cache_budget() {
584        let (_dir, config) = temp_config();
585        let engine = Engine::open(config).unwrap();
586
587        assert_eq!(engine.get_cache_budget(), 10 * 1024 * 1024);
588        assert_eq!(engine.get_cache_usage(), 0); // Empty initially
589        assert!(!engine.is_cache_over_budget());
590    }
591
592    #[test]
593    fn test_engine_operations_after_close() {
594        let (_dir, config) = temp_config();
595        let engine = Engine::open(config).unwrap();
596        engine.close().unwrap();
597
598        assert!(engine.checkpoint("test").is_err());
599        assert!(engine.clean(5).is_err());
600        assert!(engine.evict().is_err());
601    }
602
603    #[test]
604    fn test_engine_drop_closes() {
605        let (_dir, config) = temp_config();
606        let engine = Engine::open(config).unwrap();
607        assert!(engine.is_open());
608        drop(engine);
609        // Engine should have closed cleanly in drop
610    }
611
612    #[test]
613    fn test_engine_readonly() {
614        let dir = TempDir::new().unwrap();
615        let config = EngineConfig::new(dir.path())
616            .allow_create(true)
617            .cache_size(10 * 1024 * 1024)
618            .read_only(true)
619            .cleaner_enabled(false)
620            .checkpointer_enabled(false)
621            .evictor_wakeup_interval_ms(100);
622        let engine = Engine::open(config).unwrap();
623
624        assert!(engine.is_open());
625        assert!(engine.get_config().read_only);
626
627        // Read-only operations should work
628        let stats = engine.get_stats();
629        assert_eq!(stats.cache_size, 10 * 1024 * 1024);
630
631        // Write operations should fail
632        assert!(engine.checkpoint("test").is_err());
633        assert!(engine.clean(5).is_err());
634    }
635}