Skip to main content

kevy_elect/
wire.rs

1//! Wire encode + decode for [`crate::Message`]. Uses kevy-resp's
2//! RESP2 multi-bulk array shape, identical to the keyspace plane —
3//! tcpdump-friendly text frames, single decoder path workspace-wide.
4
5use kevy_resp::{ArgvBorrowed, parse_command_borrowed};
6
7use crate::message::{Message, Role};
8
9/// Encode a [`Message`] as a RESP2 multi-bulk array.
10///
11/// Numeric fields ride as decimal bulk strings — matches kevy-
12/// replicate's `REPLICATE FROM <offset> ID <id>` handshake
13/// convention. Pre-sized: every message ≤ 6 fields ≤ 32 bytes
14/// each, so a 256-byte buffer suffices for every variant.
15pub fn encode(msg: &Message) -> Vec<u8> {
16    let mut out = Vec::with_capacity(256);
17    match msg {
18        Message::Hb {
19            epoch,
20            node_id,
21            role,
22            repl_offset,
23        } => {
24            push_bulk_array(&mut out, 5);
25            push_bulk(&mut out, b"HB");
26            push_bulk(&mut out, epoch.to_string().as_bytes());
27            push_bulk(&mut out, node_id.as_bytes());
28            push_bulk(&mut out, role.as_str().as_bytes());
29            push_bulk(&mut out, repl_offset.to_string().as_bytes());
30        }
31        Message::Offer {
32            new_epoch,
33            candidate_id,
34            repl_offset,
35        } => {
36            push_bulk_array(&mut out, 4);
37            push_bulk(&mut out, b"OFFER");
38            push_bulk(&mut out, new_epoch.to_string().as_bytes());
39            push_bulk(&mut out, candidate_id.as_bytes());
40            push_bulk(&mut out, repl_offset.to_string().as_bytes());
41        }
42        Message::Accept { epoch, accepter_id } => {
43            push_bulk_array(&mut out, 3);
44            push_bulk(&mut out, b"ACCEPT");
45            push_bulk(&mut out, epoch.to_string().as_bytes());
46            push_bulk(&mut out, accepter_id.as_bytes());
47        }
48        Message::Announce {
49            epoch,
50            new_primary_id,
51            new_primary_addr,
52        } => {
53            push_bulk_array(&mut out, 4);
54            push_bulk(&mut out, b"ANNOUNCE");
55            push_bulk(&mut out, epoch.to_string().as_bytes());
56            push_bulk(&mut out, new_primary_id.as_bytes());
57            push_bulk(&mut out, new_primary_addr.as_bytes());
58        }
59    }
60    out
61}
62
63/// Errors `decode` can surface.
64#[derive(Debug, PartialEq, Eq)]
65pub enum DecodeError {
66    /// Buffer holds fewer bytes than the framed message header
67    /// claims — read more from the socket and retry.
68    Truncated,
69    /// Bytes don't parse as a RESP multi-bulk (malformed envelope).
70    Bad,
71    /// Verb was missing, unknown, or had the wrong arity for its
72    /// shape (e.g. `HB` with 3 args instead of 5).
73    WrongShape,
74    /// A numeric field (epoch / offset) was not a valid decimal
75    /// `u64`.
76    BadNumeric,
77    /// A `role` field on `HB` was not one of `primary` / `replica`
78    /// / `candidate`.
79    BadRole,
80}
81
82/// Decode one [`Message`] off the front of `buf`. Returns the
83/// decoded message and the number of bytes consumed. The caller
84/// advances its read cursor by `consumed` on success, retries with
85/// more bytes on `Truncated`, and drops the connection on every
86/// other variant.
87pub fn decode(buf: &[u8]) -> Result<(Message, usize), DecodeError> {
88    let (argv, used) = match parse_command_borrowed(buf) {
89        Ok(Some(pair)) => pair,
90        Ok(None) => return Err(DecodeError::Truncated),
91        Err(_) => return Err(DecodeError::Bad),
92    };
93    let verb = argv.first().ok_or(DecodeError::WrongShape)?;
94    let msg = parse_argv_for_verb(verb, &argv)?;
95    Ok((msg, used))
96}
97
98fn parse_argv_for_verb(verb: &[u8], argv: &ArgvBorrowed<'_>) -> Result<Message, DecodeError> {
99    if verb.eq_ignore_ascii_case(b"HB") {
100        if argv.len() != 5 {
101            return Err(DecodeError::WrongShape);
102        }
103        Ok(Message::Hb {
104            epoch: parse_u64(&argv[1])?,
105            node_id: parse_string(&argv[2]),
106            role: Role::parse(&argv[3]).ok_or(DecodeError::BadRole)?,
107            repl_offset: parse_u64(&argv[4])?,
108        })
109    } else if verb.eq_ignore_ascii_case(b"OFFER") {
110        if argv.len() != 4 {
111            return Err(DecodeError::WrongShape);
112        }
113        Ok(Message::Offer {
114            new_epoch: parse_u64(&argv[1])?,
115            candidate_id: parse_string(&argv[2]),
116            repl_offset: parse_u64(&argv[3])?,
117        })
118    } else if verb.eq_ignore_ascii_case(b"ACCEPT") {
119        if argv.len() != 3 {
120            return Err(DecodeError::WrongShape);
121        }
122        Ok(Message::Accept {
123            epoch: parse_u64(&argv[1])?,
124            accepter_id: parse_string(&argv[2]),
125        })
126    } else if verb.eq_ignore_ascii_case(b"ANNOUNCE") {
127        if argv.len() != 4 {
128            return Err(DecodeError::WrongShape);
129        }
130        Ok(Message::Announce {
131            epoch: parse_u64(&argv[1])?,
132            new_primary_id: parse_string(&argv[2]),
133            new_primary_addr: parse_string(&argv[3]),
134        })
135    } else {
136        Err(DecodeError::WrongShape)
137    }
138}
139
140fn parse_u64(b: &[u8]) -> Result<u64, DecodeError> {
141    std::str::from_utf8(b)
142        .ok()
143        .and_then(|s| s.parse::<u64>().ok())
144        .ok_or(DecodeError::BadNumeric)
145}
146
147fn parse_string(b: &[u8]) -> String {
148    String::from_utf8_lossy(b).into_owned()
149}
150
151fn push_bulk_array(out: &mut Vec<u8>, n: usize) {
152    out.extend_from_slice(format!("*{n}\r\n").as_bytes());
153}
154
155fn push_bulk(out: &mut Vec<u8>, data: &[u8]) {
156    out.extend_from_slice(format!("${}\r\n", data.len()).as_bytes());
157    out.extend_from_slice(data);
158    out.extend_from_slice(b"\r\n");
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    fn round_trip(msg: Message) -> Message {
166        let bytes = encode(&msg);
167        let (decoded, used) = decode(&bytes).expect("decode");
168        assert_eq!(used, bytes.len(), "decode must consume the whole frame");
169        decoded
170    }
171
172    #[test]
173    fn hb_round_trip() {
174        let msg = Message::Hb {
175            epoch: 42,
176            node_id: "primary-east".to_string(),
177            role: Role::Primary,
178            repl_offset: 1_234_567,
179        };
180        match round_trip(msg) {
181            Message::Hb { epoch, node_id, role, repl_offset } => {
182                assert_eq!(epoch, 42);
183                assert_eq!(node_id, "primary-east");
184                assert_eq!(role, Role::Primary);
185                assert_eq!(repl_offset, 1_234_567);
186            }
187            other => panic!("got {other:?}"),
188        }
189    }
190
191    #[test]
192    fn offer_round_trip() {
193        let msg = Message::Offer {
194            new_epoch: 7,
195            candidate_id: "replica-1".to_string(),
196            repl_offset: 99,
197        };
198        match round_trip(msg) {
199            Message::Offer { new_epoch, candidate_id, repl_offset } => {
200                assert_eq!(new_epoch, 7);
201                assert_eq!(candidate_id, "replica-1");
202                assert_eq!(repl_offset, 99);
203            }
204            other => panic!("got {other:?}"),
205        }
206    }
207
208    #[test]
209    fn accept_round_trip() {
210        let msg = Message::Accept {
211            epoch: 7,
212            accepter_id: "replica-2".to_string(),
213        };
214        match round_trip(msg) {
215            Message::Accept { epoch, accepter_id } => {
216                assert_eq!(epoch, 7);
217                assert_eq!(accepter_id, "replica-2");
218            }
219            other => panic!("got {other:?}"),
220        }
221    }
222
223    #[test]
224    fn announce_round_trip() {
225        let msg = Message::Announce {
226            epoch: 7,
227            new_primary_id: "replica-1".to_string(),
228            new_primary_addr: "10.0.0.42:6004".to_string(),
229        };
230        match round_trip(msg) {
231            Message::Announce { epoch, new_primary_id, new_primary_addr } => {
232                assert_eq!(epoch, 7);
233                assert_eq!(new_primary_id, "replica-1");
234                assert_eq!(new_primary_addr, "10.0.0.42:6004");
235            }
236            other => panic!("got {other:?}"),
237        }
238    }
239
240    #[test]
241    fn decode_truncated_returns_more() {
242        // Half a frame — decoder must surface Truncated so the
243        // caller reads more bytes from the socket.
244        let full = encode(&Message::Accept {
245            epoch: 1,
246            accepter_id: "x".to_string(),
247        });
248        let half = &full[..full.len() / 2];
249        assert!(matches!(decode(half), Err(DecodeError::Truncated)));
250    }
251
252    #[test]
253    fn decode_unknown_verb_errs() {
254        // Valid RESP, unknown verb.
255        let bytes = b"*2\r\n$4\r\nPING\r\n$2\r\nok\r\n";
256        assert!(matches!(decode(bytes), Err(DecodeError::WrongShape)));
257    }
258
259    #[test]
260    fn decode_hb_wrong_arity_errs() {
261        // `HB` with only 3 args instead of the required 5.
262        let bytes = b"*3\r\n$2\r\nHB\r\n$1\r\n1\r\n$4\r\nnode\r\n";
263        assert!(matches!(decode(bytes), Err(DecodeError::WrongShape)));
264    }
265
266    #[test]
267    fn decode_hb_bad_role_errs() {
268        // `HB` with a role that's not primary/replica/candidate.
269        let mut out = Vec::new();
270        push_bulk_array(&mut out, 5);
271        push_bulk(&mut out, b"HB");
272        push_bulk(&mut out, b"1");
273        push_bulk(&mut out, b"node-x");
274        push_bulk(&mut out, b"leader");
275        push_bulk(&mut out, b"0");
276        assert!(matches!(decode(&out), Err(DecodeError::BadRole)));
277    }
278
279    #[test]
280    fn decode_hb_bad_numeric_errs() {
281        let mut out = Vec::new();
282        push_bulk_array(&mut out, 5);
283        push_bulk(&mut out, b"HB");
284        push_bulk(&mut out, b"NaN");
285        push_bulk(&mut out, b"node-x");
286        push_bulk(&mut out, b"primary");
287        push_bulk(&mut out, b"0");
288        assert!(matches!(decode(&out), Err(DecodeError::BadNumeric)));
289    }
290
291    #[test]
292    fn verbs_are_case_insensitive_on_decode() {
293        let mut out = Vec::new();
294        push_bulk_array(&mut out, 5);
295        push_bulk(&mut out, b"hb"); // lowercase
296        push_bulk(&mut out, b"1");
297        push_bulk(&mut out, b"node-x");
298        push_bulk(&mut out, b"primary");
299        push_bulk(&mut out, b"0");
300        let (msg, _) = decode(&out).expect("decode");
301        assert!(matches!(msg, Message::Hb { .. }));
302    }
303}