1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::sync::Arc;
use arc_swap::ArcSwap;
use std::time::Duration;
use tokio::sync::broadcast;
use crate::connection_pool::ConnectionPool;
use crate::hash_ring::{HashRing, SiloAddress};
use crate::swim::{self, SwimState};
/// A membership change event broadcast to subscribers (e.g., the rebalancer).
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum MembershipChange {
SiloJoined(SiloAddress),
SiloLeft(SiloAddress),
}
/// Configuration for the SWIM-based failure detector.
///
/// Replaces the old simple ping-count model with the full SWIM protocol:
/// protocol periods, indirect pings, suspicion with timeout, and gossip
/// piggybacking.
#[derive(Clone, Debug)]
pub struct FailureDetectorConfig {
/// How often the protocol runs a probe cycle.
pub protocol_period: Duration,
/// Timeout for a direct ping response.
pub ping_timeout: Duration,
/// Number of random peers to ask for an indirect ping when a direct ping fails.
pub ping_req_count: usize,
/// How long a member stays in "suspect" state before being declared dead.
pub suspect_timeout: Duration,
/// Maximum number of gossip updates piggybacked per protocol message.
pub gossip_fanout: usize,
}
impl Default for FailureDetectorConfig {
fn default() -> Self {
Self {
protocol_period: Duration::from_secs(2),
ping_timeout: Duration::from_secs(1),
ping_req_count: 3,
suspect_timeout: Duration::from_secs(5),
gossip_fanout: 6,
}
}
}
impl FailureDetectorConfig {
/// Migrate from the pre-SWIM config fields.
///
/// Maps old field names to the SWIM equivalents:
/// - `ping_interval` -> `protocol_period`
/// - `ping_timeout` -> `ping_timeout` (unchanged)
/// - `max_missed_pings` -> `suspect_timeout` (computed as `ping_interval * max_missed_pings`)
///
/// New SWIM-specific fields (`ping_req_count`, `gossip_fanout`) use defaults.
pub fn from_legacy(
ping_interval: Duration,
ping_timeout: Duration,
max_missed_pings: u32,
) -> Self {
Self {
protocol_period: ping_interval,
ping_timeout,
suspect_timeout: ping_interval * max_missed_pings,
..Default::default()
}
}
}
/// SWIM-based failure detector. Runs as a background task that periodically
/// probes random cluster members using direct pings, indirect pings, and
/// a suspicion mechanism before declaring members dead.
pub struct FailureDetector {
config: FailureDetectorConfig,
ring: Arc<ArcSwap<HashRing>>,
pool: Arc<ConnectionPool>,
change_tx: broadcast::Sender<MembershipChange>,
swim_state: Arc<tokio::sync::Mutex<SwimState>>,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
}
impl FailureDetector {
pub fn new(
config: FailureDetectorConfig,
ring: Arc<ArcSwap<HashRing>>,
pool: Arc<ConnectionPool>,
local_addr: SiloAddress,
change_tx: broadcast::Sender<MembershipChange>,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> Self {
let swim_state = Arc::new(tokio::sync::Mutex::new(SwimState::new(local_addr)));
Self {
config,
ring,
pool,
change_tx,
swim_state,
shutdown_rx,
}
}
/// Create with an existing shared SWIM state.
pub fn with_state(
config: FailureDetectorConfig,
ring: Arc<ArcSwap<HashRing>>,
pool: Arc<ConnectionPool>,
change_tx: broadcast::Sender<MembershipChange>,
swim_state: Arc<tokio::sync::Mutex<SwimState>>,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> Self {
Self {
config,
ring,
pool,
change_tx,
swim_state,
shutdown_rx,
}
}
/// Get a reference to the shared SWIM state for use by the membership service.
pub fn swim_state(&self) -> Arc<tokio::sync::Mutex<SwimState>> {
self.swim_state.clone()
}
/// Run the SWIM protocol loop (consumes self).
pub async fn run(self) {
swim::run_swim_protocol(
self.config,
self.swim_state,
self.ring,
self.pool,
self.change_tx,
self.shutdown_rx,
)
.await;
}
}