redis_protocol/resp2/
decode.rs

1//! Functions for decoding the RESP2 protocol into frames.
2//!
3//! <https://redis.io/topics/protocol#resp-protocol-description>
4
5use crate::{
6  error::{RedisParseError, RedisProtocolError},
7  resp2::{types::*, utils::build_owned_frame},
8  types::*,
9  utils,
10};
11use alloc::vec::Vec;
12use core::str;
13use nom::{
14  bytes::streaming::{take as nom_take, take_until as nom_take_until},
15  multi::count as nom_count,
16  number::streaming::be_u8,
17  sequence::terminated as nom_terminated,
18  Err as NomErr,
19};
20
21#[cfg(feature = "bytes")]
22use crate::resp2::utils::{build_bytes_frame, freeze_parse};
23#[cfg(feature = "bytes")]
24use bytes::{Bytes, BytesMut};
25
26pub(crate) const NULL_LEN: isize = -1;
27
28fn to_isize(s: &[u8]) -> Result<isize, RedisParseError<&[u8]>> {
29  str::from_utf8(s)?
30    .parse::<isize>()
31    .map_err(|_| RedisParseError::new_custom("to_isize", "Failed to parse as integer."))
32}
33
34fn to_i64(s: &[u8]) -> Result<i64, RedisParseError<&[u8]>> {
35  str::from_utf8(s)?
36    .parse::<i64>()
37    .map_err(|_| RedisParseError::new_custom("to_i64", "Failed to parse as integer."))
38}
39
40fn d_read_to_crlf(input: (&[u8], usize)) -> DResult<usize> {
41  decode_log_str!(input.0, _input, "Parsing to CRLF. Remaining: {:?}", _input);
42  let (input_bytes, data) = nom_terminated(nom_take_until(CRLF.as_bytes()), nom_take(2_usize))(input.0)?;
43  Ok(((input_bytes, input.1 + data.len() + 2), data.len()))
44}
45
46fn d_read_to_crlf_take(input: (&[u8], usize)) -> DResult<&[u8]> {
47  decode_log_str!(input.0, _input, "Parsing to CRLF. Remaining: {:?}", _input);
48  let (input_bytes, data) = nom_terminated(nom_take_until(CRLF.as_bytes()), nom_take(2_usize))(input.0)?;
49  Ok(((input_bytes, input.1 + data.len() + 2), data))
50}
51
52fn d_read_prefix_len(input: (&[u8], usize)) -> DResult<isize> {
53  let (input, data) = d_read_to_crlf_take(input)?;
54  decode_log!("Reading prefix len. Data: {:?}", str::from_utf8(data));
55  Ok((input, etry!(to_isize(data))))
56}
57
58fn d_frame_type(input: (&[u8], usize)) -> DResult<FrameKind> {
59  let (input_bytes, byte) = be_u8(input.0)?;
60  decode_log_str!(
61    input_bytes,
62    _input,
63    "Reading frame type. Kind byte: {:?}, remaining: {:?}",
64    byte,
65    _input
66  );
67
68  let kind = match byte {
69    SIMPLESTRING_BYTE => FrameKind::SimpleString,
70    ERROR_BYTE => FrameKind::Error,
71    INTEGER_BYTE => FrameKind::Integer,
72    BULKSTRING_BYTE => FrameKind::BulkString,
73    ARRAY_BYTE => FrameKind::Array,
74    _ => e!(RedisParseError::new_custom("frame_type", "Invalid frame type.")),
75  };
76  Ok(((input_bytes, input.1 + 1), kind))
77}
78
79fn d_parse_simplestring(input: (&[u8], usize)) -> DResult<RangeFrame> {
80  let offset = input.1;
81  let ((input, next_offset), len) = d_read_to_crlf(input)?;
82  Ok(((input, next_offset), RangeFrame::SimpleString((offset, offset + len))))
83}
84
85fn d_parse_integer(input: (&[u8], usize)) -> DResult<RangeFrame> {
86  let ((input, next_offset), data) = d_read_to_crlf_take(input)?;
87  let parsed = etry!(to_i64(data));
88  Ok(((input, next_offset), RangeFrame::Integer(parsed)))
89}
90
91// assumes the '$-1\r\n' has been consumed already, since nulls look like bulk strings until the length prefix is
92// parsed, and parsing the length prefix consumes the trailing \r\n in the underlying `terminated!` call
93fn d_parse_null(input: (&[u8], usize)) -> DResult<RangeFrame> {
94  Ok((input, RangeFrame::Null))
95}
96
97fn d_parse_error(input: (&[u8], usize)) -> DResult<RangeFrame> {
98  let offset = input.1;
99  let ((input, next_offset), len) = d_read_to_crlf(input)?;
100  Ok(((input, next_offset), RangeFrame::Error((offset, offset + len))))
101}
102
103fn d_parse_bulkstring(input: (&[u8], usize), len: usize) -> DResult<RangeFrame> {
104  let offset = input.1;
105  let (input, data) = nom_terminated(nom_take(len), nom_take(2_usize))(input.0)?;
106  Ok((
107    (input, offset + len + 2),
108    RangeFrame::BulkString((offset, offset + data.len())),
109  ))
110}
111
112fn d_parse_bulkstring_or_null(input: (&[u8], usize)) -> DResult<RangeFrame> {
113  let ((input, offset), len) = d_read_prefix_len(input)?;
114  decode_log_str!(
115    input,
116    _input,
117    "Parsing bulkstring, Length: {:?}, remaining: {:?}",
118    len,
119    _input
120  );
121
122  if len == NULL_LEN {
123    d_parse_null((input, offset))
124  } else {
125    d_parse_bulkstring((input, offset), etry!(utils::isize_to_usize(len)))
126  }
127}
128
129fn d_parse_array_frames(input: (&[u8], usize), len: usize) -> DResult<Vec<RangeFrame>> {
130  decode_log_str!(
131    input.0,
132    _input,
133    "Parsing array frames. Length: {:?}, remaining: {:?}",
134    len,
135    _input
136  );
137  nom_count(d_parse_frame, len)(input)
138}
139
140fn d_parse_array(input: (&[u8], usize)) -> DResult<RangeFrame> {
141  let ((input, offset), len) = d_read_prefix_len(input)?;
142  decode_log_str!(
143    input,
144    _input,
145    "Parsing array. Length: {:?}, remaining: {:?}",
146    len,
147    _input
148  );
149
150  if len == NULL_LEN {
151    d_parse_null((input, offset))
152  } else {
153    let len = etry!(utils::isize_to_usize(len));
154    let ((input, offset), frames) = d_parse_array_frames((input, offset), len)?;
155    Ok(((input, offset), RangeFrame::Array(frames)))
156  }
157}
158
159fn d_parse_frame(input: (&[u8], usize)) -> DResult<RangeFrame> {
160  let ((input, offset), kind) = d_frame_type(input)?;
161  decode_log_str!(input, _input, "Parsed kind: {:?}, remaining: {:?}", kind, _input);
162
163  match kind {
164    FrameKind::SimpleString => d_parse_simplestring((input, offset)),
165    FrameKind::Error => d_parse_error((input, offset)),
166    FrameKind::Integer => d_parse_integer((input, offset)),
167    FrameKind::BulkString => d_parse_bulkstring_or_null((input, offset)),
168    FrameKind::Array => d_parse_array((input, offset)),
169    _ => e!(RedisParseError::new_custom("parse_frame", "Invalid frame kind.")),
170  }
171}
172
173/// Attempt to the decode the contents of `buf`, returning frames that reference ranges into the provided buffer.
174///
175/// This is the generic interface behind the zero-copy interface and can be used to implement zero-copy
176/// deserialization into other types.
177pub fn decode_range(buf: &[u8]) -> Result<Option<(RangeFrame, usize)>, RedisProtocolError> {
178  let (offset, _len) = (0, buf.len());
179
180  match d_parse_frame((buf, offset)) {
181    Ok(((_remaining, amt), frame)) => {
182      #[cfg(feature = "std")]
183      debug_assert_eq!(amt, _len - _remaining.len(), "returned offset doesn't match");
184      Ok(Some((frame, amt)))
185    },
186    Err(NomErr::Incomplete(_)) => Ok(None),
187    Err(NomErr::Error(e)) => Err(e.into()),
188    Err(NomErr::Failure(e)) => Err(e.into()),
189  }
190}
191
192/// Attempt to decode the contents of `buf`, returning the first valid frame and the number of bytes consumed.
193pub fn decode(buf: &[u8]) -> Result<Option<(OwnedFrame, usize)>, RedisProtocolError> {
194  let (frame, amt) = match decode_range(buf)? {
195    Some(result) => result,
196    None => return Ok(None),
197  };
198
199  Ok(Some((build_owned_frame(buf, &frame)?, amt)))
200}
201
202/// Attempt to decode the provided buffer without moving or copying the inner buffer contents.
203///
204/// The returned frame(s) will hold owned views into the original buffer via [slice](bytes::Bytes::slice).
205///
206/// Unlike [decode_bytes_mut], this function will not modify the input buffer.
207#[cfg(feature = "bytes")]
208#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
209pub fn decode_bytes(buf: &Bytes) -> Result<Option<(BytesFrame, usize)>, RedisProtocolError> {
210  let (frame, amt) = match decode_range(buf)? {
211    Some(result) => result,
212    None => return Ok(None),
213  };
214
215  Ok(Some((build_bytes_frame(buf, &frame)?, amt)))
216}
217
218/// Attempt to decode and [split](bytes::BytesMut::split_to) the provided buffer without moving or copying the inner
219/// buffer contents.
220///
221/// The returned frame(s) will hold owned views into the original buffer.
222///
223/// This function is designed to work best with a [codec](tokio_util::codec) interface.
224#[cfg(feature = "bytes")]
225#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
226pub fn decode_bytes_mut(buf: &mut BytesMut) -> Result<Option<(BytesFrame, usize, Bytes)>, RedisProtocolError> {
227  let (frame, amt) = match decode_range(&*buf)? {
228    Some(result) => result,
229    None => return Ok(None),
230  };
231  let (frame, buf) = freeze_parse(buf, &frame, amt)?;
232
233  Ok(Some((frame, amt, buf)))
234}
235
236// Regression tests duplicated for each frame type.
237
238#[cfg(test)]
239mod owned_tests {
240  use super::*;
241
242  pub const PADDING: &str = "FOOBARBAZ";
243
244  fn decode_and_verify_some(bytes: &[u8], expected: &(Option<OwnedFrame>, usize)) {
245    let bytes = bytes.to_vec();
246
247    let (frame, len) = match decode(&bytes) {
248      Ok(Some((f, l))) => (Some(f), l),
249      Ok(None) => panic!("Failed to decode bytes. None returned"),
250      Err(e) => panic!("{:?}", e),
251    };
252
253    assert_eq!(frame, expected.0, "decoded frame matched");
254    assert_eq!(len, expected.1, "decoded frame len matched");
255  }
256
257  fn decode_and_verify_padded_some(bytes: &[u8], expected: &(Option<OwnedFrame>, usize)) {
258    let mut bytes = bytes.to_vec();
259    bytes.extend_from_slice(PADDING.as_bytes());
260
261    let (frame, len) = match decode(&bytes) {
262      Ok(Some((f, l))) => (Some(f), l),
263      Ok(None) => panic!("Failed to decode bytes. None returned"),
264      Err(e) => panic!("{:?}", e),
265    };
266
267    assert_eq!(frame, expected.0, "decoded frame matched");
268    assert_eq!(len, expected.1, "decoded frame len matched");
269  }
270
271  fn decode_and_verify_none(bytes: &[u8]) {
272    let bytes = bytes.to_vec();
273    let (frame, len) = match decode(&bytes) {
274      Ok(Some((f, l))) => (Some(f), l),
275      Ok(None) => (None, 0),
276      Err(e) => panic!("{:?}", e),
277    };
278
279    assert!(frame.is_none());
280    assert_eq!(len, 0);
281  }
282
283  #[test]
284  fn should_decode_llen_res_example() {
285    let expected = (Some(OwnedFrame::Integer(48293)), 8);
286    let bytes: Vec<u8> = ":48293\r\n".into();
287
288    decode_and_verify_some(&bytes, &expected);
289    decode_and_verify_padded_some(&bytes, &expected);
290  }
291
292  #[test]
293  fn should_decode_simple_string() {
294    let expected = (Some(OwnedFrame::SimpleString("string".into())), 9);
295    let bytes: Vec<u8> = "+string\r\n".into();
296
297    decode_and_verify_some(&bytes, &expected);
298    decode_and_verify_padded_some(&bytes, &expected);
299  }
300
301  #[test]
302  #[should_panic]
303  fn should_decode_simple_string_incomplete() {
304    let expected = (Some(OwnedFrame::SimpleString("string".into())), 9);
305    let bytes: Vec<u8> = "+stri".into();
306
307    decode_and_verify_some(&bytes, &expected);
308    decode_and_verify_padded_some(&bytes, &expected);
309  }
310
311  #[test]
312  fn should_decode_bulk_string() {
313    let expected = (Some(OwnedFrame::BulkString("foo".into())), 9);
314    let bytes: Vec<u8> = "$3\r\nfoo\r\n".into();
315
316    decode_and_verify_some(&bytes, &expected);
317    decode_and_verify_padded_some(&bytes, &expected);
318  }
319
320  #[test]
321  #[should_panic]
322  fn should_decode_bulk_string_incomplete() {
323    let expected = (Some(OwnedFrame::BulkString("foo".into())), 9);
324    let bytes: Vec<u8> = "$3\r\nfo".into();
325
326    decode_and_verify_some(&bytes, &expected);
327    decode_and_verify_padded_some(&bytes, &expected);
328  }
329
330  #[test]
331  fn should_decode_array_no_nulls() {
332    let expected = (
333      Some(OwnedFrame::Array(vec![
334        OwnedFrame::SimpleString("Foo".into()),
335        OwnedFrame::SimpleString("Bar".into()),
336      ])),
337      16,
338    );
339    let bytes: Vec<u8> = "*2\r\n+Foo\r\n+Bar\r\n".into();
340
341    decode_and_verify_some(&bytes, &expected);
342    decode_and_verify_padded_some(&bytes, &expected);
343  }
344
345  #[test]
346  fn should_decode_array_nulls() {
347    let bytes: Vec<u8> = "*3\r\n$3\r\nFoo\r\n$-1\r\n$3\r\nBar\r\n".into();
348
349    let expected = (
350      Some(OwnedFrame::Array(vec![
351        OwnedFrame::BulkString("Foo".into()),
352        OwnedFrame::Null,
353        OwnedFrame::BulkString("Bar".into()),
354      ])),
355      bytes.len(),
356    );
357
358    decode_and_verify_some(&bytes, &expected);
359    decode_and_verify_padded_some(&bytes, &expected);
360  }
361
362  #[test]
363  fn should_decode_normal_error() {
364    let bytes: Vec<u8> = "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n".into();
365    let expected = (
366      Some(OwnedFrame::Error(
367        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
368      )),
369      bytes.len(),
370    );
371
372    decode_and_verify_some(&bytes, &expected);
373    decode_and_verify_padded_some(&bytes, &expected);
374  }
375
376  #[test]
377  fn should_decode_moved_error() {
378    let bytes: Vec<u8> = "-MOVED 3999 127.0.0.1:6381\r\n".into();
379    let expected = (Some(OwnedFrame::Error("MOVED 3999 127.0.0.1:6381".into())), bytes.len());
380
381    decode_and_verify_some(&bytes, &expected);
382    decode_and_verify_padded_some(&bytes, &expected);
383  }
384
385  #[test]
386  fn should_decode_ask_error() {
387    let bytes: Vec<u8> = "-ASK 3999 127.0.0.1:6381\r\n".into();
388    let expected = (Some(OwnedFrame::Error("ASK 3999 127.0.0.1:6381".into())), bytes.len());
389
390    decode_and_verify_some(&bytes, &expected);
391    decode_and_verify_padded_some(&bytes, &expected);
392  }
393
394  #[test]
395  fn should_decode_incomplete() {
396    let bytes: Vec<u8> = "*3\r\n$3\r\nFoo\r\n$-1\r\n$3\r\nBar".into();
397    decode_and_verify_none(&bytes);
398  }
399
400  #[test]
401  #[should_panic]
402  fn should_error_on_junk() {
403    decode("foobarbazwibblewobble".as_bytes()).unwrap();
404  }
405}
406
407#[cfg(test)]
408#[cfg(feature = "bytes")]
409mod bytes_tests {
410  use super::*;
411  use nom::AsBytes;
412
413  pub const PADDING: &str = "FOOBARBAZ";
414
415  fn decode_and_verify_some(bytes: &Bytes, expected: &(Option<BytesFrame>, usize)) {
416    let mut bytes = BytesMut::from(bytes.as_bytes());
417    let total_len = bytes.len();
418
419    let (frame, len, buf) = match decode_bytes_mut(&mut bytes) {
420      Ok(Some((f, l, b))) => (Some(f), l, b),
421      Ok(None) => panic!("Failed to decode bytes. None returned"),
422      Err(e) => panic!("{:?}", e),
423    };
424
425    assert_eq!(frame, expected.0, "decoded frame matched");
426    assert_eq!(len, expected.1, "decoded frame len matched");
427    assert_eq!(buf.len(), expected.1, "output buffer len matched");
428    assert_eq!(buf.len() + bytes.len(), total_len, "total len matched");
429  }
430
431  fn decode_and_verify_padded_some(bytes: &Bytes, expected: &(Option<BytesFrame>, usize)) {
432    let mut bytes = BytesMut::from(bytes.as_bytes());
433    bytes.extend_from_slice(PADDING.as_bytes());
434    let total_len = bytes.len();
435
436    let (frame, len, buf) = match decode_bytes_mut(&mut bytes) {
437      Ok(Some((f, l, b))) => (Some(f), l, b),
438      Ok(None) => panic!("Failed to decode bytes. None returned"),
439      Err(e) => panic!("{:?}", e),
440    };
441
442    assert_eq!(frame, expected.0, "decoded frame matched");
443    assert_eq!(len, expected.1, "decoded frame len matched");
444    assert_eq!(buf.len(), expected.1, "output buffer len matched");
445    assert_eq!(buf.len() + bytes.len(), total_len, "total len matched");
446  }
447
448  fn decode_and_verify_none(bytes: &Bytes) {
449    let mut bytes = BytesMut::from(bytes.as_bytes());
450    let (frame, len, buf) = match decode_bytes_mut(&mut bytes) {
451      Ok(Some((f, l, b))) => (Some(f), l, b),
452      Ok(None) => (None, 0, Bytes::new()),
453      Err(e) => panic!("{:?}", e),
454    };
455
456    assert!(frame.is_none());
457    assert_eq!(len, 0);
458    assert!(buf.is_empty());
459  }
460
461  #[test]
462  fn should_decode_llen_res_example() {
463    let expected = (Some(BytesFrame::Integer(48293)), 8);
464    let bytes: Bytes = ":48293\r\n".into();
465
466    decode_and_verify_some(&bytes, &expected);
467    decode_and_verify_padded_some(&bytes, &expected);
468  }
469
470  #[test]
471  fn should_decode_simple_string() {
472    let expected = (Some(BytesFrame::SimpleString("string".into())), 9);
473    let bytes: Bytes = "+string\r\n".into();
474
475    decode_and_verify_some(&bytes, &expected);
476    decode_and_verify_padded_some(&bytes, &expected);
477  }
478
479  #[test]
480  #[should_panic]
481  fn should_decode_simple_string_incomplete() {
482    let expected = (Some(BytesFrame::SimpleString("string".into())), 9);
483    let bytes: Bytes = "+stri".into();
484
485    decode_and_verify_some(&bytes, &expected);
486    decode_and_verify_padded_some(&bytes, &expected);
487  }
488
489  #[test]
490  fn should_decode_bulk_string() {
491    let expected = (Some(BytesFrame::BulkString("foo".into())), 9);
492    let bytes: Bytes = "$3\r\nfoo\r\n".into();
493
494    decode_and_verify_some(&bytes, &expected);
495    decode_and_verify_padded_some(&bytes, &expected);
496  }
497
498  #[test]
499  #[should_panic]
500  fn should_decode_bulk_string_incomplete() {
501    let expected = (Some(BytesFrame::BulkString("foo".into())), 9);
502    let bytes: Bytes = "$3\r\nfo".into();
503
504    decode_and_verify_some(&bytes, &expected);
505    decode_and_verify_padded_some(&bytes, &expected);
506  }
507
508  #[test]
509  fn should_decode_array_no_nulls() {
510    let expected = (
511      Some(BytesFrame::Array(vec![
512        BytesFrame::SimpleString("Foo".into()),
513        BytesFrame::SimpleString("Bar".into()),
514      ])),
515      16,
516    );
517    let bytes: Bytes = "*2\r\n+Foo\r\n+Bar\r\n".into();
518
519    decode_and_verify_some(&bytes, &expected);
520    decode_and_verify_padded_some(&bytes, &expected);
521  }
522
523  #[test]
524  fn should_decode_array_nulls() {
525    let bytes: Bytes = "*3\r\n$3\r\nFoo\r\n$-1\r\n$3\r\nBar\r\n".into();
526
527    let expected = (
528      Some(BytesFrame::Array(vec![
529        BytesFrame::BulkString("Foo".into()),
530        BytesFrame::Null,
531        BytesFrame::BulkString("Bar".into()),
532      ])),
533      bytes.len(),
534    );
535
536    decode_and_verify_some(&bytes, &expected);
537    decode_and_verify_padded_some(&bytes, &expected);
538  }
539
540  #[test]
541  fn should_decode_normal_error() {
542    let bytes: Bytes = "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n".into();
543    let expected = (
544      Some(BytesFrame::Error(
545        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
546      )),
547      bytes.len(),
548    );
549
550    decode_and_verify_some(&bytes, &expected);
551    decode_and_verify_padded_some(&bytes, &expected);
552  }
553
554  #[test]
555  fn should_decode_moved_error() {
556    let bytes: Bytes = "-MOVED 3999 127.0.0.1:6381\r\n".into();
557    let expected = (Some(BytesFrame::Error("MOVED 3999 127.0.0.1:6381".into())), bytes.len());
558
559    decode_and_verify_some(&bytes, &expected);
560    decode_and_verify_padded_some(&bytes, &expected);
561  }
562
563  #[test]
564  fn should_decode_ask_error() {
565    let bytes: Bytes = "-ASK 3999 127.0.0.1:6381\r\n".into();
566    let expected = (Some(BytesFrame::Error("ASK 3999 127.0.0.1:6381".into())), bytes.len());
567
568    decode_and_verify_some(&bytes, &expected);
569    decode_and_verify_padded_some(&bytes, &expected);
570  }
571
572  #[test]
573  fn should_decode_incomplete() {
574    let bytes: Bytes = "*3\r\n$3\r\nFoo\r\n$-1\r\n$3\r\nBar".into();
575    decode_and_verify_none(&bytes);
576  }
577
578  #[test]
579  #[should_panic]
580  fn should_error_on_junk() {
581    let mut bytes: BytesMut = "foobarbazwibblewobble".into();
582    decode_bytes_mut(&mut bytes).unwrap();
583  }
584}