Skip to main content

kevy_resp/
reply_parse.rs

1//! Reply-side parser (client perspective): parse server responses into a
2//! [`Reply`] enum. Mirror of the encoders in [`crate::reply_encode`] and
3//! [`crate::reply_encode_resp3`].
4//!
5//! Speaks RESP2 (the seven legacy prefixes — `+`/`-`/`:`/`$`/`*`/`$-1`/`*-1`)
6//! and the additive RESP3 set (`%` map, `~` set, `,` double, `#` boolean,
7//! `=` verbatim string, `(` big number, `_` null, `>` push, `!` blob error,
8//! `|` attributes). RESP2-only callers can still ignore the new [`Reply`]
9//! variants — the parser only produces them when the server speaks RESP3.
10
11use crate::error::ProtocolError;
12use crate::request::{find_crlf, parse_int};
13
14/// A parsed RESP reply (server → client) — the client-side counterpart of
15/// the crate's `encode_*` functions (server-side encoders).
16///
17/// Variants prefixed with `Resp3:` in their doc are only ever produced by
18/// a server speaking RESP3; an `HELLO 2` (or no `HELLO`) session sees the
19/// RESP2 subset (`Simple` / `Error` / `Int` / `Bulk` / `Nil` / `Array`)
20/// exclusively. Adding new variants is non-breaking: an exhaustive
21/// `match` on `Reply` is forced to opt into RESP3 by listing each variant
22/// (rust 2024 will not warn on missing arms only after `#[non_exhaustive]`
23/// — which we deliberately omit so RESP2-only code stays compile-checked
24/// for completeness).
25#[derive(Debug, Clone, PartialEq)]
26pub enum Reply {
27    /// `+OK`
28    Simple(Vec<u8>),
29    /// `-ERR ...`
30    Error(Vec<u8>),
31    /// `:42`
32    Int(i64),
33    /// `$5\r\nhello\r\n`
34    Bulk(Vec<u8>),
35    /// `$-1` or `*-1` — the RESP2 null sentinel; in RESP3 the dedicated
36    /// [`Reply::Null`] (`_\r\n`) is used instead. Both round-trip here.
37    Nil,
38    /// `*N ...`
39    Array(Vec<Reply>),
40    /// **Resp3:** `%N\r\n<key1><value1>...<keyN><valueN>` — N pairs (the
41    /// header count is the pair count, NOT the element count, so a map of
42    /// 3 pairs is `%3` plus 6 sub-replies). Parsed/exposed as a Vec of
43    /// pairs so duplicate keys + insertion order are preserved.
44    Map(Vec<(Reply, Reply)>),
45    /// **Resp3:** `~N\r\n<item1>...<itemN>` — set semantics on the wire;
46    /// dedup is the application's job (RESP3 doesn't require it).
47    Set(Vec<Reply>),
48    /// **Resp3:** `,1.5\r\n` — double. `inf` / `-inf` / `nan` are valid
49    /// payloads per the RESP3 spec and survive the round-trip.
50    Double(f64),
51    /// **Resp3:** `#t\r\n` / `#f\r\n` — boolean.
52    Boolean(bool),
53    /// **Resp3:** `=15\r\ntxt:Some bytes\r\n` — verbatim string carrying
54    /// a 3-char format tag (`txt` / `mkd` / etc.) + raw bytes. The colon
55    /// separator is part of the wire encoding but not part of `data`.
56    Verbatim {
57        /// 3-char format tag (e.g. `b"txt"` for plain text, `b"mkd"` for markdown).
58        fmt: [u8; 3],
59        /// Payload bytes following the `:` separator.
60        data: Vec<u8>,
61    },
62    /// **Resp3:** `(170141183460469231731687303715884105727\r\n` — arbitrary-
63    /// precision integer; carried as the raw digit bytes since we don't
64    /// pull in a bignum crate (charter: zero deps).
65    BigNumber(Vec<u8>),
66    /// **Resp3:** `_\r\n` — true null. RESP2 falls back to [`Reply::Nil`].
67    Null,
68    /// **Resp3:** `>N\r\n...` — like [`Reply::Array`] but tagged as an
69    /// out-of-band server-push frame (pub/sub messages in RESP3). The
70    /// client must dispatch these separately from regular replies.
71    Push(Vec<Reply>),
72    /// **Resp3:** `!8\r\nERR ohno\r\n` — error carried as a length-prefixed
73    /// bulk (handles errors containing CRLF that the simple-string `-`
74    /// shape can't encode).
75    BlobError(Vec<u8>),
76}
77
78/// Parse one RESP reply from the front of `buf`. Speaks RESP2 + RESP3.
79///
80/// - `Ok(Some((reply, consumed)))` — a complete reply.
81/// - `Ok(None)` — need more bytes.
82/// - `Err(_)` — malformed.
83///
84/// Attributes (`|N\r\n…<reply>`) are transparently consumed and
85/// discarded — they decorate the *next* reply but the parser surfaces
86/// only the underlying reply, matching what every RESP3 client library
87/// does today. Exposing them is a future addition once a real consumer
88/// (e.g. CLIENT TRACE) ships.
89pub fn parse_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
90    let Some(&tag) = buf.first() else {
91        return Ok(None);
92    };
93    match tag {
94        b'+' => Ok(reply_line(buf).map(|(b, used)| (Reply::Simple(b.to_vec()), used))),
95        b'-' => Ok(reply_line(buf).map(|(b, used)| (Reply::Error(b.to_vec()), used))),
96        b':' => match reply_line(buf) {
97            None => Ok(None),
98            Some((b, used)) => {
99                let n = parse_int(b).ok_or(ProtocolError::Malformed("bad integer reply"))?;
100                Ok(Some((Reply::Int(n), used)))
101            }
102        },
103        b'$' => parse_bulk_reply(buf),
104        b'*' => parse_array_reply(buf, false),
105        // ── RESP3 additions ──────────────────────────────────────────
106        b'%' => parse_map_reply(buf),
107        b'~' => parse_set_reply(buf),
108        b',' => parse_double_reply(buf),
109        b'#' => parse_boolean_reply(buf),
110        b'=' => parse_verbatim_reply(buf),
111        b'(' => match reply_line(buf) {
112            None => Ok(None),
113            Some((b, used)) => Ok(Some((Reply::BigNumber(b.to_vec()), used))),
114        },
115        b'_' => parse_null_reply(buf),
116        b'>' => parse_array_reply(buf, true),
117        b'!' => parse_blob_error_reply(buf),
118        b'|' => parse_attributed_reply(buf),
119        _ => Err(ProtocolError::Malformed("unknown reply type")),
120    }
121}
122
123/// The CRLF-terminated payload after the type byte, plus bytes consumed.
124fn reply_line(buf: &[u8]) -> Option<(&[u8], usize)> {
125    find_crlf(buf, 1).map(|eol| (&buf[1..eol], eol + 2))
126}
127
128fn parse_bulk_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
129    let Some(hdr_end) = find_crlf(buf, 1) else {
130        return Ok(None);
131    };
132    let len = parse_int(&buf[1..hdr_end]).ok_or(ProtocolError::Malformed("bad bulk length"))?;
133    if len < 0 {
134        return Ok(Some((Reply::Nil, hdr_end + 2)));
135    }
136    let data_start = hdr_end + 2;
137    let data_end = data_start + len as usize;
138    if buf.len() < data_end + 2 {
139        return Ok(None);
140    }
141    Ok(Some((
142        Reply::Bulk(buf[data_start..data_end].to_vec()),
143        data_end + 2,
144    )))
145}
146
147/// Shared parser for `*` (array, RESP2) and `>` (push, RESP3) — both
148/// are length-prefixed sequences of replies. `push=true` wraps the
149/// result in `Reply::Push`, otherwise `Reply::Array` (or `Reply::Nil`
150/// for the RESP2 `*-1` shape, which RESP3 push frames never emit).
151fn parse_array_reply(buf: &[u8], push: bool) -> Result<Option<(Reply, usize)>, ProtocolError> {
152    let Some(hdr_end) = find_crlf(buf, 1) else {
153        return Ok(None);
154    };
155    let count = parse_int(&buf[1..hdr_end]).ok_or(ProtocolError::Malformed("bad array length"))?;
156    if count < 0 {
157        if push {
158            return Err(ProtocolError::Malformed("push frame cannot be null"));
159        }
160        return Ok(Some((Reply::Nil, hdr_end + 2)));
161    }
162    let mut pos = hdr_end + 2;
163    // Cap initial capacity by remaining buffer bytes — an attacker-controlled
164    // `*999999999999\r\n` header would otherwise panic via `Vec::with_capacity`'s
165    // capacity overflow. Each item costs ≥ 1 byte (a CRLF for Nil/Int/Simple),
166    // so a real array of N items needs ≥ N bytes left. Push will grow the vec
167    // amortized if the genuine count is higher but bytes are present. Found by
168    // cargo-fuzz against crash-4c4ee6777903d009f93289eb428b3b371d027137
169    // (2026-05-26).
170    let cap = (count as usize).min(buf.len().saturating_sub(pos));
171    let mut items = Vec::with_capacity(cap);
172    for _ in 0..count {
173        match parse_reply(&buf[pos..])? {
174            None => return Ok(None),
175            Some((r, used)) => {
176                items.push(r);
177                pos += used;
178            }
179        }
180    }
181    let reply = if push { Reply::Push(items) } else { Reply::Array(items) };
182    Ok(Some((reply, pos)))
183}
184
185/// `%N\r\n` followed by 2N sub-replies (N key/value pairs).
186fn parse_map_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
187    let Some(hdr_end) = find_crlf(buf, 1) else {
188        return Ok(None);
189    };
190    let count = parse_int(&buf[1..hdr_end]).ok_or(ProtocolError::Malformed("bad map length"))?;
191    if count < 0 {
192        return Err(ProtocolError::Malformed("map length cannot be negative"));
193    }
194    let mut pos = hdr_end + 2;
195    // Same fuzz-driven cap as parse_array_reply — each pair costs ≥ 2 bytes.
196    let cap = (count as usize).min(buf.len().saturating_sub(pos) / 2);
197    let mut pairs: Vec<(Reply, Reply)> = Vec::with_capacity(cap);
198    for _ in 0..count {
199        let Some((k, used_k)) = parse_reply(&buf[pos..])? else {
200            return Ok(None);
201        };
202        pos += used_k;
203        let Some((v, used_v)) = parse_reply(&buf[pos..])? else {
204            return Ok(None);
205        };
206        pos += used_v;
207        pairs.push((k, v));
208    }
209    Ok(Some((Reply::Map(pairs), pos)))
210}
211
212/// `~N\r\n` followed by N sub-replies — set on the wire, no dedup.
213fn parse_set_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
214    let Some(hdr_end) = find_crlf(buf, 1) else {
215        return Ok(None);
216    };
217    let count = parse_int(&buf[1..hdr_end]).ok_or(ProtocolError::Malformed("bad set length"))?;
218    if count < 0 {
219        return Err(ProtocolError::Malformed("set length cannot be negative"));
220    }
221    let mut pos = hdr_end + 2;
222    let cap = (count as usize).min(buf.len().saturating_sub(pos));
223    let mut items = Vec::with_capacity(cap);
224    for _ in 0..count {
225        match parse_reply(&buf[pos..])? {
226            None => return Ok(None),
227            Some((r, used)) => {
228                items.push(r);
229                pos += used;
230            }
231        }
232    }
233    Ok(Some((Reply::Set(items), pos)))
234}
235
236/// `,N\r\n` — double. RESP3 spec carries `inf` / `-inf` / `nan` as
237/// literal byte strings; `f64::from_str` already handles all three.
238fn parse_double_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
239    let Some((bytes, used)) = reply_line(buf) else {
240        return Ok(None);
241    };
242    let s = std::str::from_utf8(bytes).map_err(|_| ProtocolError::Malformed("bad double utf8"))?;
243    let v: f64 = s.parse().map_err(|_| ProtocolError::Malformed("bad double"))?;
244    Ok(Some((Reply::Double(v), used)))
245}
246
247/// `#t\r\n` / `#f\r\n` — boolean. Any other payload is malformed.
248fn parse_boolean_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
249    let Some((bytes, used)) = reply_line(buf) else {
250        return Ok(None);
251    };
252    let v = match bytes {
253        b"t" => true,
254        b"f" => false,
255        _ => return Err(ProtocolError::Malformed("bad boolean payload")),
256    };
257    Ok(Some((Reply::Boolean(v), used)))
258}
259
260/// `=N\r\n<fmt>:<data>\r\n` — verbatim string. The 3-char `fmt` tag +
261/// `:` separator are inside the N-byte body.
262fn parse_verbatim_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
263    let Some(hdr_end) = find_crlf(buf, 1) else {
264        return Ok(None);
265    };
266    let len = parse_int(&buf[1..hdr_end])
267        .ok_or(ProtocolError::Malformed("bad verbatim length"))?;
268    if len < 4 {
269        return Err(ProtocolError::Malformed("verbatim length < 4 (fmt + ':')"));
270    }
271    let data_start = hdr_end + 2;
272    let data_end = data_start + len as usize;
273    if buf.len() < data_end + 2 {
274        return Ok(None);
275    }
276    let body = &buf[data_start..data_end];
277    if body[3] != b':' {
278        return Err(ProtocolError::Malformed("verbatim missing fmt:data separator"));
279    }
280    let mut fmt = [0u8; 3];
281    fmt.copy_from_slice(&body[..3]);
282    let data = body[4..].to_vec();
283    Ok(Some((Reply::Verbatim { fmt, data }, data_end + 2)))
284}
285
286/// `_\r\n` — RESP3 true null (5 bytes counting the `_` and CRLF).
287fn parse_null_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
288    if buf.len() < 3 {
289        return Ok(None);
290    }
291    if &buf[..3] != b"_\r\n" {
292        return Err(ProtocolError::Malformed("bad null payload"));
293    }
294    Ok(Some((Reply::Null, 3)))
295}
296
297/// `!N\r\n<error>\r\n` — length-prefixed error (carries CRLF safely).
298fn parse_blob_error_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
299    let Some(hdr_end) = find_crlf(buf, 1) else {
300        return Ok(None);
301    };
302    let len = parse_int(&buf[1..hdr_end])
303        .ok_or(ProtocolError::Malformed("bad blob error length"))?;
304    if len < 0 {
305        return Err(ProtocolError::Malformed("blob error length cannot be negative"));
306    }
307    let data_start = hdr_end + 2;
308    let data_end = data_start + len as usize;
309    if buf.len() < data_end + 2 {
310        return Ok(None);
311    }
312    Ok(Some((Reply::BlobError(buf[data_start..data_end].to_vec()), data_end + 2)))
313}
314
315/// `|N\r\n<map of N pairs><reply>` — attributes decorate the next reply.
316/// We parse the attribute map then transparently return the decorated
317/// reply, mirroring what RESP3 client libraries do today. The attributes
318/// themselves are dropped (see [`parse_reply`] docs).
319fn parse_attributed_reply(buf: &[u8]) -> Result<Option<(Reply, usize)>, ProtocolError> {
320    // Re-use the map parser but throw away the result; then parse the
321    // actual reply that follows.
322    let Some((_attrs, used_attrs)) = parse_map_reply(buf)? else {
323        return Ok(None);
324    };
325    match parse_reply(&buf[used_attrs..])? {
326        None => Ok(None),
327        Some((r, used)) => Ok(Some((r, used_attrs + used))),
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[test]
336    fn parse_replies() {
337        let r = |b: &[u8]| parse_reply(b).unwrap().unwrap().0;
338        assert_eq!(r(b"+OK\r\n"), Reply::Simple(b"OK".to_vec()));
339        assert_eq!(r(b"-ERR bad\r\n"), Reply::Error(b"ERR bad".to_vec()));
340        assert_eq!(r(b":42\r\n"), Reply::Int(42));
341        assert_eq!(r(b"$5\r\nhello\r\n"), Reply::Bulk(b"hello".to_vec()));
342        assert_eq!(r(b"$-1\r\n"), Reply::Nil);
343        assert_eq!(r(b"*-1\r\n"), Reply::Nil);
344
345        let (arr, used) = parse_reply(b"*2\r\n:1\r\n$2\r\nhi\r\n").unwrap().unwrap();
346        assert_eq!(
347            arr,
348            Reply::Array(vec![Reply::Int(1), Reply::Bulk(b"hi".to_vec())])
349        );
350        assert_eq!(used, 16);
351
352        // Incomplete replies ask for more bytes.
353        assert_eq!(parse_reply(b"$5\r\nhel").unwrap(), None);
354        assert_eq!(parse_reply(b"*2\r\n:1\r\n").unwrap(), None);
355        // RESP3 `!N\r\n...` (blob error) IS a valid prefix now — verify the
356        // old "unknown prefix" test moved to a genuinely unknown byte.
357        assert!(parse_reply(b"@huh\r\n").is_err());
358    }
359
360    #[test]
361    fn parse_resp3_scalars() {
362        let r = |b: &[u8]| parse_reply(b).unwrap().unwrap().0;
363        assert_eq!(r(b"_\r\n"), Reply::Null);
364        assert_eq!(r(b"#t\r\n"), Reply::Boolean(true));
365        assert_eq!(r(b"#f\r\n"), Reply::Boolean(false));
366        assert_eq!(r(b",1.5\r\n"), Reply::Double(1.5));
367        assert_eq!(r(b",inf\r\n"), Reply::Double(f64::INFINITY));
368        assert_eq!(r(b",-inf\r\n"), Reply::Double(f64::NEG_INFINITY));
369        // NaN doesn't satisfy `PartialEq` — match manually.
370        match r(b",nan\r\n") {
371            Reply::Double(v) => assert!(v.is_nan()),
372            other => panic!("expected Double(nan), got {other:?}"),
373        }
374        assert_eq!(
375            r(b"(170141183460469231731687303715884105727\r\n"),
376            Reply::BigNumber(b"170141183460469231731687303715884105727".to_vec())
377        );
378        assert_eq!(
379            r(b"!11\r\nERR bad cmd\r\n"),
380            Reply::BlobError(b"ERR bad cmd".to_vec())
381        );
382    }
383
384    #[test]
385    fn parse_resp3_verbatim() {
386        let r = |b: &[u8]| parse_reply(b).unwrap().unwrap().0;
387        assert_eq!(
388            r(b"=15\r\ntxt:Some string\r\n"),
389            Reply::Verbatim { fmt: *b"txt", data: b"Some string".to_vec() }
390        );
391        // len < 4 (no room for fmt + ':') is rejected.
392        assert!(parse_reply(b"=3\r\ntxt\r\n").is_err());
393        // Missing `:` separator is rejected.
394        assert!(parse_reply(b"=7\r\ntxt+abc\r\n").is_err());
395    }
396
397    #[test]
398    fn parse_resp3_map_and_set() {
399        let r = |b: &[u8]| parse_reply(b).unwrap().unwrap().0;
400        // %2\r\n :1\r\n $1\r\n a\r\n :2\r\n $1\r\n b\r\n
401        let m = r(b"%2\r\n:1\r\n$1\r\na\r\n:2\r\n$1\r\nb\r\n");
402        assert_eq!(
403            m,
404            Reply::Map(vec![
405                (Reply::Int(1), Reply::Bulk(b"a".to_vec())),
406                (Reply::Int(2), Reply::Bulk(b"b".to_vec())),
407            ])
408        );
409        // ~3\r\n :1\r\n :2\r\n :3\r\n
410        let s = r(b"~3\r\n:1\r\n:2\r\n:3\r\n");
411        assert_eq!(s, Reply::Set(vec![Reply::Int(1), Reply::Int(2), Reply::Int(3)]));
412        // Empty map / set.
413        assert_eq!(r(b"%0\r\n"), Reply::Map(vec![]));
414        assert_eq!(r(b"~0\r\n"), Reply::Set(vec![]));
415        // Negative count is malformed (only `*` / `$` allow -1 for nil).
416        assert!(parse_reply(b"%-1\r\n").is_err());
417        assert!(parse_reply(b"~-1\r\n").is_err());
418    }
419
420    #[test]
421    fn parse_resp3_push_frame() {
422        let r = |b: &[u8]| parse_reply(b).unwrap().unwrap().0;
423        let push = r(b">3\r\n+message\r\n$4\r\nnews\r\n$5\r\nhello\r\n");
424        assert_eq!(
425            push,
426            Reply::Push(vec![
427                Reply::Simple(b"message".to_vec()),
428                Reply::Bulk(b"news".to_vec()),
429                Reply::Bulk(b"hello".to_vec()),
430            ])
431        );
432        // Push frames have no null shape.
433        assert!(parse_reply(b">-1\r\n").is_err());
434    }
435
436    #[test]
437    fn parse_resp3_attributes_are_skipped() {
438        // |1\r\n +key-popularity\r\n %2\r\n $1\r\n a\r\n ,0.5\r\n $1\r\n b\r\n ,0.3\r\n
439        // followed by the actual reply: *2\r\n :1\r\n :2\r\n
440        let frame =
441            b"|1\r\n+key-popularity\r\n%2\r\n$1\r\na\r\n,0.5\r\n$1\r\nb\r\n,0.3\r\n*2\r\n:1\r\n:2\r\n";
442        let (r, used) = parse_reply(frame).unwrap().unwrap();
443        assert_eq!(r, Reply::Array(vec![Reply::Int(1), Reply::Int(2)]));
444        assert_eq!(used, frame.len());
445    }
446
447    #[test]
448    fn parse_resp3_partial_returns_none() {
449        // Each new shape: cut at every CRLF boundary and assert None.
450        for cut in [b"_".as_slice(), b"_\r", b"#t", b"#t\r"].iter() {
451            assert_eq!(parse_reply(cut).unwrap(), None);
452        }
453        assert_eq!(parse_reply(b"=15\r\ntxt:Some str").unwrap(), None);
454        // Map mid-frame.
455        assert_eq!(parse_reply(b"%2\r\n:1\r\n$1\r\na\r\n:2\r\n").unwrap(), None);
456    }
457}