rio_rs/cluster/membership_protocol/
peer_to_peer.rs

1//! ClusterProvider that uses peer-to-peer communication to identify which
2//! nodes are healthy.
3//!
4//! This is a gossip based protocol, similar to the one described in Orleans (MS).
5
6use async_trait::async_trait;
7use chrono::Utc;
8use log::debug;
9use std::time::{Duration, SystemTime};
10use std::{net::SocketAddr, str::FromStr};
11
12use crate::client::Client;
13use crate::cluster::membership_protocol::ClusterProvider;
14use crate::cluster::storage::local::LocalStorage;
15use crate::cluster::storage::{Member, MembershipStorage};
16use crate::errors::ClusterProviderServeError;
17
18/// Marks a node as inactive if we have more than `num_failures_threshold` in the past
19/// `interinterval_secs_threshold` seconds
20#[derive(Clone)]
21pub struct PeerToPeerClusterConfig {
22    pub interval_secs: u64,
23    pub num_failures_threshold: u64,
24    pub interval_secs_threshold: u64,
25    pub limit_monitored_members: Option<usize>,
26    pub drop_inactive_after_secs: Option<usize>,
27}
28
29impl Default for PeerToPeerClusterConfig {
30    fn default() -> Self {
31        PeerToPeerClusterConfig {
32            interval_secs: 10,
33            num_failures_threshold: 3,
34            interval_secs_threshold: 60,
35            limit_monitored_members: None,
36            drop_inactive_after_secs: None,
37        }
38    }
39}
40
41impl PeerToPeerClusterConfig {
42    pub fn new() -> PeerToPeerClusterConfig {
43        Self::default()
44    }
45}
46
47/// Gossip-based [ClusterProvider]
48#[derive(Clone)]
49pub struct PeerToPeerClusterProvider<T>
50where
51    T: MembershipStorage,
52{
53    members_storage: T,
54    config: PeerToPeerClusterConfig,
55}
56
57impl<T> PeerToPeerClusterProvider<T>
58where
59    T: MembershipStorage,
60{
61    pub fn new(
62        members_storage: T,
63        config: PeerToPeerClusterConfig,
64    ) -> PeerToPeerClusterProvider<T> {
65        PeerToPeerClusterProvider {
66            members_storage,
67            config,
68        }
69    }
70
71    async fn get_sorted_members(&self) -> Result<Vec<Member>, ClusterProviderServeError> {
72        let mut members = self.members_storage().members().await?;
73        members.sort_by_key(|x| x.address());
74        Ok(members)
75    }
76
77    /// TODO docs
78    fn get_members_to_monitor(&self, address: &str, sorted_members: &[Member]) -> Vec<Member> {
79        let limit_monitored_members = self.config.limit_monitored_members;
80        let mut visited = 0;
81
82        sorted_members
83            .iter()
84            .filter(|x| x.address() != address)
85            .map_while(|x| {
86                if let Some(amount_to_monitor) = limit_monitored_members {
87                    if visited >= amount_to_monitor {
88                        None
89                    } else {
90                        visited += 1;
91                        Some(x.clone())
92                    }
93                } else {
94                    visited += 1;
95                    Some(x.clone())
96                }
97            })
98            .collect()
99    }
100
101    /// TODO remove very old servers from the list
102    async fn test_member(&self, member: &Member) -> Result<(), ClusterProviderServeError> {
103        // Client needs a MembershipStorage, so we create a in-memory one
104        // for local use only
105        let local_storage = LocalStorage::default();
106        local_storage.push(member.clone()).await?;
107        let mut client = Client::new(local_storage);
108
109        let ping = client.ping().await;
110        if ping.is_err() {
111            self.members_storage()
112                .notify_failure(member.ip(), member.port())
113                .await?;
114        }
115        Ok(())
116    }
117
118    /// Marks a node as inactive if we have more than `num_failures_threshold` in the past
119    /// `interinterval_secs_threshold` seconds
120    ///
121    /// TODO review number conversions inside
122    async fn is_broken(&self, member: &Member) -> Result<bool, ClusterProviderServeError> {
123        let t0 = Utc::now() - chrono::Duration::seconds(self.config.interval_secs_threshold as i64);
124
125        let failures = self
126            .members_storage()
127            .member_failures(member.ip(), member.port())
128            .await?;
129
130        let failures_over_threshold = failures.iter().filter(|&time| time > &t0).count() as u64
131            >= self.config.num_failures_threshold;
132        Ok(failures_over_threshold)
133    }
134}
135
136#[async_trait]
137impl<T> ClusterProvider<T> for PeerToPeerClusterProvider<T>
138where
139    T: MembershipStorage,
140{
141    fn members_storage(&self) -> &T {
142        &self.members_storage
143    }
144
145    /// Membership Algorithm
146    ///
147    /// At every `self.config.interval_secs` the server runs a check agains each server in the
148    /// cluster
149    ///
150    /// It creates a task for each cluster member, each task will test connectivity for a given
151    /// member and update its state in the storage
152    ///
153    /// If the test fails `self.config.num_failures_threshold` times, the member is flagged
154    /// as inactive in the `MembershipStorage`
155    ///
156    ///
157    /// <div class="warning">
158    ///
159    /// # TODO
160    ///
161    /// 1. _If communication with MembershipStorage fails, this server should be able to keep running_
162    /// 1. _It shouldn't bring dead servers back to life_
163    ///
164    /// </div>
165    async fn serve(&self, address: &str) -> Result<(), ClusterProviderServeError> {
166        let sleep_period = std::time::Duration::from_secs(self.config.interval_secs);
167        let socket_address = SocketAddr::from_str(address)
168            .or(Err(ClusterProviderServeError::SocketAddrParsingError))?;
169        let ip = socket_address.ip().to_string();
170        let port = socket_address.port().to_string();
171
172        let mut self_member = Member::new(ip, port);
173        self_member.set_active(true);
174        self_member.set_last_seen();
175        self.members_storage().push(self_member).await?;
176
177        loop {
178            let members = self.get_sorted_members().await?;
179            let test_members = self.get_members_to_monitor(address, &members);
180            let t0 = SystemTime::now();
181
182            // Tests reachability and talks to the MembershipStorage to set
183            // servers as active or inactive
184            let mut future_member_tests = vec![];
185
186            for test_member in test_members.clone().into_iter() {
187                let fut = async move {
188                    self.test_member(&test_member).await?;
189                    let is_broken = self.is_broken(&test_member).await?;
190
191                    if is_broken {
192                        self.members_storage()
193                            .set_inactive(test_member.ip(), test_member.port())
194                            .await?;
195
196                        if let Some(drop_inactive_after_secs) = self.config.drop_inactive_after_secs
197                        {
198                            let now = Utc::now();
199                            let drop_threshold =
200                                now - Duration::from_secs(drop_inactive_after_secs as u64);
201
202                            if test_member.last_seen() < &drop_threshold {
203                                self.members_storage()
204                                    .remove(test_member.ip(), test_member.port())
205                                    .await?;
206                            }
207                        }
208
209                        return Ok::<_, ClusterProviderServeError>((test_member, false));
210                    } else if !test_member.active() {
211                        self.members_storage()
212                            .set_active(test_member.ip(), test_member.port())
213                            .await?;
214                    }
215                    debug!("[{}] {:?} is OK ", address, test_member.address());
216                    Ok::<_, ClusterProviderServeError>((test_member, true))
217                };
218                future_member_tests.push(fut);
219            }
220            let states = futures::future::join_all(future_member_tests).await;
221            debug!("[{}] STATES={:?}", address, states);
222
223            // Wait for the remaining of 'config.interval_secs'
224            let elapsed = t0.elapsed().expect("Fail to get elapsed time");
225            let remaning_sleep_period = sleep_period.saturating_sub(elapsed);
226            debug!("[{}] - Time delta {:?}", address, remaning_sleep_period);
227            if remaning_sleep_period > Duration::ZERO {
228                tokio::time::sleep(remaning_sleep_period).await;
229            }
230        }
231    }
232}
233
234#[cfg(test)]
235mod test {
236    use super::*;
237    use std::collections::HashMap;
238
239    type TestResult = Result<(), Box<dyn std::error::Error>>;
240
241    async fn storage() -> impl MembershipStorage {
242        let storage = LocalStorage::default();
243        for (ip, port) in [
244            ("0.0.0.0", "5000"),
245            ("0.0.0.0", "5001"),
246            ("0.0.0.0", "5002"),
247            ("0.0.0.0", "5003"),
248            ("0.0.0.0", "5004"),
249            ("0.0.0.0", "5005"),
250        ] {
251            storage
252                .push(Member::new(ip.to_string(), port.to_string()))
253                .await
254                .unwrap();
255        }
256        storage
257    }
258
259    #[tokio::test]
260    async fn test_member_records_member_failure() -> TestResult {
261        let storage = LocalStorage::default();
262        let membership =
263            PeerToPeerClusterProvider::new(storage, PeerToPeerClusterConfig::default());
264        let failures = membership
265            .members_storage()
266            .member_failures("0.0.0.0", "-1")
267            .await
268            .unwrap();
269        assert_eq!(failures.len(), 0);
270
271        membership
272            .test_member(&Member::new("0.0.0.0".to_string(), "-1".to_string()))
273            .await?;
274        let failures = membership
275            .members_storage()
276            .member_failures("0.0.0.0", "-1")
277            .await?;
278        assert_eq!(failures.len(), 1);
279
280        membership
281            .test_member(&Member::new("0.0.0.0".to_string(), "-1".to_string()))
282            .await?;
283        let failures = membership
284            .members_storage()
285            .member_failures("0.0.0.0", "-1")
286            .await?;
287        assert_eq!(failures.len(), 2);
288        Ok(())
289    }
290
291    #[tokio::test]
292    async fn test_is_broken() -> TestResult {
293        let storage = LocalStorage::default();
294        storage.notify_failure("0.0.0.0", "5000").await?;
295        storage.notify_failure("0.0.0.0", "5000").await?;
296        storage.notify_failure("0.0.0.0", "5001").await?;
297
298        let mut config = PeerToPeerClusterConfig::default();
299        let membership = PeerToPeerClusterProvider::new(storage.clone(), config.clone());
300
301        let is_broken = membership
302            .is_broken(&Member::new("0.0.0.0".to_string(), "5000".to_string()))
303            .await;
304        assert!(!is_broken?);
305
306        config.num_failures_threshold = 1;
307        let membership = PeerToPeerClusterProvider::new(storage.clone(), config.clone());
308
309        let is_broken = membership
310            .is_broken(&Member::new("0.0.0.0".to_string(), "5000".to_string()))
311            .await;
312        assert!(is_broken?);
313
314        config.interval_secs_threshold = 0;
315        let membership = PeerToPeerClusterProvider::new(storage.clone(), config.clone());
316
317        let is_broken = membership
318            .is_broken(&Member::new("0.0.0.0".to_string(), "5000".to_string()))
319            .await;
320        assert!(!is_broken?);
321        Ok(())
322    }
323
324    #[tokio::test]
325    async fn get_members_to_monitor_as_configured() -> TestResult {
326        let storage = storage().await;
327        let items = storage.members().await?;
328
329        let mut config = PeerToPeerClusterConfig::default();
330        config.limit_monitored_members = Some(3);
331        let membership = PeerToPeerClusterProvider::new(storage, config);
332
333        // Maps monitored addresses, grouped by server (tester)
334        let mut monitored_counter: HashMap<String, usize> = HashMap::new();
335        for i in items.iter() {
336            let members = membership.get_members_to_monitor(&i.address(), &items);
337            monitored_counter.insert(i.address().clone(), members.len());
338        }
339
340        assert_eq!(monitored_counter.len(), 6);
341        for monitored in monitored_counter.values() {
342            // The same value as in `limit_monitored_members`
343            assert_eq!(monitored, &3);
344        }
345        Ok(())
346    }
347
348    #[tokio::test]
349    async fn get_members_to_monitor_few_members() -> TestResult {
350        let storage = LocalStorage::default();
351        storage
352            .push(Member::new("0.0.0.0".to_string(), "5000".to_string()))
353            .await?;
354        let items = storage.members().await?;
355        let membership =
356            PeerToPeerClusterProvider::new(storage, PeerToPeerClusterConfig::default());
357
358        let members = membership.get_members_to_monitor("0.0.0.0:5000", &items);
359        assert_eq!(members.len(), 0, "{:?}", members);
360        Ok(())
361    }
362}