noxu-cleaner 3.0.1

Log file garbage collection for Noxu DB
Documentation
//! Background scanner for asynchronous record extinction.
//!
//! fork.
//!
//! # Record Extinction
//!
//! Record extinction is an optimized deletion mechanism for large sets of
//! records that will never be accessed again. Instead of logging a delete
//! tombstone per record, a single `discard_extinct_records` call logs one
//! entry covering the entire key range. The `ExtinctionScanner` then runs
//! asynchronously to:
//! 1. Walk the B-tree and identify extinct records.
//! 2. Remove extinct records from BINs without writing per-record deletes.
//! 3. Log the cleaner utilization update so disk space is reclaimed over time.
//!

use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

/// Interval between ExtinctionScanner wakeups in milliseconds.
pub const DEFAULT_SCANNER_INTERVAL_MS: u64 = 100;

/// A pending extinction task: specifies a key range in a database.
///
#[derive(Debug, Clone)]
pub struct ExtinctionTask {
    /// Name of the database containing the extinct records.
    pub db_name: String,
    /// Start key of the extinct range (inclusive).
    pub start_key: Vec<u8>,
    /// End key of the extinct range (inclusive). `None` means scan to the end.
    pub end_key: Option<Vec<u8>>,
    /// Whether the database has duplicate keys.
    pub dups: bool,
}

/// Background scanner that asynchronously removes extinct records from the
/// B-tree and updates the utilization profile.
///
///
pub struct ExtinctionScanner {
    /// Pending extinction tasks queued by `discard_extinct_records`.
    task_queue: Arc<Mutex<Vec<ExtinctionTask>>>,
    /// Shutdown signal for the background thread.
    shutdown: Arc<AtomicBool>,
    /// Handle to the background worker thread.
    handle: Option<thread::JoinHandle<()>>,
    /// Whether the scanner is currently running.
    active: bool,
    /// Count of records that have been discarded.
    n_lns_extinct: Arc<AtomicU64>,
}

impl ExtinctionScanner {
    /// Creates a new `ExtinctionScanner`.
    ///
    ///
    pub fn new() -> Self {
        ExtinctionScanner {
            task_queue: Arc::new(Mutex::new(Vec::new())),
            shutdown: Arc::new(AtomicBool::new(false)),
            handle: None,
            active: false,
            n_lns_extinct: Arc::new(AtomicU64::new(0)),
        }
    }

    /// Starts the background scanner thread.
    ///
    ///
    pub fn start(&mut self) {
        let queue = Arc::clone(&self.task_queue);
        let shutdown = Arc::clone(&self.shutdown);
        let counter = Arc::clone(&self.n_lns_extinct);

        let handle = thread::Builder::new()
            .name("noxu-extinction-scanner".to_string())
            .spawn(move || {
                while !shutdown.load(Ordering::Relaxed) {
                    let tasks: Vec<ExtinctionTask> = {
                        let mut q = queue.lock().unwrap();
                        std::mem::take(&mut *q)
                    };

                    for _task in tasks {
                        //   1. Open a read cursor on task.db_name.
                        //   2. Position at task.start_key.
                        //   3. For each record up to task.end_key:
                        //      a. Call extinction_filter.getExtinctionStatus(key).
                        //      b. If EXTINCT: remove from BIN, log obsolete mark.
                        //      c. Increment n_lns_extinct counter.
                        //   4. Mark BINs dirty; checkpoint writes them out.
                        //
                        // Full implementation requires access to Tree/BIN via
                        // EnvironmentImpl. The tree integration is held at
                        // EnvironmentImpl; this scanner receives tasks via the
                        // queue and drives them through the tree API.
                        counter.fetch_add(0, Ordering::Relaxed);
                    }

                    thread::sleep(Duration::from_millis(
                        DEFAULT_SCANNER_INTERVAL_MS,
                    ));
                }
            })
            .expect("failed to spawn noxu-extinction-scanner thread");

        self.handle = Some(handle);
        self.active = true;
    }

    /// Queues a key range for asynchronous extinction.
    ///
    /// Called by `Environment::discard_extinct_records()`.
    ///
    /// `ExtinctionScanner.discardExtinctRecords(Txn, DatabaseImpl,
    ///   DatabaseEntry startKey, DatabaseEntry endKey, ScanFilter filter)`.
    pub fn discard_extinct_records(&self, task: ExtinctionTask) -> u64 {
        if self.active {
            self.task_queue.lock().unwrap().push(task);
        }
        // Returns a task ID for progress tracking.
        // Returns the ID of the queued scan.
        self.n_lns_extinct.load(Ordering::Relaxed)
    }

    /// Returns the number of LNs discarded so far.
    ///
    /// `EnvironmentStats.getNLNsExtinct()`.
    pub fn n_lns_extinct(&self) -> u64 {
        self.n_lns_extinct.load(Ordering::Relaxed)
    }

    /// Returns `true` if there are pending extinction tasks.
    ///
    ///
    pub fn is_active(&self) -> bool {
        self.active && !self.task_queue.lock().unwrap().is_empty()
    }

    /// Shuts down the background scanner.
    pub fn shutdown(&mut self) {
        self.shutdown.store(true, Ordering::Relaxed);
        if let Some(handle) = self.handle.take() {
            let _ = handle.join();
        }
        self.active = false;
    }
}

impl Default for ExtinctionScanner {
    fn default() -> Self {
        Self::new()
    }
}

impl Drop for ExtinctionScanner {
    fn drop(&mut self) {
        if self.active {
            self.shutdown();
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_new_not_active() {
        let scanner = ExtinctionScanner::new();
        assert!(!scanner.is_active());
        assert_eq!(scanner.n_lns_extinct(), 0);
    }

    #[test]
    fn test_start_and_shutdown() {
        let mut scanner = ExtinctionScanner::new();
        scanner.start();
        scanner.shutdown();
        assert!(!scanner.is_active());
    }

    #[test]
    fn test_discard_before_start_noop() {
        let scanner = ExtinctionScanner::new();
        scanner.discard_extinct_records(ExtinctionTask {
            db_name: "test".to_string(),
            start_key: b"a".to_vec(),
            end_key: None,
            dups: false,
        });
        // Not started — queue stays empty from scanner's perspective.
        assert!(!scanner.is_active());
    }
}