datacake_chitchat_fork/
message.rs1use std::io::BufRead;
2
3use anyhow::Context;
4
5use crate::delta::Delta;
6use crate::digest::Digest;
7use crate::serialize::Serializable;
8
9#[derive(Debug, PartialEq, Eq)]
16pub enum ChitchatMessage {
17 Syn { cluster_id: String, digest: Digest },
19 SynAck { digest: Digest, delta: Delta },
23 Ack { delta: Delta },
25 BadCluster,
28}
29
30#[derive(Copy, Clone)]
31#[repr(u8)]
32enum MessageType {
33 Syn = 0,
34 SynAck = 1u8,
35 Ack = 2u8,
36 BadCluster = 3u8,
37}
38
39impl MessageType {
40 pub fn from_code(code: u8) -> Option<Self> {
41 match code {
42 0 => Some(Self::Syn),
43 1 => Some(Self::SynAck),
44 2 => Some(Self::Ack),
45 3 => Some(Self::BadCluster),
46 _ => None,
47 }
48 }
49 pub fn to_code(self) -> u8 {
50 self as u8
51 }
52}
53
54impl Serializable for ChitchatMessage {
55 fn serialize(&self, buf: &mut Vec<u8>) {
56 match self {
57 ChitchatMessage::Syn { cluster_id, digest } => {
58 buf.push(MessageType::Syn.to_code());
59 digest.serialize(buf);
60 cluster_id.serialize(buf);
61 }
62 ChitchatMessage::SynAck { digest, delta } => {
63 buf.push(MessageType::SynAck.to_code());
64 digest.serialize(buf);
65 delta.serialize(buf);
66 }
67 ChitchatMessage::Ack { delta } => {
68 buf.push(MessageType::Ack.to_code());
69 delta.serialize(buf);
70 }
71 ChitchatMessage::BadCluster => {
72 buf.push(MessageType::BadCluster.to_code());
73 }
74 }
75 }
76
77 fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
78 let code = buf
79 .first()
80 .cloned()
81 .and_then(MessageType::from_code)
82 .context("Invalid message type")?;
83 buf.consume(1);
84 match code {
85 MessageType::Syn => {
86 let digest = Digest::deserialize(buf)?;
87 let cluster_id = String::deserialize(buf)?;
88 Ok(Self::Syn { cluster_id, digest })
89 }
90 MessageType::SynAck => {
91 let digest = Digest::deserialize(buf)?;
92 let delta = Delta::deserialize(buf)?;
93 Ok(Self::SynAck { digest, delta })
94 }
95 MessageType::Ack => {
96 let delta = Delta::deserialize(buf)?;
97 Ok(Self::Ack { delta })
98 }
99 MessageType::BadCluster => Ok(Self::BadCluster),
100 }
101 }
102
103 fn serialized_len(&self) -> usize {
104 match self {
105 ChitchatMessage::Syn { cluster_id, digest } => {
106 1 + cluster_id.serialized_len() + digest.serialized_len()
107 }
108 ChitchatMessage::SynAck { digest, delta } => syn_ack_serialized_len(digest, delta),
109 ChitchatMessage::Ack { delta } => 1 + delta.serialized_len(),
110 ChitchatMessage::BadCluster => 1,
111 }
112 }
113}
114
115pub(crate) fn syn_ack_serialized_len(digest: &Digest, delta: &Delta) -> usize {
116 1 + digest.serialized_len() + delta.serialized_len()
117}
118
119#[cfg(test)]
120mod tests {
121 use crate::serialize::test_serdeser_aux;
122 use crate::{ChitchatMessage, Digest, NodeId};
123
124 #[test]
125 fn test_syn() {
126 let mut digest = Digest::default();
127 digest.add_node(NodeId::for_test_localhost(10_001), 1);
128 digest.add_node(NodeId::for_test_localhost(10_002), 2);
129 let syn = ChitchatMessage::Syn {
130 cluster_id: "cluster-a".to_string(),
131 digest,
132 };
133 test_serdeser_aux(&syn, 68);
134 }
135
136 #[test]
137 fn test_bad_cluster() {
138 test_serdeser_aux(&ChitchatMessage::BadCluster, 1);
139 }
140}