Skip to main content

rio_rs/cluster/storage/
mod.rs

1use std::fmt::Debug;
2
3use async_trait::async_trait;
4use chrono::{DateTime, TimeZone, Utc};
5
6use crate::errors::MembershipError;
7
8#[cfg(feature = "http")]
9pub mod http;
10#[cfg(feature = "local")]
11pub mod local;
12#[cfg(feature = "postgres")]
13pub mod postgres;
14#[cfg(feature = "redis")]
15pub mod redis;
16#[cfg(feature = "sqlite")]
17pub mod sqlite;
18
19/// Represents a running [Server](crate::server::Server).
20#[derive(Clone, Debug)]
21pub struct Member {
22    ip: String,
23    port: String,
24    active: bool,
25    last_seen: DateTime<Utc>,
26}
27
28impl Member {
29    pub fn new(ip: String, port: String) -> Member {
30        Member {
31            ip,
32            port,
33            active: false,
34            last_seen: Utc.timestamp_opt(0, 0).unwrap(),
35        }
36    }
37    pub fn ip(&self) -> &str {
38        &self.ip
39    }
40    pub fn port(&self) -> &str {
41        &self.port
42    }
43    pub fn active(&self) -> bool {
44        self.active
45    }
46    pub fn set_active(&mut self, active: bool) {
47        self.active = active;
48    }
49    pub fn last_seen(&self) -> &DateTime<Utc> {
50        &self.last_seen
51    }
52    pub fn set_last_seen(&mut self) {
53        let now = Utc::now();
54        self.last_seen = now
55    }
56    pub fn address(&self) -> String {
57        format!("{}:{}", self.ip, self.port)
58    }
59}
60
61pub type MembershipResult<T> = Result<T, MembershipError>;
62pub type MembershipUnitResult = Result<(), MembershipError>;
63
64/// `MembersshipStorage` is a trait describing how to manage a list of servers and their respective
65/// status.
66///
67/// It is **not** reponsible for asserting the nodes' state, only to store their state.
68/// This is done by the [crate::cluster::membership_protocol::ClusterProvider]
69#[async_trait]
70pub trait MembershipStorage: Send + Sync + Clone + Debug {
71    async fn prepare(&self) {}
72
73    /// Saves a new member to the storage
74    async fn push(&self, member: Member) -> MembershipUnitResult;
75
76    /// Remove a member by its public ip + port identification
77    async fn remove(&self, ip: &str, port: &str) -> MembershipUnitResult;
78
79    /// Changes status for a given Member (lookup by public ip + port)
80    async fn set_is_active(&self, ip: &str, port: &str, is_active: bool) -> MembershipUnitResult;
81
82    /// List all members in the storage
83    async fn members(&self) -> MembershipResult<Vec<Member>>;
84
85    /// Flag a failure to a given member. Note this method doesn't change the member's activity
86    /// status
87    async fn notify_failure(&self, ip: &str, port: &str) -> MembershipUnitResult;
88
89    /// List all failures of a given member
90    ///
91    /// TODO: Limit
92    async fn member_failures(&self, ip: &str, port: &str) -> MembershipResult<Vec<DateTime<Utc>>>;
93
94    /// List of active members only
95    async fn active_members(&self) -> MembershipResult<Vec<Member>> {
96        let mut members = self.members().await?;
97        members.retain(|x| x.active);
98        Ok(members)
99    }
100
101    /// Tests a member inactive (loopkup by ip + port)
102    async fn is_active(&self, ip: &str, port: &str) -> MembershipResult<bool> {
103        let active_members = self.active_members().await?;
104        for member in active_members {
105            if member.ip == ip && member.port == port {
106                return Ok(true);
107            }
108        }
109        Ok(false)
110    }
111
112    /// Sets a member inactive (loopkup by ip + port)
113    async fn set_inactive(&self, ip: &str, port: &str) -> MembershipUnitResult {
114        self.set_is_active(ip, port, false).await
115    }
116
117    /// Sets a member active (loopkup by ip + port)
118    async fn set_active(&self, ip: &str, port: &str) -> MembershipUnitResult {
119        self.set_is_active(ip, port, true).await
120    }
121}