redis_protocol/resp2/
decode.rs

1//! Functions for decoding the RESP2 protocol into frames.
2//!
3//! <https://redis.io/topics/protocol#resp-protocol-description>
4
5use crate::nom_bytes::NomBytes;
6use crate::resp2::types::*;
7use crate::types::*;
8use crate::utils;
9use alloc::vec::Vec;
10use bytes::Bytes;
11use nom::bytes::streaming::{take as nom_take, take_until as nom_take_until};
12use nom::multi::count as nom_count;
13use nom::number::streaming::be_u8;
14use nom::sequence::terminated as nom_terminated;
15use nom::{Err as NomErr, IResult};
16use core::str;
17
18pub(crate) const NULL_LEN: isize = -1;
19
20fn to_isize(s: &NomBytes) -> Result<isize, RedisParseError<NomBytes>> {
21  str::from_utf8(s)?
22    .parse::<isize>()
23    .map_err(|_| RedisParseError::new_custom("to_isize", "Failed to parse as integer."))
24}
25
26fn to_i64(s: &NomBytes) -> Result<i64, RedisParseError<NomBytes>> {
27  str::from_utf8(s)?
28    .parse::<i64>()
29    .map_err(|_| RedisParseError::new_custom("to_i64", "Failed to parse as integer."))
30}
31
32pub fn isize_to_usize<'a, T>(s: isize) -> Result<usize, RedisParseError<T>> {
33  if s >= 0 {
34    Ok(s as usize)
35  } else {
36    Err(RedisParseError::new_custom("isize_to_usize", "Invalid length."))
37  }
38}
39
40fn d_read_to_crlf(input: &NomBytes) -> IResult<NomBytes, NomBytes, RedisParseError<NomBytes>> {
41  decode_log!(input, "Parsing to CRLF. Remaining: {:?}", input);
42  nom_terminated(nom_take_until(CRLF.as_bytes()), nom_take(2_usize))(input.clone())
43}
44
45fn d_read_to_crlf_s(input: &NomBytes) -> IResult<NomBytes, NomBytes, RedisParseError<NomBytes>> {
46  let (input, data) = d_read_to_crlf(input)?;
47  decode_log!(data, "Parsing to StrMut. Data: {:?}", data);
48  Ok((input, data))
49}
50
51fn d_read_prefix_len(input: &NomBytes) -> IResult<NomBytes, isize, RedisParseError<NomBytes>> {
52  let (input, data) = d_read_to_crlf_s(input)?;
53  decode_log!("Reading prefix len. Data: {:?}", str::from_utf8(&data));
54  Ok((input, etry!(to_isize(&data))))
55}
56
57fn d_frame_type(input: &NomBytes) -> IResult<NomBytes, FrameKind, RedisParseError<NomBytes>> {
58  let (input, byte) = be_u8(input.clone())?;
59  decode_log!(
60    input,
61    "Reading frame type. Kind byte: {:?}, remaining: {:?}",
62    byte,
63    input
64  );
65  let kind = match byte {
66    SIMPLESTRING_BYTE => FrameKind::SimpleString,
67    ERROR_BYTE => FrameKind::Error,
68    INTEGER_BYTE => FrameKind::Integer,
69    BULKSTRING_BYTE => FrameKind::BulkString,
70    ARRAY_BYTE => FrameKind::Array,
71    _ => e!(RedisParseError::new_custom("frame_type", "Invalid frame type.")),
72  };
73
74  Ok((input, kind))
75}
76
77fn d_parse_simplestring(input: &NomBytes) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>> {
78  let (input, data) = d_read_to_crlf_s(input)?;
79  Ok((input, Frame::SimpleString(data.into_bytes())))
80}
81
82fn d_parse_integer(input: &NomBytes) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>> {
83  let (input, data) = d_read_to_crlf_s(input)?;
84  let parsed = etry!(to_i64(&data));
85  Ok((input, Frame::Integer(parsed)))
86}
87
88// assumes the '$-1\r\n' has been consumed already, since nulls look like bulk strings until the length prefix is parsed,
89// and parsing the length prefix consumes the trailing \r\n in the underlying `terminated!` call
90fn d_parse_null(input: &NomBytes) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>> {
91  Ok((input.clone(), Frame::Null))
92}
93
94fn d_parse_error(input: &NomBytes) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>> {
95  let (input, data) = d_read_to_crlf_s(input)?;
96  let data = etry!(utils::to_byte_str(data));
97  Ok((input, Frame::Error(data)))
98}
99
100fn d_parse_bulkstring(input: &NomBytes, len: usize) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>> {
101  let (input, data) = nom_terminated(nom_take(len), nom_take(2_usize))(input.clone())?;
102  Ok((input.clone(), Frame::BulkString(data.into_bytes())))
103}
104
105fn d_parse_bulkstring_or_null(input: &NomBytes) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>> {
106  let (input, len) = d_read_prefix_len(input)?;
107  decode_log!(input, "Parsing bulkstring, Length: {:?}, remaining: {:?}", len, input);
108
109  if len == NULL_LEN {
110    d_parse_null(&input)
111  } else {
112    d_parse_bulkstring(&input, etry!(isize_to_usize(len)))
113  }
114}
115
116fn d_parse_array_frames<T>(input: T, len: usize) -> IResult<NomBytes, Vec<Frame>, RedisParseError<NomBytes>>
117where
118  T: AsRef<NomBytes>,
119{
120  let input_ref = input.as_ref();
121  decode_log!(
122    input_ref,
123    "Parsing array frames. Length: {:?}, remaining: {:?}",
124    len,
125    input_ref
126  );
127  nom_count(d_parse_frame, len)(input_ref.clone())
128}
129
130fn d_parse_array(input: &NomBytes) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>> {
131  let (input, len) = d_read_prefix_len(input)?;
132  decode_log!(input, "Parsing array. Length: {:?}, remaining: {:?}", len, input);
133
134  if len == NULL_LEN {
135    d_parse_null(&input)
136  } else {
137    let len = etry!(isize_to_usize(len));
138    let (input, frames) = d_parse_array_frames(&input, len)?;
139    Ok((input, Frame::Array(frames)))
140  }
141}
142
143fn d_parse_frame<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
144where
145  T: AsRef<NomBytes>,
146{
147  let (input, kind) = d_frame_type(input.as_ref())?;
148  decode_log!(input, "Parsed kind: {:?}, remaining: {:?}", kind, input);
149
150  match kind {
151    FrameKind::SimpleString => d_parse_simplestring(&input),
152    FrameKind::Error => d_parse_error(&input),
153    FrameKind::Integer => d_parse_integer(&input),
154    FrameKind::BulkString => d_parse_bulkstring_or_null(&input),
155    FrameKind::Array => d_parse_array(&input),
156    _ => e!(RedisParseError::new_custom("parse_frame", "Invalid frame kind.")),
157  }
158}
159
160/// Attempt to parse the contents of `buf`, returning the first valid frame and the number of bytes consumed.
161///
162/// If the byte slice contains an incomplete frame then `None` is returned.
163pub fn decode(buf: &Bytes) -> Result<Option<(Frame, usize)>, RedisProtocolError> {
164  let len = buf.len();
165  let buffer: NomBytes = buf.into();
166
167  match d_parse_frame(buffer) {
168    Ok((remaining, frame)) => Ok(Some((frame, len - remaining.len()))),
169    Err(NomErr::Incomplete(_)) => Ok(None),
170    Err(NomErr::Error(e)) => Err(e.into()),
171    Err(NomErr::Failure(e)) => Err(e.into()),
172  }
173}
174
175#[cfg(feature = "decode-mut")]
176#[cfg_attr(docsrs, doc(cfg(feature = "decode-mut")))]
177pub use crate::decode_mut::resp2::decode_mut;
178
179#[cfg(test)]
180pub mod tests {
181  use super::*;
182  use bytes::BytesMut;
183  use nom::AsBytes;
184  use std::str;
185
186  pub const PADDING: &'static str = "FOOBARBAZ";
187
188  pub fn pretty_print_panic(e: RedisProtocolError) {
189    panic!("{:?}", e);
190  }
191
192  pub fn panic_no_decode() {
193    panic!("Failed to decode bytes. None returned");
194  }
195
196  fn decode_and_verify_some(bytes: &Bytes, expected: &(Option<Frame>, usize)) {
197    let (frame, len) = match decode(&bytes) {
198      Ok(Some((f, l))) => (Some(f), l),
199      Ok(None) => return panic_no_decode(),
200      Err(e) => return pretty_print_panic(e),
201    };
202
203    assert_eq!(frame, expected.0, "decoded frame matched");
204    assert_eq!(len, expected.1, "decoded frame len matched");
205  }
206
207  fn decode_and_verify_padded_some(bytes: &Bytes, expected: &(Option<Frame>, usize)) {
208    let mut buf = BytesMut::from(bytes.as_bytes());
209    buf.extend_from_slice(PADDING.as_bytes());
210    let buf = buf.freeze();
211
212    let (frame, len) = match decode(&buf) {
213      Ok(Some((f, l))) => (Some(f), l),
214      Ok(None) => return panic_no_decode(),
215      Err(e) => return pretty_print_panic(e),
216    };
217
218    assert_eq!(frame, expected.0, "decoded frame matched");
219    assert_eq!(len, expected.1, "decoded frame len matched");
220  }
221
222  fn decode_and_verify_none(bytes: &Bytes) {
223    let (frame, len) = match decode(&bytes) {
224      Ok(Some((f, l))) => (Some(f), l),
225      Ok(None) => (None, 0),
226      Err(e) => return pretty_print_panic(e),
227    };
228
229    assert!(frame.is_none());
230    assert_eq!(len, 0);
231  }
232
233  #[test]
234  fn should_decode_llen_res_example() {
235    let expected = (Some(Frame::Integer(48293)), 8);
236    let bytes: Bytes = ":48293\r\n".into();
237
238    decode_and_verify_some(&bytes, &expected);
239    decode_and_verify_padded_some(&bytes, &expected);
240  }
241
242  #[test]
243  fn should_decode_simple_string() {
244    let expected = (Some(Frame::SimpleString("string".into())), 9);
245    let bytes: Bytes = "+string\r\n".into();
246
247    decode_and_verify_some(&bytes, &expected);
248    decode_and_verify_padded_some(&bytes, &expected);
249  }
250
251  #[test]
252  #[should_panic]
253  fn should_decode_simple_string_incomplete() {
254    let expected = (Some(Frame::SimpleString("string".into())), 9);
255    let bytes: Bytes = "+stri".into();
256
257    decode_and_verify_some(&bytes, &expected);
258    decode_and_verify_padded_some(&bytes, &expected);
259  }
260
261  #[test]
262  fn should_decode_bulk_string() {
263    let expected = (Some(Frame::BulkString("foo".into())), 9);
264    let bytes: Bytes = "$3\r\nfoo\r\n".into();
265
266    decode_and_verify_some(&bytes, &expected);
267    decode_and_verify_padded_some(&bytes, &expected);
268  }
269
270  #[test]
271  #[should_panic]
272  fn should_decode_bulk_string_incomplete() {
273    let expected = (Some(Frame::BulkString("foo".into())), 9);
274    let bytes: Bytes = "$3\r\nfo".into();
275
276    decode_and_verify_some(&bytes, &expected);
277    decode_and_verify_padded_some(&bytes, &expected);
278  }
279
280  #[test]
281  fn should_decode_array_no_nulls() {
282    let expected = (
283      Some(Frame::Array(vec![
284        Frame::SimpleString("Foo".into()),
285        Frame::SimpleString("Bar".into()),
286      ])),
287      16,
288    );
289    let bytes: Bytes = "*2\r\n+Foo\r\n+Bar\r\n".into();
290
291    decode_and_verify_some(&bytes, &expected);
292    decode_and_verify_padded_some(&bytes, &expected);
293  }
294
295  #[test]
296  fn should_decode_array_nulls() {
297    let bytes: Bytes = "*3\r\n$3\r\nFoo\r\n$-1\r\n$3\r\nBar\r\n".into();
298
299    let expected = (
300      Some(Frame::Array(vec![
301        Frame::BulkString("Foo".into()),
302        Frame::Null,
303        Frame::BulkString("Bar".into()),
304      ])),
305      bytes.len(),
306    );
307
308    decode_and_verify_some(&bytes, &expected);
309    decode_and_verify_padded_some(&bytes, &expected);
310  }
311
312  #[test]
313  fn should_decode_normal_error() {
314    let bytes: Bytes = "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n".into();
315    let expected = (
316      Some(Frame::Error(
317        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
318      )),
319      bytes.len(),
320    );
321
322    decode_and_verify_some(&bytes, &expected);
323    decode_and_verify_padded_some(&bytes, &expected);
324  }
325
326  #[test]
327  fn should_decode_moved_error() {
328    let bytes: Bytes = "-MOVED 3999 127.0.0.1:6381\r\n".into();
329    let expected = (Some(Frame::Error("MOVED 3999 127.0.0.1:6381".into())), bytes.len());
330
331    decode_and_verify_some(&bytes, &expected);
332    decode_and_verify_padded_some(&bytes, &expected);
333  }
334
335  #[test]
336  fn should_decode_ask_error() {
337    let bytes: Bytes = "-ASK 3999 127.0.0.1:6381\r\n".into();
338    let expected = (Some(Frame::Error("ASK 3999 127.0.0.1:6381".into())), bytes.len());
339
340    decode_and_verify_some(&bytes, &expected);
341    decode_and_verify_padded_some(&bytes, &expected);
342  }
343
344  #[test]
345  fn should_decode_incomplete() {
346    let bytes: Bytes = "*3\r\n$3\r\nFoo\r\n$-1\r\n$3\r\nBar".into();
347    decode_and_verify_none(&bytes);
348  }
349
350  #[test]
351  #[should_panic]
352  fn should_error_on_junk() {
353    let bytes: Bytes = "foobarbazwibblewobble".into();
354    let _ = decode(&bytes).map_err(|e| pretty_print_panic(e));
355  }
356}