Skip to main content

ax_cache/
maintenance.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::mpsc;
4use std::thread::{self, JoinHandle};
5use std::time::Duration;
6
7use crate::shard::Shard;
8
9#[derive(Debug, Clone)]
10pub struct MaintenanceConfig {
11    pub sweep_interval: Duration,
12    pub max_sweep_per_shard: usize,
13}
14
15impl Default for MaintenanceConfig {
16    fn default() -> Self {
17        Self {
18            sweep_interval: Duration::from_millis(500),
19            max_sweep_per_shard: 64,
20        }
21    }
22}
23
24pub(crate) struct MaintenanceHandle {
25    stop: Arc<AtomicBool>,
26    thread: Option<JoinHandle<()>>,
27}
28
29impl Drop for MaintenanceHandle {
30    fn drop(&mut self) {
31        self.stop.store(true, Ordering::Relaxed);
32        if let Some(h) = self.thread.take() {
33            let (tx, rx) = mpsc::channel();
34            std::thread::spawn(move || {
35                let _ = h.join();
36                let _ = tx.send(());
37            });
38            let _ = rx.recv_timeout(Duration::from_secs(5));
39        }
40    }
41}
42
43pub(crate) fn spawn_worker<K, V, F>(
44    shards: Arc<[Shard<K, V>]>,
45    config: MaintenanceConfig,
46    now_fn: F,
47) -> MaintenanceHandle
48where
49    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
50    V: Clone + Send + Sync + 'static,
51    F: Fn() -> u32 + Send + Sync + 'static,
52{
53    let stop = Arc::new(AtomicBool::new(false));
54    let stop_clone = Arc::clone(&stop);
55
56    let thread = thread::Builder::new()
57        .name("ax-cache-maintenance".into())
58        .spawn(move || {
59            while !stop_clone.load(Ordering::Relaxed) {
60                thread::sleep(config.sweep_interval);
61                if stop_clone.load(Ordering::Relaxed) {
62                    break;
63                }
64                let now = now_fn();
65                for shard in shards.iter() {
66                    shard.sweep_expired(now, config.max_sweep_per_shard);
67                }
68            }
69        })
70        .expect("failed to spawn maintenance thread");
71
72    MaintenanceHandle {
73        stop,
74        thread: Some(thread),
75    }
76}