Skip to main content

atomr_cluster/
reachability.rs

1//! Reachability.
2//!
3//! Records which observers think which subjects are reachable. A node is
4//! considered unreachable if any observer reports it as such.
5
6use std::collections::HashMap;
7
8use atomr_core::actor::Address;
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12#[non_exhaustive]
13pub enum ReachabilityStatus {
14    Reachable,
15    Unreachable,
16    Terminated,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20pub struct Reachability {
21    pub records: HashMap<(Address, Address), ReachabilityStatus>,
22}
23
24impl Reachability {
25    pub fn new() -> Self {
26        Self::default()
27    }
28
29    pub fn unreachable(&mut self, observer: Address, subject: Address) {
30        let key = (observer, subject);
31        // Terminated is monotonic — never downgrade.
32        if matches!(self.records.get(&key), Some(ReachabilityStatus::Terminated)) {
33            return;
34        }
35        self.records.insert(key, ReachabilityStatus::Unreachable);
36    }
37
38    pub fn reachable(&mut self, observer: Address, subject: Address) {
39        let key = (observer, subject);
40        if matches!(self.records.get(&key), Some(ReachabilityStatus::Terminated)) {
41            return;
42        }
43        self.records.insert(key, ReachabilityStatus::Reachable);
44    }
45
46    pub fn terminated(&mut self, observer: Address, subject: Address) {
47        // Terminated is final and overrides any prior status.
48        self.records.insert((observer, subject), ReachabilityStatus::Terminated);
49    }
50
51    pub fn status(&self, subject: &Address) -> ReachabilityStatus {
52        let mut any_unreachable = false;
53        for ((_, s), st) in &self.records {
54            if s == subject {
55                match st {
56                    ReachabilityStatus::Terminated => return ReachabilityStatus::Terminated,
57                    ReachabilityStatus::Unreachable => any_unreachable = true,
58                    ReachabilityStatus::Reachable => {}
59                }
60            }
61        }
62        if any_unreachable {
63            ReachabilityStatus::Unreachable
64        } else {
65            ReachabilityStatus::Reachable
66        }
67    }
68
69    pub fn is_reachable(&self, subject: &Address) -> bool {
70        matches!(self.status(subject), ReachabilityStatus::Reachable)
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77
78    #[test]
79    fn marks_unreachable_then_reachable() {
80        let mut r = Reachability::new();
81        let a = Address::local("A");
82        let b = Address::local("B");
83        r.unreachable(a.clone(), b.clone());
84        assert!(!r.is_reachable(&b));
85        r.reachable(a.clone(), b.clone());
86        assert!(r.is_reachable(&b));
87    }
88}