1use std::io::{self, Read};
7use std::net::SocketAddr;
8
9use bytes::{Buf, BufMut, Bytes, BytesMut};
10
11use crate::auth::{ClusterSecret, TAG_LEN};
12use crate::{NodeId, SlotRange};
13
14const MAX_COLLECTION_COUNT: usize = 1024;
17
18const ADDR_IPV4: u8 = 4;
20const ADDR_IPV6: u8 = 6;
22
23fn safe_get_u8(buf: &mut &[u8]) -> io::Result<u8> {
26 if buf.is_empty() {
27 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "need 1 byte"));
28 }
29 Ok(buf.get_u8())
30}
31
32fn safe_get_u16_le(buf: &mut &[u8]) -> io::Result<u16> {
33 if buf.len() < 2 {
34 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "need 2 bytes"));
35 }
36 Ok(buf.get_u16_le())
37}
38
39fn safe_get_u64_le(buf: &mut &[u8]) -> io::Result<u64> {
40 if buf.len() < 8 {
41 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "need 8 bytes"));
42 }
43 Ok(buf.get_u64_le())
44}
45
46#[derive(Debug, Clone, PartialEq)]
48pub enum GossipMessage {
49 Ping {
51 seq: u64,
52 sender: NodeId,
53 updates: Vec<NodeUpdate>,
55 },
56
57 PingReq {
59 seq: u64,
60 sender: NodeId,
61 target: NodeId,
62 target_addr: SocketAddr,
63 },
64
65 Ack {
67 seq: u64,
68 sender: NodeId,
69 updates: Vec<NodeUpdate>,
71 },
72
73 Join {
75 sender: NodeId,
76 sender_addr: SocketAddr,
77 },
78
79 Welcome {
81 sender: NodeId,
82 members: Vec<MemberInfo>,
83 },
84
85 SlotsAnnounce {
89 sender: NodeId,
90 incarnation: u64,
91 slots: Vec<SlotRange>,
92 },
93}
94
95#[derive(Debug, Clone, PartialEq)]
97pub enum NodeUpdate {
98 Alive {
100 node: NodeId,
101 addr: SocketAddr,
102 incarnation: u64,
103 },
104 Suspect { node: NodeId, incarnation: u64 },
106 Dead { node: NodeId, incarnation: u64 },
108 Left { node: NodeId },
110 SlotsChanged {
112 node: NodeId,
113 incarnation: u64,
114 slots: Vec<SlotRange>,
115 },
116 RoleChanged {
118 node: NodeId,
119 incarnation: u64,
120 is_primary: bool,
122 replicates: Option<NodeId>,
124 },
125 VoteRequest {
127 candidate: NodeId,
129 epoch: u64,
131 offset: u64,
133 },
134 VoteGranted {
136 from: NodeId,
138 candidate: NodeId,
140 epoch: u64,
142 },
143}
144
145#[derive(Debug, Clone, PartialEq)]
147pub struct MemberInfo {
148 pub id: NodeId,
149 pub addr: SocketAddr,
150 pub incarnation: u64,
151 pub is_primary: bool,
152 pub slots: Vec<SlotRange>,
153}
154
155const MSG_PING: u8 = 1;
157const MSG_PING_REQ: u8 = 2;
158const MSG_ACK: u8 = 3;
159const MSG_JOIN: u8 = 4;
160const MSG_WELCOME: u8 = 5;
161const MSG_SLOTS_ANNOUNCE: u8 = 6;
162
163const UPDATE_ALIVE: u8 = 1;
164const UPDATE_SUSPECT: u8 = 2;
165const UPDATE_DEAD: u8 = 3;
166const UPDATE_LEFT: u8 = 4;
167const UPDATE_SLOTS_CHANGED: u8 = 5;
168const UPDATE_ROLE_CHANGED: u8 = 6;
169const UPDATE_VOTE_REQUEST: u8 = 7;
170const UPDATE_VOTE_GRANTED: u8 = 8;
171
172fn encode_slice<T, F>(buf: &mut BytesMut, items: &[T], encode_one: F)
178where
179 F: Fn(&mut BytesMut, &T),
180{
181 let count = items.len().min(MAX_COLLECTION_COUNT);
182 buf.put_u16_le(count as u16);
183 for item in &items[..count] {
184 encode_one(buf, item);
185 }
186}
187
188impl GossipMessage {
189 pub fn encode(&self) -> Bytes {
191 let mut buf = BytesMut::with_capacity(256);
192 self.encode_into(&mut buf);
193 buf.freeze()
194 }
195
196 pub fn encode_into(&self, buf: &mut BytesMut) {
198 match self {
199 GossipMessage::Ping {
200 seq,
201 sender,
202 updates,
203 } => {
204 buf.put_u8(MSG_PING);
205 buf.put_u64_le(*seq);
206 encode_node_id(buf, sender);
207 encode_updates(buf, updates);
208 }
209 GossipMessage::PingReq {
210 seq,
211 sender,
212 target,
213 target_addr,
214 } => {
215 buf.put_u8(MSG_PING_REQ);
216 buf.put_u64_le(*seq);
217 encode_node_id(buf, sender);
218 encode_node_id(buf, target);
219 encode_socket_addr(buf, target_addr);
220 }
221 GossipMessage::Ack {
222 seq,
223 sender,
224 updates,
225 } => {
226 buf.put_u8(MSG_ACK);
227 buf.put_u64_le(*seq);
228 encode_node_id(buf, sender);
229 encode_updates(buf, updates);
230 }
231 GossipMessage::Join {
232 sender,
233 sender_addr,
234 } => {
235 buf.put_u8(MSG_JOIN);
236 encode_node_id(buf, sender);
237 encode_socket_addr(buf, sender_addr);
238 }
239 GossipMessage::Welcome { sender, members } => {
240 buf.put_u8(MSG_WELCOME);
241 encode_node_id(buf, sender);
242 encode_slice(buf, members, encode_member_info);
243 }
244 GossipMessage::SlotsAnnounce {
245 sender,
246 incarnation,
247 slots,
248 } => {
249 buf.put_u8(MSG_SLOTS_ANNOUNCE);
250 encode_node_id(buf, sender);
251 buf.put_u64_le(*incarnation);
252 encode_slice(buf, slots, |b, slot| {
253 b.put_u16_le(slot.start);
254 b.put_u16_le(slot.end);
255 });
256 }
257 }
258 }
259
260 pub fn decode(mut buf: &[u8]) -> io::Result<Self> {
262 if buf.is_empty() {
263 return Err(io::Error::new(
264 io::ErrorKind::UnexpectedEof,
265 "empty message",
266 ));
267 }
268
269 let msg_type = safe_get_u8(&mut buf)?;
270 match msg_type {
271 MSG_PING => {
272 let seq = safe_get_u64_le(&mut buf)?;
273 let sender = decode_node_id(&mut buf)?;
274 let updates = decode_updates(&mut buf)?;
275 Ok(GossipMessage::Ping {
276 seq,
277 sender,
278 updates,
279 })
280 }
281 MSG_PING_REQ => {
282 let seq = safe_get_u64_le(&mut buf)?;
283 let sender = decode_node_id(&mut buf)?;
284 let target = decode_node_id(&mut buf)?;
285 let target_addr = decode_socket_addr(&mut buf)?;
286 Ok(GossipMessage::PingReq {
287 seq,
288 sender,
289 target,
290 target_addr,
291 })
292 }
293 MSG_ACK => {
294 let seq = safe_get_u64_le(&mut buf)?;
295 let sender = decode_node_id(&mut buf)?;
296 let updates = decode_updates(&mut buf)?;
297 Ok(GossipMessage::Ack {
298 seq,
299 sender,
300 updates,
301 })
302 }
303 MSG_JOIN => {
304 let sender = decode_node_id(&mut buf)?;
305 let sender_addr = decode_socket_addr(&mut buf)?;
306 Ok(GossipMessage::Join {
307 sender,
308 sender_addr,
309 })
310 }
311 MSG_WELCOME => {
312 let sender = decode_node_id(&mut buf)?;
313 let count = safe_get_u16_le(&mut buf)? as usize;
314 if count > MAX_COLLECTION_COUNT {
315 return Err(io::Error::new(
316 io::ErrorKind::InvalidData,
317 format!("member count {count} exceeds limit"),
318 ));
319 }
320 let mut members = Vec::with_capacity(count);
321 for _ in 0..count {
322 members.push(decode_member_info(&mut buf)?);
323 }
324 Ok(GossipMessage::Welcome { sender, members })
325 }
326 MSG_SLOTS_ANNOUNCE => {
327 let sender = decode_node_id(&mut buf)?;
328 let incarnation = safe_get_u64_le(&mut buf)?;
329 let count = safe_get_u16_le(&mut buf)? as usize;
330 if count > MAX_COLLECTION_COUNT {
331 return Err(io::Error::new(
332 io::ErrorKind::InvalidData,
333 format!("slot range count {count} exceeds limit"),
334 ));
335 }
336 let mut slots = Vec::with_capacity(count);
337 for _ in 0..count {
338 let start = safe_get_u16_le(&mut buf)?;
339 let end = safe_get_u16_le(&mut buf)?;
340 slots.push(SlotRange::try_new(start, end)?);
341 }
342 Ok(GossipMessage::SlotsAnnounce {
343 sender,
344 incarnation,
345 slots,
346 })
347 }
348 other => Err(io::Error::new(
349 io::ErrorKind::InvalidData,
350 format!("unknown message type: {other}"),
351 )),
352 }
353 }
354
355 pub fn encode_authenticated(&self, secret: &ClusterSecret) -> Bytes {
357 let mut buf = BytesMut::with_capacity(256);
358 self.encode_into(&mut buf);
359 let tag = secret.sign(&buf);
360 buf.extend_from_slice(&tag);
361 buf.freeze()
362 }
363
364 pub fn decode_authenticated(buf: &[u8], secret: &ClusterSecret) -> io::Result<Self> {
368 if buf.len() < TAG_LEN {
369 return Err(io::Error::new(
370 io::ErrorKind::InvalidData,
371 "message too short for auth tag",
372 ));
373 }
374 let (payload, tag) = buf.split_at(buf.len() - TAG_LEN);
375 if !secret.verify(payload, tag) {
376 return Err(io::Error::new(
377 io::ErrorKind::PermissionDenied,
378 "cluster auth failed",
379 ));
380 }
381 Self::decode(payload)
382 }
383}
384
385fn encode_node_id(buf: &mut BytesMut, id: &NodeId) {
386 buf.put_slice(id.0.as_bytes());
387}
388
389fn decode_node_id(buf: &mut &[u8]) -> io::Result<NodeId> {
390 if buf.len() < 16 {
391 return Err(io::Error::new(
392 io::ErrorKind::UnexpectedEof,
393 "not enough bytes for node id",
394 ));
395 }
396 let mut bytes = [0u8; 16];
397 buf.read_exact(&mut bytes)?;
398 Ok(NodeId(uuid::Uuid::from_bytes(bytes)))
399}
400
401fn encode_socket_addr(buf: &mut BytesMut, addr: &SocketAddr) {
402 match addr {
403 SocketAddr::V4(v4) => {
404 buf.put_u8(ADDR_IPV4);
405 buf.put_slice(&v4.ip().octets());
406 buf.put_u16_le(v4.port());
407 }
408 SocketAddr::V6(v6) => {
409 buf.put_u8(ADDR_IPV6);
410 buf.put_slice(&v6.ip().octets());
411 buf.put_u16_le(v6.port());
412 }
413 }
414}
415
416fn decode_socket_addr(buf: &mut &[u8]) -> io::Result<SocketAddr> {
417 if buf.is_empty() {
418 return Err(io::Error::new(
419 io::ErrorKind::UnexpectedEof,
420 "not enough bytes for address type",
421 ));
422 }
423 let addr_type = buf.get_u8();
424 match addr_type {
425 ADDR_IPV4 => {
426 if buf.len() < 6 {
428 return Err(io::Error::new(
429 io::ErrorKind::UnexpectedEof,
430 "not enough bytes for ipv4 address",
431 ));
432 }
433 let mut octets = [0u8; 4];
434 buf.read_exact(&mut octets)?;
435 let port = buf.get_u16_le();
436 Ok(SocketAddr::from((octets, port)))
437 }
438 ADDR_IPV6 => {
439 if buf.len() < 18 {
441 return Err(io::Error::new(
442 io::ErrorKind::UnexpectedEof,
443 "not enough bytes for ipv6 address",
444 ));
445 }
446 let mut octets = [0u8; 16];
447 buf.read_exact(&mut octets)?;
448 let port = buf.get_u16_le();
449 Ok(SocketAddr::from((octets, port)))
450 }
451 other => Err(io::Error::new(
452 io::ErrorKind::InvalidData,
453 format!("unknown address type: {other}"),
454 )),
455 }
456}
457
458fn encode_updates(buf: &mut BytesMut, updates: &[NodeUpdate]) {
459 encode_slice(buf, updates, encode_update);
460}
461
462fn encode_update(buf: &mut BytesMut, update: &NodeUpdate) {
463 match update {
464 NodeUpdate::Alive {
465 node,
466 addr,
467 incarnation,
468 } => {
469 buf.put_u8(UPDATE_ALIVE);
470 encode_node_id(buf, node);
471 encode_socket_addr(buf, addr);
472 buf.put_u64_le(*incarnation);
473 }
474 NodeUpdate::Suspect { node, incarnation } => {
475 buf.put_u8(UPDATE_SUSPECT);
476 encode_node_id(buf, node);
477 buf.put_u64_le(*incarnation);
478 }
479 NodeUpdate::Dead { node, incarnation } => {
480 buf.put_u8(UPDATE_DEAD);
481 encode_node_id(buf, node);
482 buf.put_u64_le(*incarnation);
483 }
484 NodeUpdate::Left { node } => {
485 buf.put_u8(UPDATE_LEFT);
486 encode_node_id(buf, node);
487 }
488 NodeUpdate::SlotsChanged {
489 node,
490 incarnation,
491 slots,
492 } => {
493 buf.put_u8(UPDATE_SLOTS_CHANGED);
494 encode_node_id(buf, node);
495 buf.put_u64_le(*incarnation);
496 encode_slice(buf, slots, |b, slot| {
497 b.put_u16_le(slot.start);
498 b.put_u16_le(slot.end);
499 });
500 }
501 NodeUpdate::RoleChanged {
502 node,
503 incarnation,
504 is_primary,
505 replicates,
506 } => {
507 buf.put_u8(UPDATE_ROLE_CHANGED);
508 encode_node_id(buf, node);
509 buf.put_u64_le(*incarnation);
510 buf.put_u8(if *is_primary { 1 } else { 0 });
511 match replicates {
512 Some(primary_id) => {
513 buf.put_u8(1);
514 encode_node_id(buf, primary_id);
515 }
516 None => {
517 buf.put_u8(0);
518 }
519 }
520 }
521 NodeUpdate::VoteRequest {
522 candidate,
523 epoch,
524 offset,
525 } => {
526 buf.put_u8(UPDATE_VOTE_REQUEST);
527 encode_node_id(buf, candidate);
528 buf.put_u64_le(*epoch);
529 buf.put_u64_le(*offset);
530 }
531 NodeUpdate::VoteGranted {
532 from,
533 candidate,
534 epoch,
535 } => {
536 buf.put_u8(UPDATE_VOTE_GRANTED);
537 encode_node_id(buf, from);
538 encode_node_id(buf, candidate);
539 buf.put_u64_le(*epoch);
540 }
541 }
542}
543
544fn decode_updates(buf: &mut &[u8]) -> io::Result<Vec<NodeUpdate>> {
545 let count = safe_get_u16_le(buf)? as usize;
546 if count > MAX_COLLECTION_COUNT {
547 return Err(io::Error::new(
548 io::ErrorKind::InvalidData,
549 format!("update count {count} exceeds limit"),
550 ));
551 }
552 let mut updates = Vec::with_capacity(count);
553 for _ in 0..count {
554 updates.push(decode_update(buf)?);
555 }
556 Ok(updates)
557}
558
559fn decode_update(buf: &mut &[u8]) -> io::Result<NodeUpdate> {
560 let update_type = safe_get_u8(buf)?;
561 match update_type {
562 UPDATE_ALIVE => {
563 let node = decode_node_id(buf)?;
564 let addr = decode_socket_addr(buf)?;
565 let incarnation = safe_get_u64_le(buf)?;
566 Ok(NodeUpdate::Alive {
567 node,
568 addr,
569 incarnation,
570 })
571 }
572 UPDATE_SUSPECT => {
573 let node = decode_node_id(buf)?;
574 let incarnation = safe_get_u64_le(buf)?;
575 Ok(NodeUpdate::Suspect { node, incarnation })
576 }
577 UPDATE_DEAD => {
578 let node = decode_node_id(buf)?;
579 let incarnation = safe_get_u64_le(buf)?;
580 Ok(NodeUpdate::Dead { node, incarnation })
581 }
582 UPDATE_LEFT => {
583 let node = decode_node_id(buf)?;
584 Ok(NodeUpdate::Left { node })
585 }
586 UPDATE_SLOTS_CHANGED => {
587 let node = decode_node_id(buf)?;
588 let incarnation = safe_get_u64_le(buf)?;
589 let count = safe_get_u16_le(buf)? as usize;
590 if count > MAX_COLLECTION_COUNT {
591 return Err(io::Error::new(
592 io::ErrorKind::InvalidData,
593 format!("slot range count {count} exceeds limit"),
594 ));
595 }
596 let mut slots = Vec::with_capacity(count);
597 for _ in 0..count {
598 let start = safe_get_u16_le(buf)?;
599 let end = safe_get_u16_le(buf)?;
600 slots.push(SlotRange::try_new(start, end)?);
601 }
602 Ok(NodeUpdate::SlotsChanged {
603 node,
604 incarnation,
605 slots,
606 })
607 }
608 UPDATE_ROLE_CHANGED => {
609 let node = decode_node_id(buf)?;
610 let incarnation = safe_get_u64_le(buf)?;
611 let is_primary = safe_get_u8(buf)? != 0;
612 let has_replicates = safe_get_u8(buf)? != 0;
613 let replicates = if has_replicates {
614 Some(decode_node_id(buf)?)
615 } else {
616 None
617 };
618 Ok(NodeUpdate::RoleChanged {
619 node,
620 incarnation,
621 is_primary,
622 replicates,
623 })
624 }
625 UPDATE_VOTE_REQUEST => {
626 let candidate = decode_node_id(buf)?;
627 let epoch = safe_get_u64_le(buf)?;
628 let offset = safe_get_u64_le(buf)?;
629 Ok(NodeUpdate::VoteRequest {
630 candidate,
631 epoch,
632 offset,
633 })
634 }
635 UPDATE_VOTE_GRANTED => {
636 let from = decode_node_id(buf)?;
637 let candidate = decode_node_id(buf)?;
638 let epoch = safe_get_u64_le(buf)?;
639 Ok(NodeUpdate::VoteGranted {
640 from,
641 candidate,
642 epoch,
643 })
644 }
645 other => Err(io::Error::new(
646 io::ErrorKind::InvalidData,
647 format!("unknown update type: {other}"),
648 )),
649 }
650}
651
652fn encode_member_info(buf: &mut BytesMut, member: &MemberInfo) {
653 encode_node_id(buf, &member.id);
654 encode_socket_addr(buf, &member.addr);
655 buf.put_u64_le(member.incarnation);
656 buf.put_u8(if member.is_primary { 1 } else { 0 });
657 encode_slice(buf, &member.slots, |b, slot| {
658 b.put_u16_le(slot.start);
659 b.put_u16_le(slot.end);
660 });
661}
662
663fn decode_member_info(buf: &mut &[u8]) -> io::Result<MemberInfo> {
664 let id = decode_node_id(buf)?;
665 let addr = decode_socket_addr(buf)?;
666 let incarnation = safe_get_u64_le(buf)?;
667 let is_primary = safe_get_u8(buf)? != 0;
668 let slot_count = safe_get_u16_le(buf)? as usize;
669 if slot_count > MAX_COLLECTION_COUNT {
670 return Err(io::Error::new(
671 io::ErrorKind::InvalidData,
672 format!("slot range count {slot_count} exceeds limit"),
673 ));
674 }
675 let mut slots = Vec::with_capacity(slot_count);
676 for _ in 0..slot_count {
677 let start = safe_get_u16_le(buf)?;
678 let end = safe_get_u16_le(buf)?;
679 slots.push(SlotRange::try_new(start, end)?);
680 }
681 Ok(MemberInfo {
682 id,
683 addr,
684 incarnation,
685 is_primary,
686 slots,
687 })
688}
689
690#[cfg(test)]
691mod tests {
692 use super::*;
693 use std::net::{Ipv4Addr, Ipv6Addr};
694
695 fn test_addr() -> SocketAddr {
696 SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 6379))
697 }
698
699 fn test_addr_v6() -> SocketAddr {
700 SocketAddr::from((Ipv6Addr::LOCALHOST, 6379))
701 }
702
703 #[test]
704 fn ping_roundtrip() {
705 let msg = GossipMessage::Ping {
706 seq: 42,
707 sender: NodeId::new(),
708 updates: vec![],
709 };
710 let encoded = msg.encode();
711 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
712 assert_eq!(msg, decoded);
713 }
714
715 #[test]
716 fn ping_with_updates() {
717 let node1 = NodeId::new();
718 let node2 = NodeId::new();
719 let msg = GossipMessage::Ping {
720 seq: 100,
721 sender: node1,
722 updates: vec![
723 NodeUpdate::Alive {
724 node: node2,
725 addr: test_addr(),
726 incarnation: 5,
727 },
728 NodeUpdate::Suspect {
729 node: node1,
730 incarnation: 3,
731 },
732 ],
733 };
734 let encoded = msg.encode();
735 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
736 assert_eq!(msg, decoded);
737 }
738
739 #[test]
740 fn ping_req_roundtrip() {
741 let msg = GossipMessage::PingReq {
742 seq: 99,
743 sender: NodeId::new(),
744 target: NodeId::new(),
745 target_addr: test_addr(),
746 };
747 let encoded = msg.encode();
748 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
749 assert_eq!(msg, decoded);
750 }
751
752 #[test]
753 fn ack_roundtrip() {
754 let msg = GossipMessage::Ack {
755 seq: 42,
756 sender: NodeId::new(),
757 updates: vec![NodeUpdate::Dead {
758 node: NodeId::new(),
759 incarnation: 10,
760 }],
761 };
762 let encoded = msg.encode();
763 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
764 assert_eq!(msg, decoded);
765 }
766
767 #[test]
768 fn join_roundtrip() {
769 let msg = GossipMessage::Join {
770 sender: NodeId::new(),
771 sender_addr: test_addr(),
772 };
773 let encoded = msg.encode();
774 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
775 assert_eq!(msg, decoded);
776 }
777
778 #[test]
779 fn welcome_roundtrip() {
780 let msg = GossipMessage::Welcome {
781 sender: NodeId::new(),
782 members: vec![
783 MemberInfo {
784 id: NodeId::new(),
785 addr: test_addr(),
786 incarnation: 1,
787 is_primary: true,
788 slots: vec![SlotRange::new(0, 5460)],
789 },
790 MemberInfo {
791 id: NodeId::new(),
792 addr: test_addr(),
793 incarnation: 2,
794 is_primary: false,
795 slots: vec![],
796 },
797 ],
798 };
799 let encoded = msg.encode();
800 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
801 assert_eq!(msg, decoded);
802 }
803
804 #[test]
805 fn ipv6_address() {
806 let msg = GossipMessage::Join {
807 sender: NodeId::new(),
808 sender_addr: test_addr_v6(),
809 };
810 let encoded = msg.encode();
811 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
812 assert_eq!(msg, decoded);
813 }
814
815 #[test]
816 fn role_changed_roundtrip() {
817 let node = NodeId::new();
818 let primary = NodeId::new();
819
820 let msg = GossipMessage::Ping {
822 seq: 1,
823 sender: node,
824 updates: vec![NodeUpdate::RoleChanged {
825 node,
826 incarnation: 7,
827 is_primary: false,
828 replicates: Some(primary),
829 }],
830 };
831 let encoded = msg.encode();
832 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
833 assert_eq!(msg, decoded);
834
835 let msg2 = GossipMessage::Ping {
837 seq: 2,
838 sender: node,
839 updates: vec![NodeUpdate::RoleChanged {
840 node,
841 incarnation: 8,
842 is_primary: true,
843 replicates: None,
844 }],
845 };
846 let encoded2 = msg2.encode();
847 let decoded2 = GossipMessage::decode(&encoded2).expect("roundtrip should succeed");
848 assert_eq!(msg2, decoded2);
849 }
850
851 #[test]
852 fn all_update_types() {
853 let node = NodeId::new();
854 let updates = vec![
855 NodeUpdate::Alive {
856 node,
857 addr: test_addr(),
858 incarnation: 1,
859 },
860 NodeUpdate::Suspect {
861 node,
862 incarnation: 2,
863 },
864 NodeUpdate::Dead {
865 node,
866 incarnation: 3,
867 },
868 NodeUpdate::Left { node },
869 NodeUpdate::SlotsChanged {
870 node,
871 incarnation: 4,
872 slots: vec![SlotRange::new(0, 5460)],
873 },
874 ];
875 let msg = GossipMessage::Ping {
876 seq: 1,
877 sender: node,
878 updates,
879 };
880 let encoded = msg.encode();
881 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
882 assert_eq!(msg, decoded);
883 }
884
885 #[test]
886 fn slots_changed_empty_roundtrip() {
887 let node = NodeId::new();
888 let msg = GossipMessage::Ping {
889 seq: 1,
890 sender: node,
891 updates: vec![NodeUpdate::SlotsChanged {
892 node,
893 incarnation: 1,
894 slots: vec![],
895 }],
896 };
897 let encoded = msg.encode();
898 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
899 assert_eq!(msg, decoded);
900 }
901
902 #[test]
903 fn slots_changed_multiple_ranges_roundtrip() {
904 let node = NodeId::new();
905 let msg = GossipMessage::Ping {
906 seq: 1,
907 sender: node,
908 updates: vec![NodeUpdate::SlotsChanged {
909 node,
910 incarnation: 5,
911 slots: vec![
912 SlotRange::new(0, 5460),
913 SlotRange::new(5461, 10922),
914 SlotRange::new(10923, 16383),
915 ],
916 }],
917 };
918 let encoded = msg.encode();
919 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
920 assert_eq!(msg, decoded);
921 }
922
923 #[test]
924 fn empty_message_error() {
925 let result = GossipMessage::decode(&[]);
926 assert!(result.is_err());
927 }
928
929 #[test]
930 fn unknown_message_type_error() {
931 let result = GossipMessage::decode(&[255]);
932 assert!(result.is_err());
933 }
934
935 #[test]
936 fn invalid_slot_range_in_welcome_rejected() {
937 let mut buf = BytesMut::new();
939 buf.put_u8(MSG_WELCOME);
940 encode_node_id(&mut buf, &NodeId::new());
941 buf.put_u16_le(1); encode_node_id(&mut buf, &NodeId::new());
943 encode_socket_addr(&mut buf, &test_addr());
944 buf.put_u64_le(1); buf.put_u8(1); buf.put_u16_le(1); buf.put_u16_le(5000); buf.put_u16_le(100); let result = GossipMessage::decode(&buf);
951 assert!(result.is_err(), "should reject inverted slot range");
952 }
953
954 #[test]
955 fn vote_request_roundtrip() {
956 let candidate = NodeId::new();
957 let msg = GossipMessage::Ping {
958 seq: 1,
959 sender: candidate,
960 updates: vec![NodeUpdate::VoteRequest {
961 candidate,
962 epoch: 5,
963 offset: 1234,
964 }],
965 };
966 let encoded = msg.encode();
967 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
968 assert_eq!(msg, decoded);
969 }
970
971 #[test]
972 fn vote_granted_roundtrip() {
973 let primary = NodeId::new();
974 let candidate = NodeId::new();
975 let msg = GossipMessage::Ack {
976 seq: 2,
977 sender: primary,
978 updates: vec![NodeUpdate::VoteGranted {
979 from: primary,
980 candidate,
981 epoch: 5,
982 }],
983 };
984 let encoded = msg.encode();
985 let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
986 assert_eq!(msg, decoded);
987 }
988
989 #[test]
990 fn out_of_range_slot_in_welcome_rejected() {
991 let mut buf = BytesMut::new();
993 buf.put_u8(MSG_WELCOME);
994 encode_node_id(&mut buf, &NodeId::new());
995 buf.put_u16_le(1);
996 encode_node_id(&mut buf, &NodeId::new());
997 encode_socket_addr(&mut buf, &test_addr());
998 buf.put_u64_le(1);
999 buf.put_u8(1);
1000 buf.put_u16_le(1); buf.put_u16_le(0);
1002 buf.put_u16_le(16384); let result = GossipMessage::decode(&buf);
1005 assert!(result.is_err(), "should reject slot >= 16384");
1006 }
1007
1008 #[test]
1011 fn authenticated_roundtrip() {
1012 let secret = ClusterSecret::from_password("cluster-pass");
1013 let msg = GossipMessage::Ping {
1014 seq: 42,
1015 sender: NodeId::new(),
1016 updates: vec![],
1017 };
1018 let encoded = msg.encode_authenticated(&secret);
1019 let decoded =
1020 GossipMessage::decode_authenticated(&encoded, &secret).expect("auth roundtrip");
1021 assert_eq!(msg, decoded);
1022 }
1023
1024 #[test]
1025 fn wrong_secret_rejected() {
1026 let s1 = ClusterSecret::from_password("secret-a");
1027 let s2 = ClusterSecret::from_password("secret-b");
1028 let msg = GossipMessage::Ping {
1029 seq: 1,
1030 sender: NodeId::new(),
1031 updates: vec![],
1032 };
1033 let encoded = msg.encode_authenticated(&s1);
1034 let result = GossipMessage::decode_authenticated(&encoded, &s2);
1035 assert!(result.is_err());
1036 }
1037
1038 #[test]
1039 fn truncated_auth_message_rejected() {
1040 let secret = ClusterSecret::from_password("test");
1041 let result = GossipMessage::decode_authenticated(&[0u8; 16], &secret);
1043 assert!(result.is_err());
1044 }
1045}