Skip to main content

nodedb_cluster/swim/
bootstrap.rs

1//! SWIM subsystem bootstrap.
2//!
3//! [`spawn`] is the one-stop entry point callers (cluster startup or
4//! tests) use to stand up a running failure detector:
5//!
6//! 1. Constructs a [`MembershipList`] containing the local node at
7//!    incarnation 0.
8//! 2. Seeds the list with an `Alive` entry for every address in
9//!    `seeds`, using a synthetic `NodeId` of the form `"seed:<addr>"`.
10//!    The first successful probe replaces the placeholder with the
11//!    peer's real node id via the normal merge path.
12//! 3. Validates [`SwimConfig`] and constructs a [`FailureDetector`].
13//! 4. Spawns the detector's run loop on a fresh tokio task.
14//! 5. Returns a [`SwimHandle`] the caller can use to read membership,
15//!    access the dissemination queue, and shut the detector down.
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use nodedb_types::NodeId;
21use tokio::sync::watch;
22use tokio::task::JoinHandle;
23
24use super::config::SwimConfig;
25use super::detector::{FailureDetector, ProbeScheduler, Transport};
26use super::dissemination::DisseminationQueue;
27use super::error::SwimError;
28use super::incarnation::Incarnation;
29use super::member::MemberState;
30use super::member::record::MemberUpdate;
31use super::membership::MembershipList;
32
33/// Owns a running SWIM detector and its shutdown plumbing.
34///
35/// Dropping `SwimHandle` leaks the background task — callers should
36/// always invoke [`SwimHandle::shutdown`] to request graceful drain.
37pub struct SwimHandle {
38    detector: Arc<FailureDetector>,
39    membership: Arc<MembershipList>,
40    shutdown_tx: watch::Sender<bool>,
41    join: JoinHandle<()>,
42}
43
44impl SwimHandle {
45    /// Shared reference to the detector (for metrics, debugging, or
46    /// injecting synthetic rumours in tests).
47    pub fn detector(&self) -> &Arc<FailureDetector> {
48        &self.detector
49    }
50
51    /// Shared reference to the membership list. Clone cheaply; the
52    /// underlying `Arc` is identical to the detector's view.
53    pub fn membership(&self) -> &Arc<MembershipList> {
54        &self.membership
55    }
56
57    /// Shared reference to the dissemination queue. Used by callers
58    /// that want to enqueue rumours from outside SWIM (e.g. the raft
59    /// layer announcing a conf change).
60    pub fn dissemination(&self) -> &Arc<DisseminationQueue> {
61        self.detector.dissemination()
62    }
63
64    /// Signal the detector to shut down and await its task to finish.
65    /// Returns whatever error the join handle surfaced (normally none).
66    pub async fn shutdown(self) {
67        let _ = self.shutdown_tx.send(true);
68        let _ = self.join.await;
69    }
70}
71
72/// Bring up a SWIM failure detector.
73///
74/// * `cfg` — validated [`SwimConfig`]. An invalid config returns
75///   [`SwimError::InvalidConfig`] before any task is spawned.
76/// * `local_id` — this node's canonical id.
77/// * `local_addr` — the socket address the transport is already bound
78///   to. The membership list stores it verbatim for peers to echo back
79///   in probe responses.
80/// * `seeds` — initial peer addresses. Empty list is legal and yields a
81///   solo-cluster detector that does nothing interesting until a peer
82///   arrives via an external join.
83/// * `transport` — any [`Transport`] impl (UDP in production, the
84///   in-memory fabric in tests).
85pub async fn spawn(
86    cfg: SwimConfig,
87    local_id: NodeId,
88    local_addr: SocketAddr,
89    seeds: Vec<SocketAddr>,
90    transport: Arc<dyn Transport>,
91) -> Result<SwimHandle, SwimError> {
92    cfg.validate()?;
93
94    let membership = Arc::new(MembershipList::new_local(
95        local_id.clone(),
96        local_addr,
97        cfg.initial_incarnation,
98    ));
99
100    // Seed the membership table so the first probe round has somewhere
101    // to go. Placeholder ids are replaced on the first ack.
102    for seed_addr in &seeds {
103        if *seed_addr == local_addr {
104            continue;
105        }
106        membership.apply(&MemberUpdate {
107            node_id: NodeId::new(format!("seed:{seed_addr}")),
108            addr: seed_addr.to_string(),
109            state: MemberState::Alive,
110            incarnation: Incarnation::ZERO,
111        });
112    }
113
114    let initial_inc = cfg.initial_incarnation;
115    let detector = Arc::new(FailureDetector::new(
116        cfg,
117        Arc::clone(&membership),
118        transport,
119        ProbeScheduler::new(),
120    ));
121
122    // Prime the dissemination queue with our own Alive record so the
123    // first outgoing probes advertise our canonical NodeId + addr to
124    // every seed. Without this, seed placeholders would never be
125    // replaced with real ids until some peer independently learned
126    // our identity — which is not reliable from seed bootstrap alone.
127    detector.dissemination().enqueue(MemberUpdate {
128        node_id: local_id.clone(),
129        addr: local_addr.to_string(),
130        state: MemberState::Alive,
131        incarnation: initial_inc,
132    });
133
134    let (shutdown_tx, shutdown_rx) = watch::channel(false);
135    let join = tokio::spawn({
136        let detector = Arc::clone(&detector);
137        async move { detector.run(shutdown_rx).await }
138    });
139
140    Ok(SwimHandle {
141        detector,
142        membership,
143        shutdown_tx,
144        join,
145    })
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::swim::detector::TransportFabric;
152    use std::net::{IpAddr, Ipv4Addr};
153    use std::time::Duration;
154
155    fn addr(p: u16) -> SocketAddr {
156        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), p)
157    }
158
159    fn cfg() -> SwimConfig {
160        SwimConfig {
161            probe_interval: Duration::from_millis(100),
162            probe_timeout: Duration::from_millis(40),
163            indirect_probes: 2,
164            suspicion_mult: 4,
165            min_suspicion: Duration::from_millis(500),
166            initial_incarnation: Incarnation::ZERO,
167            max_piggyback: 6,
168            fanout_lambda: 3,
169        }
170    }
171
172    #[tokio::test]
173    async fn spawn_solo_cluster_has_only_local() {
174        let fab = TransportFabric::new();
175        let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7100)).await);
176        let handle = spawn(cfg(), NodeId::new("a"), addr(7100), vec![], transport)
177            .await
178            .expect("spawn");
179        assert_eq!(handle.membership().len(), 1);
180        assert!(handle.membership().is_solo());
181        handle.shutdown().await;
182    }
183
184    #[tokio::test]
185    async fn spawn_seeds_populates_membership() {
186        let fab = TransportFabric::new();
187        let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7110)).await);
188        let handle = spawn(
189            cfg(),
190            NodeId::new("a"),
191            addr(7110),
192            vec![addr(7111), addr(7112)],
193            transport,
194        )
195        .await
196        .expect("spawn");
197        assert_eq!(handle.membership().len(), 3);
198        handle.shutdown().await;
199    }
200
201    #[tokio::test]
202    async fn spawn_skips_local_addr_in_seeds() {
203        let fab = TransportFabric::new();
204        let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7120)).await);
205        let handle = spawn(
206            cfg(),
207            NodeId::new("a"),
208            addr(7120),
209            vec![addr(7120), addr(7121)],
210            transport,
211        )
212        .await
213        .expect("spawn");
214        // Local + one real seed = 2.
215        assert_eq!(handle.membership().len(), 2);
216        handle.shutdown().await;
217    }
218
219    #[tokio::test]
220    async fn invalid_config_rejected_before_task_spawned() {
221        let fab = TransportFabric::new();
222        let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7130)).await);
223        let mut bad = cfg();
224        bad.probe_timeout = bad.probe_interval; // violates the strict-less rule
225        let res = spawn(bad, NodeId::new("a"), addr(7130), vec![], transport).await;
226        match res {
227            Err(SwimError::InvalidConfig { .. }) => {}
228            Err(other) => panic!("expected InvalidConfig, got {other:?}"),
229            Ok(_) => panic!("expected InvalidConfig error"),
230        }
231    }
232
233    #[tokio::test]
234    async fn shutdown_joins_promptly() {
235        let fab = TransportFabric::new();
236        let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7140)).await);
237        let handle = spawn(cfg(), NodeId::new("a"), addr(7140), vec![], transport)
238            .await
239            .expect("spawn");
240        let start = std::time::Instant::now();
241        tokio::time::timeout(Duration::from_millis(500), handle.shutdown())
242            .await
243            .expect("shutdown did not join within budget");
244        assert!(start.elapsed() < Duration::from_millis(500));
245    }
246}