Skip to main content

noxu_engine/
engine.rs

1//! Main engine implementation for Noxu DB.
2
3use crate::daemon_manager::DaemonManager;
4use crate::engine_config::EngineConfig;
5use crate::env_stats::{
6    EnvironmentStats, EvictorStatsSnapshot, LockStatsSnapshot,
7    LogStatsSnapshot, TxnStatsSnapshot,
8};
9use crate::error::{EngineError, Result};
10use noxu_cleaner::{CleanResult, Cleaner};
11use noxu_dbi::EnvironmentImpl;
12use noxu_evictor::{Arbiter, EvictResult, EvictionSource, Evictor};
13use noxu_recovery::{
14    CheckpointConfig, CheckpointResult, Checkpointer, RecoveryManager,
15    log_scanner::InMemoryLogScanner,
16};
17use noxu_sync::Mutex;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20
21/// The Noxu DB engine.
22///
23/// Wires together all internal subsystems:
24/// - EnvironmentImpl (dbi layer)
25/// - Evictor (cache management)
26/// - Cleaner (log GC)
27/// - Checkpointer (durability)
28/// - DaemonManager (background threads)
29///
30/// This is the internal engine that `noxu-db` wraps. It coordinates
31/// all subsystems and provides a unified interface for database operations.
32pub struct Engine {
33    /// Engine configuration.
34    config: EngineConfig,
35
36    /// The internal environment implementation (dbi layer).
37    env_impl: Arc<Mutex<EnvironmentImpl>>,
38
39    /// The evictor for cache management.
40    evictor: Arc<Evictor>,
41
42    /// The cleaner for log garbage collection.
43    cleaner: Arc<Cleaner>,
44
45    /// The checkpointer for durability.
46    checkpointer: Arc<Checkpointer>,
47
48    /// The daemon manager for background threads.
49    daemon_manager: Mutex<DaemonManager>,
50
51    /// Whether the engine is open.
52    open: AtomicBool,
53
54    /// Memory budget tracker (shared with arbiter).
55    cache_usage: Arc<AtomicI64>,
56}
57
58impl Engine {
59    /// Opens a Noxu DB environment with the given configuration.
60    ///
61    /// This is the main entry point for opening an environment. It:
62    /// 1. Validates configuration
63    /// 2. Creates the environment directory if needed
64    /// 3. Creates EnvironmentImpl (dbi layer)
65    /// 4. Creates Evictor, Cleaner, Checkpointer
66    /// 5. Runs recovery (RecoveryManager)
67    /// 6. Starts daemon threads
68    /// 7. Returns the Engine
69    ///
70    /// # Errors
71    /// Returns an error if:
72    /// - Configuration is invalid
73    /// - Environment directory cannot be created
74    /// - Recovery fails
75    /// - Any subsystem initialization fails
76    pub fn open(config: EngineConfig) -> Result<Self> {
77        // Validate configuration
78        config.validate().map_err(EngineError::InvalidConfig)?;
79
80        // Create environment directory if needed
81        if config.allow_create && !config.home.exists() {
82            std::fs::create_dir_all(&config.home)?;
83        }
84
85        // Verify directory exists
86        if !config.home.exists() {
87            return Err(EngineError::InvalidConfig(format!(
88                "environment directory does not exist: {}",
89                config.home.display()
90            )));
91        }
92
93        // Create EnvironmentImpl (dbi layer)
94        let env_impl = EnvironmentImpl::new(
95            &config.home,
96            config.read_only,
97            config.transactional,
98        )?;
99        let env_impl = Arc::new(Mutex::new(env_impl));
100
101        // Create cache usage tracker
102        let cache_usage = Arc::new(AtomicI64::new(0));
103
104        // Create arbiter for eviction decisions
105        let arbiter = Arbiter::new(
106            config.cache_size as i64,
107            Arc::clone(&cache_usage),
108            (config.cache_size / 10) as i64, // 10% eviction pledge
109            (config.cache_size / 5) as i64,  // 20% critical threshold
110        );
111
112        // Create evictor
113        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
114
115        // Create cleaner
116        let cleaner = Arc::new(Cleaner::new(
117            config.cleaner_min_utilization,
118            config.cleaner_min_file_count,
119            0, // min age
120        ));
121
122        // Create checkpointer
123        let checkpoint_config = CheckpointConfig::default()
124            .bytes_interval(config.checkpoint_bytes_interval);
125        let checkpointer = Arc::new(Checkpointer::new(checkpoint_config));
126
127        // Run recovery using an empty in-memory scanner (no log files yet on
128        // fresh open; a real LogFileScanner will replace this once the log
129        // manager is wired through the engine).
130        let mut recovery_manager = RecoveryManager::new();
131        let mut scanner = InMemoryLogScanner::new();
132        log::info!("Running recovery...");
133        let recovery_info =
134            recovery_manager.recover(&mut scanner, None, true)?;
135        log::info!(
136            "Recovery completed: last_used_lsn={:?}, checkpoint_start_lsn={:?}",
137            recovery_info.last_used_lsn,
138            recovery_info.checkpoint_start_lsn
139        );
140
141        // Create daemon manager
142        let mut daemon_manager = DaemonManager::new(&config);
143
144        // Start daemons
145        daemon_manager.start_daemons(
146            Arc::clone(&evictor),
147            Arc::clone(&cleaner),
148            Arc::clone(&checkpointer),
149        );
150
151        let engine = Engine {
152            config,
153            env_impl,
154            evictor,
155            cleaner,
156            checkpointer,
157            daemon_manager: Mutex::new(daemon_manager),
158            open: AtomicBool::new(true),
159            cache_usage,
160        };
161
162        log::info!("Engine opened successfully");
163        Ok(engine)
164    }
165
166    /// Closes the environment.
167    ///
168    /// Performs orderly shutdown:
169    /// 1. Stop engine daemon threads
170    /// 2. Flush final checkpoint
171    /// 3. Close `EnvironmentImpl` (stops its own daemons, forces a final
172    ///    checkpoint, and fsyncs the WAL)
173    ///
174    /// After close(), the Engine cannot be used. Dropping the `Engine`
175    /// afterwards relies on `EnvironmentImpl`'s own RAII `Drop` as a backstop;
176    /// `EnvironmentImpl::close` is idempotent, so the explicit call here and
177    /// the eventual `Drop` do not conflict.
178    pub fn close(&self) -> Result<()> {
179        if !self.is_open() {
180            return Err(EngineError::EnvironmentClosed);
181        }
182
183        log::info!("Closing engine...");
184
185        // Mark as closed
186        self.open.store(false, Ordering::Relaxed);
187
188        // Stop daemon threads
189        self.daemon_manager.lock().shutdown();
190
191        // Flush final checkpoint
192        if !self.config.read_only
193            && self.config.checkpointer_enabled
194            && let Err(e) = self.checkpointer.do_checkpoint("close")
195        {
196            log::warn!("Final checkpoint failed: {}", e);
197        }
198
199        // Close the environment impl: stops the dbi-layer daemons, forces a
200        // final checkpoint, and fsyncs the WAL. Idempotent and best-effort on
201        // shutdown; log (don't propagate) so the rest of close still runs.
202        if let Err(e) = self.env_impl.lock().close() {
203            log::warn!("EnvironmentImpl close failed: {}", e);
204        }
205
206        log::info!("Engine closed successfully");
207        Ok(())
208    }
209
210    /// Returns whether the engine is open.
211    pub fn is_open(&self) -> bool {
212        self.open.load(Ordering::Relaxed)
213    }
214
215    /// Gets a reference to the EnvironmentImpl.
216    pub fn get_env_impl(&self) -> &Arc<Mutex<EnvironmentImpl>> {
217        &self.env_impl
218    }
219
220    /// Gets a reference to the Evictor.
221    pub fn get_evictor(&self) -> &Arc<Evictor> {
222        &self.evictor
223    }
224
225    /// Gets a reference to the Cleaner.
226    pub fn get_cleaner(&self) -> &Arc<Cleaner> {
227        &self.cleaner
228    }
229
230    /// Gets a reference to the Checkpointer.
231    pub fn get_checkpointer(&self) -> &Arc<Checkpointer> {
232        &self.checkpointer
233    }
234
235    /// Gets the engine configuration.
236    pub fn get_config(&self) -> &EngineConfig {
237        &self.config
238    }
239
240    /// Performs a checkpoint.
241    ///
242    /// # Arguments
243    /// * `invoker` - Description of who invoked the checkpoint (for logging)
244    ///
245    /// # Returns
246    /// Information about the checkpoint that was performed.
247    ///
248    /// # Errors
249    /// * [`EngineError::EnvironmentClosed`] if the engine has been closed.
250    /// * [`EngineError::InvalidConfig`] if the engine is opened read-only.
251    /// * Any error returned by the underlying checkpointer
252    ///   (e.g. [`EngineError::DatabaseError`] propagated from log/tree I/O).
253    pub fn checkpoint(&self, invoker: &str) -> Result<CheckpointResult> {
254        if !self.is_open() {
255            return Err(EngineError::EnvironmentClosed);
256        }
257
258        if self.config.read_only {
259            return Err(EngineError::InvalidConfig(
260                "cannot checkpoint read-only environment".to_string(),
261            ));
262        }
263
264        let result = self.checkpointer.do_checkpoint(invoker)?;
265        Ok(result)
266    }
267
268    /// Performs log cleaning.
269    ///
270    /// # Arguments
271    /// * `n_files` - Maximum number of files to clean
272    ///
273    /// # Returns
274    /// Information about the cleaning operation.
275    ///
276    /// # Errors
277    /// * [`EngineError::EnvironmentClosed`] if the engine has been closed.
278    /// * [`EngineError::InvalidConfig`] if the engine is opened read-only.
279    /// * [`EngineError::DatabaseError`] for any I/O or tree error returned
280    ///   by the underlying cleaner.
281    pub fn clean(&self, n_files: u32) -> Result<CleanResult> {
282        if !self.is_open() {
283            return Err(EngineError::EnvironmentClosed);
284        }
285
286        if self.config.read_only {
287            return Err(EngineError::InvalidConfig(
288                "cannot clean read-only environment".to_string(),
289            ));
290        }
291
292        let result = self
293            .cleaner
294            .do_clean(n_files, false)
295            .map_err(EngineError::DatabaseError)?;
296        Ok(result)
297    }
298
299    /// Throttle-driven cleaning pass for use by the cleaner daemon.
300    ///
301    /// Reads the current log write-byte counter, updates the throttle, then
302    /// cleans `n_files` recommended by the throttle.
303    ///
304    /// Returns `(CleanResult, sleep_ms)` — the daemon should sleep
305    /// `sleep_ms` milliseconds before its next pass.
306    ///
307    /// # Errors
308    /// * [`EngineError::EnvironmentClosed`] if the engine has been closed.
309    /// * [`EngineError::InvalidConfig`] if the engine is opened read-only.
310    /// * [`EngineError::DatabaseError`] for any I/O or tree error returned
311    ///   by the underlying cleaner.
312    pub fn clean_adaptive(&self) -> Result<(CleanResult, u64)> {
313        if !self.is_open() {
314            return Err(EngineError::EnvironmentClosed);
315        }
316        if self.config.read_only {
317            return Err(EngineError::InvalidConfig(
318                "cannot clean read-only environment".to_string(),
319            ));
320        }
321
322        // Read current write byte count from log manager stats.
323        let bytes_written = {
324            let env_impl = self.env_impl.lock();
325            env_impl
326                .get_log_manager()
327                .map(|lm| lm.get_stats().n_sequential_write_bytes)
328                .unwrap_or(0)
329        };
330
331        // Determine if cleaning is needed (any file below min utilization).
332        let cleaning_needed =
333            self.cleaner.get_file_selector().lock().has_files_to_clean();
334
335        let (sleep_ms, n_files) =
336            self.cleaner.throttle.update(bytes_written, cleaning_needed);
337
338        let result = self
339            .cleaner
340            .do_clean(n_files, false)
341            .map_err(EngineError::DatabaseError)?;
342
343        Ok((result, sleep_ms))
344    }
345
346    /// Performs cache eviction.
347    ///
348    /// # Returns
349    /// Information about the eviction operation.
350    ///
351    /// # Errors
352    /// * [`EngineError::EnvironmentClosed`] if the engine has been closed.
353    pub fn evict(&self) -> Result<EvictResult> {
354        if !self.is_open() {
355            return Err(EngineError::EnvironmentClosed);
356        }
357
358        let result = self.evictor.do_evict(EvictionSource::Manual);
359        Ok(result)
360    }
361
362    /// Collects environment statistics.
363    ///
364    /// Returns a snapshot of statistics from all subsystems.
365    pub fn get_stats(&self) -> EnvironmentStats {
366        let evictor_stats = self.evictor.get_stats();
367        let cleaner_stats = self.cleaner.get_stats();
368        let checkpoint_stats = self.checkpointer.get_stats();
369
370        let env_impl = self.env_impl.lock();
371        let n_databases = env_impl.n_databases() as u32;
372        let log_stats = env_impl.get_log_manager().map(|lm| lm.get_stats());
373        let lock_stats = env_impl.get_lock_manager().get_stats();
374        let real_n_lock_tables =
375            env_impl.get_lock_manager().n_lock_tables() as u64;
376        let txn_stats = env_impl.get_txn_manager().get_stats();
377        let throughput = env_impl.get_throughput_snapshot();
378        drop(env_impl);
379
380        EnvironmentStats {
381            cache_size: self.config.cache_size,
382            cache_usage: self.cache_usage.load(Ordering::Relaxed) as u64,
383            n_databases,
384            evictor: EvictorStatsSnapshot::from(evictor_stats),
385            log: log_stats
386                .as_ref()
387                .map(LogStatsSnapshot::from)
388                .unwrap_or_default(),
389            lock: LockStatsSnapshot {
390                // Report the ACTUAL shard count from the live LockManager, not
391                // a decoupled config echo (was reporting config.lock_table_count
392                // which the LockManager never received). See JE-fidelity DRIFT-2.
393                n_lock_tables: real_n_lock_tables,
394                ..LockStatsSnapshot::from(&lock_stats)
395            },
396            txn: TxnStatsSnapshot::from(&txn_stats),
397            cleaner: cleaner_stats.snapshot(),
398            checkpoint: checkpoint_stats.snapshot(),
399            throughput,
400        }
401    }
402
403    /// Gets the current cache usage in bytes.
404    pub fn get_cache_usage(&self) -> u64 {
405        self.cache_usage.load(Ordering::Relaxed) as u64
406    }
407
408    /// Gets the cache budget in bytes.
409    pub fn get_cache_budget(&self) -> u64 {
410        self.config.cache_size
411    }
412
413    /// Checks if the cache is over budget.
414    pub fn is_cache_over_budget(&self) -> bool {
415        self.get_cache_usage() > self.get_cache_budget()
416    }
417}
418
419impl Drop for Engine {
420    fn drop(&mut self) {
421        if self.is_open()
422            && let Err(e) = self.close()
423        {
424            log::error!("Error closing engine in drop: {}", e);
425        }
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use tempfile::TempDir;
433
434    fn temp_config() -> (TempDir, EngineConfig) {
435        let dir = TempDir::new().unwrap();
436        let config = EngineConfig::new(dir.path())
437            .allow_create(true)
438            .cache_size(10 * 1024 * 1024)
439            .evictor_wakeup_interval_ms(100)
440            .cleaner_wakeup_interval_ms(100)
441            .checkpointer_wakeup_interval_ms(100);
442        (dir, config)
443    }
444
445    #[test]
446    fn test_engine_open_and_close() {
447        let (_dir, config) = temp_config();
448        let engine = Engine::open(config).unwrap();
449        assert!(engine.is_open());
450        assert!(engine.get_env_impl().lock().is_open());
451
452        engine.close().unwrap();
453        assert!(!engine.is_open());
454        // Item 2: Engine::close must actually close the EnvironmentImpl, not
455        // leave it running and rely solely on a later Drop.
456        assert!(
457            !engine.get_env_impl().lock().is_open(),
458            "EnvironmentImpl must be closed after Engine::close"
459        );
460    }
461
462    #[test]
463    fn test_engine_open_creates_directory() {
464        let dir = TempDir::new().unwrap();
465        let home = dir.path().join("newdb");
466        let config = EngineConfig::new(&home).allow_create(true);
467
468        assert!(!home.exists());
469        let engine = Engine::open(config).unwrap();
470        assert!(home.exists());
471        assert!(engine.is_open());
472    }
473
474    #[test]
475    fn test_engine_open_fails_without_create() {
476        let dir = TempDir::new().unwrap();
477        let home = dir.path().join("nonexistent");
478        let config = EngineConfig::new(&home).allow_create(false);
479
480        let result = Engine::open(config);
481        assert!(result.is_err());
482    }
483
484    #[test]
485    fn test_engine_invalid_config() {
486        let dir = TempDir::new().unwrap();
487        let config = EngineConfig::new(dir.path()).cache_size(1024); // Too small
488
489        let result = Engine::open(config);
490        assert!(result.is_err());
491        match result {
492            Err(EngineError::InvalidConfig(_)) => { /* Expected */ }
493            _ => panic!("Expected InvalidConfig error"),
494        }
495    }
496
497    #[test]
498    fn test_engine_double_close() {
499        let (_dir, config) = temp_config();
500        let engine = Engine::open(config).unwrap();
501
502        engine.close().unwrap();
503        let result = engine.close();
504        assert!(result.is_err());
505        assert!(matches!(result.unwrap_err(), EngineError::EnvironmentClosed));
506    }
507
508    #[test]
509    fn test_engine_get_subsystems() {
510        let (_dir, config) = temp_config();
511        let engine = Engine::open(config).unwrap();
512
513        assert!(engine.get_env_impl().lock().is_open());
514        assert_eq!(
515            engine.get_evictor().get_lru_sizes().0
516                + engine.get_evictor().get_lru_sizes().1,
517            0
518        );
519        // Cleaner and checkpointer don't have simple accessors but we can verify they exist
520        let _ = engine.get_cleaner();
521        let _ = engine.get_checkpointer();
522    }
523
524    #[test]
525    fn test_engine_checkpoint() {
526        let (_dir, config) = temp_config();
527        let engine = Engine::open(config).unwrap();
528
529        let result = engine.checkpoint("test");
530        assert!(result.is_ok());
531    }
532
533    #[test]
534    fn test_engine_checkpoint_readonly() {
535        let dir = TempDir::new().unwrap();
536        let config = EngineConfig::new(dir.path())
537            .allow_create(true)
538            .cache_size(10 * 1024 * 1024)
539            .read_only(true)
540            .cleaner_enabled(false)
541            .checkpointer_enabled(false)
542            .evictor_wakeup_interval_ms(100);
543        let engine = Engine::open(config).unwrap();
544
545        let result = engine.checkpoint("test");
546        assert!(result.is_err());
547    }
548
549    #[test]
550    fn test_engine_clean() {
551        let (_dir, config) = temp_config();
552        let engine = Engine::open(config).unwrap();
553
554        let result = engine.clean(5);
555        assert!(result.is_ok());
556    }
557
558    #[test]
559    fn test_engine_clean_readonly() {
560        let dir = TempDir::new().unwrap();
561        let config = EngineConfig::new(dir.path())
562            .allow_create(true)
563            .cache_size(10 * 1024 * 1024)
564            .read_only(true)
565            .cleaner_enabled(false)
566            .checkpointer_enabled(false)
567            .evictor_wakeup_interval_ms(100);
568        let engine = Engine::open(config).unwrap();
569
570        let result = engine.clean(5);
571        assert!(result.is_err());
572    }
573
574    #[test]
575    fn test_engine_evict() {
576        let (_dir, config) = temp_config();
577        let engine = Engine::open(config).unwrap();
578
579        let result = engine.evict();
580        assert!(result.is_ok());
581        let _evict_result = result.unwrap();
582        // May or may not evict anything depending on cache state
583    }
584
585    #[test]
586    fn test_engine_get_stats() {
587        let (_dir, config) = temp_config();
588        let engine = Engine::open(config).unwrap();
589
590        let stats = engine.get_stats();
591        assert_eq!(stats.cache_size, 10 * 1024 * 1024);
592        // The engine reports the LIVE LockManager shard count (default 64),
593        // not the decoupled engine_config.lock_table_count (DRIFT-2 fix).
594        assert_eq!(stats.lock.n_lock_tables, 64);
595    }
596
597    #[test]
598    fn test_engine_cache_budget() {
599        let (_dir, config) = temp_config();
600        let engine = Engine::open(config).unwrap();
601
602        assert_eq!(engine.get_cache_budget(), 10 * 1024 * 1024);
603        assert_eq!(engine.get_cache_usage(), 0); // Empty initially
604        assert!(!engine.is_cache_over_budget());
605    }
606
607    #[test]
608    fn test_engine_operations_after_close() {
609        let (_dir, config) = temp_config();
610        let engine = Engine::open(config).unwrap();
611        engine.close().unwrap();
612
613        assert!(engine.checkpoint("test").is_err());
614        assert!(engine.clean(5).is_err());
615        assert!(engine.evict().is_err());
616    }
617
618    #[test]
619    fn test_engine_drop_closes() {
620        let (_dir, config) = temp_config();
621        let engine = Engine::open(config).unwrap();
622        assert!(engine.is_open());
623        drop(engine);
624        // Engine should have closed cleanly in drop
625    }
626
627    #[test]
628    fn test_engine_readonly() {
629        let dir = TempDir::new().unwrap();
630        let config = EngineConfig::new(dir.path())
631            .allow_create(true)
632            .cache_size(10 * 1024 * 1024)
633            .read_only(true)
634            .cleaner_enabled(false)
635            .checkpointer_enabled(false)
636            .evictor_wakeup_interval_ms(100);
637        let engine = Engine::open(config).unwrap();
638
639        assert!(engine.is_open());
640        assert!(engine.get_config().read_only);
641
642        // Read-only operations should work
643        let stats = engine.get_stats();
644        assert_eq!(stats.cache_size, 10 * 1024 * 1024);
645
646        // Write operations should fail
647        assert!(engine.checkpoint("test").is_err());
648        assert!(engine.clean(5).is_err());
649    }
650}