rio_rs/cluster/membership_protocol/
peer_to_peer.rs1use async_trait::async_trait;
7use chrono::Utc;
8use log::debug;
9use std::time::{Duration, SystemTime};
10use std::{net::SocketAddr, str::FromStr};
11
12use crate::client::Client;
13use crate::cluster::membership_protocol::ClusterProvider;
14use crate::cluster::storage::local::LocalStorage;
15use crate::cluster::storage::{Member, MembershipStorage};
16use crate::errors::ClusterProviderServeError;
17
18#[derive(Clone)]
21pub struct PeerToPeerClusterConfig {
22 pub interval_secs: u64,
23 pub num_failures_threshold: u64,
24 pub interval_secs_threshold: u64,
25 pub limit_monitored_members: Option<usize>,
26 pub drop_inactive_after_secs: Option<usize>,
27}
28
29impl Default for PeerToPeerClusterConfig {
30 fn default() -> Self {
31 PeerToPeerClusterConfig {
32 interval_secs: 10,
33 num_failures_threshold: 3,
34 interval_secs_threshold: 60,
35 limit_monitored_members: None,
36 drop_inactive_after_secs: None,
37 }
38 }
39}
40
41impl PeerToPeerClusterConfig {
42 pub fn new() -> PeerToPeerClusterConfig {
43 Self::default()
44 }
45}
46
47#[derive(Clone)]
49pub struct PeerToPeerClusterProvider<T>
50where
51 T: MembershipStorage,
52{
53 members_storage: T,
54 config: PeerToPeerClusterConfig,
55}
56
57impl<T> PeerToPeerClusterProvider<T>
58where
59 T: MembershipStorage,
60{
61 pub fn new(
62 members_storage: T,
63 config: PeerToPeerClusterConfig,
64 ) -> PeerToPeerClusterProvider<T> {
65 PeerToPeerClusterProvider {
66 members_storage,
67 config,
68 }
69 }
70
71 async fn get_sorted_members(&self) -> Result<Vec<Member>, ClusterProviderServeError> {
72 let mut members = self.members_storage().members().await?;
73 members.sort_by_key(|x| x.address());
74 Ok(members)
75 }
76
77 fn get_members_to_monitor(&self, address: &str, sorted_members: &[Member]) -> Vec<Member> {
79 let limit_monitored_members = self.config.limit_monitored_members;
80 let mut visited = 0;
81
82 sorted_members
83 .iter()
84 .filter(|x| x.address() != address)
85 .map_while(|x| {
86 if let Some(amount_to_monitor) = limit_monitored_members {
87 if visited >= amount_to_monitor {
88 None
89 } else {
90 visited += 1;
91 Some(x.clone())
92 }
93 } else {
94 visited += 1;
95 Some(x.clone())
96 }
97 })
98 .collect()
99 }
100
101 async fn test_member(&self, member: &Member) -> Result<(), ClusterProviderServeError> {
103 let local_storage = LocalStorage::default();
106 local_storage.push(member.clone()).await?;
107 let mut client = Client::new(local_storage);
108
109 let ping = client.ping().await;
110 if ping.is_err() {
111 self.members_storage()
112 .notify_failure(member.ip(), member.port())
113 .await?;
114 }
115 Ok(())
116 }
117
118 async fn is_broken(&self, member: &Member) -> Result<bool, ClusterProviderServeError> {
123 let t0 = Utc::now() - chrono::Duration::seconds(self.config.interval_secs_threshold as i64);
124
125 let failures = self
126 .members_storage()
127 .member_failures(member.ip(), member.port())
128 .await?;
129
130 let failures_over_threshold = failures.iter().filter(|&time| time > &t0).count() as u64
131 >= self.config.num_failures_threshold;
132 Ok(failures_over_threshold)
133 }
134}
135
136#[async_trait]
137impl<T> ClusterProvider<T> for PeerToPeerClusterProvider<T>
138where
139 T: MembershipStorage,
140{
141 fn members_storage(&self) -> &T {
142 &self.members_storage
143 }
144
145 async fn serve(&self, address: &str) -> Result<(), ClusterProviderServeError> {
166 let sleep_period = std::time::Duration::from_secs(self.config.interval_secs);
167 let socket_address = SocketAddr::from_str(address)
168 .or(Err(ClusterProviderServeError::SocketAddrParsingError))?;
169 let ip = socket_address.ip().to_string();
170 let port = socket_address.port().to_string();
171
172 let mut self_member = Member::new(ip, port);
173 self_member.set_active(true);
174 self_member.set_last_seen();
175 self.members_storage().push(self_member).await?;
176
177 loop {
178 let members = self.get_sorted_members().await?;
179 let test_members = self.get_members_to_monitor(address, &members);
180 let t0 = SystemTime::now();
181
182 let mut future_member_tests = vec![];
185
186 for test_member in test_members.clone().into_iter() {
187 let fut = async move {
188 self.test_member(&test_member).await?;
189 let is_broken = self.is_broken(&test_member).await?;
190
191 if is_broken {
192 self.members_storage()
193 .set_inactive(test_member.ip(), test_member.port())
194 .await?;
195
196 if let Some(drop_inactive_after_secs) = self.config.drop_inactive_after_secs
197 {
198 let now = Utc::now();
199 let drop_threshold =
200 now - Duration::from_secs(drop_inactive_after_secs as u64);
201
202 if test_member.last_seen() < &drop_threshold {
203 self.members_storage()
204 .remove(test_member.ip(), test_member.port())
205 .await?;
206 }
207 }
208
209 return Ok::<_, ClusterProviderServeError>((test_member, false));
210 } else if !test_member.active() {
211 self.members_storage()
212 .set_active(test_member.ip(), test_member.port())
213 .await?;
214 }
215 debug!("[{}] {:?} is OK ", address, test_member.address());
216 Ok::<_, ClusterProviderServeError>((test_member, true))
217 };
218 future_member_tests.push(fut);
219 }
220 let states = futures::future::join_all(future_member_tests).await;
221 debug!("[{}] STATES={:?}", address, states);
222
223 let elapsed = t0.elapsed().expect("Fail to get elapsed time");
225 let remaning_sleep_period = sleep_period.saturating_sub(elapsed);
226 debug!("[{}] - Time delta {:?}", address, remaning_sleep_period);
227 if remaning_sleep_period > Duration::ZERO {
228 tokio::time::sleep(remaning_sleep_period).await;
229 }
230 }
231 }
232}
233
234#[cfg(test)]
235mod test {
236 use super::*;
237 use std::collections::HashMap;
238
239 type TestResult = Result<(), Box<dyn std::error::Error>>;
240
241 async fn storage() -> impl MembershipStorage {
242 let storage = LocalStorage::default();
243 for (ip, port) in [
244 ("0.0.0.0", "5000"),
245 ("0.0.0.0", "5001"),
246 ("0.0.0.0", "5002"),
247 ("0.0.0.0", "5003"),
248 ("0.0.0.0", "5004"),
249 ("0.0.0.0", "5005"),
250 ] {
251 storage
252 .push(Member::new(ip.to_string(), port.to_string()))
253 .await
254 .unwrap();
255 }
256 storage
257 }
258
259 #[tokio::test]
260 async fn test_member_records_member_failure() -> TestResult {
261 let storage = LocalStorage::default();
262 let membership =
263 PeerToPeerClusterProvider::new(storage, PeerToPeerClusterConfig::default());
264 let failures = membership
265 .members_storage()
266 .member_failures("0.0.0.0", "-1")
267 .await
268 .unwrap();
269 assert_eq!(failures.len(), 0);
270
271 membership
272 .test_member(&Member::new("0.0.0.0".to_string(), "-1".to_string()))
273 .await?;
274 let failures = membership
275 .members_storage()
276 .member_failures("0.0.0.0", "-1")
277 .await?;
278 assert_eq!(failures.len(), 1);
279
280 membership
281 .test_member(&Member::new("0.0.0.0".to_string(), "-1".to_string()))
282 .await?;
283 let failures = membership
284 .members_storage()
285 .member_failures("0.0.0.0", "-1")
286 .await?;
287 assert_eq!(failures.len(), 2);
288 Ok(())
289 }
290
291 #[tokio::test]
292 async fn test_is_broken() -> TestResult {
293 let storage = LocalStorage::default();
294 storage.notify_failure("0.0.0.0", "5000").await?;
295 storage.notify_failure("0.0.0.0", "5000").await?;
296 storage.notify_failure("0.0.0.0", "5001").await?;
297
298 let mut config = PeerToPeerClusterConfig::default();
299 let membership = PeerToPeerClusterProvider::new(storage.clone(), config.clone());
300
301 let is_broken = membership
302 .is_broken(&Member::new("0.0.0.0".to_string(), "5000".to_string()))
303 .await;
304 assert!(!is_broken?);
305
306 config.num_failures_threshold = 1;
307 let membership = PeerToPeerClusterProvider::new(storage.clone(), config.clone());
308
309 let is_broken = membership
310 .is_broken(&Member::new("0.0.0.0".to_string(), "5000".to_string()))
311 .await;
312 assert!(is_broken?);
313
314 config.interval_secs_threshold = 0;
315 let membership = PeerToPeerClusterProvider::new(storage.clone(), config.clone());
316
317 let is_broken = membership
318 .is_broken(&Member::new("0.0.0.0".to_string(), "5000".to_string()))
319 .await;
320 assert!(!is_broken?);
321 Ok(())
322 }
323
324 #[tokio::test]
325 async fn get_members_to_monitor_as_configured() -> TestResult {
326 let storage = storage().await;
327 let items = storage.members().await?;
328
329 let mut config = PeerToPeerClusterConfig::default();
330 config.limit_monitored_members = Some(3);
331 let membership = PeerToPeerClusterProvider::new(storage, config);
332
333 let mut monitored_counter: HashMap<String, usize> = HashMap::new();
335 for i in items.iter() {
336 let members = membership.get_members_to_monitor(&i.address(), &items);
337 monitored_counter.insert(i.address().clone(), members.len());
338 }
339
340 assert_eq!(monitored_counter.len(), 6);
341 for monitored in monitored_counter.values() {
342 assert_eq!(monitored, &3);
344 }
345 Ok(())
346 }
347
348 #[tokio::test]
349 async fn get_members_to_monitor_few_members() -> TestResult {
350 let storage = LocalStorage::default();
351 storage
352 .push(Member::new("0.0.0.0".to_string(), "5000".to_string()))
353 .await?;
354 let items = storage.members().await?;
355 let membership =
356 PeerToPeerClusterProvider::new(storage, PeerToPeerClusterConfig::default());
357
358 let members = membership.get_members_to_monitor("0.0.0.0:5000", &items);
359 assert_eq!(members.len(), 0, "{:?}", members);
360 Ok(())
361 }
362}