nodedb_cluster/swim/wire/
authenticated.rs1use std::net::SocketAddr;
41
42use sha2::{Digest, Sha256};
43
44use crate::rpc_codec::auth_envelope;
45use crate::rpc_codec::{MacKey, PeerSeqSender, PeerSeqWindow};
46use crate::swim::error::SwimError;
47
48use super::{codec, message::SwimMessage};
49
50pub fn addr_hash(addr: SocketAddr) -> u64 {
58 let mut h = Sha256::new();
59 h.update(addr.to_string().as_bytes());
60 let digest = h.finalize();
61 u64::from_le_bytes(digest[..8].try_into().expect("sha256 is 32 bytes"))
62}
63
64#[derive(Debug)]
68pub struct SwimAuth {
69 mac_key: MacKey,
70 local_addr_hash: u64,
71 seq_out: PeerSeqSender,
72 seq_in: PeerSeqWindow,
73}
74
75impl SwimAuth {
76 pub fn new(mac_key: MacKey, local_addr: SocketAddr) -> Self {
82 Self {
83 mac_key,
84 local_addr_hash: addr_hash(local_addr),
85 seq_out: PeerSeqSender::new(),
86 seq_in: PeerSeqWindow::new(),
87 }
88 }
89
90 pub fn local_addr_hash(&self) -> u64 {
93 self.local_addr_hash
94 }
95}
96
97pub fn wrap(auth: &SwimAuth, _to: SocketAddr, msg: &SwimMessage) -> Result<Vec<u8>, SwimError> {
104 let inner = codec::encode(msg)?;
105 let seq = auth.seq_out.next();
106 let mut out = Vec::with_capacity(auth_envelope::ENVELOPE_OVERHEAD + inner.len());
107 auth_envelope::write_envelope(auth.local_addr_hash, seq, &inner, &auth.mac_key, &mut out)
108 .map_err(|e| SwimError::Encode {
109 detail: format!("swim envelope: {e}"),
110 })?;
111 Ok(out)
112}
113
114pub fn unwrap(auth: &SwimAuth, from: SocketAddr, bytes: &[u8]) -> Result<SwimMessage, SwimError> {
118 let (fields, inner_frame) =
119 auth_envelope::parse_envelope(bytes, &auth.mac_key).map_err(|e| SwimError::Decode {
120 detail: format!("swim envelope: {e}"),
121 })?;
122
123 let expected = addr_hash(from);
124 if fields.from_node_id != expected {
125 return Err(SwimError::Decode {
126 detail: format!(
127 "swim envelope from {from} claimed addr_hash {}, observed hash {}",
128 fields.from_node_id, expected
129 ),
130 });
131 }
132
133 auth.seq_in
134 .accept(fields.from_node_id, fields.seq)
135 .map_err(|e| SwimError::Decode {
136 detail: format!("swim replay: {e}"),
137 })?;
138
139 codec::decode(inner_frame)
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use crate::swim::incarnation::Incarnation;
146 use crate::swim::wire::probe::{Ping, ProbeId};
147 use nodedb_types::NodeId;
148 use std::net::{IpAddr, Ipv4Addr};
149
150 fn addr(port: u16) -> SocketAddr {
151 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
152 }
153
154 fn sample_msg() -> SwimMessage {
155 SwimMessage::Ping(Ping {
156 probe_id: ProbeId::new(1),
157 from: NodeId::try_new("a").expect("test fixture"),
158 incarnation: Incarnation::ZERO,
159 piggyback: vec![],
160 })
161 }
162
163 #[test]
164 fn addr_hash_is_deterministic() {
165 assert_eq!(addr_hash(addr(7001)), addr_hash(addr(7001)));
169 assert_ne!(addr_hash(addr(7001)), addr_hash(addr(7002)));
170 }
171
172 #[test]
173 fn roundtrip_across_independent_endpoints() {
174 let key = MacKey::from_bytes([0x11u8; 32]);
178 let sender = SwimAuth::new(key.clone(), addr(7001));
179 let receiver = SwimAuth::new(key, addr(7002));
180 let bytes = wrap(&sender, addr(7002), &sample_msg()).unwrap();
181 let msg = unwrap(&receiver, addr(7001), &bytes).unwrap();
183 assert_eq!(msg, sample_msg());
184 }
185
186 #[test]
187 fn rejects_spoofed_source_address() {
188 let key = MacKey::from_bytes([0x33u8; 32]);
193 let real_sender = SwimAuth::new(key.clone(), addr(7001));
194 let receiver = SwimAuth::new(key, addr(7002));
195 let bytes = wrap(&real_sender, addr(7002), &sample_msg()).unwrap();
196 let err = unwrap(&receiver, addr(9999), &bytes).unwrap_err();
197 assert!(err.to_string().contains("addr_hash"));
198 }
199
200 #[test]
201 fn rejects_tampered_mac() {
202 let key = MacKey::from_bytes([3u8; 32]);
203 let sender = SwimAuth::new(key.clone(), addr(7001));
204 let receiver = SwimAuth::new(key, addr(7002));
205 let mut bytes = wrap(&sender, addr(7002), &sample_msg()).unwrap();
206 let mac_start = bytes.len() - 32;
207 bytes[mac_start] ^= 0xFF;
208 let err = unwrap(&receiver, addr(7001), &bytes).unwrap_err();
209 assert!(err.to_string().contains("MAC verification failed"));
210 }
211
212 #[test]
213 fn rejects_replay() {
214 let key = MacKey::from_bytes([4u8; 32]);
215 let sender = SwimAuth::new(key.clone(), addr(7001));
216 let receiver = SwimAuth::new(key, addr(7002));
217 let bytes = wrap(&sender, addr(7002), &sample_msg()).unwrap();
218 unwrap(&receiver, addr(7001), &bytes).unwrap();
219 let err = unwrap(&receiver, addr(7001), &bytes).unwrap_err();
220 assert!(err.to_string().contains("replayed"));
221 }
222
223 #[test]
224 fn rejects_wrong_cluster_key() {
225 let k1 = MacKey::from_bytes([1u8; 32]);
226 let k2 = MacKey::from_bytes([2u8; 32]);
227 let sender = SwimAuth::new(k1, addr(7001));
228 let receiver = SwimAuth::new(k2, addr(7002));
229 let bytes = wrap(&sender, addr(7002), &sample_msg()).unwrap();
230 let err = unwrap(&receiver, addr(7001), &bytes).unwrap_err();
231 assert!(err.to_string().contains("MAC verification failed"));
232 }
233}