nodedb_cluster/swim/wire/
message.rs1use serde::{Deserialize, Serialize};
12
13use super::probe::{Ack, Nack, Ping, PingReq};
14use crate::swim::member::record::MemberUpdate;
15
16#[derive(
18 Debug,
19 Clone,
20 PartialEq,
21 Eq,
22 Serialize,
23 Deserialize,
24 zerompk::ToMessagePack,
25 zerompk::FromMessagePack,
26)]
27pub enum SwimMessage {
28 Ping(Ping),
29 PingReq(PingReq),
30 Ack(Ack),
31 Nack(Nack),
32}
33
34impl SwimMessage {
35 pub fn piggyback_mut(&mut self) -> &mut Vec<MemberUpdate> {
39 match self {
40 SwimMessage::Ping(m) => &mut m.piggyback,
41 SwimMessage::PingReq(m) => &mut m.piggyback,
42 SwimMessage::Ack(m) => &mut m.piggyback,
43 SwimMessage::Nack(m) => &mut m.piggyback,
44 }
45 }
46
47 pub fn piggyback(&self) -> &[MemberUpdate] {
49 match self {
50 SwimMessage::Ping(m) => &m.piggyback,
51 SwimMessage::PingReq(m) => &m.piggyback,
52 SwimMessage::Ack(m) => &m.piggyback,
53 SwimMessage::Nack(m) => &m.piggyback,
54 }
55 }
56
57 pub fn truncate_piggyback(&mut self, max: usize) {
62 let slot = self.piggyback_mut();
63 if slot.len() > max {
64 slot.truncate(max);
65 }
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::super::probe::{NackReason, ProbeId};
72 use super::*;
73 use crate::swim::incarnation::Incarnation;
74 use crate::swim::member::MemberState;
75 use nodedb_types::NodeId;
76 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
77
78 fn mk_update(id: &str) -> MemberUpdate {
79 MemberUpdate {
80 node_id: NodeId::try_new(id).expect("test fixture"),
81 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7000).to_string(),
82 state: MemberState::Alive,
83 incarnation: Incarnation::ZERO,
84 }
85 }
86
87 fn ping_with_piggyback(n: usize) -> SwimMessage {
88 SwimMessage::Ping(Ping {
89 probe_id: ProbeId::new(1),
90 from: NodeId::try_new("a").expect("test fixture"),
91 incarnation: Incarnation::new(2),
92 piggyback: (0..n).map(|i| mk_update(&format!("n{i}"))).collect(),
93 })
94 }
95
96 #[test]
97 fn piggyback_accessor_returns_variant_slot() {
98 let msg = ping_with_piggyback(3);
99 assert_eq!(msg.piggyback().len(), 3);
100 }
101
102 #[test]
103 fn truncate_bounds_piggyback() {
104 let mut msg = ping_with_piggyback(10);
105 msg.truncate_piggyback(4);
106 assert_eq!(msg.piggyback().len(), 4);
107 }
108
109 #[test]
110 fn truncate_is_noop_when_under_limit() {
111 let mut msg = ping_with_piggyback(2);
112 msg.truncate_piggyback(16);
113 assert_eq!(msg.piggyback().len(), 2);
114 }
115
116 #[test]
117 fn piggyback_mut_accessor_for_every_variant() {
118 let mut variants: Vec<SwimMessage> = vec![
119 ping_with_piggyback(0),
120 SwimMessage::PingReq(PingReq {
121 probe_id: ProbeId::ZERO,
122 from: NodeId::try_new("a").expect("test fixture"),
123 target: NodeId::try_new("b").expect("test fixture"),
124 target_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7001).to_string(),
125 piggyback: vec![],
126 }),
127 SwimMessage::Ack(Ack {
128 probe_id: ProbeId::ZERO,
129 from: NodeId::try_new("b").expect("test fixture"),
130 incarnation: Incarnation::ZERO,
131 piggyback: vec![],
132 }),
133 SwimMessage::Nack(Nack {
134 probe_id: ProbeId::ZERO,
135 from: NodeId::try_new("c").expect("test fixture"),
136 reason: NackReason::TargetUnreachable,
137 piggyback: vec![],
138 }),
139 ];
140 for m in &mut variants {
141 m.piggyback_mut().push(mk_update("extra"));
142 assert_eq!(m.piggyback().len(), 1);
143 }
144 }
145}