Skip to main content

atomr_remote/
failure_detector_registry.rs

1//! Per-`Address` failure detector registry.
2
3use std::sync::Arc;
4
5use dashmap::DashMap;
6
7use atomr_core::actor::Address;
8
9use crate::failure_detector::FailureDetector;
10
11/// Factory closure that produces a fresh detector for a newly-tracked
12/// address. Default implementations build a `PhiAccrualFailureDetector`
13/// with conservative thresholds.
14type DetectorFactory = Arc<dyn Fn() -> Arc<dyn FailureDetector> + Send + Sync>;
15
16#[derive(Clone)]
17pub struct FailureDetectorRegistry {
18    factory: DetectorFactory,
19    detectors: Arc<DashMap<String, Arc<dyn FailureDetector>>>,
20}
21
22impl FailureDetectorRegistry {
23    pub fn new(factory: DetectorFactory) -> Self {
24        Self { factory, detectors: Arc::new(DashMap::new()) }
25    }
26
27    /// Default detector: `PhiAccrualFailureDetector` with phi threshold 8,
28    /// 1000-sample window, ~100ms heartbeat, 3s acceptable pause, 1s warm-up.
29    pub fn default_phi() -> Self {
30        Self::new(Arc::new(|| {
31            Arc::new(crate::phi_accrual::PhiAccrualFailureDetector::new(
32                8.0,
33                1000,
34                std::time::Duration::from_millis(100),
35                std::time::Duration::from_secs(3),
36                std::time::Duration::from_secs(1),
37            ))
38        }))
39    }
40
41    pub fn heartbeat(&self, from: &Address) {
42        let key = from.to_string();
43        let entry = self.detectors.entry(key).or_insert_with(|| (self.factory)());
44        entry.heartbeat();
45    }
46
47    pub fn is_available(&self, address: &Address) -> bool {
48        self.detectors.get(&address.to_string()).map(|d| d.is_available()).unwrap_or(true)
49    }
50
51    pub fn remove(&self, address: &Address) {
52        self.detectors.remove(&address.to_string());
53    }
54
55    pub fn addresses(&self) -> Vec<String> {
56        self.detectors.iter().map(|e| e.key().clone()).collect()
57    }
58}