1use std::io::{self, Read};
7use std::net::SocketAddr;
8
9use bytes::{Buf, BufMut, Bytes, BytesMut};
10
11use crate::{NodeId, SlotRange};
12
13#[derive(Debug, Clone, PartialEq)]
15pub enum GossipMessage {
16 Ping {
18 seq: u64,
19 sender: NodeId,
20 updates: Vec<NodeUpdate>,
22 },
23
24 PingReq {
26 seq: u64,
27 sender: NodeId,
28 target: NodeId,
29 target_addr: SocketAddr,
30 },
31
32 Ack {
34 seq: u64,
35 sender: NodeId,
36 updates: Vec<NodeUpdate>,
38 },
39
40 Join {
42 sender: NodeId,
43 sender_addr: SocketAddr,
44 },
45
46 Welcome {
48 sender: NodeId,
49 members: Vec<MemberInfo>,
50 },
51}
52
53#[derive(Debug, Clone, PartialEq)]
55pub enum NodeUpdate {
56 Alive {
58 node: NodeId,
59 addr: SocketAddr,
60 incarnation: u64,
61 },
62 Suspect { node: NodeId, incarnation: u64 },
64 Dead { node: NodeId, incarnation: u64 },
66 Left { node: NodeId },
68}
69
70#[derive(Debug, Clone, PartialEq)]
72pub struct MemberInfo {
73 pub id: NodeId,
74 pub addr: SocketAddr,
75 pub incarnation: u64,
76 pub is_primary: bool,
77 pub slots: Vec<SlotRange>,
78}
79
80const MSG_PING: u8 = 1;
82const MSG_PING_REQ: u8 = 2;
83const MSG_ACK: u8 = 3;
84const MSG_JOIN: u8 = 4;
85const MSG_WELCOME: u8 = 5;
86
87const UPDATE_ALIVE: u8 = 1;
88const UPDATE_SUSPECT: u8 = 2;
89const UPDATE_DEAD: u8 = 3;
90const UPDATE_LEFT: u8 = 4;
91
92impl GossipMessage {
93 pub fn encode(&self) -> Bytes {
95 let mut buf = BytesMut::with_capacity(256);
96 self.encode_into(&mut buf);
97 buf.freeze()
98 }
99
100 pub fn encode_into(&self, buf: &mut BytesMut) {
102 match self {
103 GossipMessage::Ping {
104 seq,
105 sender,
106 updates,
107 } => {
108 buf.put_u8(MSG_PING);
109 buf.put_u64_le(*seq);
110 encode_node_id(buf, sender);
111 encode_updates(buf, updates);
112 }
113 GossipMessage::PingReq {
114 seq,
115 sender,
116 target,
117 target_addr,
118 } => {
119 buf.put_u8(MSG_PING_REQ);
120 buf.put_u64_le(*seq);
121 encode_node_id(buf, sender);
122 encode_node_id(buf, target);
123 encode_socket_addr(buf, target_addr);
124 }
125 GossipMessage::Ack {
126 seq,
127 sender,
128 updates,
129 } => {
130 buf.put_u8(MSG_ACK);
131 buf.put_u64_le(*seq);
132 encode_node_id(buf, sender);
133 encode_updates(buf, updates);
134 }
135 GossipMessage::Join {
136 sender,
137 sender_addr,
138 } => {
139 buf.put_u8(MSG_JOIN);
140 encode_node_id(buf, sender);
141 encode_socket_addr(buf, sender_addr);
142 }
143 GossipMessage::Welcome { sender, members } => {
144 buf.put_u8(MSG_WELCOME);
145 encode_node_id(buf, sender);
146 buf.put_u16_le(members.len() as u16);
147 for member in members {
148 encode_member_info(buf, member);
149 }
150 }
151 }
152 }
153
154 pub fn decode(mut buf: &[u8]) -> io::Result<Self> {
156 if buf.is_empty() {
157 return Err(io::Error::new(
158 io::ErrorKind::UnexpectedEof,
159 "empty message",
160 ));
161 }
162
163 let msg_type = buf.get_u8();
164 match msg_type {
165 MSG_PING => {
166 let seq = buf.get_u64_le();
167 let sender = decode_node_id(&mut buf)?;
168 let updates = decode_updates(&mut buf)?;
169 Ok(GossipMessage::Ping {
170 seq,
171 sender,
172 updates,
173 })
174 }
175 MSG_PING_REQ => {
176 let seq = buf.get_u64_le();
177 let sender = decode_node_id(&mut buf)?;
178 let target = decode_node_id(&mut buf)?;
179 let target_addr = decode_socket_addr(&mut buf)?;
180 Ok(GossipMessage::PingReq {
181 seq,
182 sender,
183 target,
184 target_addr,
185 })
186 }
187 MSG_ACK => {
188 let seq = buf.get_u64_le();
189 let sender = decode_node_id(&mut buf)?;
190 let updates = decode_updates(&mut buf)?;
191 Ok(GossipMessage::Ack {
192 seq,
193 sender,
194 updates,
195 })
196 }
197 MSG_JOIN => {
198 let sender = decode_node_id(&mut buf)?;
199 let sender_addr = decode_socket_addr(&mut buf)?;
200 Ok(GossipMessage::Join {
201 sender,
202 sender_addr,
203 })
204 }
205 MSG_WELCOME => {
206 let sender = decode_node_id(&mut buf)?;
207 let count = buf.get_u16_le() as usize;
208 let mut members = Vec::with_capacity(count);
209 for _ in 0..count {
210 members.push(decode_member_info(&mut buf)?);
211 }
212 Ok(GossipMessage::Welcome { sender, members })
213 }
214 other => Err(io::Error::new(
215 io::ErrorKind::InvalidData,
216 format!("unknown message type: {other}"),
217 )),
218 }
219 }
220}
221
222fn encode_node_id(buf: &mut BytesMut, id: &NodeId) {
223 buf.put_slice(id.0.as_bytes());
224}
225
226fn decode_node_id(buf: &mut &[u8]) -> io::Result<NodeId> {
227 if buf.len() < 16 {
228 return Err(io::Error::new(
229 io::ErrorKind::UnexpectedEof,
230 "not enough bytes for node id",
231 ));
232 }
233 let mut bytes = [0u8; 16];
234 buf.read_exact(&mut bytes)?;
235 Ok(NodeId(uuid::Uuid::from_bytes(bytes)))
236}
237
238fn encode_socket_addr(buf: &mut BytesMut, addr: &SocketAddr) {
239 match addr {
240 SocketAddr::V4(v4) => {
241 buf.put_u8(4);
242 buf.put_slice(&v4.ip().octets());
243 buf.put_u16_le(v4.port());
244 }
245 SocketAddr::V6(v6) => {
246 buf.put_u8(6);
247 buf.put_slice(&v6.ip().octets());
248 buf.put_u16_le(v6.port());
249 }
250 }
251}
252
253fn decode_socket_addr(buf: &mut &[u8]) -> io::Result<SocketAddr> {
254 if buf.is_empty() {
255 return Err(io::Error::new(
256 io::ErrorKind::UnexpectedEof,
257 "not enough bytes for address type",
258 ));
259 }
260 let addr_type = buf.get_u8();
261 match addr_type {
262 4 => {
263 if buf.len() < 6 {
264 return Err(io::Error::new(
265 io::ErrorKind::UnexpectedEof,
266 "not enough bytes for ipv4 address",
267 ));
268 }
269 let mut octets = [0u8; 4];
270 buf.read_exact(&mut octets)?;
271 let port = buf.get_u16_le();
272 Ok(SocketAddr::from((octets, port)))
273 }
274 6 => {
275 if buf.len() < 18 {
276 return Err(io::Error::new(
277 io::ErrorKind::UnexpectedEof,
278 "not enough bytes for ipv6 address",
279 ));
280 }
281 let mut octets = [0u8; 16];
282 buf.read_exact(&mut octets)?;
283 let port = buf.get_u16_le();
284 Ok(SocketAddr::from((octets, port)))
285 }
286 other => Err(io::Error::new(
287 io::ErrorKind::InvalidData,
288 format!("unknown address type: {other}"),
289 )),
290 }
291}
292
293fn encode_updates(buf: &mut BytesMut, updates: &[NodeUpdate]) {
294 buf.put_u16_le(updates.len() as u16);
295 for update in updates {
296 encode_update(buf, update);
297 }
298}
299
300fn encode_update(buf: &mut BytesMut, update: &NodeUpdate) {
301 match update {
302 NodeUpdate::Alive {
303 node,
304 addr,
305 incarnation,
306 } => {
307 buf.put_u8(UPDATE_ALIVE);
308 encode_node_id(buf, node);
309 encode_socket_addr(buf, addr);
310 buf.put_u64_le(*incarnation);
311 }
312 NodeUpdate::Suspect { node, incarnation } => {
313 buf.put_u8(UPDATE_SUSPECT);
314 encode_node_id(buf, node);
315 buf.put_u64_le(*incarnation);
316 }
317 NodeUpdate::Dead { node, incarnation } => {
318 buf.put_u8(UPDATE_DEAD);
319 encode_node_id(buf, node);
320 buf.put_u64_le(*incarnation);
321 }
322 NodeUpdate::Left { node } => {
323 buf.put_u8(UPDATE_LEFT);
324 encode_node_id(buf, node);
325 }
326 }
327}
328
329fn decode_updates(buf: &mut &[u8]) -> io::Result<Vec<NodeUpdate>> {
330 if buf.len() < 2 {
331 return Err(io::Error::new(
332 io::ErrorKind::UnexpectedEof,
333 "not enough bytes for update count",
334 ));
335 }
336 let count = buf.get_u16_le() as usize;
337 let mut updates = Vec::with_capacity(count);
338 for _ in 0..count {
339 updates.push(decode_update(buf)?);
340 }
341 Ok(updates)
342}
343
344fn decode_update(buf: &mut &[u8]) -> io::Result<NodeUpdate> {
345 if buf.is_empty() {
346 return Err(io::Error::new(
347 io::ErrorKind::UnexpectedEof,
348 "not enough bytes for update type",
349 ));
350 }
351 let update_type = buf.get_u8();
352 match update_type {
353 UPDATE_ALIVE => {
354 let node = decode_node_id(buf)?;
355 let addr = decode_socket_addr(buf)?;
356 let incarnation = buf.get_u64_le();
357 Ok(NodeUpdate::Alive {
358 node,
359 addr,
360 incarnation,
361 })
362 }
363 UPDATE_SUSPECT => {
364 let node = decode_node_id(buf)?;
365 let incarnation = buf.get_u64_le();
366 Ok(NodeUpdate::Suspect { node, incarnation })
367 }
368 UPDATE_DEAD => {
369 let node = decode_node_id(buf)?;
370 let incarnation = buf.get_u64_le();
371 Ok(NodeUpdate::Dead { node, incarnation })
372 }
373 UPDATE_LEFT => {
374 let node = decode_node_id(buf)?;
375 Ok(NodeUpdate::Left { node })
376 }
377 other => Err(io::Error::new(
378 io::ErrorKind::InvalidData,
379 format!("unknown update type: {other}"),
380 )),
381 }
382}
383
384fn encode_member_info(buf: &mut BytesMut, member: &MemberInfo) {
385 encode_node_id(buf, &member.id);
386 encode_socket_addr(buf, &member.addr);
387 buf.put_u64_le(member.incarnation);
388 buf.put_u8(if member.is_primary { 1 } else { 0 });
389 buf.put_u16_le(member.slots.len() as u16);
390 for slot in &member.slots {
391 buf.put_u16_le(slot.start);
392 buf.put_u16_le(slot.end);
393 }
394}
395
396fn decode_member_info(buf: &mut &[u8]) -> io::Result<MemberInfo> {
397 let id = decode_node_id(buf)?;
398 let addr = decode_socket_addr(buf)?;
399 let incarnation = buf.get_u64_le();
400 let is_primary = buf.get_u8() != 0;
401 let slot_count = buf.get_u16_le() as usize;
402 let mut slots = Vec::with_capacity(slot_count);
403 for _ in 0..slot_count {
404 let start = buf.get_u16_le();
405 let end = buf.get_u16_le();
406 slots.push(SlotRange::new(start, end));
407 }
408 Ok(MemberInfo {
409 id,
410 addr,
411 incarnation,
412 is_primary,
413 slots,
414 })
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use std::net::{Ipv4Addr, Ipv6Addr};
421
422 fn test_addr() -> SocketAddr {
423 SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 6379))
424 }
425
426 fn test_addr_v6() -> SocketAddr {
427 SocketAddr::from((Ipv6Addr::LOCALHOST, 6379))
428 }
429
430 #[test]
431 fn ping_roundtrip() {
432 let msg = GossipMessage::Ping {
433 seq: 42,
434 sender: NodeId::new(),
435 updates: vec![],
436 };
437 let encoded = msg.encode();
438 let decoded = GossipMessage::decode(&encoded).unwrap();
439 assert_eq!(msg, decoded);
440 }
441
442 #[test]
443 fn ping_with_updates() {
444 let node1 = NodeId::new();
445 let node2 = NodeId::new();
446 let msg = GossipMessage::Ping {
447 seq: 100,
448 sender: node1,
449 updates: vec![
450 NodeUpdate::Alive {
451 node: node2,
452 addr: test_addr(),
453 incarnation: 5,
454 },
455 NodeUpdate::Suspect {
456 node: node1,
457 incarnation: 3,
458 },
459 ],
460 };
461 let encoded = msg.encode();
462 let decoded = GossipMessage::decode(&encoded).unwrap();
463 assert_eq!(msg, decoded);
464 }
465
466 #[test]
467 fn ping_req_roundtrip() {
468 let msg = GossipMessage::PingReq {
469 seq: 99,
470 sender: NodeId::new(),
471 target: NodeId::new(),
472 target_addr: test_addr(),
473 };
474 let encoded = msg.encode();
475 let decoded = GossipMessage::decode(&encoded).unwrap();
476 assert_eq!(msg, decoded);
477 }
478
479 #[test]
480 fn ack_roundtrip() {
481 let msg = GossipMessage::Ack {
482 seq: 42,
483 sender: NodeId::new(),
484 updates: vec![NodeUpdate::Dead {
485 node: NodeId::new(),
486 incarnation: 10,
487 }],
488 };
489 let encoded = msg.encode();
490 let decoded = GossipMessage::decode(&encoded).unwrap();
491 assert_eq!(msg, decoded);
492 }
493
494 #[test]
495 fn join_roundtrip() {
496 let msg = GossipMessage::Join {
497 sender: NodeId::new(),
498 sender_addr: test_addr(),
499 };
500 let encoded = msg.encode();
501 let decoded = GossipMessage::decode(&encoded).unwrap();
502 assert_eq!(msg, decoded);
503 }
504
505 #[test]
506 fn welcome_roundtrip() {
507 let msg = GossipMessage::Welcome {
508 sender: NodeId::new(),
509 members: vec![
510 MemberInfo {
511 id: NodeId::new(),
512 addr: test_addr(),
513 incarnation: 1,
514 is_primary: true,
515 slots: vec![SlotRange::new(0, 5460)],
516 },
517 MemberInfo {
518 id: NodeId::new(),
519 addr: test_addr(),
520 incarnation: 2,
521 is_primary: false,
522 slots: vec![],
523 },
524 ],
525 };
526 let encoded = msg.encode();
527 let decoded = GossipMessage::decode(&encoded).unwrap();
528 assert_eq!(msg, decoded);
529 }
530
531 #[test]
532 fn ipv6_address() {
533 let msg = GossipMessage::Join {
534 sender: NodeId::new(),
535 sender_addr: test_addr_v6(),
536 };
537 let encoded = msg.encode();
538 let decoded = GossipMessage::decode(&encoded).unwrap();
539 assert_eq!(msg, decoded);
540 }
541
542 #[test]
543 fn all_update_types() {
544 let node = NodeId::new();
545 let updates = vec![
546 NodeUpdate::Alive {
547 node,
548 addr: test_addr(),
549 incarnation: 1,
550 },
551 NodeUpdate::Suspect {
552 node,
553 incarnation: 2,
554 },
555 NodeUpdate::Dead {
556 node,
557 incarnation: 3,
558 },
559 NodeUpdate::Left { node },
560 ];
561 let msg = GossipMessage::Ping {
562 seq: 1,
563 sender: node,
564 updates,
565 };
566 let encoded = msg.encode();
567 let decoded = GossipMessage::decode(&encoded).unwrap();
568 assert_eq!(msg, decoded);
569 }
570
571 #[test]
572 fn empty_message_error() {
573 let result = GossipMessage::decode(&[]);
574 assert!(result.is_err());
575 }
576
577 #[test]
578 fn unknown_message_type_error() {
579 let result = GossipMessage::decode(&[255]);
580 assert!(result.is_err());
581 }
582}