atomr_remote/
failure_detector_registry.rs1use std::sync::Arc;
4
5use dashmap::DashMap;
6
7use atomr_core::actor::Address;
8
9use crate::failure_detector::FailureDetector;
10
11type DetectorFactory = Arc<dyn Fn() -> Arc<dyn FailureDetector> + Send + Sync>;
15
16#[derive(Clone)]
17pub struct FailureDetectorRegistry {
18 factory: DetectorFactory,
19 detectors: Arc<DashMap<String, Arc<dyn FailureDetector>>>,
20}
21
22impl FailureDetectorRegistry {
23 pub fn new(factory: DetectorFactory) -> Self {
24 Self { factory, detectors: Arc::new(DashMap::new()) }
25 }
26
27 pub fn default_phi() -> Self {
30 Self::new(Arc::new(|| {
31 Arc::new(crate::phi_accrual::PhiAccrualFailureDetector::new(
32 8.0,
33 1000,
34 std::time::Duration::from_millis(100),
35 std::time::Duration::from_secs(3),
36 std::time::Duration::from_secs(1),
37 ))
38 }))
39 }
40
41 pub fn heartbeat(&self, from: &Address) {
42 let key = from.to_string();
43 let entry = self.detectors.entry(key).or_insert_with(|| (self.factory)());
44 entry.heartbeat();
45 }
46
47 pub fn is_available(&self, address: &Address) -> bool {
48 self.detectors.get(&address.to_string()).map(|d| d.is_available()).unwrap_or(true)
49 }
50
51 pub fn remove(&self, address: &Address) {
52 self.detectors.remove(&address.to_string());
53 }
54
55 pub fn addresses(&self) -> Vec<String> {
56 self.detectors.iter().map(|e| e.key().clone()).collect()
57 }
58}