Skip to main content

orlando_cluster/
failure_detector.rs

1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use std::time::Duration;
5
6use tokio::sync::broadcast;
7
8use crate::connection_pool::ConnectionPool;
9use crate::hash_ring::{HashRing, SiloAddress};
10use crate::swim::{self, SwimState};
11
12/// A membership change event broadcast to subscribers (e.g., the rebalancer).
13#[derive(Clone, Debug)]
14#[non_exhaustive]
15pub enum MembershipChange {
16    SiloJoined(SiloAddress),
17    SiloLeft(SiloAddress),
18}
19
20/// Configuration for the SWIM-based failure detector.
21///
22/// Replaces the old simple ping-count model with the full SWIM protocol:
23/// protocol periods, indirect pings, suspicion with timeout, and gossip
24/// piggybacking.
25#[derive(Clone, Debug)]
26pub struct FailureDetectorConfig {
27    /// How often the protocol runs a probe cycle.
28    pub protocol_period: Duration,
29    /// Timeout for a direct ping response.
30    pub ping_timeout: Duration,
31    /// Number of random peers to ask for an indirect ping when a direct ping fails.
32    pub ping_req_count: usize,
33    /// How long a member stays in "suspect" state before being declared dead.
34    pub suspect_timeout: Duration,
35    /// Maximum number of gossip updates piggybacked per protocol message.
36    pub gossip_fanout: usize,
37}
38
39impl Default for FailureDetectorConfig {
40    fn default() -> Self {
41        Self {
42            protocol_period: Duration::from_secs(2),
43            ping_timeout: Duration::from_secs(1),
44            ping_req_count: 3,
45            suspect_timeout: Duration::from_secs(5),
46            gossip_fanout: 6,
47        }
48    }
49}
50
51impl FailureDetectorConfig {
52    /// Migrate from the pre-SWIM config fields.
53    ///
54    /// Maps old field names to the SWIM equivalents:
55    /// - `ping_interval` -> `protocol_period`
56    /// - `ping_timeout` -> `ping_timeout` (unchanged)
57    /// - `max_missed_pings` -> `suspect_timeout` (computed as `ping_interval * max_missed_pings`)
58    ///
59    /// New SWIM-specific fields (`ping_req_count`, `gossip_fanout`) use defaults.
60    pub fn from_legacy(
61        ping_interval: Duration,
62        ping_timeout: Duration,
63        max_missed_pings: u32,
64    ) -> Self {
65        Self {
66            protocol_period: ping_interval,
67            ping_timeout,
68            suspect_timeout: ping_interval * max_missed_pings,
69            ..Default::default()
70        }
71    }
72}
73
74/// SWIM-based failure detector. Runs as a background task that periodically
75/// probes random cluster members using direct pings, indirect pings, and
76/// a suspicion mechanism before declaring members dead.
77pub struct FailureDetector {
78    config: FailureDetectorConfig,
79    ring: Arc<ArcSwap<HashRing>>,
80    pool: Arc<ConnectionPool>,
81    change_tx: broadcast::Sender<MembershipChange>,
82    swim_state: Arc<tokio::sync::Mutex<SwimState>>,
83    shutdown_rx: tokio::sync::watch::Receiver<bool>,
84}
85
86impl FailureDetector {
87    pub fn new(
88        config: FailureDetectorConfig,
89        ring: Arc<ArcSwap<HashRing>>,
90        pool: Arc<ConnectionPool>,
91        local_addr: SiloAddress,
92        change_tx: broadcast::Sender<MembershipChange>,
93        shutdown_rx: tokio::sync::watch::Receiver<bool>,
94    ) -> Self {
95        let swim_state = Arc::new(tokio::sync::Mutex::new(SwimState::new(local_addr)));
96        Self {
97            config,
98            ring,
99            pool,
100            change_tx,
101            swim_state,
102            shutdown_rx,
103        }
104    }
105
106    /// Create with an existing shared SWIM state.
107    pub fn with_state(
108        config: FailureDetectorConfig,
109        ring: Arc<ArcSwap<HashRing>>,
110        pool: Arc<ConnectionPool>,
111        change_tx: broadcast::Sender<MembershipChange>,
112        swim_state: Arc<tokio::sync::Mutex<SwimState>>,
113        shutdown_rx: tokio::sync::watch::Receiver<bool>,
114    ) -> Self {
115        Self {
116            config,
117            ring,
118            pool,
119            change_tx,
120            swim_state,
121            shutdown_rx,
122        }
123    }
124
125    /// Get a reference to the shared SWIM state for use by the membership service.
126    pub fn swim_state(&self) -> Arc<tokio::sync::Mutex<SwimState>> {
127        self.swim_state.clone()
128    }
129
130    /// Run the SWIM protocol loop (consumes self).
131    pub async fn run(self) {
132        swim::run_swim_protocol(
133            self.config,
134            self.swim_state,
135            self.ring,
136            self.pool,
137            self.change_tx,
138            self.shutdown_rx,
139        )
140        .await;
141    }
142}