atomr_cluster/
multi_dc.rs1use std::time::Duration;
15
16use crate::member::Member;
17
18pub const DC_ROLE_PREFIX: &str = "dc-";
21
22pub const DEFAULT_DC: &str = "default";
24
25pub fn member_dc(m: &Member) -> &str {
28 for role in &m.roles {
29 if let Some(rest) = role.strip_prefix(DC_ROLE_PREFIX) {
30 return rest;
31 }
32 }
33 DEFAULT_DC
34}
35
36pub fn same_dc(a: &Member, b: &Member) -> bool {
38 member_dc(a) == member_dc(b)
39}
40
41#[derive(Debug, Clone, Copy)]
43#[non_exhaustive]
44pub struct CrossDcSettings {
45 pub heartbeat_interval: Duration,
47 pub acceptable_pause: Duration,
50 pub max_monitored_peers: usize,
53}
54
55impl Default for CrossDcSettings {
56 fn default() -> Self {
57 Self {
58 heartbeat_interval: Duration::from_secs(5),
59 acceptable_pause: Duration::from_secs(30),
60 max_monitored_peers: 5,
61 }
62 }
63}
64
65pub fn heartbeat_interval_for(
69 local: &Member,
70 peer: &Member,
71 local_interval: Duration,
72 cross: &CrossDcSettings,
73) -> Duration {
74 if same_dc(local, peer) {
75 local_interval
76 } else {
77 cross.heartbeat_interval
78 }
79}
80
81pub fn partition_by_dc<'a>(local: &Member, peers: &'a [Member]) -> (Vec<&'a Member>, Vec<&'a Member>) {
84 let mut in_dc = Vec::new();
85 let mut cross = Vec::new();
86 for p in peers {
87 if same_dc(local, p) {
88 in_dc.push(p);
89 } else {
90 cross.push(p);
91 }
92 }
93 (in_dc, cross)
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use atomr_core::actor::Address;
100
101 fn member(addr: &str, roles: &[&str]) -> Member {
102 Member::new(Address::local(addr), roles.iter().map(|s| s.to_string()).collect())
103 }
104
105 #[test]
106 fn member_dc_uses_dc_role_when_present() {
107 let m = member("a", &["dc-eu-west", "compute"]);
108 assert_eq!(member_dc(&m), "eu-west");
109 }
110
111 #[test]
112 fn member_dc_defaults_when_missing() {
113 let m = member("a", &["compute"]);
114 assert_eq!(member_dc(&m), DEFAULT_DC);
115 }
116
117 #[test]
118 fn same_dc_compares_correctly() {
119 let a = member("a", &["dc-us"]);
120 let b = member("b", &["dc-us"]);
121 let c = member("c", &["dc-eu"]);
122 assert!(same_dc(&a, &b));
123 assert!(!same_dc(&a, &c));
124 }
125
126 #[test]
127 fn heartbeat_interval_picks_cross_for_other_dc() {
128 let a = member("a", &["dc-us"]);
129 let b = member("b", &["dc-eu"]);
130 let cross = CrossDcSettings::default();
131 let interval = heartbeat_interval_for(&a, &b, Duration::from_secs(1), &cross);
132 assert_eq!(interval, cross.heartbeat_interval);
133 }
134
135 #[test]
136 fn heartbeat_interval_picks_local_for_same_dc() {
137 let a = member("a", &["dc-us"]);
138 let b = member("b", &["dc-us"]);
139 let local = Duration::from_secs(1);
140 let cross = CrossDcSettings::default();
141 let interval = heartbeat_interval_for(&a, &b, local, &cross);
142 assert_eq!(interval, local);
143 }
144
145 #[test]
146 fn partition_splits_peers_correctly() {
147 let local = member("self", &["dc-us"]);
148 let peers = vec![
149 member("a", &["dc-us"]),
150 member("b", &["dc-eu"]),
151 member("c", &["dc-us"]),
152 member("d", &["dc-ap"]),
153 ];
154 let (same, cross) = partition_by_dc(&local, &peers);
155 assert_eq!(same.len(), 2);
156 assert_eq!(cross.len(), 2);
157 }
158}