nodedb_cluster/swim/wire/
authenticated.rs1use std::net::SocketAddr;
39
40use sha2::{Digest, Sha256};
41
42use crate::rpc_codec::auth_envelope;
43use crate::rpc_codec::{MacKey, PeerSeqSender, PeerSeqWindow};
44use crate::swim::error::SwimError;
45
46use super::{codec, message::SwimMessage};
47
48pub fn addr_hash(addr: SocketAddr) -> u64 {
56 let mut h = Sha256::new();
57 h.update(addr.to_string().as_bytes());
58 let digest = h.finalize();
59 u64::from_le_bytes(digest[..8].try_into().expect("sha256 is 32 bytes"))
60}
61
62#[derive(Debug)]
66pub struct SwimAuth {
67 mac_key: MacKey,
68 local_addr_hash: u64,
69 seq_out: PeerSeqSender,
70 seq_in: PeerSeqWindow,
71}
72
73impl SwimAuth {
74 pub fn new(mac_key: MacKey, local_addr: SocketAddr) -> Self {
80 Self {
81 mac_key,
82 local_addr_hash: addr_hash(local_addr),
83 seq_out: PeerSeqSender::new(),
84 seq_in: PeerSeqWindow::new(),
85 }
86 }
87
88 pub fn local_addr_hash(&self) -> u64 {
91 self.local_addr_hash
92 }
93}
94
95pub fn wrap(auth: &SwimAuth, _to: SocketAddr, msg: &SwimMessage) -> Result<Vec<u8>, SwimError> {
102 let inner = codec::encode(msg)?;
103 let seq = auth.seq_out.next();
104 let mut out = Vec::with_capacity(auth_envelope::ENVELOPE_OVERHEAD + inner.len());
105 auth_envelope::write_envelope(auth.local_addr_hash, seq, &inner, &auth.mac_key, &mut out)
106 .map_err(|e| SwimError::Encode {
107 detail: format!("swim envelope: {e}"),
108 })?;
109 Ok(out)
110}
111
112pub fn unwrap(auth: &SwimAuth, from: SocketAddr, bytes: &[u8]) -> Result<SwimMessage, SwimError> {
116 let (fields, inner_frame) =
117 auth_envelope::parse_envelope(bytes, &auth.mac_key).map_err(|e| SwimError::Decode {
118 detail: format!("swim envelope: {e}"),
119 })?;
120
121 let expected = addr_hash(from);
122 if fields.from_node_id != expected {
123 return Err(SwimError::Decode {
124 detail: format!(
125 "swim envelope from {from} claimed addr_hash {}, observed hash {}",
126 fields.from_node_id, expected
127 ),
128 });
129 }
130
131 auth.seq_in
132 .accept(fields.from_node_id, fields.seq)
133 .map_err(|e| SwimError::Decode {
134 detail: format!("swim replay: {e}"),
135 })?;
136
137 codec::decode(inner_frame)
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use crate::swim::incarnation::Incarnation;
144 use crate::swim::wire::probe::{Ping, ProbeId};
145 use nodedb_types::NodeId;
146 use std::net::{IpAddr, Ipv4Addr};
147
148 fn addr(port: u16) -> SocketAddr {
149 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
150 }
151
152 fn sample_msg() -> SwimMessage {
153 SwimMessage::Ping(Ping {
154 probe_id: ProbeId::new(1),
155 from: NodeId::new("a"),
156 incarnation: Incarnation::ZERO,
157 piggyback: vec![],
158 })
159 }
160
161 #[test]
162 fn addr_hash_is_deterministic() {
163 assert_eq!(addr_hash(addr(7001)), addr_hash(addr(7001)));
167 assert_ne!(addr_hash(addr(7001)), addr_hash(addr(7002)));
168 }
169
170 #[test]
171 fn roundtrip_across_independent_endpoints() {
172 let key = MacKey::from_bytes([0x11u8; 32]);
176 let sender = SwimAuth::new(key.clone(), addr(7001));
177 let receiver = SwimAuth::new(key, addr(7002));
178 let bytes = wrap(&sender, addr(7002), &sample_msg()).unwrap();
179 let msg = unwrap(&receiver, addr(7001), &bytes).unwrap();
181 assert_eq!(msg, sample_msg());
182 }
183
184 #[test]
185 fn rejects_spoofed_source_address() {
186 let key = MacKey::from_bytes([0x33u8; 32]);
191 let real_sender = SwimAuth::new(key.clone(), addr(7001));
192 let receiver = SwimAuth::new(key, addr(7002));
193 let bytes = wrap(&real_sender, addr(7002), &sample_msg()).unwrap();
194 let err = unwrap(&receiver, addr(9999), &bytes).unwrap_err();
195 assert!(err.to_string().contains("addr_hash"));
196 }
197
198 #[test]
199 fn rejects_tampered_mac() {
200 let key = MacKey::from_bytes([3u8; 32]);
201 let sender = SwimAuth::new(key.clone(), addr(7001));
202 let receiver = SwimAuth::new(key, addr(7002));
203 let mut bytes = wrap(&sender, addr(7002), &sample_msg()).unwrap();
204 let mac_start = bytes.len() - 32;
205 bytes[mac_start] ^= 0xFF;
206 let err = unwrap(&receiver, addr(7001), &bytes).unwrap_err();
207 assert!(err.to_string().contains("MAC verification failed"));
208 }
209
210 #[test]
211 fn rejects_replay() {
212 let key = MacKey::from_bytes([4u8; 32]);
213 let sender = SwimAuth::new(key.clone(), addr(7001));
214 let receiver = SwimAuth::new(key, addr(7002));
215 let bytes = wrap(&sender, addr(7002), &sample_msg()).unwrap();
216 unwrap(&receiver, addr(7001), &bytes).unwrap();
217 let err = unwrap(&receiver, addr(7001), &bytes).unwrap_err();
218 assert!(err.to_string().contains("replayed"));
219 }
220
221 #[test]
222 fn rejects_wrong_cluster_key() {
223 let k1 = MacKey::from_bytes([1u8; 32]);
224 let k2 = MacKey::from_bytes([2u8; 32]);
225 let sender = SwimAuth::new(k1, addr(7001));
226 let receiver = SwimAuth::new(k2, addr(7002));
227 let bytes = wrap(&sender, addr(7002), &sample_msg()).unwrap();
228 let err = unwrap(&receiver, addr(7001), &bytes).unwrap_err();
229 assert!(err.to_string().contains("MAC verification failed"));
230 }
231}