use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
pub const DEFAULT_SCANNER_INTERVAL_MS: u64 = 100;
#[derive(Debug, Clone)]
pub struct ExtinctionTask {
pub db_name: String,
pub start_key: Vec<u8>,
pub end_key: Option<Vec<u8>>,
pub dups: bool,
}
pub struct ExtinctionScanner {
task_queue: Arc<Mutex<Vec<ExtinctionTask>>>,
shutdown: Arc<AtomicBool>,
handle: Option<thread::JoinHandle<()>>,
active: bool,
n_lns_extinct: Arc<AtomicU64>,
}
impl ExtinctionScanner {
pub fn new() -> Self {
ExtinctionScanner {
task_queue: Arc::new(Mutex::new(Vec::new())),
shutdown: Arc::new(AtomicBool::new(false)),
handle: None,
active: false,
n_lns_extinct: Arc::new(AtomicU64::new(0)),
}
}
pub fn start(&mut self) {
let queue = Arc::clone(&self.task_queue);
let shutdown = Arc::clone(&self.shutdown);
let counter = Arc::clone(&self.n_lns_extinct);
let handle = thread::Builder::new()
.name("noxu-extinction-scanner".to_string())
.spawn(move || {
while !shutdown.load(Ordering::Relaxed) {
let tasks: Vec<ExtinctionTask> = {
let mut q = queue.lock().unwrap();
std::mem::take(&mut *q)
};
for _task in tasks {
counter.fetch_add(0, Ordering::Relaxed);
}
thread::sleep(Duration::from_millis(
DEFAULT_SCANNER_INTERVAL_MS,
));
}
})
.expect("failed to spawn noxu-extinction-scanner thread");
self.handle = Some(handle);
self.active = true;
}
pub fn discard_extinct_records(&self, task: ExtinctionTask) -> u64 {
if self.active {
self.task_queue.lock().unwrap().push(task);
}
self.n_lns_extinct.load(Ordering::Relaxed)
}
pub fn n_lns_extinct(&self) -> u64 {
self.n_lns_extinct.load(Ordering::Relaxed)
}
pub fn is_active(&self) -> bool {
self.active && !self.task_queue.lock().unwrap().is_empty()
}
pub fn shutdown(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
self.active = false;
}
}
impl Default for ExtinctionScanner {
fn default() -> Self {
Self::new()
}
}
impl Drop for ExtinctionScanner {
fn drop(&mut self) {
if self.active {
self.shutdown();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_not_active() {
let scanner = ExtinctionScanner::new();
assert!(!scanner.is_active());
assert_eq!(scanner.n_lns_extinct(), 0);
}
#[test]
fn test_start_and_shutdown() {
let mut scanner = ExtinctionScanner::new();
scanner.start();
scanner.shutdown();
assert!(!scanner.is_active());
}
#[test]
fn test_discard_before_start_noop() {
let scanner = ExtinctionScanner::new();
scanner.discard_extinct_records(ExtinctionTask {
db_name: "test".to_string(),
start_key: b"a".to_vec(),
end_key: None,
dups: false,
});
assert!(!scanner.is_active());
}
}