ax-cache 0.1.0

Hardware-aware concurrent cache engine. Sharded SwissTable with S3-FIFO eviction, fueled by axhash.
Documentation
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::thread::{self, JoinHandle};
use std::time::Duration;

use crate::shard::Shard;

#[derive(Debug, Clone)]
pub struct MaintenanceConfig {
    pub sweep_interval: Duration,
    pub max_sweep_per_shard: usize,
}

impl Default for MaintenanceConfig {
    fn default() -> Self {
        Self {
            sweep_interval: Duration::from_millis(500),
            max_sweep_per_shard: 64,
        }
    }
}

pub(crate) struct MaintenanceHandle {
    stop: Arc<AtomicBool>,
    thread: Option<JoinHandle<()>>,
}

impl Drop for MaintenanceHandle {
    fn drop(&mut self) {
        self.stop.store(true, Ordering::Relaxed);
        if let Some(h) = self.thread.take() {
            let (tx, rx) = mpsc::channel();
            std::thread::spawn(move || {
                let _ = h.join();
                let _ = tx.send(());
            });
            let _ = rx.recv_timeout(Duration::from_secs(5));
        }
    }
}

pub(crate) fn spawn_worker<K, V, F>(
    shards: Arc<[Shard<K, V>]>,
    config: MaintenanceConfig,
    now_fn: F,
) -> MaintenanceHandle
where
    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
    V: Clone + Send + Sync + 'static,
    F: Fn() -> u32 + Send + Sync + 'static,
{
    let stop = Arc::new(AtomicBool::new(false));
    let stop_clone = Arc::clone(&stop);

    let thread = thread::Builder::new()
        .name("ax-cache-maintenance".into())
        .spawn(move || {
            while !stop_clone.load(Ordering::Relaxed) {
                thread::sleep(config.sweep_interval);
                if stop_clone.load(Ordering::Relaxed) {
                    break;
                }
                let now = now_fn();
                for shard in shards.iter() {
                    shard.sweep_expired(now, config.max_sweep_per_shard);
                }
            }
        })
        .expect("failed to spawn maintenance thread");

    MaintenanceHandle {
        stop,
        thread: Some(thread),
    }
}