1use kevy_resp::{ArgvBorrowed, parse_command_borrowed};
6
7use crate::message::{Message, Role};
8
9pub fn encode(msg: &Message) -> Vec<u8> {
16 let mut out = Vec::with_capacity(256);
17 match msg {
18 Message::Hb {
19 epoch,
20 node_id,
21 role,
22 repl_offset,
23 } => {
24 push_bulk_array(&mut out, 5);
25 push_bulk(&mut out, b"HB");
26 push_bulk(&mut out, epoch.to_string().as_bytes());
27 push_bulk(&mut out, node_id.as_bytes());
28 push_bulk(&mut out, role.as_str().as_bytes());
29 push_bulk(&mut out, repl_offset.to_string().as_bytes());
30 }
31 Message::Offer {
32 new_epoch,
33 candidate_id,
34 repl_offset,
35 } => {
36 push_bulk_array(&mut out, 4);
37 push_bulk(&mut out, b"OFFER");
38 push_bulk(&mut out, new_epoch.to_string().as_bytes());
39 push_bulk(&mut out, candidate_id.as_bytes());
40 push_bulk(&mut out, repl_offset.to_string().as_bytes());
41 }
42 Message::Accept { epoch, accepter_id } => {
43 push_bulk_array(&mut out, 3);
44 push_bulk(&mut out, b"ACCEPT");
45 push_bulk(&mut out, epoch.to_string().as_bytes());
46 push_bulk(&mut out, accepter_id.as_bytes());
47 }
48 Message::Announce {
49 epoch,
50 new_primary_id,
51 new_primary_addr,
52 } => {
53 push_bulk_array(&mut out, 4);
54 push_bulk(&mut out, b"ANNOUNCE");
55 push_bulk(&mut out, epoch.to_string().as_bytes());
56 push_bulk(&mut out, new_primary_id.as_bytes());
57 push_bulk(&mut out, new_primary_addr.as_bytes());
58 }
59 }
60 out
61}
62
63#[derive(Debug, PartialEq, Eq)]
65pub enum DecodeError {
66 Truncated,
69 Bad,
71 WrongShape,
74 BadNumeric,
77 BadRole,
80}
81
82pub fn decode(buf: &[u8]) -> Result<(Message, usize), DecodeError> {
88 let (argv, used) = match parse_command_borrowed(buf) {
89 Ok(Some(pair)) => pair,
90 Ok(None) => return Err(DecodeError::Truncated),
91 Err(_) => return Err(DecodeError::Bad),
92 };
93 let verb = argv.first().ok_or(DecodeError::WrongShape)?;
94 let msg = parse_argv_for_verb(verb, &argv)?;
95 Ok((msg, used))
96}
97
98fn parse_argv_for_verb(verb: &[u8], argv: &ArgvBorrowed<'_>) -> Result<Message, DecodeError> {
99 if verb.eq_ignore_ascii_case(b"HB") {
100 if argv.len() != 5 {
101 return Err(DecodeError::WrongShape);
102 }
103 Ok(Message::Hb {
104 epoch: parse_u64(&argv[1])?,
105 node_id: parse_string(&argv[2]),
106 role: Role::parse(&argv[3]).ok_or(DecodeError::BadRole)?,
107 repl_offset: parse_u64(&argv[4])?,
108 })
109 } else if verb.eq_ignore_ascii_case(b"OFFER") {
110 if argv.len() != 4 {
111 return Err(DecodeError::WrongShape);
112 }
113 Ok(Message::Offer {
114 new_epoch: parse_u64(&argv[1])?,
115 candidate_id: parse_string(&argv[2]),
116 repl_offset: parse_u64(&argv[3])?,
117 })
118 } else if verb.eq_ignore_ascii_case(b"ACCEPT") {
119 if argv.len() != 3 {
120 return Err(DecodeError::WrongShape);
121 }
122 Ok(Message::Accept {
123 epoch: parse_u64(&argv[1])?,
124 accepter_id: parse_string(&argv[2]),
125 })
126 } else if verb.eq_ignore_ascii_case(b"ANNOUNCE") {
127 if argv.len() != 4 {
128 return Err(DecodeError::WrongShape);
129 }
130 Ok(Message::Announce {
131 epoch: parse_u64(&argv[1])?,
132 new_primary_id: parse_string(&argv[2]),
133 new_primary_addr: parse_string(&argv[3]),
134 })
135 } else {
136 Err(DecodeError::WrongShape)
137 }
138}
139
140fn parse_u64(b: &[u8]) -> Result<u64, DecodeError> {
141 std::str::from_utf8(b)
142 .ok()
143 .and_then(|s| s.parse::<u64>().ok())
144 .ok_or(DecodeError::BadNumeric)
145}
146
147fn parse_string(b: &[u8]) -> String {
148 String::from_utf8_lossy(b).into_owned()
149}
150
151fn push_bulk_array(out: &mut Vec<u8>, n: usize) {
152 out.extend_from_slice(format!("*{n}\r\n").as_bytes());
153}
154
155fn push_bulk(out: &mut Vec<u8>, data: &[u8]) {
156 out.extend_from_slice(format!("${}\r\n", data.len()).as_bytes());
157 out.extend_from_slice(data);
158 out.extend_from_slice(b"\r\n");
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 fn round_trip(msg: Message) -> Message {
166 let bytes = encode(&msg);
167 let (decoded, used) = decode(&bytes).expect("decode");
168 assert_eq!(used, bytes.len(), "decode must consume the whole frame");
169 decoded
170 }
171
172 #[test]
173 fn hb_round_trip() {
174 let msg = Message::Hb {
175 epoch: 42,
176 node_id: "primary-east".to_string(),
177 role: Role::Primary,
178 repl_offset: 1_234_567,
179 };
180 match round_trip(msg) {
181 Message::Hb { epoch, node_id, role, repl_offset } => {
182 assert_eq!(epoch, 42);
183 assert_eq!(node_id, "primary-east");
184 assert_eq!(role, Role::Primary);
185 assert_eq!(repl_offset, 1_234_567);
186 }
187 other => panic!("got {other:?}"),
188 }
189 }
190
191 #[test]
192 fn offer_round_trip() {
193 let msg = Message::Offer {
194 new_epoch: 7,
195 candidate_id: "replica-1".to_string(),
196 repl_offset: 99,
197 };
198 match round_trip(msg) {
199 Message::Offer { new_epoch, candidate_id, repl_offset } => {
200 assert_eq!(new_epoch, 7);
201 assert_eq!(candidate_id, "replica-1");
202 assert_eq!(repl_offset, 99);
203 }
204 other => panic!("got {other:?}"),
205 }
206 }
207
208 #[test]
209 fn accept_round_trip() {
210 let msg = Message::Accept {
211 epoch: 7,
212 accepter_id: "replica-2".to_string(),
213 };
214 match round_trip(msg) {
215 Message::Accept { epoch, accepter_id } => {
216 assert_eq!(epoch, 7);
217 assert_eq!(accepter_id, "replica-2");
218 }
219 other => panic!("got {other:?}"),
220 }
221 }
222
223 #[test]
224 fn announce_round_trip() {
225 let msg = Message::Announce {
226 epoch: 7,
227 new_primary_id: "replica-1".to_string(),
228 new_primary_addr: "10.0.0.42:6004".to_string(),
229 };
230 match round_trip(msg) {
231 Message::Announce { epoch, new_primary_id, new_primary_addr } => {
232 assert_eq!(epoch, 7);
233 assert_eq!(new_primary_id, "replica-1");
234 assert_eq!(new_primary_addr, "10.0.0.42:6004");
235 }
236 other => panic!("got {other:?}"),
237 }
238 }
239
240 #[test]
241 fn decode_truncated_returns_more() {
242 let full = encode(&Message::Accept {
245 epoch: 1,
246 accepter_id: "x".to_string(),
247 });
248 let half = &full[..full.len() / 2];
249 assert!(matches!(decode(half), Err(DecodeError::Truncated)));
250 }
251
252 #[test]
253 fn decode_unknown_verb_errs() {
254 let bytes = b"*2\r\n$4\r\nPING\r\n$2\r\nok\r\n";
256 assert!(matches!(decode(bytes), Err(DecodeError::WrongShape)));
257 }
258
259 #[test]
260 fn decode_hb_wrong_arity_errs() {
261 let bytes = b"*3\r\n$2\r\nHB\r\n$1\r\n1\r\n$4\r\nnode\r\n";
263 assert!(matches!(decode(bytes), Err(DecodeError::WrongShape)));
264 }
265
266 #[test]
267 fn decode_hb_bad_role_errs() {
268 let mut out = Vec::new();
270 push_bulk_array(&mut out, 5);
271 push_bulk(&mut out, b"HB");
272 push_bulk(&mut out, b"1");
273 push_bulk(&mut out, b"node-x");
274 push_bulk(&mut out, b"leader");
275 push_bulk(&mut out, b"0");
276 assert!(matches!(decode(&out), Err(DecodeError::BadRole)));
277 }
278
279 #[test]
280 fn decode_hb_bad_numeric_errs() {
281 let mut out = Vec::new();
282 push_bulk_array(&mut out, 5);
283 push_bulk(&mut out, b"HB");
284 push_bulk(&mut out, b"NaN");
285 push_bulk(&mut out, b"node-x");
286 push_bulk(&mut out, b"primary");
287 push_bulk(&mut out, b"0");
288 assert!(matches!(decode(&out), Err(DecodeError::BadNumeric)));
289 }
290
291 #[test]
292 fn verbs_are_case_insensitive_on_decode() {
293 let mut out = Vec::new();
294 push_bulk_array(&mut out, 5);
295 push_bulk(&mut out, b"hb"); push_bulk(&mut out, b"1");
297 push_bulk(&mut out, b"node-x");
298 push_bulk(&mut out, b"primary");
299 push_bulk(&mut out, b"0");
300 let (msg, _) = decode(&out).expect("decode");
301 assert!(matches!(msg, Message::Hb { .. }));
302 }
303}