Skip to main content

atomr_cluster/
multi_dc.rs

1//! Multi-data-center awareness.
2//!
3//! Nodes carry `dc-default` / `dc-<name>` cluster roles. A node belongs to
4//! exactly one data-center (or `default`); cross-DC heartbeats use a slow
5//! path (longer interval, larger phi-accrual threshold) so transient WAN
6//! latency doesn't trigger spurious downing.
7//!
8//! This module ships the pure helpers — DC extraction from
9//! `Member.roles`, peer classification, and slow-path interval
10//! selection. The wiring into `HeartbeatSender` (Phase 6.E) +
11//! gossip dissemination (Phase 6.D) is a follow-on.
12
13use std::time::Duration;
14
15use crate::member::Member;
16
17/// Convention: a member's data-center is encoded as a role of the
18/// form `"dc-<name>"`. uses the same prefix.
19pub const DC_ROLE_PREFIX: &str = "dc-";
20
21/// Default DC name used when no `dc-*` role is present.
22pub const DEFAULT_DC: &str = "default";
23
24/// Extract the data-center name from a member's role list. Returns
25/// [`DEFAULT_DC`] when no `dc-*` role is set.
26pub fn member_dc(m: &Member) -> &str {
27    for role in &m.roles {
28        if let Some(rest) = role.strip_prefix(DC_ROLE_PREFIX) {
29            return rest;
30        }
31    }
32    DEFAULT_DC
33}
34
35/// `true` if `a` and `b` belong to the same data-center.
36pub fn same_dc(a: &Member, b: &Member) -> bool {
37    member_dc(a) == member_dc(b)
38}
39
40/// Slow-path settings used for cross-DC heartbeats / gossip.
41#[derive(Debug, Clone, Copy)]
42#[non_exhaustive]
43pub struct CrossDcSettings {
44    /// Heartbeat interval for peers in a different DC.
45    pub heartbeat_interval: Duration,
46    /// Acceptable pause window for cross-DC peers (bigger than the
47    /// in-DC default to absorb WAN jitter).
48    pub acceptable_pause: Duration,
49    /// Threshold to keep at most `n` cross-DC peers actively
50    /// monitored.
51    pub max_monitored_peers: usize,
52}
53
54impl Default for CrossDcSettings {
55    fn default() -> Self {
56        Self {
57            heartbeat_interval: Duration::from_secs(5),
58            acceptable_pause: Duration::from_secs(30),
59            max_monitored_peers: 5,
60        }
61    }
62}
63
64/// Pick the heartbeat interval to use against `peer` from the
65/// perspective of `local`: in-DC peers get `local_interval`,
66/// cross-DC peers get `cross.heartbeat_interval`.
67pub fn heartbeat_interval_for(
68    local: &Member,
69    peer: &Member,
70    local_interval: Duration,
71    cross: &CrossDcSettings,
72) -> Duration {
73    if same_dc(local, peer) {
74        local_interval
75    } else {
76        cross.heartbeat_interval
77    }
78}
79
80/// Partition a peer list into `(in_dc, cross_dc)` from `local`'s
81/// perspective.
82pub fn partition_by_dc<'a>(local: &Member, peers: &'a [Member]) -> (Vec<&'a Member>, Vec<&'a Member>) {
83    let mut in_dc = Vec::new();
84    let mut cross = Vec::new();
85    for p in peers {
86        if same_dc(local, p) {
87            in_dc.push(p);
88        } else {
89            cross.push(p);
90        }
91    }
92    (in_dc, cross)
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use atomr_core::actor::Address;
99
100    fn member(addr: &str, roles: &[&str]) -> Member {
101        Member::new(Address::local(addr), roles.iter().map(|s| s.to_string()).collect())
102    }
103
104    #[test]
105    fn member_dc_uses_dc_role_when_present() {
106        let m = member("a", &["dc-eu-west", "compute"]);
107        assert_eq!(member_dc(&m), "eu-west");
108    }
109
110    #[test]
111    fn member_dc_defaults_when_missing() {
112        let m = member("a", &["compute"]);
113        assert_eq!(member_dc(&m), DEFAULT_DC);
114    }
115
116    #[test]
117    fn same_dc_compares_correctly() {
118        let a = member("a", &["dc-us"]);
119        let b = member("b", &["dc-us"]);
120        let c = member("c", &["dc-eu"]);
121        assert!(same_dc(&a, &b));
122        assert!(!same_dc(&a, &c));
123    }
124
125    #[test]
126    fn heartbeat_interval_picks_cross_for_other_dc() {
127        let a = member("a", &["dc-us"]);
128        let b = member("b", &["dc-eu"]);
129        let cross = CrossDcSettings::default();
130        let interval = heartbeat_interval_for(&a, &b, Duration::from_secs(1), &cross);
131        assert_eq!(interval, cross.heartbeat_interval);
132    }
133
134    #[test]
135    fn heartbeat_interval_picks_local_for_same_dc() {
136        let a = member("a", &["dc-us"]);
137        let b = member("b", &["dc-us"]);
138        let local = Duration::from_secs(1);
139        let cross = CrossDcSettings::default();
140        let interval = heartbeat_interval_for(&a, &b, local, &cross);
141        assert_eq!(interval, local);
142    }
143
144    #[test]
145    fn partition_splits_peers_correctly() {
146        let local = member("self", &["dc-us"]);
147        let peers = vec![
148            member("a", &["dc-us"]),
149            member("b", &["dc-eu"]),
150            member("c", &["dc-us"]),
151            member("d", &["dc-ap"]),
152        ];
153        let (same, cross) = partition_by_dc(&local, &peers);
154        assert_eq!(same.len(), 2);
155        assert_eq!(cross.len(), 2);
156    }
157}