1use std::net::SocketAddr;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13
14use tokio::sync::{Mutex, watch};
15use tokio::time::{Instant, interval};
16
17use crate::swim::config::SwimConfig;
18use crate::swim::dissemination::{DisseminationQueue, apply_and_disseminate};
19use crate::swim::error::SwimError;
20use crate::swim::incarnation::Incarnation;
21use crate::swim::member::MemberState;
22use crate::swim::member::record::MemberUpdate;
23use crate::swim::membership::{MembershipList, MergeOutcome};
24use crate::swim::subscriber::MembershipSubscriber;
25use crate::swim::wire::{Ack, Ping, PingReq, ProbeId, SwimMessage};
26
27use super::probe_round::{InflightProbes, ProbeOutcome, ProbeRound};
28use super::scheduler::ProbeScheduler;
29use super::suspicion::SuspicionTimer;
30use super::transport::Transport;
31
32pub struct FailureDetector {
38 cfg: SwimConfig,
39 membership: Arc<MembershipList>,
40 transport: Arc<dyn Transport>,
41 scheduler: Mutex<ProbeScheduler>,
42 suspicion: Mutex<SuspicionTimer>,
43 inflight: Arc<InflightProbes>,
44 dissemination: Arc<DisseminationQueue>,
45 probe_counter: AtomicU64,
46 local_incarnation: Mutex<Incarnation>,
47 subscribers: Vec<Arc<dyn MembershipSubscriber>>,
48}
49
50impl FailureDetector {
51 pub fn new(
54 cfg: SwimConfig,
55 membership: Arc<MembershipList>,
56 transport: Arc<dyn Transport>,
57 scheduler: ProbeScheduler,
58 ) -> Self {
59 Self::with_subscribers(cfg, membership, transport, scheduler, Vec::new())
60 }
61
62 pub fn with_subscribers(
65 cfg: SwimConfig,
66 membership: Arc<MembershipList>,
67 transport: Arc<dyn Transport>,
68 scheduler: ProbeScheduler,
69 subscribers: Vec<Arc<dyn MembershipSubscriber>>,
70 ) -> Self {
71 let initial_inc = cfg.initial_incarnation;
72 Self {
73 cfg,
74 membership,
75 transport,
76 scheduler: Mutex::new(scheduler),
77 suspicion: Mutex::new(SuspicionTimer::new()),
78 inflight: Arc::new(InflightProbes::new()),
79 dissemination: Arc::new(DisseminationQueue::new()),
80 probe_counter: AtomicU64::new(0),
81 local_incarnation: Mutex::new(initial_inc),
82 subscribers,
83 }
84 }
85
86 fn apply_and_notify(&self, update: &MemberUpdate) -> MergeOutcome {
91 let old_state = self.membership.get(&update.node_id).map(|m| m.state);
92 let outcome = apply_and_disseminate(&self.membership, &self.dissemination, update);
93 if self.subscribers.is_empty() {
94 return outcome;
95 }
96 let new_state = match self.membership.get(&update.node_id) {
97 Some(m) => m.state,
98 None => return outcome,
99 };
100 if old_state != Some(new_state) {
101 for sub in &self.subscribers {
102 sub.on_state_change(&update.node_id, old_state, new_state);
103 }
104 }
105 outcome
106 }
107
108 pub fn dissemination(&self) -> &Arc<DisseminationQueue> {
111 &self.dissemination
112 }
113
114 async fn ingest_piggyback(&self, piggyback: &[MemberUpdate]) {
119 for update in piggyback {
120 let outcome = self.apply_and_notify(update);
121 if let MergeOutcome::SelfRefute { new_incarnation } = outcome {
122 let mut guard = self.local_incarnation.lock().await;
123 if new_incarnation > *guard {
124 *guard = new_incarnation;
125 }
126 }
127 }
128 }
129
130 #[cfg(test)]
133 pub fn inflight(&self) -> &Arc<InflightProbes> {
134 &self.inflight
135 }
136
137 fn next_probe_id(&self) -> ProbeId {
138 ProbeId::new(self.probe_counter.fetch_add(1, Ordering::Relaxed))
139 }
140
141 pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
143 let mut tick = interval(self.cfg.probe_interval);
144 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
145 tick.tick().await;
148 loop {
149 tokio::select! {
150 biased;
151 changed = shutdown.changed() => {
152 if changed.is_ok() && *shutdown.borrow() {
153 break;
154 }
155 }
156 _ = tick.tick() => {
157 self.on_tick().await;
158 }
159 recv = self.transport.recv() => {
160 match recv {
161 Ok((from_addr, msg)) => self.on_incoming(from_addr, msg).await,
162 Err(SwimError::TransportClosed) => break,
163 Err(_) => {}
164 }
165 }
166 }
167 }
168 }
169
170 async fn on_tick(&self) {
171 let now = Instant::now();
173 let expired = self.suspicion.lock().await.drain_expired(now);
174 for node_id in expired {
175 if let Some(member) = self.membership.get(&node_id) {
176 let dead_update = MemberUpdate {
177 node_id: node_id.clone(),
178 addr: member.addr.to_string(),
179 state: MemberState::Dead,
180 incarnation: member.incarnation,
181 };
182 self.apply_and_notify(&dead_update);
183 }
184 }
185
186 let local_inc = *self.local_incarnation.lock().await;
188 let mut sched = self.scheduler.lock().await;
189 let outcome = ProbeRound {
190 scheduler: &mut sched,
191 membership: &self.membership,
192 transport: &self.transport,
193 inflight: &self.inflight,
194 dissemination: &self.dissemination,
195 probe_timeout: self.cfg.probe_timeout,
196 k_indirect: self.cfg.indirect_probes as usize,
197 max_piggyback: self.cfg.max_piggyback,
198 fanout_lambda: self.cfg.fanout_lambda,
199 next_probe_id: || self.next_probe_id(),
200 local_incarnation: local_inc,
201 }
202 .execute()
203 .await;
204 drop(sched);
205
206 match outcome {
207 Ok(ProbeOutcome::Idle) | Ok(ProbeOutcome::Acked { .. }) => {}
208 Ok(ProbeOutcome::Suspect { target }) => {
209 if let Some(member) = self.membership.get(&target) {
210 let suspect_update = MemberUpdate {
211 node_id: target.clone(),
212 addr: member.addr.to_string(),
213 state: MemberState::Suspect,
214 incarnation: member.incarnation,
215 };
216 self.apply_and_notify(&suspect_update);
217 let cluster_size = self.membership.len();
218 self.suspicion.lock().await.arm(
219 target,
220 Instant::now(),
221 &self.cfg,
222 cluster_size,
223 );
224 }
225 }
226 Err(_) => {}
227 }
228 }
229
230 async fn on_incoming(&self, from_addr: SocketAddr, msg: SwimMessage) {
231 self.ingest_piggyback(msg.piggyback()).await;
234 match msg {
235 SwimMessage::Ping(ping) => self.handle_ping(from_addr, ping).await,
236 SwimMessage::PingReq(req) => self.handle_ping_req(from_addr, req).await,
237 SwimMessage::Ack(ack) => {
238 self.inflight
239 .resolve(ack.probe_id, SwimMessage::Ack(ack))
240 .await
241 }
242 SwimMessage::Nack(nack) => {
243 self.inflight
244 .resolve(nack.probe_id, SwimMessage::Nack(nack))
245 .await
246 }
247 }
248 }
249
250 async fn handle_ping(&self, from_addr: SocketAddr, ping: Ping) {
251 let local_inc = *self.local_incarnation.lock().await;
252 let fanout =
253 DisseminationQueue::fanout_threshold(self.membership.len(), self.cfg.fanout_lambda);
254 let ack = SwimMessage::Ack(Ack {
255 probe_id: ping.probe_id,
256 from: self.membership.local_node_id().clone(),
257 incarnation: local_inc,
258 piggyback: self
259 .dissemination
260 .take_for_message(self.cfg.max_piggyback, fanout),
261 });
262 let _ = self.transport.send(from_addr, ack).await;
263 }
264
265 async fn handle_ping_req(&self, requester_addr: SocketAddr, req: PingReq) {
266 let Ok(target_sock) = req.target_addr.parse::<SocketAddr>() else {
267 return;
268 };
269
270 let forward_id = self.next_probe_id();
275 let Ok(forward_rx) = self.inflight.register(forward_id).await else {
276 return;
277 };
278
279 let local_node = self.membership.local_node_id().clone();
280 let local_inc = *self.local_incarnation.lock().await;
281 let transport = Arc::clone(&self.transport);
282 let inflight = Arc::clone(&self.inflight);
283 let dissemination = Arc::clone(&self.dissemination);
284 let timeout_dur = self.cfg.probe_timeout;
285 let max_piggyback = self.cfg.max_piggyback;
286 let fanout =
287 DisseminationQueue::fanout_threshold(self.membership.len(), self.cfg.fanout_lambda);
288 let original_probe_id = req.probe_id;
289
290 tokio::spawn(async move {
291 let send_res = transport
292 .send(
293 target_sock,
294 SwimMessage::Ping(Ping {
295 probe_id: forward_id,
296 from: local_node.clone(),
297 incarnation: local_inc,
298 piggyback: dissemination.take_for_message(max_piggyback, fanout),
299 }),
300 )
301 .await;
302 if send_res.is_err() {
303 inflight.forget(forward_id).await;
304 return;
305 }
306 match tokio::time::timeout(timeout_dur, forward_rx).await {
307 Ok(Ok(SwimMessage::Ack(ack))) => {
308 let relay = SwimMessage::Ack(Ack {
309 probe_id: original_probe_id,
310 from: ack.from,
311 incarnation: ack.incarnation,
312 piggyback: dissemination.take_for_message(max_piggyback, fanout),
313 });
314 let _ = transport.send(requester_addr, relay).await;
315 }
316 _ => {
317 inflight.forget(forward_id).await;
318 }
319 }
320 });
321 }
322
323 #[cfg(test)]
328 pub async fn bump_local_incarnation(&self, past: Incarnation) -> Incarnation {
329 let mut guard = self.local_incarnation.lock().await;
330 *guard = guard.refute(past);
331 *guard
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::swim::detector::transport::TransportFabric;
339 use crate::swim::member::MemberState;
340 use crate::swim::wire::ProbeId;
341 use nodedb_types::NodeId;
342 use std::net::{IpAddr, Ipv4Addr};
343 use std::time::Duration;
344
345 fn addr(p: u16) -> SocketAddr {
346 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), p)
347 }
348
349 fn cfg() -> SwimConfig {
350 SwimConfig {
351 probe_interval: Duration::from_millis(100),
352 probe_timeout: Duration::from_millis(40),
353 indirect_probes: 2,
354 suspicion_mult: 4,
355 min_suspicion: Duration::from_millis(500),
356 initial_incarnation: Incarnation::ZERO,
357 max_piggyback: 6,
358 fanout_lambda: 3,
359 }
360 }
361
362 async fn spawn_node(
363 fab: &Arc<TransportFabric>,
364 id: &str,
365 port: u16,
366 peers: &[(String, u16)],
367 ) -> (
368 Arc<FailureDetector>,
369 watch::Sender<bool>,
370 tokio::task::JoinHandle<()>,
371 ) {
372 let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(port)).await);
373 let list = Arc::new(MembershipList::new_local(
374 NodeId::try_new(id).expect("test fixture"),
375 addr(port),
376 Incarnation::ZERO,
377 ));
378 for (peer_id, peer_port) in peers {
379 list.apply(&MemberUpdate {
380 node_id: NodeId::try_new(peer_id.as_str()).expect("test fixture"),
381 addr: addr(*peer_port).to_string(),
382 state: MemberState::Alive,
383 incarnation: Incarnation::new(1),
384 });
385 }
386 let detector = Arc::new(FailureDetector::new(
387 cfg(),
388 list,
389 transport,
390 ProbeScheduler::with_seed(port as u64),
391 ));
392 let (tx, rx) = watch::channel(false);
393 let handle = tokio::spawn({
394 let det = Arc::clone(&detector);
395 async move { det.run(rx).await }
396 });
397 (detector, tx, handle)
398 }
399
400 #[tokio::test(start_paused = true)]
401 async fn three_node_mesh_converges_when_target_partitioned() {
402 let fab = TransportFabric::new();
403 let peers_of = |me: &str| {
404 ["a", "b", "c"]
405 .iter()
406 .filter(|p| **p != me)
407 .map(|p| {
408 let port = match *p {
409 "a" => 7010,
410 "b" => 7011,
411 "c" => 7012,
412 _ => unreachable!(),
413 };
414 (p.to_string(), port)
415 })
416 .collect::<Vec<_>>()
417 };
418 let (det_a, sd_a, h_a) = spawn_node(&fab, "a", 7010, &peers_of("a")).await;
419 let (_det_b, sd_b, h_b) = spawn_node(&fab, "b", 7011, &peers_of("b")).await;
420 let (_det_c, sd_c, h_c) = spawn_node(&fab, "c", 7012, &peers_of("c")).await;
421
422 fab.drop_edge(addr(7010), addr(7011)).await;
424 fab.drop_edge(addr(7011), addr(7010)).await;
425 fab.drop_edge(addr(7012), addr(7011)).await;
426 fab.drop_edge(addr(7011), addr(7012)).await;
427
428 for _ in 0..30 {
432 tokio::time::advance(cfg().probe_interval).await;
433 tokio::task::yield_now().await;
434 }
435
436 let m = det_a
439 .membership
440 .get(&NodeId::try_new("b").expect("test fixture"))
441 .expect("b in list");
442 assert!(
443 matches!(m.state, MemberState::Suspect | MemberState::Dead),
444 "expected Suspect or Dead, got {:?}",
445 m.state
446 );
447
448 let _ = sd_a.send(true);
450 let _ = sd_b.send(true);
451 let _ = sd_c.send(true);
452 let _ = tokio::time::timeout(Duration::from_millis(200), h_a).await;
453 let _ = tokio::time::timeout(Duration::from_millis(200), h_b).await;
454 let _ = tokio::time::timeout(Duration::from_millis(200), h_c).await;
455 }
456
457 #[tokio::test(start_paused = true)]
458 async fn ping_triggers_ack_reply() {
459 let fab = TransportFabric::new();
460 let (_det_a, sd_a, h_a) = spawn_node(&fab, "a", 7020, &[]).await;
461 let probe_addr = addr(7021);
462 let probe_transport = Arc::new(fab.bind(probe_addr).await);
463
464 probe_transport
466 .send(
467 addr(7020),
468 SwimMessage::Ping(Ping {
469 probe_id: ProbeId::new(42),
470 from: NodeId::try_new("probe").expect("test fixture"),
471 incarnation: Incarnation::ZERO,
472 piggyback: vec![],
473 }),
474 )
475 .await
476 .unwrap();
477
478 for _ in 0..5 {
480 tokio::task::yield_now().await;
481 }
482
483 let (from, msg) = tokio::time::timeout(Duration::from_millis(50), probe_transport.recv())
484 .await
485 .expect("recv did not time out")
486 .expect("recv");
487 assert_eq!(from, addr(7020));
488 match msg {
489 SwimMessage::Ack(ack) => assert_eq!(ack.probe_id, ProbeId::new(42)),
490 other => panic!("expected Ack, got {other:?}"),
491 }
492
493 let _ = sd_a.send(true);
494 let _ = tokio::time::timeout(Duration::from_millis(100), h_a).await;
495 }
496
497 #[tokio::test(start_paused = true)]
498 async fn shutdown_terminates_loop_promptly() {
499 let fab = TransportFabric::new();
500 let (_det_a, sd_a, h_a) = spawn_node(&fab, "a", 7030, &[]).await;
501 let _ = sd_a.send(true);
502 let joined = tokio::time::timeout(Duration::from_millis(100), h_a).await;
503 assert!(joined.is_ok(), "detector did not shut down in time");
504 }
505
506 #[tokio::test(start_paused = true)]
507 async fn bump_local_incarnation_is_monotonic() {
508 let fab = TransportFabric::new();
509 let (det_a, sd_a, h_a) = spawn_node(&fab, "a", 7040, &[]).await;
510 let bumped = det_a.bump_local_incarnation(Incarnation::new(5)).await;
511 assert!(bumped > Incarnation::new(5));
512 let _ = sd_a.send(true);
513 let _ = tokio::time::timeout(Duration::from_millis(100), h_a).await;
514 }
515
516 #[tokio::test(start_paused = true)]
520 async fn piggyback_propagates_delta_to_peers() {
521 let fab = TransportFabric::new();
522 let peers_of = |me: &str| {
523 ["a", "b", "c"]
524 .iter()
525 .filter(|p| **p != me)
526 .map(|p| {
527 let port = match *p {
528 "a" => 7050,
529 "b" => 7051,
530 "c" => 7052,
531 _ => unreachable!(),
532 };
533 (p.to_string(), port)
534 })
535 .collect::<Vec<_>>()
536 };
537 let (det_a, sd_a, h_a) = spawn_node(&fab, "a", 7050, &peers_of("a")).await;
538 let (det_b, sd_b, h_b) = spawn_node(&fab, "b", 7051, &peers_of("b")).await;
539 let (det_c, sd_c, h_c) = spawn_node(&fab, "c", 7052, &peers_of("c")).await;
540
541 det_a.dissemination().enqueue(MemberUpdate {
544 node_id: NodeId::try_new("ghost").expect("test fixture"),
545 addr: "127.0.0.1:9999".to_string(),
546 state: MemberState::Alive,
547 incarnation: Incarnation::new(1),
548 });
549 det_a.membership.apply(&MemberUpdate {
553 node_id: NodeId::try_new("ghost").expect("test fixture"),
554 addr: "127.0.0.1:9999".to_string(),
555 state: MemberState::Alive,
556 incarnation: Incarnation::new(1),
557 });
558
559 for _ in 0..20 {
561 tokio::time::advance(cfg().probe_interval).await;
562 tokio::task::yield_now().await;
563 }
564
565 assert!(
566 det_b
567 .membership
568 .get(&NodeId::try_new("ghost").expect("test fixture"))
569 .is_some(),
570 "B must learn about ghost via piggyback"
571 );
572 assert!(
573 det_c
574 .membership
575 .get(&NodeId::try_new("ghost").expect("test fixture"))
576 .is_some(),
577 "C must learn about ghost via piggyback"
578 );
579
580 let _ = sd_a.send(true);
581 let _ = sd_b.send(true);
582 let _ = sd_c.send(true);
583 let _ = tokio::time::timeout(Duration::from_millis(200), h_a).await;
584 let _ = tokio::time::timeout(Duration::from_millis(200), h_b).await;
585 let _ = tokio::time::timeout(Duration::from_millis(200), h_c).await;
586 }
587
588 #[tokio::test(start_paused = true)]
591 async fn self_refute_bumps_incarnation_via_piggyback() {
592 let fab = TransportFabric::new();
593 let (det_a, sd_a, h_a) = spawn_node(&fab, "a", 7060, &[]).await;
594 let probe = Arc::new(fab.bind(addr(7061)).await);
595
596 probe
598 .send(
599 addr(7060),
600 SwimMessage::Ping(Ping {
601 probe_id: ProbeId::new(1),
602 from: NodeId::try_new("probe").expect("test fixture"),
603 incarnation: Incarnation::ZERO,
604 piggyback: vec![MemberUpdate {
605 node_id: NodeId::try_new("a").expect("test fixture"),
606 addr: addr(7060).to_string(),
607 state: MemberState::Suspect,
608 incarnation: Incarnation::new(7),
609 }],
610 }),
611 )
612 .await
613 .unwrap();
614
615 let (_from, _ack) = tokio::time::timeout(Duration::from_millis(50), probe.recv())
617 .await
618 .expect("did not time out")
619 .expect("recv");
620
621 let bumped = *det_a.local_incarnation.lock().await;
623 assert!(
624 bumped > Incarnation::new(7),
625 "local incarnation {bumped:?} did not refute rumoured Suspect(7)"
626 );
627 let me = det_a
629 .membership
630 .get(&NodeId::try_new("a").expect("test fixture"))
631 .expect("self");
632 assert_eq!(me.state, MemberState::Alive);
633 assert!(me.incarnation > Incarnation::new(7));
634
635 let _ = sd_a.send(true);
636 let _ = tokio::time::timeout(Duration::from_millis(100), h_a).await;
637 }
638}