1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::mpsc;
4use std::thread::{self, JoinHandle};
5use std::time::Duration;
6
7use crate::shard::Shard;
8
9#[derive(Debug, Clone)]
10pub struct MaintenanceConfig {
11 pub sweep_interval: Duration,
12 pub max_sweep_per_shard: usize,
13}
14
15impl Default for MaintenanceConfig {
16 fn default() -> Self {
17 Self {
18 sweep_interval: Duration::from_millis(500),
19 max_sweep_per_shard: 64,
20 }
21 }
22}
23
24pub(crate) struct MaintenanceHandle {
25 stop: Arc<AtomicBool>,
26 thread: Option<JoinHandle<()>>,
27}
28
29impl Drop for MaintenanceHandle {
30 fn drop(&mut self) {
31 self.stop.store(true, Ordering::Relaxed);
32 if let Some(h) = self.thread.take() {
33 let (tx, rx) = mpsc::channel();
34 std::thread::spawn(move || {
35 let _ = h.join();
36 let _ = tx.send(());
37 });
38 let _ = rx.recv_timeout(Duration::from_secs(5));
39 }
40 }
41}
42
43pub(crate) fn spawn_worker<K, V, F>(
44 shards: Arc<[Shard<K, V>]>,
45 config: MaintenanceConfig,
46 now_fn: F,
47) -> MaintenanceHandle
48where
49 K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
50 V: Clone + Send + Sync + 'static,
51 F: Fn() -> u32 + Send + Sync + 'static,
52{
53 let stop = Arc::new(AtomicBool::new(false));
54 let stop_clone = Arc::clone(&stop);
55
56 let thread = thread::Builder::new()
57 .name("ax-cache-maintenance".into())
58 .spawn(move || {
59 while !stop_clone.load(Ordering::Relaxed) {
60 thread::sleep(config.sweep_interval);
61 if stop_clone.load(Ordering::Relaxed) {
62 break;
63 }
64 let now = now_fn();
65 for shard in shards.iter() {
66 shard.sweep_expired(now, config.max_sweep_per_shard);
67 }
68 }
69 })
70 .expect("failed to spawn maintenance thread");
71
72 MaintenanceHandle {
73 stop,
74 thread: Some(thread),
75 }
76}