rio_rs/cluster/storage/
mod.rs1use std::fmt::Debug;
2
3use async_trait::async_trait;
4use chrono::{DateTime, TimeZone, Utc};
5
6use crate::errors::MembershipError;
7
8#[cfg(feature = "http")]
9pub mod http;
10#[cfg(feature = "local")]
11pub mod local;
12#[cfg(feature = "postgres")]
13pub mod postgres;
14#[cfg(feature = "redis")]
15pub mod redis;
16#[cfg(feature = "sqlite")]
17pub mod sqlite;
18
19#[derive(Clone, Debug)]
21pub struct Member {
22 ip: String,
23 port: String,
24 active: bool,
25 last_seen: DateTime<Utc>,
26}
27
28impl Member {
29 pub fn new(ip: String, port: String) -> Member {
30 Member {
31 ip,
32 port,
33 active: false,
34 last_seen: Utc.timestamp_opt(0, 0).unwrap(),
35 }
36 }
37 pub fn ip(&self) -> &str {
38 &self.ip
39 }
40 pub fn port(&self) -> &str {
41 &self.port
42 }
43 pub fn active(&self) -> bool {
44 self.active
45 }
46 pub fn set_active(&mut self, active: bool) {
47 self.active = active;
48 }
49 pub fn last_seen(&self) -> &DateTime<Utc> {
50 &self.last_seen
51 }
52 pub fn set_last_seen(&mut self) {
53 let now = Utc::now();
54 self.last_seen = now
55 }
56 pub fn address(&self) -> String {
57 format!("{}:{}", self.ip, self.port)
58 }
59}
60
61pub type MembershipResult<T> = Result<T, MembershipError>;
62pub type MembershipUnitResult = Result<(), MembershipError>;
63
64#[async_trait]
70pub trait MembershipStorage: Send + Sync + Clone + Debug {
71 async fn prepare(&self) {}
72
73 async fn push(&self, member: Member) -> MembershipUnitResult;
75
76 async fn remove(&self, ip: &str, port: &str) -> MembershipUnitResult;
78
79 async fn set_is_active(&self, ip: &str, port: &str, is_active: bool) -> MembershipUnitResult;
81
82 async fn members(&self) -> MembershipResult<Vec<Member>>;
84
85 async fn notify_failure(&self, ip: &str, port: &str) -> MembershipUnitResult;
88
89 async fn member_failures(&self, ip: &str, port: &str) -> MembershipResult<Vec<DateTime<Utc>>>;
93
94 async fn active_members(&self) -> MembershipResult<Vec<Member>> {
96 let mut members = self.members().await?;
97 members.retain(|x| x.active);
98 Ok(members)
99 }
100
101 async fn is_active(&self, ip: &str, port: &str) -> MembershipResult<bool> {
103 let active_members = self.active_members().await?;
104 for member in active_members {
105 if member.ip == ip && member.port == port {
106 return Ok(true);
107 }
108 }
109 Ok(false)
110 }
111
112 async fn set_inactive(&self, ip: &str, port: &str) -> MembershipUnitResult {
114 self.set_is_active(ip, port, false).await
115 }
116
117 async fn set_active(&self, ip: &str, port: &str) -> MembershipUnitResult {
119 self.set_is_active(ip, port, true).await
120 }
121}