atomr_cluster/
multi_dc.rs1use std::time::Duration;
14
15use crate::member::Member;
16
17pub const DC_ROLE_PREFIX: &str = "dc-";
20
21pub const DEFAULT_DC: &str = "default";
23
24pub fn member_dc(m: &Member) -> &str {
27 for role in &m.roles {
28 if let Some(rest) = role.strip_prefix(DC_ROLE_PREFIX) {
29 return rest;
30 }
31 }
32 DEFAULT_DC
33}
34
35pub fn same_dc(a: &Member, b: &Member) -> bool {
37 member_dc(a) == member_dc(b)
38}
39
40#[derive(Debug, Clone, Copy)]
42#[non_exhaustive]
43pub struct CrossDcSettings {
44 pub heartbeat_interval: Duration,
46 pub acceptable_pause: Duration,
49 pub max_monitored_peers: usize,
52}
53
54impl Default for CrossDcSettings {
55 fn default() -> Self {
56 Self {
57 heartbeat_interval: Duration::from_secs(5),
58 acceptable_pause: Duration::from_secs(30),
59 max_monitored_peers: 5,
60 }
61 }
62}
63
64pub fn heartbeat_interval_for(
68 local: &Member,
69 peer: &Member,
70 local_interval: Duration,
71 cross: &CrossDcSettings,
72) -> Duration {
73 if same_dc(local, peer) {
74 local_interval
75 } else {
76 cross.heartbeat_interval
77 }
78}
79
80pub fn partition_by_dc<'a>(local: &Member, peers: &'a [Member]) -> (Vec<&'a Member>, Vec<&'a Member>) {
83 let mut in_dc = Vec::new();
84 let mut cross = Vec::new();
85 for p in peers {
86 if same_dc(local, p) {
87 in_dc.push(p);
88 } else {
89 cross.push(p);
90 }
91 }
92 (in_dc, cross)
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use atomr_core::actor::Address;
99
100 fn member(addr: &str, roles: &[&str]) -> Member {
101 Member::new(Address::local(addr), roles.iter().map(|s| s.to_string()).collect())
102 }
103
104 #[test]
105 fn member_dc_uses_dc_role_when_present() {
106 let m = member("a", &["dc-eu-west", "compute"]);
107 assert_eq!(member_dc(&m), "eu-west");
108 }
109
110 #[test]
111 fn member_dc_defaults_when_missing() {
112 let m = member("a", &["compute"]);
113 assert_eq!(member_dc(&m), DEFAULT_DC);
114 }
115
116 #[test]
117 fn same_dc_compares_correctly() {
118 let a = member("a", &["dc-us"]);
119 let b = member("b", &["dc-us"]);
120 let c = member("c", &["dc-eu"]);
121 assert!(same_dc(&a, &b));
122 assert!(!same_dc(&a, &c));
123 }
124
125 #[test]
126 fn heartbeat_interval_picks_cross_for_other_dc() {
127 let a = member("a", &["dc-us"]);
128 let b = member("b", &["dc-eu"]);
129 let cross = CrossDcSettings::default();
130 let interval = heartbeat_interval_for(&a, &b, Duration::from_secs(1), &cross);
131 assert_eq!(interval, cross.heartbeat_interval);
132 }
133
134 #[test]
135 fn heartbeat_interval_picks_local_for_same_dc() {
136 let a = member("a", &["dc-us"]);
137 let b = member("b", &["dc-us"]);
138 let local = Duration::from_secs(1);
139 let cross = CrossDcSettings::default();
140 let interval = heartbeat_interval_for(&a, &b, local, &cross);
141 assert_eq!(interval, local);
142 }
143
144 #[test]
145 fn partition_splits_peers_correctly() {
146 let local = member("self", &["dc-us"]);
147 let peers = vec![
148 member("a", &["dc-us"]),
149 member("b", &["dc-eu"]),
150 member("c", &["dc-us"]),
151 member("d", &["dc-ap"]),
152 ];
153 let (same, cross) = partition_by_dc(&local, &peers);
154 assert_eq!(same.len(), 2);
155 assert_eq!(cross.len(), 2);
156 }
157}