orlando_cluster/
failure_detector.rs1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use std::time::Duration;
5
6use tokio::sync::broadcast;
7
8use crate::connection_pool::ConnectionPool;
9use crate::hash_ring::{HashRing, SiloAddress};
10use crate::swim::{self, SwimState};
11
12#[derive(Clone, Debug)]
14#[non_exhaustive]
15pub enum MembershipChange {
16 SiloJoined(SiloAddress),
17 SiloLeft(SiloAddress),
18}
19
20#[derive(Clone, Debug)]
26pub struct FailureDetectorConfig {
27 pub protocol_period: Duration,
29 pub ping_timeout: Duration,
31 pub ping_req_count: usize,
33 pub suspect_timeout: Duration,
35 pub gossip_fanout: usize,
37}
38
39impl Default for FailureDetectorConfig {
40 fn default() -> Self {
41 Self {
42 protocol_period: Duration::from_secs(2),
43 ping_timeout: Duration::from_secs(1),
44 ping_req_count: 3,
45 suspect_timeout: Duration::from_secs(5),
46 gossip_fanout: 6,
47 }
48 }
49}
50
51impl FailureDetectorConfig {
52 pub fn from_legacy(
61 ping_interval: Duration,
62 ping_timeout: Duration,
63 max_missed_pings: u32,
64 ) -> Self {
65 Self {
66 protocol_period: ping_interval,
67 ping_timeout,
68 suspect_timeout: ping_interval * max_missed_pings,
69 ..Default::default()
70 }
71 }
72}
73
74pub struct FailureDetector {
78 config: FailureDetectorConfig,
79 ring: Arc<ArcSwap<HashRing>>,
80 pool: Arc<ConnectionPool>,
81 change_tx: broadcast::Sender<MembershipChange>,
82 swim_state: Arc<tokio::sync::Mutex<SwimState>>,
83 shutdown_rx: tokio::sync::watch::Receiver<bool>,
84}
85
86impl FailureDetector {
87 pub fn new(
88 config: FailureDetectorConfig,
89 ring: Arc<ArcSwap<HashRing>>,
90 pool: Arc<ConnectionPool>,
91 local_addr: SiloAddress,
92 change_tx: broadcast::Sender<MembershipChange>,
93 shutdown_rx: tokio::sync::watch::Receiver<bool>,
94 ) -> Self {
95 let swim_state = Arc::new(tokio::sync::Mutex::new(SwimState::new(local_addr)));
96 Self {
97 config,
98 ring,
99 pool,
100 change_tx,
101 swim_state,
102 shutdown_rx,
103 }
104 }
105
106 pub fn with_state(
108 config: FailureDetectorConfig,
109 ring: Arc<ArcSwap<HashRing>>,
110 pool: Arc<ConnectionPool>,
111 change_tx: broadcast::Sender<MembershipChange>,
112 swim_state: Arc<tokio::sync::Mutex<SwimState>>,
113 shutdown_rx: tokio::sync::watch::Receiver<bool>,
114 ) -> Self {
115 Self {
116 config,
117 ring,
118 pool,
119 change_tx,
120 swim_state,
121 shutdown_rx,
122 }
123 }
124
125 pub fn swim_state(&self) -> Arc<tokio::sync::Mutex<SwimState>> {
127 self.swim_state.clone()
128 }
129
130 pub async fn run(self) {
132 swim::run_swim_protocol(
133 self.config,
134 self.swim_state,
135 self.ring,
136 self.pool,
137 self.change_tx,
138 self.shutdown_rx,
139 )
140 .await;
141 }
142}