nodedb_cluster/swim/wire/
message.rs1use serde::{Deserialize, Serialize};
10
11use super::probe::{Ack, Nack, Ping, PingReq};
12use crate::swim::member::record::MemberUpdate;
13
14#[derive(
16 Debug,
17 Clone,
18 PartialEq,
19 Eq,
20 Serialize,
21 Deserialize,
22 zerompk::ToMessagePack,
23 zerompk::FromMessagePack,
24)]
25pub enum SwimMessage {
26 Ping(Ping),
27 PingReq(PingReq),
28 Ack(Ack),
29 Nack(Nack),
30}
31
32impl SwimMessage {
33 pub fn piggyback_mut(&mut self) -> &mut Vec<MemberUpdate> {
37 match self {
38 SwimMessage::Ping(m) => &mut m.piggyback,
39 SwimMessage::PingReq(m) => &mut m.piggyback,
40 SwimMessage::Ack(m) => &mut m.piggyback,
41 SwimMessage::Nack(m) => &mut m.piggyback,
42 }
43 }
44
45 pub fn piggyback(&self) -> &[MemberUpdate] {
47 match self {
48 SwimMessage::Ping(m) => &m.piggyback,
49 SwimMessage::PingReq(m) => &m.piggyback,
50 SwimMessage::Ack(m) => &m.piggyback,
51 SwimMessage::Nack(m) => &m.piggyback,
52 }
53 }
54
55 pub fn truncate_piggyback(&mut self, max: usize) {
60 let slot = self.piggyback_mut();
61 if slot.len() > max {
62 slot.truncate(max);
63 }
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use super::super::probe::{NackReason, ProbeId};
70 use super::*;
71 use crate::swim::incarnation::Incarnation;
72 use crate::swim::member::MemberState;
73 use nodedb_types::NodeId;
74 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
75
76 fn mk_update(id: &str) -> MemberUpdate {
77 MemberUpdate {
78 node_id: NodeId::new(id),
79 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7000).to_string(),
80 state: MemberState::Alive,
81 incarnation: Incarnation::ZERO,
82 }
83 }
84
85 fn ping_with_piggyback(n: usize) -> SwimMessage {
86 SwimMessage::Ping(Ping {
87 probe_id: ProbeId::new(1),
88 from: NodeId::new("a"),
89 incarnation: Incarnation::new(2),
90 piggyback: (0..n).map(|i| mk_update(&format!("n{i}"))).collect(),
91 })
92 }
93
94 #[test]
95 fn piggyback_accessor_returns_variant_slot() {
96 let msg = ping_with_piggyback(3);
97 assert_eq!(msg.piggyback().len(), 3);
98 }
99
100 #[test]
101 fn truncate_bounds_piggyback() {
102 let mut msg = ping_with_piggyback(10);
103 msg.truncate_piggyback(4);
104 assert_eq!(msg.piggyback().len(), 4);
105 }
106
107 #[test]
108 fn truncate_is_noop_when_under_limit() {
109 let mut msg = ping_with_piggyback(2);
110 msg.truncate_piggyback(16);
111 assert_eq!(msg.piggyback().len(), 2);
112 }
113
114 #[test]
115 fn piggyback_mut_accessor_for_every_variant() {
116 let mut variants: Vec<SwimMessage> = vec![
117 ping_with_piggyback(0),
118 SwimMessage::PingReq(PingReq {
119 probe_id: ProbeId::ZERO,
120 from: NodeId::new("a"),
121 target: NodeId::new("b"),
122 target_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7001).to_string(),
123 piggyback: vec![],
124 }),
125 SwimMessage::Ack(Ack {
126 probe_id: ProbeId::ZERO,
127 from: NodeId::new("b"),
128 incarnation: Incarnation::ZERO,
129 piggyback: vec![],
130 }),
131 SwimMessage::Nack(Nack {
132 probe_id: ProbeId::ZERO,
133 from: NodeId::new("c"),
134 reason: NackReason::TargetUnreachable,
135 piggyback: vec![],
136 }),
137 ];
138 for m in &mut variants {
139 m.piggyback_mut().push(mk_update("extra"));
140 assert_eq!(m.piggyback().len(), 1);
141 }
142 }
143}