datacake_chitchat_fork/
message.rs

1use std::io::BufRead;
2
3use anyhow::Context;
4
5use crate::delta::Delta;
6use crate::digest::Digest;
7use crate::serialize::Serializable;
8
9/// Chitchat message.
10///
11/// Each variant represents a step of the gossip "handshake"
12/// between node A and node B.
13/// The names {Syn, SynAck, Ack} of the different steps are borrowed from
14/// TCP Handshake.
15#[derive(Debug, PartialEq, Eq)]
16pub enum ChitchatMessage {
17    /// Node A initiates handshakes.
18    Syn { cluster_id: String, digest: Digest },
19    /// Node B returns a partial update as described
20    /// in the scuttlebutt reconcialiation algorithm,
21    /// and returns its own checksum.
22    SynAck { digest: Digest, delta: Delta },
23    /// Node A returns a partial update for B.
24    Ack { delta: Delta },
25    /// Node B rejects the Syn message because of a
26    /// cluster name mismatch between the peers.
27    BadCluster,
28}
29
30#[derive(Copy, Clone)]
31#[repr(u8)]
32enum MessageType {
33    Syn = 0,
34    SynAck = 1u8,
35    Ack = 2u8,
36    BadCluster = 3u8,
37}
38
39impl MessageType {
40    pub fn from_code(code: u8) -> Option<Self> {
41        match code {
42            0 => Some(Self::Syn),
43            1 => Some(Self::SynAck),
44            2 => Some(Self::Ack),
45            3 => Some(Self::BadCluster),
46            _ => None,
47        }
48    }
49    pub fn to_code(self) -> u8 {
50        self as u8
51    }
52}
53
54impl Serializable for ChitchatMessage {
55    fn serialize(&self, buf: &mut Vec<u8>) {
56        match self {
57            ChitchatMessage::Syn { cluster_id, digest } => {
58                buf.push(MessageType::Syn.to_code());
59                digest.serialize(buf);
60                cluster_id.serialize(buf);
61            }
62            ChitchatMessage::SynAck { digest, delta } => {
63                buf.push(MessageType::SynAck.to_code());
64                digest.serialize(buf);
65                delta.serialize(buf);
66            }
67            ChitchatMessage::Ack { delta } => {
68                buf.push(MessageType::Ack.to_code());
69                delta.serialize(buf);
70            }
71            ChitchatMessage::BadCluster => {
72                buf.push(MessageType::BadCluster.to_code());
73            }
74        }
75    }
76
77    fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
78        let code = buf
79            .first()
80            .cloned()
81            .and_then(MessageType::from_code)
82            .context("Invalid message type")?;
83        buf.consume(1);
84        match code {
85            MessageType::Syn => {
86                let digest = Digest::deserialize(buf)?;
87                let cluster_id = String::deserialize(buf)?;
88                Ok(Self::Syn { cluster_id, digest })
89            }
90            MessageType::SynAck => {
91                let digest = Digest::deserialize(buf)?;
92                let delta = Delta::deserialize(buf)?;
93                Ok(Self::SynAck { digest, delta })
94            }
95            MessageType::Ack => {
96                let delta = Delta::deserialize(buf)?;
97                Ok(Self::Ack { delta })
98            }
99            MessageType::BadCluster => Ok(Self::BadCluster),
100        }
101    }
102
103    fn serialized_len(&self) -> usize {
104        match self {
105            ChitchatMessage::Syn { cluster_id, digest } => {
106                1 + cluster_id.serialized_len() + digest.serialized_len()
107            }
108            ChitchatMessage::SynAck { digest, delta } => syn_ack_serialized_len(digest, delta),
109            ChitchatMessage::Ack { delta } => 1 + delta.serialized_len(),
110            ChitchatMessage::BadCluster => 1,
111        }
112    }
113}
114
115pub(crate) fn syn_ack_serialized_len(digest: &Digest, delta: &Delta) -> usize {
116    1 + digest.serialized_len() + delta.serialized_len()
117}
118
119#[cfg(test)]
120mod tests {
121    use crate::serialize::test_serdeser_aux;
122    use crate::{ChitchatMessage, Digest, NodeId};
123
124    #[test]
125    fn test_syn() {
126        let mut digest = Digest::default();
127        digest.add_node(NodeId::for_test_localhost(10_001), 1);
128        digest.add_node(NodeId::for_test_localhost(10_002), 2);
129        let syn = ChitchatMessage::Syn {
130            cluster_id: "cluster-a".to_string(),
131            digest,
132        };
133        test_serdeser_aux(&syn, 68);
134    }
135
136    #[test]
137    fn test_bad_cluster() {
138        test_serdeser_aux(&ChitchatMessage::BadCluster, 1);
139    }
140}