Skip to main content

ember_cluster/
message.rs

1//! Binary wire format for cluster gossip messages.
2//!
3//! Uses a compact binary encoding for efficiency over the network.
4//! All multi-byte integers are little-endian.
5
6use std::io::{self, Read};
7use std::net::SocketAddr;
8
9use bytes::{Buf, BufMut, Bytes, BytesMut};
10
11use crate::{NodeId, SlotRange};
12
13/// Message types for the SWIM gossip protocol.
14#[derive(Debug, Clone, PartialEq)]
15pub enum GossipMessage {
16    /// Direct probe to check if a node is alive.
17    Ping {
18        seq: u64,
19        sender: NodeId,
20        /// Piggybacked state updates.
21        updates: Vec<NodeUpdate>,
22    },
23
24    /// Request another node to probe a target on our behalf.
25    PingReq {
26        seq: u64,
27        sender: NodeId,
28        target: NodeId,
29        target_addr: SocketAddr,
30    },
31
32    /// Response to a Ping or forwarded PingReq.
33    Ack {
34        seq: u64,
35        sender: NodeId,
36        /// Piggybacked state updates.
37        updates: Vec<NodeUpdate>,
38    },
39
40    /// Join request from a new node.
41    Join {
42        sender: NodeId,
43        sender_addr: SocketAddr,
44    },
45
46    /// Welcome response with current cluster state.
47    Welcome {
48        sender: NodeId,
49        members: Vec<MemberInfo>,
50    },
51}
52
53/// A state update about a node, piggybacked on protocol messages.
54#[derive(Debug, Clone, PartialEq)]
55pub enum NodeUpdate {
56    /// Node is alive with given incarnation number.
57    Alive {
58        node: NodeId,
59        addr: SocketAddr,
60        incarnation: u64,
61    },
62    /// Node is suspected to be failing.
63    Suspect { node: NodeId, incarnation: u64 },
64    /// Node has been confirmed dead.
65    Dead { node: NodeId, incarnation: u64 },
66    /// Node left the cluster gracefully.
67    Left { node: NodeId },
68}
69
70/// Information about a cluster member.
71#[derive(Debug, Clone, PartialEq)]
72pub struct MemberInfo {
73    pub id: NodeId,
74    pub addr: SocketAddr,
75    pub incarnation: u64,
76    pub is_primary: bool,
77    pub slots: Vec<SlotRange>,
78}
79
80// Wire format constants
81const MSG_PING: u8 = 1;
82const MSG_PING_REQ: u8 = 2;
83const MSG_ACK: u8 = 3;
84const MSG_JOIN: u8 = 4;
85const MSG_WELCOME: u8 = 5;
86
87const UPDATE_ALIVE: u8 = 1;
88const UPDATE_SUSPECT: u8 = 2;
89const UPDATE_DEAD: u8 = 3;
90const UPDATE_LEFT: u8 = 4;
91
92impl GossipMessage {
93    /// Serializes the message to bytes.
94    pub fn encode(&self) -> Bytes {
95        let mut buf = BytesMut::with_capacity(256);
96        self.encode_into(&mut buf);
97        buf.freeze()
98    }
99
100    /// Serializes the message into the given buffer.
101    pub fn encode_into(&self, buf: &mut BytesMut) {
102        match self {
103            GossipMessage::Ping {
104                seq,
105                sender,
106                updates,
107            } => {
108                buf.put_u8(MSG_PING);
109                buf.put_u64_le(*seq);
110                encode_node_id(buf, sender);
111                encode_updates(buf, updates);
112            }
113            GossipMessage::PingReq {
114                seq,
115                sender,
116                target,
117                target_addr,
118            } => {
119                buf.put_u8(MSG_PING_REQ);
120                buf.put_u64_le(*seq);
121                encode_node_id(buf, sender);
122                encode_node_id(buf, target);
123                encode_socket_addr(buf, target_addr);
124            }
125            GossipMessage::Ack {
126                seq,
127                sender,
128                updates,
129            } => {
130                buf.put_u8(MSG_ACK);
131                buf.put_u64_le(*seq);
132                encode_node_id(buf, sender);
133                encode_updates(buf, updates);
134            }
135            GossipMessage::Join {
136                sender,
137                sender_addr,
138            } => {
139                buf.put_u8(MSG_JOIN);
140                encode_node_id(buf, sender);
141                encode_socket_addr(buf, sender_addr);
142            }
143            GossipMessage::Welcome { sender, members } => {
144                buf.put_u8(MSG_WELCOME);
145                encode_node_id(buf, sender);
146                buf.put_u16_le(members.len() as u16);
147                for member in members {
148                    encode_member_info(buf, member);
149                }
150            }
151        }
152    }
153
154    /// Deserializes a message from bytes.
155    pub fn decode(mut buf: &[u8]) -> io::Result<Self> {
156        if buf.is_empty() {
157            return Err(io::Error::new(
158                io::ErrorKind::UnexpectedEof,
159                "empty message",
160            ));
161        }
162
163        let msg_type = buf.get_u8();
164        match msg_type {
165            MSG_PING => {
166                let seq = buf.get_u64_le();
167                let sender = decode_node_id(&mut buf)?;
168                let updates = decode_updates(&mut buf)?;
169                Ok(GossipMessage::Ping {
170                    seq,
171                    sender,
172                    updates,
173                })
174            }
175            MSG_PING_REQ => {
176                let seq = buf.get_u64_le();
177                let sender = decode_node_id(&mut buf)?;
178                let target = decode_node_id(&mut buf)?;
179                let target_addr = decode_socket_addr(&mut buf)?;
180                Ok(GossipMessage::PingReq {
181                    seq,
182                    sender,
183                    target,
184                    target_addr,
185                })
186            }
187            MSG_ACK => {
188                let seq = buf.get_u64_le();
189                let sender = decode_node_id(&mut buf)?;
190                let updates = decode_updates(&mut buf)?;
191                Ok(GossipMessage::Ack {
192                    seq,
193                    sender,
194                    updates,
195                })
196            }
197            MSG_JOIN => {
198                let sender = decode_node_id(&mut buf)?;
199                let sender_addr = decode_socket_addr(&mut buf)?;
200                Ok(GossipMessage::Join {
201                    sender,
202                    sender_addr,
203                })
204            }
205            MSG_WELCOME => {
206                let sender = decode_node_id(&mut buf)?;
207                let count = buf.get_u16_le() as usize;
208                let mut members = Vec::with_capacity(count);
209                for _ in 0..count {
210                    members.push(decode_member_info(&mut buf)?);
211                }
212                Ok(GossipMessage::Welcome { sender, members })
213            }
214            other => Err(io::Error::new(
215                io::ErrorKind::InvalidData,
216                format!("unknown message type: {other}"),
217            )),
218        }
219    }
220}
221
222fn encode_node_id(buf: &mut BytesMut, id: &NodeId) {
223    buf.put_slice(id.0.as_bytes());
224}
225
226fn decode_node_id(buf: &mut &[u8]) -> io::Result<NodeId> {
227    if buf.len() < 16 {
228        return Err(io::Error::new(
229            io::ErrorKind::UnexpectedEof,
230            "not enough bytes for node id",
231        ));
232    }
233    let mut bytes = [0u8; 16];
234    buf.read_exact(&mut bytes)?;
235    Ok(NodeId(uuid::Uuid::from_bytes(bytes)))
236}
237
238fn encode_socket_addr(buf: &mut BytesMut, addr: &SocketAddr) {
239    match addr {
240        SocketAddr::V4(v4) => {
241            buf.put_u8(4);
242            buf.put_slice(&v4.ip().octets());
243            buf.put_u16_le(v4.port());
244        }
245        SocketAddr::V6(v6) => {
246            buf.put_u8(6);
247            buf.put_slice(&v6.ip().octets());
248            buf.put_u16_le(v6.port());
249        }
250    }
251}
252
253fn decode_socket_addr(buf: &mut &[u8]) -> io::Result<SocketAddr> {
254    if buf.is_empty() {
255        return Err(io::Error::new(
256            io::ErrorKind::UnexpectedEof,
257            "not enough bytes for address type",
258        ));
259    }
260    let addr_type = buf.get_u8();
261    match addr_type {
262        4 => {
263            if buf.len() < 6 {
264                return Err(io::Error::new(
265                    io::ErrorKind::UnexpectedEof,
266                    "not enough bytes for ipv4 address",
267                ));
268            }
269            let mut octets = [0u8; 4];
270            buf.read_exact(&mut octets)?;
271            let port = buf.get_u16_le();
272            Ok(SocketAddr::from((octets, port)))
273        }
274        6 => {
275            if buf.len() < 18 {
276                return Err(io::Error::new(
277                    io::ErrorKind::UnexpectedEof,
278                    "not enough bytes for ipv6 address",
279                ));
280            }
281            let mut octets = [0u8; 16];
282            buf.read_exact(&mut octets)?;
283            let port = buf.get_u16_le();
284            Ok(SocketAddr::from((octets, port)))
285        }
286        other => Err(io::Error::new(
287            io::ErrorKind::InvalidData,
288            format!("unknown address type: {other}"),
289        )),
290    }
291}
292
293fn encode_updates(buf: &mut BytesMut, updates: &[NodeUpdate]) {
294    buf.put_u16_le(updates.len() as u16);
295    for update in updates {
296        encode_update(buf, update);
297    }
298}
299
300fn encode_update(buf: &mut BytesMut, update: &NodeUpdate) {
301    match update {
302        NodeUpdate::Alive {
303            node,
304            addr,
305            incarnation,
306        } => {
307            buf.put_u8(UPDATE_ALIVE);
308            encode_node_id(buf, node);
309            encode_socket_addr(buf, addr);
310            buf.put_u64_le(*incarnation);
311        }
312        NodeUpdate::Suspect { node, incarnation } => {
313            buf.put_u8(UPDATE_SUSPECT);
314            encode_node_id(buf, node);
315            buf.put_u64_le(*incarnation);
316        }
317        NodeUpdate::Dead { node, incarnation } => {
318            buf.put_u8(UPDATE_DEAD);
319            encode_node_id(buf, node);
320            buf.put_u64_le(*incarnation);
321        }
322        NodeUpdate::Left { node } => {
323            buf.put_u8(UPDATE_LEFT);
324            encode_node_id(buf, node);
325        }
326    }
327}
328
329fn decode_updates(buf: &mut &[u8]) -> io::Result<Vec<NodeUpdate>> {
330    if buf.len() < 2 {
331        return Err(io::Error::new(
332            io::ErrorKind::UnexpectedEof,
333            "not enough bytes for update count",
334        ));
335    }
336    let count = buf.get_u16_le() as usize;
337    let mut updates = Vec::with_capacity(count);
338    for _ in 0..count {
339        updates.push(decode_update(buf)?);
340    }
341    Ok(updates)
342}
343
344fn decode_update(buf: &mut &[u8]) -> io::Result<NodeUpdate> {
345    if buf.is_empty() {
346        return Err(io::Error::new(
347            io::ErrorKind::UnexpectedEof,
348            "not enough bytes for update type",
349        ));
350    }
351    let update_type = buf.get_u8();
352    match update_type {
353        UPDATE_ALIVE => {
354            let node = decode_node_id(buf)?;
355            let addr = decode_socket_addr(buf)?;
356            let incarnation = buf.get_u64_le();
357            Ok(NodeUpdate::Alive {
358                node,
359                addr,
360                incarnation,
361            })
362        }
363        UPDATE_SUSPECT => {
364            let node = decode_node_id(buf)?;
365            let incarnation = buf.get_u64_le();
366            Ok(NodeUpdate::Suspect { node, incarnation })
367        }
368        UPDATE_DEAD => {
369            let node = decode_node_id(buf)?;
370            let incarnation = buf.get_u64_le();
371            Ok(NodeUpdate::Dead { node, incarnation })
372        }
373        UPDATE_LEFT => {
374            let node = decode_node_id(buf)?;
375            Ok(NodeUpdate::Left { node })
376        }
377        other => Err(io::Error::new(
378            io::ErrorKind::InvalidData,
379            format!("unknown update type: {other}"),
380        )),
381    }
382}
383
384fn encode_member_info(buf: &mut BytesMut, member: &MemberInfo) {
385    encode_node_id(buf, &member.id);
386    encode_socket_addr(buf, &member.addr);
387    buf.put_u64_le(member.incarnation);
388    buf.put_u8(if member.is_primary { 1 } else { 0 });
389    buf.put_u16_le(member.slots.len() as u16);
390    for slot in &member.slots {
391        buf.put_u16_le(slot.start);
392        buf.put_u16_le(slot.end);
393    }
394}
395
396fn decode_member_info(buf: &mut &[u8]) -> io::Result<MemberInfo> {
397    let id = decode_node_id(buf)?;
398    let addr = decode_socket_addr(buf)?;
399    let incarnation = buf.get_u64_le();
400    let is_primary = buf.get_u8() != 0;
401    let slot_count = buf.get_u16_le() as usize;
402    let mut slots = Vec::with_capacity(slot_count);
403    for _ in 0..slot_count {
404        let start = buf.get_u16_le();
405        let end = buf.get_u16_le();
406        slots.push(SlotRange::new(start, end));
407    }
408    Ok(MemberInfo {
409        id,
410        addr,
411        incarnation,
412        is_primary,
413        slots,
414    })
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use std::net::{Ipv4Addr, Ipv6Addr};
421
422    fn test_addr() -> SocketAddr {
423        SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 6379))
424    }
425
426    fn test_addr_v6() -> SocketAddr {
427        SocketAddr::from((Ipv6Addr::LOCALHOST, 6379))
428    }
429
430    #[test]
431    fn ping_roundtrip() {
432        let msg = GossipMessage::Ping {
433            seq: 42,
434            sender: NodeId::new(),
435            updates: vec![],
436        };
437        let encoded = msg.encode();
438        let decoded = GossipMessage::decode(&encoded).unwrap();
439        assert_eq!(msg, decoded);
440    }
441
442    #[test]
443    fn ping_with_updates() {
444        let node1 = NodeId::new();
445        let node2 = NodeId::new();
446        let msg = GossipMessage::Ping {
447            seq: 100,
448            sender: node1,
449            updates: vec![
450                NodeUpdate::Alive {
451                    node: node2,
452                    addr: test_addr(),
453                    incarnation: 5,
454                },
455                NodeUpdate::Suspect {
456                    node: node1,
457                    incarnation: 3,
458                },
459            ],
460        };
461        let encoded = msg.encode();
462        let decoded = GossipMessage::decode(&encoded).unwrap();
463        assert_eq!(msg, decoded);
464    }
465
466    #[test]
467    fn ping_req_roundtrip() {
468        let msg = GossipMessage::PingReq {
469            seq: 99,
470            sender: NodeId::new(),
471            target: NodeId::new(),
472            target_addr: test_addr(),
473        };
474        let encoded = msg.encode();
475        let decoded = GossipMessage::decode(&encoded).unwrap();
476        assert_eq!(msg, decoded);
477    }
478
479    #[test]
480    fn ack_roundtrip() {
481        let msg = GossipMessage::Ack {
482            seq: 42,
483            sender: NodeId::new(),
484            updates: vec![NodeUpdate::Dead {
485                node: NodeId::new(),
486                incarnation: 10,
487            }],
488        };
489        let encoded = msg.encode();
490        let decoded = GossipMessage::decode(&encoded).unwrap();
491        assert_eq!(msg, decoded);
492    }
493
494    #[test]
495    fn join_roundtrip() {
496        let msg = GossipMessage::Join {
497            sender: NodeId::new(),
498            sender_addr: test_addr(),
499        };
500        let encoded = msg.encode();
501        let decoded = GossipMessage::decode(&encoded).unwrap();
502        assert_eq!(msg, decoded);
503    }
504
505    #[test]
506    fn welcome_roundtrip() {
507        let msg = GossipMessage::Welcome {
508            sender: NodeId::new(),
509            members: vec![
510                MemberInfo {
511                    id: NodeId::new(),
512                    addr: test_addr(),
513                    incarnation: 1,
514                    is_primary: true,
515                    slots: vec![SlotRange::new(0, 5460)],
516                },
517                MemberInfo {
518                    id: NodeId::new(),
519                    addr: test_addr(),
520                    incarnation: 2,
521                    is_primary: false,
522                    slots: vec![],
523                },
524            ],
525        };
526        let encoded = msg.encode();
527        let decoded = GossipMessage::decode(&encoded).unwrap();
528        assert_eq!(msg, decoded);
529    }
530
531    #[test]
532    fn ipv6_address() {
533        let msg = GossipMessage::Join {
534            sender: NodeId::new(),
535            sender_addr: test_addr_v6(),
536        };
537        let encoded = msg.encode();
538        let decoded = GossipMessage::decode(&encoded).unwrap();
539        assert_eq!(msg, decoded);
540    }
541
542    #[test]
543    fn all_update_types() {
544        let node = NodeId::new();
545        let updates = vec![
546            NodeUpdate::Alive {
547                node,
548                addr: test_addr(),
549                incarnation: 1,
550            },
551            NodeUpdate::Suspect {
552                node,
553                incarnation: 2,
554            },
555            NodeUpdate::Dead {
556                node,
557                incarnation: 3,
558            },
559            NodeUpdate::Left { node },
560        ];
561        let msg = GossipMessage::Ping {
562            seq: 1,
563            sender: node,
564            updates,
565        };
566        let encoded = msg.encode();
567        let decoded = GossipMessage::decode(&encoded).unwrap();
568        assert_eq!(msg, decoded);
569    }
570
571    #[test]
572    fn empty_message_error() {
573        let result = GossipMessage::decode(&[]);
574        assert!(result.is_err());
575    }
576
577    #[test]
578    fn unknown_message_type_error() {
579        let result = GossipMessage::decode(&[255]);
580        assert!(result.is_err());
581    }
582}