nodedb_cluster/swim/wire/probe.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! SWIM probe message structs.
4//!
5//! These are the four datagram types the failure detector exchanges
6//! over the network. They are pure data types with `serde` derives —
7//! no I/O, no validation beyond what the type system enforces.
8//!
9//! ## Message flow (reference)
10//!
11//! ```text
12//! ┌──────── Ping ───────┐
13//! sender A ──┤ ├── target B
14//! └──── Ack / timeout ──┘
15//! │
16//! (timeout)
17//! ▼
18//! ┌──── PingReq ────┐
19//! sender A ──┤ ├── helper C ──── Ping ───► target B
20//! └─── Ack / Nack ──┘ │
21//! ◄─── Ack / timeout ────┘
22//! ```
23//!
24//! Every message carries a bounded `piggyback: Vec<MemberUpdate>` slot
25//! used for gossip-style dissemination of membership deltas.
26
27use nodedb_types::NodeId;
28use serde::{Deserialize, Serialize};
29
30use crate::swim::incarnation::Incarnation;
31use crate::swim::member::record::MemberUpdate;
32
33/// Monotonic per-sender probe identifier. Used to correlate `Ack`/`Nack`
34/// with the originating `Ping`/`PingReq`.
35#[derive(
36 Debug,
37 Clone,
38 Copy,
39 PartialEq,
40 Eq,
41 Hash,
42 PartialOrd,
43 Ord,
44 Serialize,
45 Deserialize,
46 zerompk::ToMessagePack,
47 zerompk::FromMessagePack,
48)]
49pub struct ProbeId(u64);
50
51impl ProbeId {
52 /// The smallest probe id. The first probe a sender emits after boot.
53 pub const ZERO: ProbeId = ProbeId(0);
54
55 /// Construct from the raw `u64`. Public for tests and decode paths.
56 pub const fn new(v: u64) -> Self {
57 Self(v)
58 }
59
60 /// Raw value.
61 pub const fn get(self) -> u64 {
62 self.0
63 }
64
65 /// Advance by one, saturating at `u64::MAX`. A sender that issued
66 /// 2^64 probes without restart would freeze at the max — SWIM does
67 /// not reuse probe ids within a single incarnation.
68 pub fn bump(self) -> Self {
69 ProbeId(self.0.saturating_add(1))
70 }
71}
72
73/// Why a helper returned `Nack` instead of a forwarded `Ack`.
74#[derive(
75 Debug,
76 Clone,
77 Copy,
78 PartialEq,
79 Eq,
80 Serialize,
81 Deserialize,
82 zerompk::ToMessagePack,
83 zerompk::FromMessagePack,
84)]
85pub enum NackReason {
86 /// Helper tried to contact the target and did not receive an ack
87 /// within its own probe timeout.
88 TargetUnreachable,
89 /// Helper already considers the target `Dead` or `Left`.
90 TargetDead,
91 /// Helper refused to forward the probe due to rate limiting.
92 RateLimited,
93}
94
95/// Direct probe. Sender A asks target B "are you alive?".
96#[derive(
97 Debug,
98 Clone,
99 PartialEq,
100 Eq,
101 Serialize,
102 Deserialize,
103 zerompk::ToMessagePack,
104 zerompk::FromMessagePack,
105)]
106pub struct Ping {
107 pub probe_id: ProbeId,
108 pub from: NodeId,
109 /// Sender's current incarnation. Receiver uses this for merge logic.
110 pub incarnation: Incarnation,
111 pub piggyback: Vec<MemberUpdate>,
112}
113
114/// Indirect probe. Sender A asks helper C to probe target B on A's
115/// behalf after A's direct ping to B timed out.
116#[derive(
117 Debug,
118 Clone,
119 PartialEq,
120 Eq,
121 Serialize,
122 Deserialize,
123 zerompk::ToMessagePack,
124 zerompk::FromMessagePack,
125)]
126pub struct PingReq {
127 pub probe_id: ProbeId,
128 pub from: NodeId,
129 pub target: NodeId,
130 /// Target's last-known socket address in string form (e.g.
131 /// `"10.0.0.7:7000"`). Stored as `String` because `SocketAddr` has no
132 /// zerompk impl; the helper parses before connecting.
133 pub target_addr: String,
134 pub piggyback: Vec<MemberUpdate>,
135}
136
137/// Positive response to a `Ping` or a helper-forwarded `PingReq`.
138#[derive(
139 Debug,
140 Clone,
141 PartialEq,
142 Eq,
143 Serialize,
144 Deserialize,
145 zerompk::ToMessagePack,
146 zerompk::FromMessagePack,
147)]
148pub struct Ack {
149 pub probe_id: ProbeId,
150 pub from: NodeId,
151 /// Responder's incarnation at the moment of ack. If the responder
152 /// refuted a self-`Suspect` rumour during this probe round, the
153 /// bumped incarnation is propagated here.
154 pub incarnation: Incarnation,
155 pub piggyback: Vec<MemberUpdate>,
156}
157
158/// Negative response from a helper that could not ack on behalf of the
159/// original target.
160#[derive(
161 Debug,
162 Clone,
163 PartialEq,
164 Eq,
165 Serialize,
166 Deserialize,
167 zerompk::ToMessagePack,
168 zerompk::FromMessagePack,
169)]
170pub struct Nack {
171 pub probe_id: ProbeId,
172 pub from: NodeId,
173 pub reason: NackReason,
174 pub piggyback: Vec<MemberUpdate>,
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
182 fn probe_id_bump_is_monotonic() {
183 assert_eq!(ProbeId::ZERO.bump(), ProbeId::new(1));
184 assert_eq!(ProbeId::new(42).bump(), ProbeId::new(43));
185 }
186
187 #[test]
188 fn probe_id_saturates_at_u64_max() {
189 let max = ProbeId::new(u64::MAX);
190 assert_eq!(max.bump(), max);
191 }
192
193 #[test]
194 fn probe_id_total_order() {
195 assert!(ProbeId::new(1) < ProbeId::new(2));
196 assert!(ProbeId::ZERO < ProbeId::new(1));
197 }
198
199 #[test]
200 fn nack_reason_equality() {
201 assert_eq!(NackReason::TargetDead, NackReason::TargetDead);
202 assert_ne!(NackReason::TargetDead, NackReason::RateLimited);
203 }
204}