Skip to main content

nodedb_cluster/swim/detector/
runner.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `FailureDetector` — the SWIM runtime task.
4//!
5//! One instance per node. Owns the membership list (shared via `Arc`),
6//! the probe scheduler, the suspicion timer, the inflight-probe registry,
7//! and the async transport. Drives a `tokio::select!` loop over four
8//! arms: probe tick, inbound datagram, suspicion expiry, shutdown.
9
10use std::net::SocketAddr;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13
14use tokio::sync::{Mutex, watch};
15use tokio::time::{Instant, interval};
16
17use crate::swim::config::SwimConfig;
18use crate::swim::dissemination::{DisseminationQueue, apply_and_disseminate};
19use crate::swim::error::SwimError;
20use crate::swim::incarnation::Incarnation;
21use crate::swim::member::MemberState;
22use crate::swim::member::record::MemberUpdate;
23use crate::swim::membership::{MembershipList, MergeOutcome};
24use crate::swim::subscriber::MembershipSubscriber;
25use crate::swim::wire::{Ack, Ping, PingReq, ProbeId, SwimMessage};
26
27use super::probe_round::{InflightProbes, ProbeOutcome, ProbeRound};
28use super::scheduler::ProbeScheduler;
29use super::suspicion::SuspicionTimer;
30use super::transport::Transport;
31
32/// Top-level failure detector handle.
33///
34/// Construct with [`FailureDetector::new`], then call
35/// [`FailureDetector::run`] on a dedicated tokio task. The run loop
36/// returns when `shutdown` flips to `true`.
37pub struct FailureDetector {
38    cfg: SwimConfig,
39    membership: Arc<MembershipList>,
40    transport: Arc<dyn Transport>,
41    scheduler: Mutex<ProbeScheduler>,
42    suspicion: Mutex<SuspicionTimer>,
43    inflight: Arc<InflightProbes>,
44    dissemination: Arc<DisseminationQueue>,
45    probe_counter: AtomicU64,
46    local_incarnation: Mutex<Incarnation>,
47    subscribers: Vec<Arc<dyn MembershipSubscriber>>,
48}
49
50impl FailureDetector {
51    /// Construct. Does not spawn anything — the caller is responsible
52    /// for driving [`Self::run`] on a tokio task.
53    pub fn new(
54        cfg: SwimConfig,
55        membership: Arc<MembershipList>,
56        transport: Arc<dyn Transport>,
57        scheduler: ProbeScheduler,
58    ) -> Self {
59        Self::with_subscribers(cfg, membership, transport, scheduler, Vec::new())
60    }
61
62    /// Construct with a list of [`MembershipSubscriber`]s that will be
63    /// notified on every member state transition.
64    pub fn with_subscribers(
65        cfg: SwimConfig,
66        membership: Arc<MembershipList>,
67        transport: Arc<dyn Transport>,
68        scheduler: ProbeScheduler,
69        subscribers: Vec<Arc<dyn MembershipSubscriber>>,
70    ) -> Self {
71        let initial_inc = cfg.initial_incarnation;
72        Self {
73            cfg,
74            membership,
75            transport,
76            scheduler: Mutex::new(scheduler),
77            suspicion: Mutex::new(SuspicionTimer::new()),
78            inflight: Arc::new(InflightProbes::new()),
79            dissemination: Arc::new(DisseminationQueue::new()),
80            probe_counter: AtomicU64::new(0),
81            local_incarnation: Mutex::new(initial_inc),
82            subscribers,
83        }
84    }
85
86    /// Apply an update via [`apply_and_disseminate`] while notifying
87    /// every subscriber of any resulting state transition. Returns the
88    /// raw [`MergeOutcome`] so callers can still react to
89    /// `SelfRefute` etc.
90    fn apply_and_notify(&self, update: &MemberUpdate) -> MergeOutcome {
91        let old_state = self.membership.get(&update.node_id).map(|m| m.state);
92        let outcome = apply_and_disseminate(&self.membership, &self.dissemination, update);
93        if self.subscribers.is_empty() {
94            return outcome;
95        }
96        let new_state = match self.membership.get(&update.node_id) {
97            Some(m) => m.state,
98            None => return outcome,
99        };
100        if old_state != Some(new_state) {
101            for sub in &self.subscribers {
102                sub.on_state_change(&update.node_id, old_state, new_state);
103            }
104        }
105        outcome
106    }
107
108    /// Shared reference to the dissemination queue. Tests use it to
109    /// enqueue synthetic rumours without constructing a full message.
110    pub fn dissemination(&self) -> &Arc<DisseminationQueue> {
111        &self.dissemination
112    }
113
114    /// Ingest every piggyback entry attached to an inbound datagram.
115    /// Applies each update to the membership list via
116    /// [`apply_and_disseminate`] and, on a self-refutation, bumps the
117    /// local incarnation so subsequent probes advertise the new value.
118    async fn ingest_piggyback(&self, piggyback: &[MemberUpdate]) {
119        for update in piggyback {
120            let outcome = self.apply_and_notify(update);
121            if let MergeOutcome::SelfRefute { new_incarnation } = outcome {
122                let mut guard = self.local_incarnation.lock().await;
123                if new_incarnation > *guard {
124                    *guard = new_incarnation;
125                }
126            }
127        }
128    }
129
130    /// Exposed for tests that need to route a synthetic message into the
131    /// inflight table without going through the transport.
132    #[cfg(test)]
133    pub fn inflight(&self) -> &Arc<InflightProbes> {
134        &self.inflight
135    }
136
137    fn next_probe_id(&self) -> ProbeId {
138        ProbeId::new(self.probe_counter.fetch_add(1, Ordering::Relaxed))
139    }
140
141    /// Main loop. Returns when `shutdown` receives `true`.
142    pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
143        let mut tick = interval(self.cfg.probe_interval);
144        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
145        // Consume the first immediate tick so the first probe aligns
146        // with a full interval from start.
147        tick.tick().await;
148        loop {
149            tokio::select! {
150                biased;
151                changed = shutdown.changed() => {
152                    if changed.is_ok() && *shutdown.borrow() {
153                        break;
154                    }
155                }
156                _ = tick.tick() => {
157                    self.on_tick().await;
158                }
159                recv = self.transport.recv() => {
160                    match recv {
161                        Ok((from_addr, msg)) => self.on_incoming(from_addr, msg).await,
162                        Err(SwimError::TransportClosed) => break,
163                        Err(_) => {}
164                    }
165                }
166            }
167        }
168    }
169
170    async fn on_tick(&self) {
171        // Expire suspect members that have waited out their timeout.
172        let now = Instant::now();
173        let expired = self.suspicion.lock().await.drain_expired(now);
174        for node_id in expired {
175            if let Some(member) = self.membership.get(&node_id) {
176                let dead_update = MemberUpdate {
177                    node_id: node_id.clone(),
178                    addr: member.addr.to_string(),
179                    state: MemberState::Dead,
180                    incarnation: member.incarnation,
181                };
182                self.apply_and_notify(&dead_update);
183            }
184        }
185
186        // Execute one probe round against the next target.
187        let local_inc = *self.local_incarnation.lock().await;
188        let mut sched = self.scheduler.lock().await;
189        let outcome = ProbeRound {
190            scheduler: &mut sched,
191            membership: &self.membership,
192            transport: &self.transport,
193            inflight: &self.inflight,
194            dissemination: &self.dissemination,
195            probe_timeout: self.cfg.probe_timeout,
196            k_indirect: self.cfg.indirect_probes as usize,
197            max_piggyback: self.cfg.max_piggyback,
198            fanout_lambda: self.cfg.fanout_lambda,
199            next_probe_id: || self.next_probe_id(),
200            local_incarnation: local_inc,
201        }
202        .execute()
203        .await;
204        drop(sched);
205
206        match outcome {
207            Ok(ProbeOutcome::Idle) | Ok(ProbeOutcome::Acked { .. }) => {}
208            Ok(ProbeOutcome::Suspect { target }) => {
209                if let Some(member) = self.membership.get(&target) {
210                    let suspect_update = MemberUpdate {
211                        node_id: target.clone(),
212                        addr: member.addr.to_string(),
213                        state: MemberState::Suspect,
214                        incarnation: member.incarnation,
215                    };
216                    self.apply_and_notify(&suspect_update);
217                    let cluster_size = self.membership.len();
218                    self.suspicion.lock().await.arm(
219                        target,
220                        Instant::now(),
221                        &self.cfg,
222                        cluster_size,
223                    );
224                }
225            }
226            Err(_) => {}
227        }
228    }
229
230    async fn on_incoming(&self, from_addr: SocketAddr, msg: SwimMessage) {
231        // Every datagram carries piggyback; ingest before dispatching so
232        // a self-refutation bump is reflected in the outgoing Ack below.
233        self.ingest_piggyback(msg.piggyback()).await;
234        match msg {
235            SwimMessage::Ping(ping) => self.handle_ping(from_addr, ping).await,
236            SwimMessage::PingReq(req) => self.handle_ping_req(from_addr, req).await,
237            SwimMessage::Ack(ack) => {
238                self.inflight
239                    .resolve(ack.probe_id, SwimMessage::Ack(ack))
240                    .await
241            }
242            SwimMessage::Nack(nack) => {
243                self.inflight
244                    .resolve(nack.probe_id, SwimMessage::Nack(nack))
245                    .await
246            }
247        }
248    }
249
250    async fn handle_ping(&self, from_addr: SocketAddr, ping: Ping) {
251        let local_inc = *self.local_incarnation.lock().await;
252        let fanout =
253            DisseminationQueue::fanout_threshold(self.membership.len(), self.cfg.fanout_lambda);
254        let ack = SwimMessage::Ack(Ack {
255            probe_id: ping.probe_id,
256            from: self.membership.local_node_id().clone(),
257            incarnation: local_inc,
258            piggyback: self
259                .dissemination
260                .take_for_message(self.cfg.max_piggyback, fanout),
261        });
262        let _ = self.transport.send(from_addr, ack).await;
263    }
264
265    async fn handle_ping_req(&self, requester_addr: SocketAddr, req: PingReq) {
266        let Ok(target_sock) = req.target_addr.parse::<SocketAddr>() else {
267            return;
268        };
269
270        // Register a nested probe id; when the forwarded ack arrives
271        // we rewrap it with the original probe id and relay to the
272        // requester. The relay runs on a dedicated task so the detector
273        // run-loop stays responsive.
274        let forward_id = self.next_probe_id();
275        let Ok(forward_rx) = self.inflight.register(forward_id).await else {
276            return;
277        };
278
279        let local_node = self.membership.local_node_id().clone();
280        let local_inc = *self.local_incarnation.lock().await;
281        let transport = Arc::clone(&self.transport);
282        let inflight = Arc::clone(&self.inflight);
283        let dissemination = Arc::clone(&self.dissemination);
284        let timeout_dur = self.cfg.probe_timeout;
285        let max_piggyback = self.cfg.max_piggyback;
286        let fanout =
287            DisseminationQueue::fanout_threshold(self.membership.len(), self.cfg.fanout_lambda);
288        let original_probe_id = req.probe_id;
289
290        tokio::spawn(async move {
291            let send_res = transport
292                .send(
293                    target_sock,
294                    SwimMessage::Ping(Ping {
295                        probe_id: forward_id,
296                        from: local_node.clone(),
297                        incarnation: local_inc,
298                        piggyback: dissemination.take_for_message(max_piggyback, fanout),
299                    }),
300                )
301                .await;
302            if send_res.is_err() {
303                inflight.forget(forward_id).await;
304                return;
305            }
306            match tokio::time::timeout(timeout_dur, forward_rx).await {
307                Ok(Ok(SwimMessage::Ack(ack))) => {
308                    let relay = SwimMessage::Ack(Ack {
309                        probe_id: original_probe_id,
310                        from: ack.from,
311                        incarnation: ack.incarnation,
312                        piggyback: dissemination.take_for_message(max_piggyback, fanout),
313                    });
314                    let _ = transport.send(requester_addr, relay).await;
315                }
316                _ => {
317                    inflight.forget(forward_id).await;
318                }
319            }
320        });
321    }
322
323    /// Refute a self-suspect rumour by bumping local incarnation and
324    /// rebroadcasting `Alive`. Exposed for tests that assert the
325    /// refutation machinery directly; the piggyback ingestor calls
326    /// the same underlying path automatically in production.
327    #[cfg(test)]
328    pub async fn bump_local_incarnation(&self, past: Incarnation) -> Incarnation {
329        let mut guard = self.local_incarnation.lock().await;
330        *guard = guard.refute(past);
331        *guard
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::swim::detector::transport::TransportFabric;
339    use crate::swim::member::MemberState;
340    use crate::swim::wire::ProbeId;
341    use nodedb_types::NodeId;
342    use std::net::{IpAddr, Ipv4Addr};
343    use std::time::Duration;
344
345    fn addr(p: u16) -> SocketAddr {
346        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), p)
347    }
348
349    fn cfg() -> SwimConfig {
350        SwimConfig {
351            probe_interval: Duration::from_millis(100),
352            probe_timeout: Duration::from_millis(40),
353            indirect_probes: 2,
354            suspicion_mult: 4,
355            min_suspicion: Duration::from_millis(500),
356            initial_incarnation: Incarnation::ZERO,
357            max_piggyback: 6,
358            fanout_lambda: 3,
359        }
360    }
361
362    async fn spawn_node(
363        fab: &Arc<TransportFabric>,
364        id: &str,
365        port: u16,
366        peers: &[(String, u16)],
367    ) -> (
368        Arc<FailureDetector>,
369        watch::Sender<bool>,
370        tokio::task::JoinHandle<()>,
371    ) {
372        let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(port)).await);
373        let list = Arc::new(MembershipList::new_local(
374            NodeId::try_new(id).expect("test fixture"),
375            addr(port),
376            Incarnation::ZERO,
377        ));
378        for (peer_id, peer_port) in peers {
379            list.apply(&MemberUpdate {
380                node_id: NodeId::try_new(peer_id.as_str()).expect("test fixture"),
381                addr: addr(*peer_port).to_string(),
382                state: MemberState::Alive,
383                incarnation: Incarnation::new(1),
384            });
385        }
386        let detector = Arc::new(FailureDetector::new(
387            cfg(),
388            list,
389            transport,
390            ProbeScheduler::with_seed(port as u64),
391        ));
392        let (tx, rx) = watch::channel(false);
393        let handle = tokio::spawn({
394            let det = Arc::clone(&detector);
395            async move { det.run(rx).await }
396        });
397        (detector, tx, handle)
398    }
399
400    #[tokio::test(start_paused = true)]
401    async fn three_node_mesh_converges_when_target_partitioned() {
402        let fab = TransportFabric::new();
403        let peers_of = |me: &str| {
404            ["a", "b", "c"]
405                .iter()
406                .filter(|p| **p != me)
407                .map(|p| {
408                    let port = match *p {
409                        "a" => 7010,
410                        "b" => 7011,
411                        "c" => 7012,
412                        _ => unreachable!(),
413                    };
414                    (p.to_string(), port)
415                })
416                .collect::<Vec<_>>()
417        };
418        let (det_a, sd_a, h_a) = spawn_node(&fab, "a", 7010, &peers_of("a")).await;
419        let (_det_b, sd_b, h_b) = spawn_node(&fab, "b", 7011, &peers_of("b")).await;
420        let (_det_c, sd_c, h_c) = spawn_node(&fab, "c", 7012, &peers_of("c")).await;
421
422        // Partition b from everything (both directions).
423        fab.drop_edge(addr(7010), addr(7011)).await;
424        fab.drop_edge(addr(7011), addr(7010)).await;
425        fab.drop_edge(addr(7012), addr(7011)).await;
426        fab.drop_edge(addr(7011), addr(7012)).await;
427
428        // Give the detector a few probe intervals to converge. Use
429        // advance() in a loop so timers, inflight probes, and suspicion
430        // expiry all get a chance to fire.
431        for _ in 0..30 {
432            tokio::time::advance(cfg().probe_interval).await;
433            tokio::task::yield_now().await;
434        }
435
436        // A's membership view must have marked b as Dead (Suspect →
437        // Dead after suspicion timeout).
438        let m = det_a
439            .membership
440            .get(&NodeId::try_new("b").expect("test fixture"))
441            .expect("b in list");
442        assert!(
443            matches!(m.state, MemberState::Suspect | MemberState::Dead),
444            "expected Suspect or Dead, got {:?}",
445            m.state
446        );
447
448        // Shutdown.
449        let _ = sd_a.send(true);
450        let _ = sd_b.send(true);
451        let _ = sd_c.send(true);
452        let _ = tokio::time::timeout(Duration::from_millis(200), h_a).await;
453        let _ = tokio::time::timeout(Duration::from_millis(200), h_b).await;
454        let _ = tokio::time::timeout(Duration::from_millis(200), h_c).await;
455    }
456
457    #[tokio::test(start_paused = true)]
458    async fn ping_triggers_ack_reply() {
459        let fab = TransportFabric::new();
460        let (_det_a, sd_a, h_a) = spawn_node(&fab, "a", 7020, &[]).await;
461        let probe_addr = addr(7021);
462        let probe_transport = Arc::new(fab.bind(probe_addr).await);
463
464        // Send a raw Ping from probe → a and wait for the Ack.
465        probe_transport
466            .send(
467                addr(7020),
468                SwimMessage::Ping(Ping {
469                    probe_id: ProbeId::new(42),
470                    from: NodeId::try_new("probe").expect("test fixture"),
471                    incarnation: Incarnation::ZERO,
472                    piggyback: vec![],
473                }),
474            )
475            .await
476            .unwrap();
477
478        // Let the detector's recv arm fire.
479        for _ in 0..5 {
480            tokio::task::yield_now().await;
481        }
482
483        let (from, msg) = tokio::time::timeout(Duration::from_millis(50), probe_transport.recv())
484            .await
485            .expect("recv did not time out")
486            .expect("recv");
487        assert_eq!(from, addr(7020));
488        match msg {
489            SwimMessage::Ack(ack) => assert_eq!(ack.probe_id, ProbeId::new(42)),
490            other => panic!("expected Ack, got {other:?}"),
491        }
492
493        let _ = sd_a.send(true);
494        let _ = tokio::time::timeout(Duration::from_millis(100), h_a).await;
495    }
496
497    #[tokio::test(start_paused = true)]
498    async fn shutdown_terminates_loop_promptly() {
499        let fab = TransportFabric::new();
500        let (_det_a, sd_a, h_a) = spawn_node(&fab, "a", 7030, &[]).await;
501        let _ = sd_a.send(true);
502        let joined = tokio::time::timeout(Duration::from_millis(100), h_a).await;
503        assert!(joined.is_ok(), "detector did not shut down in time");
504    }
505
506    #[tokio::test(start_paused = true)]
507    async fn bump_local_incarnation_is_monotonic() {
508        let fab = TransportFabric::new();
509        let (det_a, sd_a, h_a) = spawn_node(&fab, "a", 7040, &[]).await;
510        let bumped = det_a.bump_local_incarnation(Incarnation::new(5)).await;
511        assert!(bumped > Incarnation::new(5));
512        let _ = sd_a.send(true);
513        let _ = tokio::time::timeout(Duration::from_millis(100), h_a).await;
514    }
515
516    /// Enqueue a synthetic rumour about a never-probed peer on node A's
517    /// dissemination queue, then let the 3-node mesh run a few probe
518    /// rounds. Nodes B and C must observe the delta via piggyback.
519    #[tokio::test(start_paused = true)]
520    async fn piggyback_propagates_delta_to_peers() {
521        let fab = TransportFabric::new();
522        let peers_of = |me: &str| {
523            ["a", "b", "c"]
524                .iter()
525                .filter(|p| **p != me)
526                .map(|p| {
527                    let port = match *p {
528                        "a" => 7050,
529                        "b" => 7051,
530                        "c" => 7052,
531                        _ => unreachable!(),
532                    };
533                    (p.to_string(), port)
534                })
535                .collect::<Vec<_>>()
536        };
537        let (det_a, sd_a, h_a) = spawn_node(&fab, "a", 7050, &peers_of("a")).await;
538        let (det_b, sd_b, h_b) = spawn_node(&fab, "b", 7051, &peers_of("b")).await;
539        let (det_c, sd_c, h_c) = spawn_node(&fab, "c", 7052, &peers_of("c")).await;
540
541        // Synthetic rumour: "ghost" is an Alive peer A learned about
542        // out of band. It is NOT in B or C's membership initially.
543        det_a.dissemination().enqueue(MemberUpdate {
544            node_id: NodeId::try_new("ghost").expect("test fixture"),
545            addr: "127.0.0.1:9999".to_string(),
546            state: MemberState::Alive,
547            incarnation: Incarnation::new(1),
548        });
549        // A's list has to know about ghost too, otherwise the outgoing
550        // piggyback is still correct but there's nothing asserting the
551        // local state. Apply it now.
552        det_a.membership.apply(&MemberUpdate {
553            node_id: NodeId::try_new("ghost").expect("test fixture"),
554            addr: "127.0.0.1:9999".to_string(),
555            state: MemberState::Alive,
556            incarnation: Incarnation::new(1),
557        });
558
559        // Run enough probe rounds for gossip to reach B and C.
560        for _ in 0..20 {
561            tokio::time::advance(cfg().probe_interval).await;
562            tokio::task::yield_now().await;
563        }
564
565        assert!(
566            det_b
567                .membership
568                .get(&NodeId::try_new("ghost").expect("test fixture"))
569                .is_some(),
570            "B must learn about ghost via piggyback"
571        );
572        assert!(
573            det_c
574                .membership
575                .get(&NodeId::try_new("ghost").expect("test fixture"))
576                .is_some(),
577            "C must learn about ghost via piggyback"
578        );
579
580        let _ = sd_a.send(true);
581        let _ = sd_b.send(true);
582        let _ = sd_c.send(true);
583        let _ = tokio::time::timeout(Duration::from_millis(200), h_a).await;
584        let _ = tokio::time::timeout(Duration::from_millis(200), h_b).await;
585        let _ = tokio::time::timeout(Duration::from_millis(200), h_c).await;
586    }
587
588    /// A receives a Ping whose piggyback claims A is Suspect. A must
589    /// bump its own incarnation and enqueue an Alive refutation.
590    #[tokio::test(start_paused = true)]
591    async fn self_refute_bumps_incarnation_via_piggyback() {
592        let fab = TransportFabric::new();
593        let (det_a, sd_a, h_a) = spawn_node(&fab, "a", 7060, &[]).await;
594        let probe = Arc::new(fab.bind(addr(7061)).await);
595
596        // Send a ping whose piggyback suspects "a" at inc 7.
597        probe
598            .send(
599                addr(7060),
600                SwimMessage::Ping(Ping {
601                    probe_id: ProbeId::new(1),
602                    from: NodeId::try_new("probe").expect("test fixture"),
603                    incarnation: Incarnation::ZERO,
604                    piggyback: vec![MemberUpdate {
605                        node_id: NodeId::try_new("a").expect("test fixture"),
606                        addr: addr(7060).to_string(),
607                        state: MemberState::Suspect,
608                        incarnation: Incarnation::new(7),
609                    }],
610                }),
611            )
612            .await
613            .unwrap();
614
615        // Drain the Ack so the detector actually processes recv.
616        let (_from, _ack) = tokio::time::timeout(Duration::from_millis(50), probe.recv())
617            .await
618            .expect("did not time out")
619            .expect("recv");
620
621        // A's local incarnation must now be > 7.
622        let bumped = *det_a.local_incarnation.lock().await;
623        assert!(
624            bumped > Incarnation::new(7),
625            "local incarnation {bumped:?} did not refute rumoured Suspect(7)"
626        );
627        // A's membership view for itself is Alive at the bumped value.
628        let me = det_a
629            .membership
630            .get(&NodeId::try_new("a").expect("test fixture"))
631            .expect("self");
632        assert_eq!(me.state, MemberState::Alive);
633        assert!(me.incarnation > Incarnation::new(7));
634
635        let _ = sd_a.send(true);
636        let _ = tokio::time::timeout(Duration::from_millis(100), h_a).await;
637    }
638}