Skip to main content

atomr_cluster/
multi_dc.rs

1//! Multi-data-center awareness.
2//!
3//! Phase 6.G of `docs/full-port-plan.md`. Akka.NET parity:
4//! `dc-default` / `dc-name` cluster roles. A node belongs to exactly
5//! one data-center (or `default`); cross-DC heartbeats use a slow
6//! path (longer interval, larger phi-accrual threshold) so transient
7//! WAN latency doesn't trigger spurious downing.
8//!
9//! This module ships the pure helpers — DC extraction from
10//! `Member.roles`, peer classification, and slow-path interval
11//! selection. The wiring into `HeartbeatSender` (Phase 6.E) +
12//! gossip dissemination (Phase 6.D) is a follow-on.
13
14use std::time::Duration;
15
16use crate::member::Member;
17
18/// Convention: a member's data-center is encoded as a role of the
19/// form `"dc-<name>"`. Akka.NET uses the same prefix.
20pub const DC_ROLE_PREFIX: &str = "dc-";
21
22/// Default DC name used when no `dc-*` role is present.
23pub const DEFAULT_DC: &str = "default";
24
25/// Extract the data-center name from a member's role list. Returns
26/// [`DEFAULT_DC`] when no `dc-*` role is set.
27pub fn member_dc(m: &Member) -> &str {
28    for role in &m.roles {
29        if let Some(rest) = role.strip_prefix(DC_ROLE_PREFIX) {
30            return rest;
31        }
32    }
33    DEFAULT_DC
34}
35
36/// `true` if `a` and `b` belong to the same data-center.
37pub fn same_dc(a: &Member, b: &Member) -> bool {
38    member_dc(a) == member_dc(b)
39}
40
41/// Slow-path settings used for cross-DC heartbeats / gossip.
42#[derive(Debug, Clone, Copy)]
43#[non_exhaustive]
44pub struct CrossDcSettings {
45    /// Heartbeat interval for peers in a different DC.
46    pub heartbeat_interval: Duration,
47    /// Acceptable pause window for cross-DC peers (bigger than the
48    /// in-DC default to absorb WAN jitter).
49    pub acceptable_pause: Duration,
50    /// Threshold to keep at most `n` cross-DC peers actively
51    /// monitored (akka.net: `cross-data-center-connections`).
52    pub max_monitored_peers: usize,
53}
54
55impl Default for CrossDcSettings {
56    fn default() -> Self {
57        Self {
58            heartbeat_interval: Duration::from_secs(5),
59            acceptable_pause: Duration::from_secs(30),
60            max_monitored_peers: 5,
61        }
62    }
63}
64
65/// Pick the heartbeat interval to use against `peer` from the
66/// perspective of `local`: in-DC peers get `local_interval`,
67/// cross-DC peers get `cross.heartbeat_interval`.
68pub fn heartbeat_interval_for(
69    local: &Member,
70    peer: &Member,
71    local_interval: Duration,
72    cross: &CrossDcSettings,
73) -> Duration {
74    if same_dc(local, peer) {
75        local_interval
76    } else {
77        cross.heartbeat_interval
78    }
79}
80
81/// Partition a peer list into `(in_dc, cross_dc)` from `local`'s
82/// perspective.
83pub fn partition_by_dc<'a>(local: &Member, peers: &'a [Member]) -> (Vec<&'a Member>, Vec<&'a Member>) {
84    let mut in_dc = Vec::new();
85    let mut cross = Vec::new();
86    for p in peers {
87        if same_dc(local, p) {
88            in_dc.push(p);
89        } else {
90            cross.push(p);
91        }
92    }
93    (in_dc, cross)
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use atomr_core::actor::Address;
100
101    fn member(addr: &str, roles: &[&str]) -> Member {
102        Member::new(Address::local(addr), roles.iter().map(|s| s.to_string()).collect())
103    }
104
105    #[test]
106    fn member_dc_uses_dc_role_when_present() {
107        let m = member("a", &["dc-eu-west", "compute"]);
108        assert_eq!(member_dc(&m), "eu-west");
109    }
110
111    #[test]
112    fn member_dc_defaults_when_missing() {
113        let m = member("a", &["compute"]);
114        assert_eq!(member_dc(&m), DEFAULT_DC);
115    }
116
117    #[test]
118    fn same_dc_compares_correctly() {
119        let a = member("a", &["dc-us"]);
120        let b = member("b", &["dc-us"]);
121        let c = member("c", &["dc-eu"]);
122        assert!(same_dc(&a, &b));
123        assert!(!same_dc(&a, &c));
124    }
125
126    #[test]
127    fn heartbeat_interval_picks_cross_for_other_dc() {
128        let a = member("a", &["dc-us"]);
129        let b = member("b", &["dc-eu"]);
130        let cross = CrossDcSettings::default();
131        let interval = heartbeat_interval_for(&a, &b, Duration::from_secs(1), &cross);
132        assert_eq!(interval, cross.heartbeat_interval);
133    }
134
135    #[test]
136    fn heartbeat_interval_picks_local_for_same_dc() {
137        let a = member("a", &["dc-us"]);
138        let b = member("b", &["dc-us"]);
139        let local = Duration::from_secs(1);
140        let cross = CrossDcSettings::default();
141        let interval = heartbeat_interval_for(&a, &b, local, &cross);
142        assert_eq!(interval, local);
143    }
144
145    #[test]
146    fn partition_splits_peers_correctly() {
147        let local = member("self", &["dc-us"]);
148        let peers = vec![
149            member("a", &["dc-us"]),
150            member("b", &["dc-eu"]),
151            member("c", &["dc-us"]),
152            member("d", &["dc-ap"]),
153        ];
154        let (same, cross) = partition_by_dc(&local, &peers);
155        assert_eq!(same.len(), 2);
156        assert_eq!(cross.len(), 2);
157    }
158}