Skip to main content

ember_cluster/
message.rs

1//! Binary wire format for cluster gossip messages.
2//!
3//! Uses a compact binary encoding for efficiency over the network.
4//! All multi-byte integers are little-endian.
5
6use std::io::{self, Read};
7use std::net::SocketAddr;
8
9use bytes::{Buf, BufMut, Bytes, BytesMut};
10
11use crate::auth::{ClusterSecret, TAG_LEN};
12use crate::{NodeId, SlotRange};
13
14/// Maximum number of members in a Welcome message or updates in a Ping/Ack.
15/// Prevents allocation bombs from crafted messages.
16const MAX_COLLECTION_COUNT: usize = 1024;
17
18/// Address family discriminant for IPv4 (standard AF_INET byte count: 4).
19const ADDR_IPV4: u8 = 4;
20/// Address family discriminant for IPv6 (standard AF_INET6 byte count: 6 hex groups, 16 bytes).
21const ADDR_IPV6: u8 = 6;
22
23// Safe read helpers that return io::Error instead of panicking on truncated input.
24
25fn safe_get_u8(buf: &mut &[u8]) -> io::Result<u8> {
26    if buf.is_empty() {
27        return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "need 1 byte"));
28    }
29    Ok(buf.get_u8())
30}
31
32fn safe_get_u16_le(buf: &mut &[u8]) -> io::Result<u16> {
33    if buf.len() < 2 {
34        return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "need 2 bytes"));
35    }
36    Ok(buf.get_u16_le())
37}
38
39fn safe_get_u64_le(buf: &mut &[u8]) -> io::Result<u64> {
40    if buf.len() < 8 {
41        return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "need 8 bytes"));
42    }
43    Ok(buf.get_u64_le())
44}
45
46/// Message types for the SWIM gossip protocol.
47#[derive(Debug, Clone, PartialEq)]
48pub enum GossipMessage {
49    /// Direct probe to check if a node is alive.
50    Ping {
51        seq: u64,
52        sender: NodeId,
53        /// Piggybacked state updates.
54        updates: Vec<NodeUpdate>,
55    },
56
57    /// Request another node to probe a target on our behalf.
58    PingReq {
59        seq: u64,
60        sender: NodeId,
61        target: NodeId,
62        target_addr: SocketAddr,
63    },
64
65    /// Response to a Ping or forwarded PingReq.
66    Ack {
67        seq: u64,
68        sender: NodeId,
69        /// Piggybacked state updates.
70        updates: Vec<NodeUpdate>,
71    },
72
73    /// Join request from a new node.
74    Join {
75        sender: NodeId,
76        sender_addr: SocketAddr,
77    },
78
79    /// Welcome response with current cluster state.
80    Welcome {
81        sender: NodeId,
82        members: Vec<MemberInfo>,
83    },
84
85    /// Direct slot announcement: the sender is telling peers its current slot
86    /// ownership without going through the normal piggybacking mechanism.
87    /// Used by `broadcast_local_slots` to eagerly push slot state to all peers.
88    SlotsAnnounce {
89        sender: NodeId,
90        incarnation: u64,
91        slots: Vec<SlotRange>,
92    },
93}
94
95/// A state update about a node, piggybacked on protocol messages.
96#[derive(Debug, Clone, PartialEq)]
97pub enum NodeUpdate {
98    /// Node is alive with given incarnation number.
99    Alive {
100        node: NodeId,
101        addr: SocketAddr,
102        incarnation: u64,
103    },
104    /// Node is suspected to be failing.
105    Suspect { node: NodeId, incarnation: u64 },
106    /// Node has been confirmed dead.
107    Dead { node: NodeId, incarnation: u64 },
108    /// Node left the cluster gracefully.
109    Left { node: NodeId },
110    /// Node's slot ownership changed.
111    SlotsChanged {
112        node: NodeId,
113        incarnation: u64,
114        slots: Vec<SlotRange>,
115    },
116    /// Node's role changed (primary ↔ replica).
117    RoleChanged {
118        node: NodeId,
119        incarnation: u64,
120        /// `true` if the node is now a primary, `false` if replica.
121        is_primary: bool,
122        /// The primary this node replicates from, if it is a replica.
123        replicates: Option<NodeId>,
124    },
125    /// A replica is requesting votes to take over a failed primary.
126    VoteRequest {
127        /// The candidate requesting votes.
128        candidate: NodeId,
129        /// Config epoch this election is contesting.
130        epoch: u64,
131        /// Candidate's replication offset; higher value signals the most up-to-date replica.
132        offset: u64,
133    },
134    /// A primary is granting its vote to a candidate replica.
135    VoteGranted {
136        /// The primary casting the vote.
137        from: NodeId,
138        /// The candidate receiving the vote.
139        candidate: NodeId,
140        /// The epoch this vote is for.
141        epoch: u64,
142    },
143}
144
145/// Information about a cluster member.
146#[derive(Debug, Clone, PartialEq)]
147pub struct MemberInfo {
148    pub id: NodeId,
149    pub addr: SocketAddr,
150    pub incarnation: u64,
151    pub is_primary: bool,
152    pub slots: Vec<SlotRange>,
153}
154
155// Wire format constants
156const MSG_PING: u8 = 1;
157const MSG_PING_REQ: u8 = 2;
158const MSG_ACK: u8 = 3;
159const MSG_JOIN: u8 = 4;
160const MSG_WELCOME: u8 = 5;
161const MSG_SLOTS_ANNOUNCE: u8 = 6;
162
163const UPDATE_ALIVE: u8 = 1;
164const UPDATE_SUSPECT: u8 = 2;
165const UPDATE_DEAD: u8 = 3;
166const UPDATE_LEFT: u8 = 4;
167const UPDATE_SLOTS_CHANGED: u8 = 5;
168const UPDATE_ROLE_CHANGED: u8 = 6;
169const UPDATE_VOTE_REQUEST: u8 = 7;
170const UPDATE_VOTE_GRANTED: u8 = 8;
171
172/// Writes a count-capped slice into `buf` using a per-item encoder.
173///
174/// Writes `min(items.len(), MAX_COLLECTION_COUNT)` as a u16, then calls
175/// `encode_one` for each item in the truncated slice. Used everywhere a
176/// repeated list is encoded to avoid duplicating the truncation logic.
177fn encode_slice<T, F>(buf: &mut BytesMut, items: &[T], encode_one: F)
178where
179    F: Fn(&mut BytesMut, &T),
180{
181    let count = items.len().min(MAX_COLLECTION_COUNT);
182    buf.put_u16_le(count as u16);
183    for item in &items[..count] {
184        encode_one(buf, item);
185    }
186}
187
188impl GossipMessage {
189    /// Serializes the message to bytes.
190    pub fn encode(&self) -> Bytes {
191        let mut buf = BytesMut::with_capacity(256);
192        self.encode_into(&mut buf);
193        buf.freeze()
194    }
195
196    /// Serializes the message into the given buffer.
197    pub fn encode_into(&self, buf: &mut BytesMut) {
198        match self {
199            GossipMessage::Ping {
200                seq,
201                sender,
202                updates,
203            } => {
204                buf.put_u8(MSG_PING);
205                buf.put_u64_le(*seq);
206                encode_node_id(buf, sender);
207                encode_updates(buf, updates);
208            }
209            GossipMessage::PingReq {
210                seq,
211                sender,
212                target,
213                target_addr,
214            } => {
215                buf.put_u8(MSG_PING_REQ);
216                buf.put_u64_le(*seq);
217                encode_node_id(buf, sender);
218                encode_node_id(buf, target);
219                encode_socket_addr(buf, target_addr);
220            }
221            GossipMessage::Ack {
222                seq,
223                sender,
224                updates,
225            } => {
226                buf.put_u8(MSG_ACK);
227                buf.put_u64_le(*seq);
228                encode_node_id(buf, sender);
229                encode_updates(buf, updates);
230            }
231            GossipMessage::Join {
232                sender,
233                sender_addr,
234            } => {
235                buf.put_u8(MSG_JOIN);
236                encode_node_id(buf, sender);
237                encode_socket_addr(buf, sender_addr);
238            }
239            GossipMessage::Welcome { sender, members } => {
240                buf.put_u8(MSG_WELCOME);
241                encode_node_id(buf, sender);
242                encode_slice(buf, members, encode_member_info);
243            }
244            GossipMessage::SlotsAnnounce {
245                sender,
246                incarnation,
247                slots,
248            } => {
249                buf.put_u8(MSG_SLOTS_ANNOUNCE);
250                encode_node_id(buf, sender);
251                buf.put_u64_le(*incarnation);
252                encode_slice(buf, slots, |b, slot| {
253                    b.put_u16_le(slot.start);
254                    b.put_u16_le(slot.end);
255                });
256            }
257        }
258    }
259
260    /// Deserializes a message from bytes.
261    pub fn decode(mut buf: &[u8]) -> io::Result<Self> {
262        if buf.is_empty() {
263            return Err(io::Error::new(
264                io::ErrorKind::UnexpectedEof,
265                "empty message",
266            ));
267        }
268
269        let msg_type = safe_get_u8(&mut buf)?;
270        match msg_type {
271            MSG_PING => {
272                let seq = safe_get_u64_le(&mut buf)?;
273                let sender = decode_node_id(&mut buf)?;
274                let updates = decode_updates(&mut buf)?;
275                Ok(GossipMessage::Ping {
276                    seq,
277                    sender,
278                    updates,
279                })
280            }
281            MSG_PING_REQ => {
282                let seq = safe_get_u64_le(&mut buf)?;
283                let sender = decode_node_id(&mut buf)?;
284                let target = decode_node_id(&mut buf)?;
285                let target_addr = decode_socket_addr(&mut buf)?;
286                Ok(GossipMessage::PingReq {
287                    seq,
288                    sender,
289                    target,
290                    target_addr,
291                })
292            }
293            MSG_ACK => {
294                let seq = safe_get_u64_le(&mut buf)?;
295                let sender = decode_node_id(&mut buf)?;
296                let updates = decode_updates(&mut buf)?;
297                Ok(GossipMessage::Ack {
298                    seq,
299                    sender,
300                    updates,
301                })
302            }
303            MSG_JOIN => {
304                let sender = decode_node_id(&mut buf)?;
305                let sender_addr = decode_socket_addr(&mut buf)?;
306                Ok(GossipMessage::Join {
307                    sender,
308                    sender_addr,
309                })
310            }
311            MSG_WELCOME => {
312                let sender = decode_node_id(&mut buf)?;
313                let count = safe_get_u16_le(&mut buf)? as usize;
314                if count > MAX_COLLECTION_COUNT {
315                    return Err(io::Error::new(
316                        io::ErrorKind::InvalidData,
317                        format!("member count {count} exceeds limit"),
318                    ));
319                }
320                let mut members = Vec::with_capacity(count);
321                for _ in 0..count {
322                    members.push(decode_member_info(&mut buf)?);
323                }
324                Ok(GossipMessage::Welcome { sender, members })
325            }
326            MSG_SLOTS_ANNOUNCE => {
327                let sender = decode_node_id(&mut buf)?;
328                let incarnation = safe_get_u64_le(&mut buf)?;
329                let count = safe_get_u16_le(&mut buf)? as usize;
330                if count > MAX_COLLECTION_COUNT {
331                    return Err(io::Error::new(
332                        io::ErrorKind::InvalidData,
333                        format!("slot range count {count} exceeds limit"),
334                    ));
335                }
336                let mut slots = Vec::with_capacity(count);
337                for _ in 0..count {
338                    let start = safe_get_u16_le(&mut buf)?;
339                    let end = safe_get_u16_le(&mut buf)?;
340                    slots.push(SlotRange::try_new(start, end)?);
341                }
342                Ok(GossipMessage::SlotsAnnounce {
343                    sender,
344                    incarnation,
345                    slots,
346                })
347            }
348            other => Err(io::Error::new(
349                io::ErrorKind::InvalidData,
350                format!("unknown message type: {other}"),
351            )),
352        }
353    }
354
355    /// Encodes the message with a trailing HMAC-SHA256 tag.
356    pub fn encode_authenticated(&self, secret: &ClusterSecret) -> Bytes {
357        let mut buf = BytesMut::with_capacity(256);
358        self.encode_into(&mut buf);
359        let tag = secret.sign(&buf);
360        buf.extend_from_slice(&tag);
361        buf.freeze()
362    }
363
364    /// Decodes a message, verifying the trailing HMAC-SHA256 tag first.
365    ///
366    /// Returns an error if the buffer is too short or the tag doesn't match.
367    pub fn decode_authenticated(buf: &[u8], secret: &ClusterSecret) -> io::Result<Self> {
368        if buf.len() < TAG_LEN {
369            return Err(io::Error::new(
370                io::ErrorKind::InvalidData,
371                "message too short for auth tag",
372            ));
373        }
374        let (payload, tag) = buf.split_at(buf.len() - TAG_LEN);
375        if !secret.verify(payload, tag) {
376            return Err(io::Error::new(
377                io::ErrorKind::PermissionDenied,
378                "cluster auth failed",
379            ));
380        }
381        Self::decode(payload)
382    }
383}
384
385fn encode_node_id(buf: &mut BytesMut, id: &NodeId) {
386    buf.put_slice(id.0.as_bytes());
387}
388
389fn decode_node_id(buf: &mut &[u8]) -> io::Result<NodeId> {
390    if buf.len() < 16 {
391        return Err(io::Error::new(
392            io::ErrorKind::UnexpectedEof,
393            "not enough bytes for node id",
394        ));
395    }
396    let mut bytes = [0u8; 16];
397    buf.read_exact(&mut bytes)?;
398    Ok(NodeId(uuid::Uuid::from_bytes(bytes)))
399}
400
401fn encode_socket_addr(buf: &mut BytesMut, addr: &SocketAddr) {
402    match addr {
403        SocketAddr::V4(v4) => {
404            buf.put_u8(ADDR_IPV4);
405            buf.put_slice(&v4.ip().octets());
406            buf.put_u16_le(v4.port());
407        }
408        SocketAddr::V6(v6) => {
409            buf.put_u8(ADDR_IPV6);
410            buf.put_slice(&v6.ip().octets());
411            buf.put_u16_le(v6.port());
412        }
413    }
414}
415
416fn decode_socket_addr(buf: &mut &[u8]) -> io::Result<SocketAddr> {
417    if buf.is_empty() {
418        return Err(io::Error::new(
419            io::ErrorKind::UnexpectedEof,
420            "not enough bytes for address type",
421        ));
422    }
423    let addr_type = buf.get_u8();
424    match addr_type {
425        ADDR_IPV4 => {
426            // 4 octets + 2-byte port = 6 bytes
427            if buf.len() < 6 {
428                return Err(io::Error::new(
429                    io::ErrorKind::UnexpectedEof,
430                    "not enough bytes for ipv4 address",
431                ));
432            }
433            let mut octets = [0u8; 4];
434            buf.read_exact(&mut octets)?;
435            let port = buf.get_u16_le();
436            Ok(SocketAddr::from((octets, port)))
437        }
438        ADDR_IPV6 => {
439            // 16 octets + 2-byte port = 18 bytes
440            if buf.len() < 18 {
441                return Err(io::Error::new(
442                    io::ErrorKind::UnexpectedEof,
443                    "not enough bytes for ipv6 address",
444                ));
445            }
446            let mut octets = [0u8; 16];
447            buf.read_exact(&mut octets)?;
448            let port = buf.get_u16_le();
449            Ok(SocketAddr::from((octets, port)))
450        }
451        other => Err(io::Error::new(
452            io::ErrorKind::InvalidData,
453            format!("unknown address type: {other}"),
454        )),
455    }
456}
457
458fn encode_updates(buf: &mut BytesMut, updates: &[NodeUpdate]) {
459    encode_slice(buf, updates, encode_update);
460}
461
462fn encode_update(buf: &mut BytesMut, update: &NodeUpdate) {
463    match update {
464        NodeUpdate::Alive {
465            node,
466            addr,
467            incarnation,
468        } => {
469            buf.put_u8(UPDATE_ALIVE);
470            encode_node_id(buf, node);
471            encode_socket_addr(buf, addr);
472            buf.put_u64_le(*incarnation);
473        }
474        NodeUpdate::Suspect { node, incarnation } => {
475            buf.put_u8(UPDATE_SUSPECT);
476            encode_node_id(buf, node);
477            buf.put_u64_le(*incarnation);
478        }
479        NodeUpdate::Dead { node, incarnation } => {
480            buf.put_u8(UPDATE_DEAD);
481            encode_node_id(buf, node);
482            buf.put_u64_le(*incarnation);
483        }
484        NodeUpdate::Left { node } => {
485            buf.put_u8(UPDATE_LEFT);
486            encode_node_id(buf, node);
487        }
488        NodeUpdate::SlotsChanged {
489            node,
490            incarnation,
491            slots,
492        } => {
493            buf.put_u8(UPDATE_SLOTS_CHANGED);
494            encode_node_id(buf, node);
495            buf.put_u64_le(*incarnation);
496            encode_slice(buf, slots, |b, slot| {
497                b.put_u16_le(slot.start);
498                b.put_u16_le(slot.end);
499            });
500        }
501        NodeUpdate::RoleChanged {
502            node,
503            incarnation,
504            is_primary,
505            replicates,
506        } => {
507            buf.put_u8(UPDATE_ROLE_CHANGED);
508            encode_node_id(buf, node);
509            buf.put_u64_le(*incarnation);
510            buf.put_u8(if *is_primary { 1 } else { 0 });
511            match replicates {
512                Some(primary_id) => {
513                    buf.put_u8(1);
514                    encode_node_id(buf, primary_id);
515                }
516                None => {
517                    buf.put_u8(0);
518                }
519            }
520        }
521        NodeUpdate::VoteRequest {
522            candidate,
523            epoch,
524            offset,
525        } => {
526            buf.put_u8(UPDATE_VOTE_REQUEST);
527            encode_node_id(buf, candidate);
528            buf.put_u64_le(*epoch);
529            buf.put_u64_le(*offset);
530        }
531        NodeUpdate::VoteGranted {
532            from,
533            candidate,
534            epoch,
535        } => {
536            buf.put_u8(UPDATE_VOTE_GRANTED);
537            encode_node_id(buf, from);
538            encode_node_id(buf, candidate);
539            buf.put_u64_le(*epoch);
540        }
541    }
542}
543
544fn decode_updates(buf: &mut &[u8]) -> io::Result<Vec<NodeUpdate>> {
545    let count = safe_get_u16_le(buf)? as usize;
546    if count > MAX_COLLECTION_COUNT {
547        return Err(io::Error::new(
548            io::ErrorKind::InvalidData,
549            format!("update count {count} exceeds limit"),
550        ));
551    }
552    let mut updates = Vec::with_capacity(count);
553    for _ in 0..count {
554        updates.push(decode_update(buf)?);
555    }
556    Ok(updates)
557}
558
559fn decode_update(buf: &mut &[u8]) -> io::Result<NodeUpdate> {
560    let update_type = safe_get_u8(buf)?;
561    match update_type {
562        UPDATE_ALIVE => {
563            let node = decode_node_id(buf)?;
564            let addr = decode_socket_addr(buf)?;
565            let incarnation = safe_get_u64_le(buf)?;
566            Ok(NodeUpdate::Alive {
567                node,
568                addr,
569                incarnation,
570            })
571        }
572        UPDATE_SUSPECT => {
573            let node = decode_node_id(buf)?;
574            let incarnation = safe_get_u64_le(buf)?;
575            Ok(NodeUpdate::Suspect { node, incarnation })
576        }
577        UPDATE_DEAD => {
578            let node = decode_node_id(buf)?;
579            let incarnation = safe_get_u64_le(buf)?;
580            Ok(NodeUpdate::Dead { node, incarnation })
581        }
582        UPDATE_LEFT => {
583            let node = decode_node_id(buf)?;
584            Ok(NodeUpdate::Left { node })
585        }
586        UPDATE_SLOTS_CHANGED => {
587            let node = decode_node_id(buf)?;
588            let incarnation = safe_get_u64_le(buf)?;
589            let count = safe_get_u16_le(buf)? as usize;
590            if count > MAX_COLLECTION_COUNT {
591                return Err(io::Error::new(
592                    io::ErrorKind::InvalidData,
593                    format!("slot range count {count} exceeds limit"),
594                ));
595            }
596            let mut slots = Vec::with_capacity(count);
597            for _ in 0..count {
598                let start = safe_get_u16_le(buf)?;
599                let end = safe_get_u16_le(buf)?;
600                slots.push(SlotRange::try_new(start, end)?);
601            }
602            Ok(NodeUpdate::SlotsChanged {
603                node,
604                incarnation,
605                slots,
606            })
607        }
608        UPDATE_ROLE_CHANGED => {
609            let node = decode_node_id(buf)?;
610            let incarnation = safe_get_u64_le(buf)?;
611            let is_primary = safe_get_u8(buf)? != 0;
612            let has_replicates = safe_get_u8(buf)? != 0;
613            let replicates = if has_replicates {
614                Some(decode_node_id(buf)?)
615            } else {
616                None
617            };
618            Ok(NodeUpdate::RoleChanged {
619                node,
620                incarnation,
621                is_primary,
622                replicates,
623            })
624        }
625        UPDATE_VOTE_REQUEST => {
626            let candidate = decode_node_id(buf)?;
627            let epoch = safe_get_u64_le(buf)?;
628            let offset = safe_get_u64_le(buf)?;
629            Ok(NodeUpdate::VoteRequest {
630                candidate,
631                epoch,
632                offset,
633            })
634        }
635        UPDATE_VOTE_GRANTED => {
636            let from = decode_node_id(buf)?;
637            let candidate = decode_node_id(buf)?;
638            let epoch = safe_get_u64_le(buf)?;
639            Ok(NodeUpdate::VoteGranted {
640                from,
641                candidate,
642                epoch,
643            })
644        }
645        other => Err(io::Error::new(
646            io::ErrorKind::InvalidData,
647            format!("unknown update type: {other}"),
648        )),
649    }
650}
651
652fn encode_member_info(buf: &mut BytesMut, member: &MemberInfo) {
653    encode_node_id(buf, &member.id);
654    encode_socket_addr(buf, &member.addr);
655    buf.put_u64_le(member.incarnation);
656    buf.put_u8(if member.is_primary { 1 } else { 0 });
657    encode_slice(buf, &member.slots, |b, slot| {
658        b.put_u16_le(slot.start);
659        b.put_u16_le(slot.end);
660    });
661}
662
663fn decode_member_info(buf: &mut &[u8]) -> io::Result<MemberInfo> {
664    let id = decode_node_id(buf)?;
665    let addr = decode_socket_addr(buf)?;
666    let incarnation = safe_get_u64_le(buf)?;
667    let is_primary = safe_get_u8(buf)? != 0;
668    let slot_count = safe_get_u16_le(buf)? as usize;
669    if slot_count > MAX_COLLECTION_COUNT {
670        return Err(io::Error::new(
671            io::ErrorKind::InvalidData,
672            format!("slot range count {slot_count} exceeds limit"),
673        ));
674    }
675    let mut slots = Vec::with_capacity(slot_count);
676    for _ in 0..slot_count {
677        let start = safe_get_u16_le(buf)?;
678        let end = safe_get_u16_le(buf)?;
679        slots.push(SlotRange::try_new(start, end)?);
680    }
681    Ok(MemberInfo {
682        id,
683        addr,
684        incarnation,
685        is_primary,
686        slots,
687    })
688}
689
690#[cfg(test)]
691mod tests {
692    use super::*;
693    use std::net::{Ipv4Addr, Ipv6Addr};
694
695    fn test_addr() -> SocketAddr {
696        SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 6379))
697    }
698
699    fn test_addr_v6() -> SocketAddr {
700        SocketAddr::from((Ipv6Addr::LOCALHOST, 6379))
701    }
702
703    #[test]
704    fn ping_roundtrip() {
705        let msg = GossipMessage::Ping {
706            seq: 42,
707            sender: NodeId::new(),
708            updates: vec![],
709        };
710        let encoded = msg.encode();
711        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
712        assert_eq!(msg, decoded);
713    }
714
715    #[test]
716    fn ping_with_updates() {
717        let node1 = NodeId::new();
718        let node2 = NodeId::new();
719        let msg = GossipMessage::Ping {
720            seq: 100,
721            sender: node1,
722            updates: vec![
723                NodeUpdate::Alive {
724                    node: node2,
725                    addr: test_addr(),
726                    incarnation: 5,
727                },
728                NodeUpdate::Suspect {
729                    node: node1,
730                    incarnation: 3,
731                },
732            ],
733        };
734        let encoded = msg.encode();
735        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
736        assert_eq!(msg, decoded);
737    }
738
739    #[test]
740    fn ping_req_roundtrip() {
741        let msg = GossipMessage::PingReq {
742            seq: 99,
743            sender: NodeId::new(),
744            target: NodeId::new(),
745            target_addr: test_addr(),
746        };
747        let encoded = msg.encode();
748        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
749        assert_eq!(msg, decoded);
750    }
751
752    #[test]
753    fn ack_roundtrip() {
754        let msg = GossipMessage::Ack {
755            seq: 42,
756            sender: NodeId::new(),
757            updates: vec![NodeUpdate::Dead {
758                node: NodeId::new(),
759                incarnation: 10,
760            }],
761        };
762        let encoded = msg.encode();
763        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
764        assert_eq!(msg, decoded);
765    }
766
767    #[test]
768    fn join_roundtrip() {
769        let msg = GossipMessage::Join {
770            sender: NodeId::new(),
771            sender_addr: test_addr(),
772        };
773        let encoded = msg.encode();
774        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
775        assert_eq!(msg, decoded);
776    }
777
778    #[test]
779    fn welcome_roundtrip() {
780        let msg = GossipMessage::Welcome {
781            sender: NodeId::new(),
782            members: vec![
783                MemberInfo {
784                    id: NodeId::new(),
785                    addr: test_addr(),
786                    incarnation: 1,
787                    is_primary: true,
788                    slots: vec![SlotRange::new(0, 5460)],
789                },
790                MemberInfo {
791                    id: NodeId::new(),
792                    addr: test_addr(),
793                    incarnation: 2,
794                    is_primary: false,
795                    slots: vec![],
796                },
797            ],
798        };
799        let encoded = msg.encode();
800        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
801        assert_eq!(msg, decoded);
802    }
803
804    #[test]
805    fn ipv6_address() {
806        let msg = GossipMessage::Join {
807            sender: NodeId::new(),
808            sender_addr: test_addr_v6(),
809        };
810        let encoded = msg.encode();
811        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
812        assert_eq!(msg, decoded);
813    }
814
815    #[test]
816    fn role_changed_roundtrip() {
817        let node = NodeId::new();
818        let primary = NodeId::new();
819
820        // replica variant
821        let msg = GossipMessage::Ping {
822            seq: 1,
823            sender: node,
824            updates: vec![NodeUpdate::RoleChanged {
825                node,
826                incarnation: 7,
827                is_primary: false,
828                replicates: Some(primary),
829            }],
830        };
831        let encoded = msg.encode();
832        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
833        assert_eq!(msg, decoded);
834
835        // primary variant (no replicates field)
836        let msg2 = GossipMessage::Ping {
837            seq: 2,
838            sender: node,
839            updates: vec![NodeUpdate::RoleChanged {
840                node,
841                incarnation: 8,
842                is_primary: true,
843                replicates: None,
844            }],
845        };
846        let encoded2 = msg2.encode();
847        let decoded2 = GossipMessage::decode(&encoded2).expect("roundtrip should succeed");
848        assert_eq!(msg2, decoded2);
849    }
850
851    #[test]
852    fn all_update_types() {
853        let node = NodeId::new();
854        let updates = vec![
855            NodeUpdate::Alive {
856                node,
857                addr: test_addr(),
858                incarnation: 1,
859            },
860            NodeUpdate::Suspect {
861                node,
862                incarnation: 2,
863            },
864            NodeUpdate::Dead {
865                node,
866                incarnation: 3,
867            },
868            NodeUpdate::Left { node },
869            NodeUpdate::SlotsChanged {
870                node,
871                incarnation: 4,
872                slots: vec![SlotRange::new(0, 5460)],
873            },
874        ];
875        let msg = GossipMessage::Ping {
876            seq: 1,
877            sender: node,
878            updates,
879        };
880        let encoded = msg.encode();
881        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
882        assert_eq!(msg, decoded);
883    }
884
885    #[test]
886    fn slots_changed_empty_roundtrip() {
887        let node = NodeId::new();
888        let msg = GossipMessage::Ping {
889            seq: 1,
890            sender: node,
891            updates: vec![NodeUpdate::SlotsChanged {
892                node,
893                incarnation: 1,
894                slots: vec![],
895            }],
896        };
897        let encoded = msg.encode();
898        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
899        assert_eq!(msg, decoded);
900    }
901
902    #[test]
903    fn slots_changed_multiple_ranges_roundtrip() {
904        let node = NodeId::new();
905        let msg = GossipMessage::Ping {
906            seq: 1,
907            sender: node,
908            updates: vec![NodeUpdate::SlotsChanged {
909                node,
910                incarnation: 5,
911                slots: vec![
912                    SlotRange::new(0, 5460),
913                    SlotRange::new(5461, 10922),
914                    SlotRange::new(10923, 16383),
915                ],
916            }],
917        };
918        let encoded = msg.encode();
919        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
920        assert_eq!(msg, decoded);
921    }
922
923    #[test]
924    fn empty_message_error() {
925        let result = GossipMessage::decode(&[]);
926        assert!(result.is_err());
927    }
928
929    #[test]
930    fn unknown_message_type_error() {
931        let result = GossipMessage::decode(&[255]);
932        assert!(result.is_err());
933    }
934
935    #[test]
936    fn invalid_slot_range_in_welcome_rejected() {
937        // craft a Welcome message with an invalid slot range (start > end)
938        let mut buf = BytesMut::new();
939        buf.put_u8(MSG_WELCOME);
940        encode_node_id(&mut buf, &NodeId::new());
941        buf.put_u16_le(1); // 1 member
942        encode_node_id(&mut buf, &NodeId::new());
943        encode_socket_addr(&mut buf, &test_addr());
944        buf.put_u64_le(1); // incarnation
945        buf.put_u8(1); // is_primary
946        buf.put_u16_le(1); // 1 slot range
947        buf.put_u16_le(5000); // start
948        buf.put_u16_le(100); // end < start — invalid
949
950        let result = GossipMessage::decode(&buf);
951        assert!(result.is_err(), "should reject inverted slot range");
952    }
953
954    #[test]
955    fn vote_request_roundtrip() {
956        let candidate = NodeId::new();
957        let msg = GossipMessage::Ping {
958            seq: 1,
959            sender: candidate,
960            updates: vec![NodeUpdate::VoteRequest {
961                candidate,
962                epoch: 5,
963                offset: 1234,
964            }],
965        };
966        let encoded = msg.encode();
967        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
968        assert_eq!(msg, decoded);
969    }
970
971    #[test]
972    fn vote_granted_roundtrip() {
973        let primary = NodeId::new();
974        let candidate = NodeId::new();
975        let msg = GossipMessage::Ack {
976            seq: 2,
977            sender: primary,
978            updates: vec![NodeUpdate::VoteGranted {
979                from: primary,
980                candidate,
981                epoch: 5,
982            }],
983        };
984        let encoded = msg.encode();
985        let decoded = GossipMessage::decode(&encoded).expect("roundtrip should succeed");
986        assert_eq!(msg, decoded);
987    }
988
989    #[test]
990    fn out_of_range_slot_in_welcome_rejected() {
991        // craft a Welcome message with a slot >= 16384
992        let mut buf = BytesMut::new();
993        buf.put_u8(MSG_WELCOME);
994        encode_node_id(&mut buf, &NodeId::new());
995        buf.put_u16_le(1);
996        encode_node_id(&mut buf, &NodeId::new());
997        encode_socket_addr(&mut buf, &test_addr());
998        buf.put_u64_le(1);
999        buf.put_u8(1);
1000        buf.put_u16_le(1); // 1 slot range
1001        buf.put_u16_le(0);
1002        buf.put_u16_le(16384); // out of range
1003
1004        let result = GossipMessage::decode(&buf);
1005        assert!(result.is_err(), "should reject slot >= 16384");
1006    }
1007
1008    // -- authenticated encode/decode tests --
1009
1010    #[test]
1011    fn authenticated_roundtrip() {
1012        let secret = ClusterSecret::from_password("cluster-pass");
1013        let msg = GossipMessage::Ping {
1014            seq: 42,
1015            sender: NodeId::new(),
1016            updates: vec![],
1017        };
1018        let encoded = msg.encode_authenticated(&secret);
1019        let decoded =
1020            GossipMessage::decode_authenticated(&encoded, &secret).expect("auth roundtrip");
1021        assert_eq!(msg, decoded);
1022    }
1023
1024    #[test]
1025    fn wrong_secret_rejected() {
1026        let s1 = ClusterSecret::from_password("secret-a");
1027        let s2 = ClusterSecret::from_password("secret-b");
1028        let msg = GossipMessage::Ping {
1029            seq: 1,
1030            sender: NodeId::new(),
1031            updates: vec![],
1032        };
1033        let encoded = msg.encode_authenticated(&s1);
1034        let result = GossipMessage::decode_authenticated(&encoded, &s2);
1035        assert!(result.is_err());
1036    }
1037
1038    #[test]
1039    fn truncated_auth_message_rejected() {
1040        let secret = ClusterSecret::from_password("test");
1041        // too short to even contain a 32-byte tag
1042        let result = GossipMessage::decode_authenticated(&[0u8; 16], &secret);
1043        assert!(result.is_err());
1044    }
1045}