Skip to main content

nodedb_cluster/swim/wire/
probe.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! SWIM probe message structs.
4//!
5//! These are the four datagram types the failure detector exchanges
6//! over the network. They are pure data types with `serde` derives —
7//! no I/O, no validation beyond what the type system enforces.
8//!
9//! ## Message flow (reference)
10//!
11//! ```text
12//!            ┌──────── Ping ───────┐
13//! sender A ──┤                     ├── target B
14//!            └──── Ack / timeout ──┘
15//!                       │
16//!                     (timeout)
17//!                       ▼
18//!            ┌──── PingReq ────┐
19//! sender A ──┤                 ├── helper C ──── Ping ───► target B
20//!            └─── Ack / Nack ──┘                           │
21//!                                   ◄─── Ack / timeout ────┘
22//! ```
23//!
24//! Every message carries a bounded `piggyback: Vec<MemberUpdate>` slot
25//! used for gossip-style dissemination of membership deltas.
26
27use nodedb_types::NodeId;
28use serde::{Deserialize, Serialize};
29
30use crate::swim::incarnation::Incarnation;
31use crate::swim::member::record::MemberUpdate;
32
33/// Monotonic per-sender probe identifier. Used to correlate `Ack`/`Nack`
34/// with the originating `Ping`/`PingReq`.
35#[derive(
36    Debug,
37    Clone,
38    Copy,
39    PartialEq,
40    Eq,
41    Hash,
42    PartialOrd,
43    Ord,
44    Serialize,
45    Deserialize,
46    zerompk::ToMessagePack,
47    zerompk::FromMessagePack,
48)]
49pub struct ProbeId(u64);
50
51impl ProbeId {
52    /// The smallest probe id. The first probe a sender emits after boot.
53    pub const ZERO: ProbeId = ProbeId(0);
54
55    /// Construct from the raw `u64`. Public for tests and decode paths.
56    pub const fn new(v: u64) -> Self {
57        Self(v)
58    }
59
60    /// Raw value.
61    pub const fn get(self) -> u64 {
62        self.0
63    }
64
65    /// Advance by one, saturating at `u64::MAX`. A sender that issued
66    /// 2^64 probes without restart would freeze at the max — SWIM does
67    /// not reuse probe ids within a single incarnation.
68    pub fn bump(self) -> Self {
69        ProbeId(self.0.saturating_add(1))
70    }
71}
72
73/// Why a helper returned `Nack` instead of a forwarded `Ack`.
74#[derive(
75    Debug,
76    Clone,
77    Copy,
78    PartialEq,
79    Eq,
80    Serialize,
81    Deserialize,
82    zerompk::ToMessagePack,
83    zerompk::FromMessagePack,
84)]
85pub enum NackReason {
86    /// Helper tried to contact the target and did not receive an ack
87    /// within its own probe timeout.
88    TargetUnreachable,
89    /// Helper already considers the target `Dead` or `Left`.
90    TargetDead,
91    /// Helper refused to forward the probe due to rate limiting.
92    RateLimited,
93}
94
95/// Direct probe. Sender A asks target B "are you alive?".
96#[derive(
97    Debug,
98    Clone,
99    PartialEq,
100    Eq,
101    Serialize,
102    Deserialize,
103    zerompk::ToMessagePack,
104    zerompk::FromMessagePack,
105)]
106pub struct Ping {
107    pub probe_id: ProbeId,
108    pub from: NodeId,
109    /// Sender's current incarnation. Receiver uses this for merge logic.
110    pub incarnation: Incarnation,
111    pub piggyback: Vec<MemberUpdate>,
112}
113
114/// Indirect probe. Sender A asks helper C to probe target B on A's
115/// behalf after A's direct ping to B timed out.
116#[derive(
117    Debug,
118    Clone,
119    PartialEq,
120    Eq,
121    Serialize,
122    Deserialize,
123    zerompk::ToMessagePack,
124    zerompk::FromMessagePack,
125)]
126pub struct PingReq {
127    pub probe_id: ProbeId,
128    pub from: NodeId,
129    pub target: NodeId,
130    /// Target's last-known socket address in string form (e.g.
131    /// `"10.0.0.7:7000"`). Stored as `String` because `SocketAddr` has no
132    /// zerompk impl; the helper parses before connecting.
133    pub target_addr: String,
134    pub piggyback: Vec<MemberUpdate>,
135}
136
137/// Positive response to a `Ping` or a helper-forwarded `PingReq`.
138#[derive(
139    Debug,
140    Clone,
141    PartialEq,
142    Eq,
143    Serialize,
144    Deserialize,
145    zerompk::ToMessagePack,
146    zerompk::FromMessagePack,
147)]
148pub struct Ack {
149    pub probe_id: ProbeId,
150    pub from: NodeId,
151    /// Responder's incarnation at the moment of ack. If the responder
152    /// refuted a self-`Suspect` rumour during this probe round, the
153    /// bumped incarnation is propagated here.
154    pub incarnation: Incarnation,
155    pub piggyback: Vec<MemberUpdate>,
156}
157
158/// Negative response from a helper that could not ack on behalf of the
159/// original target.
160#[derive(
161    Debug,
162    Clone,
163    PartialEq,
164    Eq,
165    Serialize,
166    Deserialize,
167    zerompk::ToMessagePack,
168    zerompk::FromMessagePack,
169)]
170pub struct Nack {
171    pub probe_id: ProbeId,
172    pub from: NodeId,
173    pub reason: NackReason,
174    pub piggyback: Vec<MemberUpdate>,
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    #[test]
182    fn probe_id_bump_is_monotonic() {
183        assert_eq!(ProbeId::ZERO.bump(), ProbeId::new(1));
184        assert_eq!(ProbeId::new(42).bump(), ProbeId::new(43));
185    }
186
187    #[test]
188    fn probe_id_saturates_at_u64_max() {
189        let max = ProbeId::new(u64::MAX);
190        assert_eq!(max.bump(), max);
191    }
192
193    #[test]
194    fn probe_id_total_order() {
195        assert!(ProbeId::new(1) < ProbeId::new(2));
196        assert!(ProbeId::ZERO < ProbeId::new(1));
197    }
198
199    #[test]
200    fn nack_reason_equality() {
201        assert_eq!(NackReason::TargetDead, NackReason::TargetDead);
202        assert_ne!(NackReason::TargetDead, NackReason::RateLimited);
203    }
204}