redis_protocol/resp3/
decode.rs

1//! Functions for decoding the RESP3 protocol into frames.
2//!
3//! <https://github.com/antirez/RESP3/blob/master/spec.md>
4
5use crate::nom_bytes::NomBytes;
6use crate::resp3::types::*;
7use crate::resp3::utils as resp3_utils;
8use crate::types::*;
9use crate::utils;
10use alloc::format;
11use alloc::vec::Vec;
12use bytes_utils::Str;
13use nom::bytes::streaming::{take as nom_take, take_until as nom_take_until};
14use nom::combinator::{map as nom_map, map_res as nom_map_res, opt as nom_opt};
15use nom::multi::count as nom_count;
16use nom::number::streaming::be_u8;
17use nom::sequence::terminated as nom_terminated;
18use nom::{Err as NomErr, IResult};
19use core::fmt::Debug;
20use core::str;
21
22fn map_complete_frame(frame: Frame) -> DecodedFrame {
23  DecodedFrame::Complete(frame)
24}
25
26pub(crate) fn unwrap_complete_frame(frame: DecodedFrame) -> Result<Frame, RedisParseError<NomBytes>> {
27  frame
28    .into_complete_frame()
29    .map_err(|e| RedisParseError::new_custom("unwrap_complete_frame", format!("{:?}", e)))
30}
31
32pub(crate) fn to_usize<T>(s: &[u8]) -> Result<usize, RedisParseError<T>> {
33  str::from_utf8(s)?
34    .parse::<usize>()
35    .map_err(|e| RedisParseError::new_custom("to_usize", format!("{:?}", e)))
36}
37
38pub(crate) fn to_isize<T>(s: &[u8]) -> Result<isize, RedisParseError<T>> {
39  let s = str::from_utf8(s)?;
40
41  if s == "?" {
42    Ok(-1)
43  } else {
44    s.parse::<isize>()
45      .map_err(|e| RedisParseError::new_custom("to_isize", format!("{:?}", e)))
46  }
47}
48
49pub(crate) fn isize_to_usize<T>(n: isize) -> Result<usize, RedisParseError<T>> {
50  if n.is_negative() {
51    Err(RedisParseError::new_custom("isize_to_usize", "Invalid prefix length."))
52  } else {
53    Ok(n as usize)
54  }
55}
56
57pub(crate) fn to_i64<T>(s: &[u8]) -> Result<i64, RedisParseError<T>> {
58  str::from_utf8(s)?
59    .parse::<i64>()
60    .map_err(|e| RedisParseError::new_custom("to_i64", format!("{:?}", e)))
61}
62
63pub(crate) fn to_f64<T>(s: &[u8]) -> Result<f64, RedisParseError<T>> {
64  str::from_utf8(s)?
65    .parse::<f64>()
66    .map_err(|e| RedisParseError::new_custom("to_f64", format!("{:?}", e)))
67}
68
69pub(crate) fn to_bool<T>(s: &[u8]) -> Result<bool, RedisParseError<T>> {
70  match str::from_utf8(s)?.as_ref() {
71    "t" => Ok(true),
72    "f" => Ok(false),
73    _ => Err(RedisParseError::new_custom("to_bool", "Invalid boolean value.")),
74  }
75}
76
77pub(crate) fn to_verbatimstring_format<T>(s: &[u8]) -> Result<VerbatimStringFormat, RedisParseError<T>> {
78  match str::from_utf8(s)?.as_ref() {
79    "txt" => Ok(VerbatimStringFormat::Text),
80    "mkd" => Ok(VerbatimStringFormat::Markdown),
81    _ => Err(RedisParseError::new_custom(
82      "to_verbatimstring_format",
83      "Invalid format.",
84    )),
85  }
86}
87
88pub(crate) fn to_hello<T>(version: u8, auth: Option<(Str, Str)>) -> Result<Frame, RedisParseError<T>> {
89  let version = match version {
90    2 => RespVersion::RESP2,
91    3 => RespVersion::RESP3,
92    _ => {
93      return Err(RedisParseError::new_custom("parse_hello", "Invalid RESP version."));
94    }
95  };
96  let auth = if let Some((username, password)) = auth {
97    Some(Auth { username, password })
98  } else {
99    None
100  };
101
102  Ok(Frame::Hello { version, auth })
103}
104
105fn to_map(mut data: Vec<Frame>) -> Result<FrameMap, RedisParseError<NomBytes>> {
106  if data.len() % 2 != 0 {
107    return Err(RedisParseError::new_custom("to_map", "Invalid hashmap frame length."));
108  }
109
110  let mut out = resp3_utils::new_map(Some(data.len() / 2));
111  while data.len() >= 2 {
112    let value = data.pop().unwrap();
113    let key = data.pop().unwrap();
114
115    out.insert(key, value);
116  }
117
118  Ok(out)
119}
120
121fn to_set(data: Vec<Frame>) -> Result<FrameSet, RedisParseError<NomBytes>> {
122  let mut out = resp3_utils::new_set(Some(data.len()));
123
124  for frame in data.into_iter() {
125    out.insert(frame);
126  }
127
128  Ok(out)
129}
130
131fn attach_attributes(
132  attributes: Attributes,
133  mut frame: DecodedFrame,
134) -> Result<DecodedFrame, RedisParseError<NomBytes>> {
135  if let Err(e) = frame.add_attributes(attributes) {
136    Err(RedisParseError::new_custom("attach_attributes", format!("{:?}", e)))
137  } else {
138    Ok(frame)
139  }
140}
141
142fn d_read_to_crlf<T>(input: T) -> IResult<NomBytes, NomBytes, RedisParseError<NomBytes>>
143where
144  T: AsRef<NomBytes> + Debug,
145{
146  let input_ref = input.as_ref();
147  decode_log!(input_ref, "Parsing to CRLF. Remaining: {:?}", input_ref);
148  nom_terminated(nom_take_until(CRLF.as_bytes()), nom_take(2_usize))(input_ref.clone())
149}
150
151// this returns bytes instead of str because MONITOR frames will use this prefix and they can contain bulk strings
152fn d_read_to_crlf_s<T>(input: T) -> IResult<NomBytes, NomBytes, RedisParseError<NomBytes>>
153where
154  T: AsRef<NomBytes> + Debug,
155{
156  let (input, data) = d_read_to_crlf(input)?;
157  decode_log!(data, "Parsing as str: Data: {:?}", data);
158  Ok((input, data))
159}
160
161fn d_read_prefix_len<T>(input: T) -> IResult<NomBytes, usize, RedisParseError<NomBytes>>
162where
163  T: AsRef<NomBytes> + Debug,
164{
165  let (input, data) = d_read_to_crlf_s(input)?;
166  decode_log!("Reading prefix len. Data: {:?}", str::from_utf8(data));
167  Ok((input, etry!(to_usize(&data))))
168}
169
170fn d_read_prefix_len_signed<T>(input: T) -> IResult<NomBytes, isize, RedisParseError<NomBytes>>
171where
172  T: AsRef<NomBytes> + Debug,
173{
174  let (input, data) = d_read_to_crlf_s(input)?;
175  decode_log!("Reading prefix len. Data: {:?}", str::from_utf8(data));
176  Ok((input, etry!(to_isize(&data))))
177}
178
179fn d_frame_type<T>(input: T) -> IResult<NomBytes, FrameKind, RedisParseError<NomBytes>>
180where
181  T: AsRef<NomBytes> + Debug,
182{
183  let (input, byte) = be_u8(input.as_ref().clone())?;
184  let kind = match FrameKind::from_byte(byte) {
185    Some(k) => k,
186    None => e!(RedisParseError::new_custom("frame_type", "Invalid frame type prefix.")),
187  };
188  decode_log!(input, "Parsed frame type {:?}, remaining: {:?}", kind, input);
189
190  Ok((input, kind))
191}
192
193fn d_parse_simplestring<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
194where
195  T: AsRef<NomBytes> + Debug,
196{
197  let (input, data) = d_read_to_crlf(input)?;
198  decode_log!(input, "Parsed simplestring {:?}, remaining: {:?}", data, input);
199  Ok((
200    input,
201    Frame::SimpleString {
202      data: data.into_bytes(),
203      attributes: None,
204    },
205  ))
206}
207
208fn d_parse_simpleerror<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
209where
210  T: AsRef<NomBytes> + Debug,
211{
212  let (input, data) = d_read_to_crlf_s(input)?;
213  let data = etry!(Str::from_inner(data.into_bytes()));
214  decode_log!(input, "Parsed simpleerror {:?}, remaining: {:?}", data, input);
215  Ok((input, Frame::SimpleError { data, attributes: None }))
216}
217
218fn d_parse_number<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
219where
220  T: AsRef<NomBytes> + Debug,
221{
222  let (input, data) = d_read_to_crlf_s(input)?;
223  let parsed = etry!(to_i64(&data));
224  decode_log!(input, "Parsed number {}, remaining {:?}", parsed, input);
225  Ok((
226    input,
227    Frame::Number {
228      data: parsed,
229      attributes: None,
230    },
231  ))
232}
233
234fn d_parse_double<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
235where
236  T: AsRef<NomBytes> + Debug,
237{
238  let (input, data) = d_read_to_crlf_s(input)?;
239  let parsed = etry!(to_f64(&data));
240  decode_log!(input, "Parsed double {}, remaining {:?}", parsed, input);
241  Ok((
242    input,
243    Frame::Double {
244      data: parsed,
245      attributes: None,
246    },
247  ))
248}
249
250fn d_parse_boolean<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
251where
252  T: AsRef<NomBytes> + Debug,
253{
254  let (input, data) = d_read_to_crlf_s(input)?;
255  let parsed = etry!(to_bool(&data));
256  decode_log!(input, "Parsed bool {}, remaining: {:?}", parsed, input);
257  Ok((
258    input,
259    Frame::Boolean {
260      data: parsed,
261      attributes: None,
262    },
263  ))
264}
265
266fn d_parse_null<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
267where
268  T: AsRef<NomBytes> + Debug,
269{
270  let (input, _) = d_read_to_crlf_s(input)?;
271  Ok((input, Frame::Null))
272}
273
274fn d_parse_blobstring<T>(input: T, len: usize) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
275where
276  T: AsRef<NomBytes> + Debug,
277{
278  let (input, data) = nom_terminated(nom_take(len), nom_take(2_usize))(input.as_ref().clone())?;
279
280  Ok((
281    input,
282    Frame::BlobString {
283      data: data.into_bytes(),
284      attributes: None,
285    },
286  ))
287}
288
289fn d_parse_bloberror<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
290where
291  T: AsRef<NomBytes> + Debug,
292{
293  let (input, len) = d_read_prefix_len(input)?;
294  let (input, data) = nom_terminated(nom_take(len), nom_take(2_usize))(input)?;
295
296  Ok((
297    input,
298    Frame::BlobError {
299      data: data.into_bytes(),
300      attributes: None,
301    },
302  ))
303}
304
305fn d_parse_verbatimstring<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
306where
307  T: AsRef<NomBytes> + Debug,
308{
309  let (input, len) = d_read_prefix_len(input)?;
310  let (input, format_bytes) = nom_terminated(nom_take(3_usize), nom_take(1_usize))(input)?;
311  let format = etry!(to_verbatimstring_format(&format_bytes));
312  let (input, data) = nom_terminated(nom_take(len - 4), nom_take(2_usize))(input)?;
313
314  Ok((
315    input,
316    Frame::VerbatimString {
317      data: data.into_bytes(),
318      format,
319      attributes: None,
320    },
321  ))
322}
323
324fn d_parse_bignumber<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
325where
326  T: AsRef<NomBytes> + Debug,
327{
328  let (input, data) = d_read_to_crlf(input)?;
329
330  Ok((
331    input,
332    Frame::BigNumber {
333      data: data.into_bytes(),
334      attributes: None,
335    },
336  ))
337}
338
339fn d_parse_array_frames<T>(input: T, len: usize) -> IResult<NomBytes, Vec<Frame>, RedisParseError<NomBytes>>
340where
341  T: AsRef<NomBytes> + Debug,
342{
343  nom_count(nom_map_res(d_parse_frame_or_attribute, unwrap_complete_frame), len)(input.as_ref().clone())
344}
345
346fn d_parse_kv_pairs<T>(input: T, len: usize) -> IResult<NomBytes, FrameMap, RedisParseError<NomBytes>>
347where
348  T: AsRef<NomBytes> + Debug,
349{
350  nom_map_res(
351    nom_count(nom_map_res(d_parse_frame_or_attribute, unwrap_complete_frame), len * 2),
352    to_map,
353  )(input.as_ref().clone())
354}
355
356fn d_parse_array<T>(input: T, len: usize) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
357where
358  T: AsRef<NomBytes> + Debug,
359{
360  let (input, data) = d_parse_array_frames(input, len)?;
361  Ok((input, Frame::Array { data, attributes: None }))
362}
363
364fn d_parse_push<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
365where
366  T: AsRef<NomBytes> + Debug,
367{
368  let (input, len) = d_read_prefix_len(input)?;
369  let (input, data) = d_parse_array_frames(input, len)?;
370  Ok((input, Frame::Push { data, attributes: None }))
371}
372
373fn d_parse_set<T>(input: T, len: usize) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
374where
375  T: AsRef<NomBytes> + Debug,
376{
377  let (input, frames) = d_parse_array_frames(input, len)?;
378  let set = etry!(to_set(frames));
379
380  Ok((
381    input,
382    Frame::Set {
383      data: set,
384      attributes: None,
385    },
386  ))
387}
388
389fn d_parse_map<T>(input: T, len: usize) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
390where
391  T: AsRef<NomBytes> + Debug,
392{
393  let (input, frames) = d_parse_kv_pairs(input, len)?;
394
395  Ok((
396    input,
397    Frame::Map {
398      data: frames,
399      attributes: None,
400    },
401  ))
402}
403
404fn d_parse_attribute<T>(input: T) -> IResult<NomBytes, Attributes, RedisParseError<NomBytes>>
405where
406  T: AsRef<NomBytes> + Debug,
407{
408  let (input, len) = d_read_prefix_len(input)?;
409  let (input, attributes) = d_parse_kv_pairs(input, len)?;
410
411  Ok((input, attributes))
412}
413
414fn d_parse_hello<T>(input: T) -> IResult<NomBytes, Frame, RedisParseError<NomBytes>>
415where
416  T: AsRef<NomBytes> + Debug,
417{
418  let (input, _) = nom_map_res(
419    nom_terminated(nom_take_until(HELLO.as_bytes()), nom_take(1_usize)),
420    utils::to_byte_str,
421  )(input.as_ref().clone())?;
422  let (input, version) = be_u8(input)?;
423  let (input, auth) = nom_opt(nom_map_res(
424    nom_terminated(nom_take_until(AUTH.as_bytes()), nom_take(1_usize)),
425    utils::to_byte_str,
426  ))(input)?;
427
428  let (input, auth) = if auth.is_some() {
429    let (input, username) = nom_map_res(
430      nom_terminated(nom_take_until(EMPTY_SPACE.as_bytes()), nom_take(1_usize)),
431      utils::to_byte_str,
432    )(input)?;
433    let (input, password) = nom_map_res(nom_take_until(CRLF.as_bytes()), utils::to_byte_str)(input)?;
434
435    (input, Some((username, password)))
436  } else {
437    (input, None)
438  };
439
440  Ok((input, etry!(to_hello(version, auth))))
441}
442
443/// Check for a streaming variant of a frame, and if found then return the prefix bytes only, otherwise return the complete frame.
444///
445/// Only supported for arrays, sets, maps, and blob strings.
446fn d_check_streaming<T>(input: T, kind: FrameKind) -> IResult<NomBytes, DecodedFrame, RedisParseError<NomBytes>>
447where
448  T: AsRef<NomBytes> + Debug,
449{
450  let (input, len) = d_read_prefix_len_signed(input)?;
451  let (input, frame) = if len == -1 {
452    (input, DecodedFrame::Streaming(StreamedFrame::new(kind)))
453  } else {
454    let len = etry!(isize_to_usize(len));
455    let (input, frame) = match kind {
456      FrameKind::Array => d_parse_array(input, len)?,
457      FrameKind::Set => d_parse_set(input, len)?,
458      FrameKind::Map => d_parse_map(input, len)?,
459      FrameKind::BlobString => d_parse_blobstring(input, len)?,
460      _ => e!(RedisParseError::new_custom(
461        "check_streaming",
462        format!("Invalid frame type: {:?}", kind)
463      )),
464    };
465
466    (input, DecodedFrame::Complete(frame))
467  };
468
469  Ok((input, frame))
470}
471
472fn d_parse_chunked_string<T>(input: T) -> IResult<NomBytes, DecodedFrame, RedisParseError<NomBytes>>
473where
474  T: AsRef<NomBytes> + Debug,
475{
476  let (input, len) = d_read_prefix_len(input)?;
477  let (input, frame) = if len == 0 {
478    (input, Frame::new_end_stream())
479  } else {
480    let (input, contents) = nom_terminated(nom_take(len), nom_take(2_usize))(input)?;
481    (input, Frame::ChunkedString(contents.into_bytes()))
482  };
483
484  Ok((input, DecodedFrame::Complete(frame)))
485}
486
487fn d_return_end_stream<T>(input: T) -> IResult<NomBytes, DecodedFrame, RedisParseError<NomBytes>>
488where
489  T: AsRef<NomBytes> + Debug,
490{
491  let (input, _) = d_read_to_crlf(input)?;
492  Ok((input, DecodedFrame::Complete(Frame::new_end_stream())))
493}
494
495fn d_parse_non_attribute_frame<T>(
496  input: T,
497  kind: FrameKind,
498) -> IResult<NomBytes, DecodedFrame, RedisParseError<NomBytes>>
499where
500  T: AsRef<NomBytes> + Debug,
501{
502  let input = input.as_ref().clone();
503
504  let (input, frame) = match kind {
505    FrameKind::Array => d_check_streaming(input, kind)?,
506    FrameKind::BlobString => d_check_streaming(input, kind)?,
507    FrameKind::Map => d_check_streaming(input, kind)?,
508    FrameKind::Set => d_check_streaming(input, kind)?,
509    FrameKind::SimpleString => nom_map(d_parse_simplestring, map_complete_frame)(input)?,
510    FrameKind::SimpleError => nom_map(d_parse_simpleerror, map_complete_frame)(input)?,
511    FrameKind::Number => nom_map(d_parse_number, map_complete_frame)(input)?,
512    FrameKind::Null => nom_map(d_parse_null, map_complete_frame)(input)?,
513    FrameKind::Double => nom_map(d_parse_double, map_complete_frame)(input)?,
514    FrameKind::Boolean => nom_map(d_parse_boolean, map_complete_frame)(input)?,
515    FrameKind::BlobError => nom_map(d_parse_bloberror, map_complete_frame)(input)?,
516    FrameKind::VerbatimString => nom_map(d_parse_verbatimstring, map_complete_frame)(input)?,
517    FrameKind::Push => nom_map(d_parse_push, map_complete_frame)(input)?,
518    FrameKind::BigNumber => nom_map(d_parse_bignumber, map_complete_frame)(input)?,
519    FrameKind::Hello => nom_map(d_parse_hello, map_complete_frame)(input)?,
520    FrameKind::ChunkedString => d_parse_chunked_string(input)?,
521    FrameKind::EndStream => d_return_end_stream(input)?,
522    FrameKind::Attribute => {
523      error!("Found unexpected attribute frame.");
524      e!(RedisParseError::new_custom(
525        "parse_non_attribute_frame",
526        "Unexpected attribute frame.",
527      ));
528    }
529  };
530
531  Ok((input, frame))
532}
533
534fn d_parse_attribute_and_frame<T>(input: T) -> IResult<NomBytes, DecodedFrame, RedisParseError<NomBytes>>
535where
536  T: AsRef<NomBytes> + Debug,
537{
538  let (input, attributes) = d_parse_attribute(input)?;
539  let (input, kind) = d_frame_type(input)?;
540  let (input, next_frame) = d_parse_non_attribute_frame(input, kind)?;
541  let frame = etry!(attach_attributes(attributes, next_frame));
542
543  Ok((input, frame))
544}
545
546fn d_parse_frame_or_attribute<T>(input: T) -> IResult<NomBytes, DecodedFrame, RedisParseError<NomBytes>>
547where
548  T: AsRef<NomBytes> + Debug,
549{
550  let (input, kind) = d_frame_type(input)?;
551  let (input, frame) = if let FrameKind::Attribute = kind {
552    d_parse_attribute_and_frame(input)?
553  } else {
554    d_parse_non_attribute_frame(input, kind)?
555  };
556
557  Ok((input, frame))
558}
559
560/// Decoding functions for complete frames. **If a streamed frame is detected it will result in an error.**
561pub mod complete {
562  use super::*;
563  use bytes::Bytes;
564
565  /// Attempt to parse the contents of `buf`, returning the first valid frame and the number of bytes consumed.
566  ///
567  /// If the byte slice contains an incomplete frame then `None` is returned.
568  pub fn decode(buf: &Bytes) -> Result<Option<(Frame, usize)>, RedisProtocolError> {
569    let len = buf.len();
570    let buf: NomBytes = buf.into();
571
572    match d_parse_frame_or_attribute(&buf) {
573      Ok((remaining, frame)) => Ok(Some((frame.into_complete_frame()?, len - remaining.len()))),
574      Err(NomErr::Incomplete(_)) => Ok(None),
575      Err(e) => Err(RedisParseError::from(e).into()),
576    }
577  }
578
579  #[cfg(feature = "decode-mut")]
580  #[cfg_attr(docsrs, doc(cfg(feature = "decode-mut")))]
581  pub use crate::decode_mut::resp3::complete::decode_mut;
582}
583
584/// Decoding structs and functions that support streaming frames.
585///
586/// The caller is responsible for managing any returned state for streaming frames.
587pub mod streaming {
588  use super::*;
589  use bytes::Bytes;
590
591  /// Attempt to parse the contents of `buf`, returning the first valid frame and the number of bytes consumed.
592  ///
593  /// If the byte slice contains an incomplete frame then `None` is returned.
594  pub fn decode(buf: &Bytes) -> Result<Option<(DecodedFrame, usize)>, RedisProtocolError> {
595    let len = buf.len();
596    let buf: NomBytes = buf.clone().into();
597
598    match d_parse_frame_or_attribute(&buf) {
599      Ok((remaining, frame)) => Ok(Some((frame, len - remaining.len()))),
600      Err(NomErr::Incomplete(_)) => Ok(None),
601      Err(e) => Err(RedisParseError::from(e).into()),
602    }
603  }
604
605  #[cfg(feature = "decode-mut")]
606  #[cfg_attr(docsrs, doc(cfg(feature = "decode-mut")))]
607  pub use crate::decode_mut::resp3::streaming::decode_mut;
608}
609
610#[cfg(test)]
611pub mod tests {
612  use super::*;
613  use crate::resp3::decode::complete::decode;
614  use crate::resp3::decode::streaming::decode as stream_decode;
615  use bytes::{Bytes, BytesMut};
616  use nom::AsBytes;
617  use std::str;
618
619  pub const PADDING: &'static str = "FOOBARBAZ";
620
621  pub fn pretty_print_panic(e: RedisProtocolError) {
622    panic!("{:?}", e);
623  }
624
625  pub fn panic_no_decode() {
626    panic!("Failed to decode bytes. None returned.")
627  }
628
629  fn decode_and_verify_some(bytes: &Bytes, expected: &(Option<Frame>, usize)) {
630    let (frame, len) = match complete::decode(&bytes) {
631      Ok(Some((f, l))) => (Some(f), l),
632      Ok(None) => return panic_no_decode(),
633      Err(e) => return pretty_print_panic(e),
634    };
635
636    assert_eq!(frame, expected.0, "decoded frame matched");
637    assert_eq!(len, expected.1, "decoded frame len matched");
638  }
639
640  fn decode_and_verify_padded_some(bytes: &Bytes, expected: &(Option<Frame>, usize)) {
641    let mut bytes = BytesMut::from(bytes.as_bytes());
642    bytes.extend_from_slice(PADDING.as_bytes());
643    let bytes = bytes.freeze();
644
645    let (frame, len) = match complete::decode(&bytes) {
646      Ok(Some((f, l))) => (Some(f), l),
647      Ok(None) => return panic_no_decode(),
648      Err(e) => return pretty_print_panic(e),
649    };
650
651    assert_eq!(frame, expected.0, "decoded frame matched");
652    assert_eq!(len, expected.1, "decoded frame len matched");
653  }
654
655  fn decode_and_verify_none(bytes: &Bytes) {
656    let (frame, len) = match complete::decode(&bytes) {
657      Ok(Some((f, l))) => (Some(f), l),
658      Ok(None) => (None, 0),
659      Err(e) => return pretty_print_panic(e),
660    };
661
662    assert!(frame.is_none());
663    assert_eq!(len, 0);
664  }
665
666  // ----------------------- tests adapted from RESP2 ------------------------
667
668  #[test]
669  fn should_decode_llen_res_example() {
670    let expected = (
671      Some(Frame::Number {
672        data: 48293,
673        attributes: None,
674      }),
675      8,
676    );
677    let bytes: Bytes = ":48293\r\n".into();
678
679    decode_and_verify_some(&bytes, &expected);
680    decode_and_verify_padded_some(&bytes, &expected);
681  }
682
683  #[test]
684  fn should_decode_simple_string() {
685    let expected = (
686      Some(Frame::SimpleString {
687        data: "string".into(),
688        attributes: None,
689      }),
690      9,
691    );
692    let bytes: Bytes = "+string\r\n".into();
693
694    decode_and_verify_some(&bytes, &expected);
695    decode_and_verify_padded_some(&bytes, &expected);
696  }
697
698  #[test]
699  #[should_panic]
700  fn should_decode_simple_string_incomplete() {
701    let expected = (
702      Some(Frame::SimpleString {
703        data: "string".into(),
704        attributes: None,
705      }),
706      9,
707    );
708    let bytes: Bytes = "+stri".into();
709
710    decode_and_verify_some(&bytes, &expected);
711    decode_and_verify_padded_some(&bytes, &expected);
712  }
713
714  #[test]
715  fn should_decode_blob_string() {
716    let expected = (
717      Some(Frame::BlobString {
718        data: "foo".into(),
719        attributes: None,
720      }),
721      9,
722    );
723    let bytes: Bytes = "$3\r\nfoo\r\n".into();
724
725    decode_and_verify_some(&bytes, &expected);
726    decode_and_verify_padded_some(&bytes, &expected);
727  }
728
729  #[test]
730  #[should_panic]
731  fn should_decode_blob_string_incomplete() {
732    let expected = (
733      Some(Frame::BlobString {
734        data: "foo".into(),
735        attributes: None,
736      }),
737      9,
738    );
739    let bytes: Bytes = "$3\r\nfo".into();
740
741    decode_and_verify_some(&bytes, &expected);
742    decode_and_verify_padded_some(&bytes, &expected);
743  }
744
745  #[test]
746  fn should_decode_array_no_nulls() {
747    let expected = (
748      Some(Frame::Array {
749        data: vec![
750          Frame::SimpleString {
751            data: "Foo".into(),
752            attributes: None,
753          },
754          Frame::SimpleString {
755            data: "Bar".into(),
756            attributes: None,
757          },
758        ],
759        attributes: None,
760      }),
761      16,
762    );
763    let bytes: Bytes = "*2\r\n+Foo\r\n+Bar\r\n".into();
764
765    decode_and_verify_some(&bytes, &expected);
766    decode_and_verify_padded_some(&bytes, &expected);
767  }
768
769  #[test]
770  fn should_decode_array_nulls() {
771    let bytes: Bytes = "*3\r\n$3\r\nFoo\r\n_\r\n$3\r\nBar\r\n".into();
772
773    let expected = (
774      Some(Frame::Array {
775        data: vec![
776          Frame::BlobString {
777            data: "Foo".into(),
778            attributes: None,
779          },
780          Frame::Null,
781          Frame::BlobString {
782            data: "Bar".into(),
783            attributes: None,
784          },
785        ],
786        attributes: None,
787      }),
788      bytes.len(),
789    );
790
791    decode_and_verify_some(&bytes, &expected);
792    decode_and_verify_padded_some(&bytes, &expected);
793  }
794
795  #[test]
796  fn should_decode_normal_error() {
797    let bytes: Bytes = "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n".into();
798    let expected = (
799      Some(Frame::SimpleError {
800        data: "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
801        attributes: None,
802      }),
803      bytes.len(),
804    );
805
806    decode_and_verify_some(&bytes, &expected);
807    decode_and_verify_padded_some(&bytes, &expected);
808  }
809
810  #[test]
811  fn should_decode_moved_error() {
812    let bytes: Bytes = "-MOVED 3999 127.0.0.1:6381\r\n".into();
813    let expected = (
814      Some(Frame::SimpleError {
815        data: "MOVED 3999 127.0.0.1:6381".into(),
816        attributes: None,
817      }),
818      bytes.len(),
819    );
820
821    decode_and_verify_some(&bytes, &expected);
822    decode_and_verify_padded_some(&bytes, &expected);
823  }
824
825  #[test]
826  fn should_decode_ask_error() {
827    let bytes: Bytes = "-ASK 3999 127.0.0.1:6381\r\n".into();
828    let expected = (
829      Some(Frame::SimpleError {
830        data: "ASK 3999 127.0.0.1:6381".into(),
831        attributes: None,
832      }),
833      bytes.len(),
834    );
835
836    decode_and_verify_some(&bytes, &expected);
837    decode_and_verify_padded_some(&bytes, &expected);
838  }
839
840  #[test]
841  fn should_decode_incomplete() {
842    let bytes: Bytes = "*3\r\n$3\r\nFoo\r\n_\r\n$3\r\nBar".into();
843    decode_and_verify_none(&bytes);
844  }
845
846  #[test]
847  #[should_panic]
848  fn should_error_on_junk() {
849    let bytes: Bytes = "foobarbazwibblewobble".into();
850    let _ = complete::decode(&bytes).map_err(|e| pretty_print_panic(e));
851  }
852
853  // ----------------- end tests adapted from RESP2 ------------------------
854
855  #[test]
856  fn should_decode_blob_error() {
857    let expected = (
858      Some(Frame::BlobError {
859        data: "foo".into(),
860        attributes: None,
861      }),
862      9,
863    );
864    let bytes: Bytes = "!3\r\nfoo\r\n".into();
865
866    decode_and_verify_some(&bytes, &expected);
867    decode_and_verify_padded_some(&bytes, &expected);
868  }
869
870  #[test]
871  #[should_panic]
872  fn should_decode_blob_error_incomplete() {
873    let expected = (
874      Some(Frame::BlobError {
875        data: "foo".into(),
876        attributes: None,
877      }),
878      9,
879    );
880    let bytes: Bytes = "!3\r\nfo".into();
881
882    decode_and_verify_some(&bytes, &expected);
883    decode_and_verify_padded_some(&bytes, &expected);
884  }
885
886  #[test]
887  fn should_decode_simple_error() {
888    let expected = (
889      Some(Frame::SimpleError {
890        data: "string".into(),
891        attributes: None,
892      }),
893      9,
894    );
895    let bytes: Bytes = "-string\r\n".into();
896
897    decode_and_verify_some(&bytes, &expected);
898    decode_and_verify_padded_some(&bytes, &expected);
899  }
900
901  #[test]
902  #[should_panic]
903  fn should_decode_simple_error_incomplete() {
904    let expected = (
905      Some(Frame::SimpleError {
906        data: "string".into(),
907        attributes: None,
908      }),
909      9,
910    );
911    let bytes: Bytes = "-strin".into();
912
913    decode_and_verify_some(&bytes, &expected);
914    decode_and_verify_padded_some(&bytes, &expected);
915  }
916
917  #[test]
918  fn should_decode_boolean_true() {
919    let expected = (
920      Some(Frame::Boolean {
921        data: true,
922        attributes: None,
923      }),
924      4,
925    );
926    let bytes: Bytes = "#t\r\n".into();
927
928    decode_and_verify_some(&bytes, &expected);
929    decode_and_verify_padded_some(&bytes, &expected);
930  }
931
932  #[test]
933  fn should_decode_boolean_false() {
934    let expected = (
935      Some(Frame::Boolean {
936        data: false,
937        attributes: None,
938      }),
939      4,
940    );
941    let bytes: Bytes = "#f\r\n".into();
942
943    decode_and_verify_some(&bytes, &expected);
944    decode_and_verify_padded_some(&bytes, &expected);
945  }
946
947  #[test]
948  fn should_decode_number() {
949    let expected = (
950      Some(Frame::Number {
951        data: 42,
952        attributes: None,
953      }),
954      5,
955    );
956    let bytes: Bytes = ":42\r\n".into();
957
958    decode_and_verify_some(&bytes, &expected);
959    decode_and_verify_padded_some(&bytes, &expected);
960  }
961
962  #[test]
963  fn should_decode_double_inf() {
964    let expected = (
965      Some(Frame::Double {
966        data: f64::INFINITY,
967        attributes: None,
968      }),
969      6,
970    );
971    let bytes: Bytes = ",inf\r\n".into();
972
973    decode_and_verify_some(&bytes, &expected);
974    decode_and_verify_padded_some(&bytes, &expected);
975  }
976
977  #[test]
978  fn should_decode_double_neg_inf() {
979    let expected = (
980      Some(Frame::Double {
981        data: f64::NEG_INFINITY,
982        attributes: None,
983      }),
984      7,
985    );
986    let bytes: Bytes = ",-inf\r\n".into();
987
988    decode_and_verify_some(&bytes, &expected);
989    decode_and_verify_padded_some(&bytes, &expected);
990  }
991
992  #[test]
993  #[should_panic]
994  fn should_decode_double_nan() {
995    let expected = (
996      Some(Frame::Double {
997        data: f64::NAN,
998        attributes: None,
999      }),
1000      7,
1001    );
1002    let bytes: Bytes = ",foo\r\n".into();
1003
1004    decode_and_verify_some(&bytes, &expected);
1005    decode_and_verify_padded_some(&bytes, &expected);
1006  }
1007
1008  #[test]
1009  fn should_decode_double() {
1010    let expected = (
1011      Some(Frame::Double {
1012        data: 4.59193,
1013        attributes: None,
1014      }),
1015      10,
1016    );
1017    let bytes: Bytes = ",4.59193\r\n".into();
1018
1019    decode_and_verify_some(&bytes, &expected);
1020    decode_and_verify_padded_some(&bytes, &expected);
1021
1022    let expected = (
1023      Some(Frame::Double {
1024        data: 4_f64,
1025        attributes: None,
1026      }),
1027      4,
1028    );
1029    let bytes: Bytes = ",4\r\n".into();
1030
1031    decode_and_verify_some(&bytes, &expected);
1032    decode_and_verify_padded_some(&bytes, &expected);
1033  }
1034
1035  #[test]
1036  fn should_decode_bignumber() {
1037    let expected = (
1038      Some(Frame::BigNumber {
1039        data: "3492890328409238509324850943850943825024385".into(),
1040        attributes: None,
1041      }),
1042      46,
1043    );
1044    let bytes: Bytes = "(3492890328409238509324850943850943825024385\r\n".into();
1045
1046    decode_and_verify_some(&bytes, &expected);
1047    decode_and_verify_padded_some(&bytes, &expected);
1048  }
1049
1050  #[test]
1051  fn should_decode_null() {
1052    let expected = (Some(Frame::Null), 3);
1053    let bytes: Bytes = "_\r\n".into();
1054
1055    decode_and_verify_some(&bytes, &expected);
1056    decode_and_verify_padded_some(&bytes, &expected);
1057  }
1058
1059  #[test]
1060  fn should_decode_verbatim_string_mkd() {
1061    let expected = (
1062      Some(Frame::VerbatimString {
1063        data: "Some string".into(),
1064        format: VerbatimStringFormat::Markdown,
1065        attributes: None,
1066      }),
1067      22,
1068    );
1069    let bytes: Bytes = "=15\r\nmkd:Some string\r\n".into();
1070
1071    decode_and_verify_some(&bytes, &expected);
1072    decode_and_verify_padded_some(&bytes, &expected);
1073  }
1074
1075  #[test]
1076  fn should_decode_verbatim_string_txt() {
1077    let expected = (
1078      Some(Frame::VerbatimString {
1079        data: "Some string".into(),
1080        format: VerbatimStringFormat::Text,
1081        attributes: None,
1082      }),
1083      22,
1084    );
1085    let bytes: Bytes = "=15\r\ntxt:Some string\r\n".into();
1086
1087    decode_and_verify_some(&bytes, &expected);
1088    decode_and_verify_padded_some(&bytes, &expected);
1089  }
1090
1091  #[test]
1092  fn should_decode_map_no_nulls() {
1093    let k1 = Frame::SimpleString {
1094      data: "first".into(),
1095      attributes: None,
1096    };
1097    let v1 = Frame::Number {
1098      data: 1,
1099      attributes: None,
1100    };
1101    let k2 = Frame::BlobString {
1102      data: "second".into(),
1103      attributes: None,
1104    };
1105    let v2 = Frame::Double {
1106      data: 4.2,
1107      attributes: None,
1108    };
1109
1110    let mut expected_map = resp3_utils::new_map(None);
1111    expected_map.insert(k1, v1);
1112    expected_map.insert(k2, v2);
1113    let expected = (
1114      Some(Frame::Map {
1115        data: expected_map,
1116        attributes: None,
1117      }),
1118      34,
1119    );
1120    let bytes: Bytes = "%2\r\n+first\r\n:1\r\n$6\r\nsecond\r\n,4.2\r\n".into();
1121
1122    decode_and_verify_some(&bytes, &expected);
1123    decode_and_verify_padded_some(&bytes, &expected);
1124  }
1125
1126  #[test]
1127  fn should_decode_map_with_nulls() {
1128    let k1 = Frame::SimpleString {
1129      data: "first".into(),
1130      attributes: None,
1131    };
1132    let v1 = Frame::Number {
1133      data: 1,
1134      attributes: None,
1135    };
1136    let k2 = Frame::Number {
1137      data: 2,
1138      attributes: None,
1139    };
1140    let v2 = Frame::Null;
1141    let k3 = Frame::BlobString {
1142      data: "second".into(),
1143      attributes: None,
1144    };
1145    let v3 = Frame::Double {
1146      data: 4.2,
1147      attributes: None,
1148    };
1149
1150    let mut expected_map = resp3_utils::new_map(None);
1151    expected_map.insert(k1, v1);
1152    expected_map.insert(k2, v2);
1153    expected_map.insert(k3, v3);
1154    let expected = (
1155      Some(Frame::Map {
1156        data: expected_map,
1157        attributes: None,
1158      }),
1159      41,
1160    );
1161    let bytes: Bytes = "%3\r\n+first\r\n:1\r\n:2\r\n_\r\n$6\r\nsecond\r\n,4.2\r\n".into();
1162
1163    decode_and_verify_some(&bytes, &expected);
1164    decode_and_verify_padded_some(&bytes, &expected);
1165  }
1166
1167  #[test]
1168  fn should_decode_set_no_nulls() {
1169    let mut expected_set = resp3_utils::new_set(None);
1170    expected_set.insert(Frame::Number {
1171      data: 1,
1172      attributes: None,
1173    });
1174    expected_set.insert(Frame::SimpleString {
1175      data: "2".into(),
1176      attributes: None,
1177    });
1178    expected_set.insert(Frame::BlobString {
1179      data: "foobar".into(),
1180      attributes: None,
1181    });
1182    expected_set.insert(Frame::Double {
1183      data: 4.2,
1184      attributes: None,
1185    });
1186    let expected = (
1187      Some(Frame::Set {
1188        data: expected_set,
1189        attributes: None,
1190      }),
1191      30,
1192    );
1193    let bytes: Bytes = "~4\r\n:1\r\n+2\r\n$6\r\nfoobar\r\n,4.2\r\n".into();
1194
1195    decode_and_verify_some(&bytes, &expected);
1196    decode_and_verify_padded_some(&bytes, &expected);
1197  }
1198
1199  #[test]
1200  fn should_decode_set_with_nulls() {
1201    let mut expected_set = resp3_utils::new_set(None);
1202    expected_set.insert(Frame::Number {
1203      data: 1,
1204      attributes: None,
1205    });
1206    expected_set.insert(Frame::SimpleString {
1207      data: "2".into(),
1208      attributes: None,
1209    });
1210    expected_set.insert(Frame::Null);
1211    expected_set.insert(Frame::Double {
1212      data: 4.2,
1213      attributes: None,
1214    });
1215    let expected = (
1216      Some(Frame::Set {
1217        data: expected_set,
1218        attributes: None,
1219      }),
1220      21,
1221    );
1222    let bytes: Bytes = "~4\r\n:1\r\n+2\r\n_\r\n,4.2\r\n".into();
1223
1224    decode_and_verify_some(&bytes, &expected);
1225    decode_and_verify_padded_some(&bytes, &expected);
1226  }
1227
1228  #[test]
1229  fn should_decode_push_pubsub() {
1230    let expected = (
1231      Some(Frame::Push {
1232        data: vec![
1233          Frame::SimpleString {
1234            data: "pubsub".into(),
1235            attributes: None,
1236          },
1237          Frame::SimpleString {
1238            data: "message".into(),
1239            attributes: None,
1240          },
1241          Frame::SimpleString {
1242            data: "somechannel".into(),
1243            attributes: None,
1244          },
1245          Frame::SimpleString {
1246            data: "this is the message".into(),
1247            attributes: None,
1248          },
1249        ],
1250        attributes: None,
1251      }),
1252      59,
1253    );
1254    let bytes: Bytes = ">4\r\n+pubsub\r\n+message\r\n+somechannel\r\n+this is the message\r\n".into();
1255
1256    decode_and_verify_some(&bytes, &expected);
1257    decode_and_verify_padded_some(&bytes, &expected);
1258
1259    let (frame, _) = decode(&bytes).unwrap().unwrap();
1260    assert!(frame.is_pubsub_message());
1261    assert!(frame.is_normal_pubsub());
1262  }
1263
1264  #[test]
1265  fn should_decode_push_pattern_pubsub() {
1266    let expected = (
1267      Some(Frame::Push {
1268        data: vec![
1269          Frame::SimpleString {
1270            data: "pubsub".into(),
1271            attributes: None,
1272          },
1273          Frame::SimpleString {
1274            data: "pmessage".into(),
1275            attributes: None,
1276          },
1277          Frame::SimpleString {
1278            data: "somechannel".into(),
1279            attributes: None,
1280          },
1281          Frame::SimpleString {
1282            data: "this is the message".into(),
1283            attributes: None,
1284          },
1285        ],
1286        attributes: None,
1287      }),
1288      60,
1289    );
1290    let bytes: Bytes = ">4\r\n+pubsub\r\n+pmessage\r\n+somechannel\r\n+this is the message\r\n".into();
1291
1292    decode_and_verify_some(&bytes, &expected);
1293    decode_and_verify_padded_some(&bytes, &expected);
1294
1295    let (frame, _) = decode(&bytes).unwrap().unwrap();
1296    assert!(frame.is_pattern_pubsub_message());
1297    assert!(frame.is_pubsub_message());
1298  }
1299
1300  #[test]
1301  fn should_decode_keyevent_message() {
1302    let expected = (
1303      Some(Frame::Push {
1304        data: vec![
1305          Frame::SimpleString {
1306            data: "pubsub".into(),
1307            attributes: None,
1308          },
1309          Frame::SimpleString {
1310            data: "pmessage".into(),
1311            attributes: None,
1312          },
1313          Frame::SimpleString {
1314            data: "__key*".into(),
1315            attributes: None,
1316          },
1317          Frame::SimpleString {
1318            data: "__keyevent@0__:set".into(),
1319            attributes: None,
1320          },
1321          Frame::SimpleString {
1322            data: "foo".into(),
1323            attributes: None,
1324          },
1325        ],
1326        attributes: None,
1327      }),
1328      60,
1329    );
1330    let bytes: Bytes = ">5\r\n+pubsub\r\n+pmessage\r\n+__key*\r\n+__keyevent@0__:set\r\n+foo\r\n".into();
1331
1332    decode_and_verify_some(&bytes, &expected);
1333    decode_and_verify_padded_some(&bytes, &expected);
1334
1335    let (frame, _) = decode(&bytes).unwrap().unwrap();
1336    assert!(frame.is_pattern_pubsub_message());
1337    assert!(frame.is_pubsub_message());
1338  }
1339
1340  #[test]
1341  fn should_parse_outer_attributes() {
1342    let mut expected_inner_attrs = resp3_utils::new_map(None);
1343    expected_inner_attrs.insert(
1344      Frame::BlobString {
1345        data: "a".into(),
1346        attributes: None,
1347      },
1348      Frame::Double {
1349        data: 0.1923,
1350        attributes: None,
1351      },
1352    );
1353    expected_inner_attrs.insert(
1354      Frame::BlobString {
1355        data: "b".into(),
1356        attributes: None,
1357      },
1358      Frame::Double {
1359        data: 0.0012,
1360        attributes: None,
1361      },
1362    );
1363    let expected_inner_attrs = Frame::Map {
1364      data: expected_inner_attrs,
1365      attributes: None,
1366    };
1367
1368    let mut expected_attrs = resp3_utils::new_map(None);
1369    expected_attrs.insert(
1370      Frame::SimpleString {
1371        data: "key-popularity".into(),
1372        attributes: None,
1373      },
1374      expected_inner_attrs,
1375    );
1376
1377    let expected = (
1378      Some(Frame::Array {
1379        data: vec![
1380          Frame::Number {
1381            data: 2039123,
1382            attributes: None,
1383          },
1384          Frame::Number {
1385            data: 9543892,
1386            attributes: None,
1387          },
1388        ],
1389        attributes: Some(expected_attrs.into()),
1390      }),
1391      81,
1392    );
1393
1394    let bytes: Bytes =
1395      "|1\r\n+key-popularity\r\n%2\r\n$1\r\na\r\n,0.1923\r\n$1\r\nb\r\n,0.0012\r\n*2\r\n:2039123\r\n:9543892\r\n"
1396        .into();
1397
1398    decode_and_verify_some(&bytes, &expected);
1399    decode_and_verify_padded_some(&bytes, &expected);
1400  }
1401
1402  #[test]
1403  fn should_parse_inner_attributes() {
1404    let mut expected_attrs = resp3_utils::new_map(None);
1405    expected_attrs.insert(
1406      Frame::SimpleString {
1407        data: "ttl".into(),
1408        attributes: None,
1409      },
1410      Frame::Number {
1411        data: 3600,
1412        attributes: None,
1413      },
1414    );
1415
1416    let expected = (
1417      Some(Frame::Array {
1418        data: vec![
1419          Frame::Number {
1420            data: 1,
1421            attributes: None,
1422          },
1423          Frame::Number {
1424            data: 2,
1425            attributes: None,
1426          },
1427          Frame::Number {
1428            data: 3,
1429            attributes: Some(expected_attrs),
1430          },
1431        ],
1432        attributes: None,
1433      }),
1434      33,
1435    );
1436    let bytes: Bytes = "*3\r\n:1\r\n:2\r\n|1\r\n+ttl\r\n:3600\r\n:3\r\n".into();
1437
1438    decode_and_verify_some(&bytes, &expected);
1439    decode_and_verify_padded_some(&bytes, &expected);
1440  }
1441
1442  #[test]
1443  fn should_decode_end_stream() {
1444    let bytes: Bytes = ";0\r\n".into();
1445    let (frame, _) = stream_decode(&bytes).unwrap().unwrap();
1446    assert_eq!(frame, DecodedFrame::Complete(Frame::new_end_stream()))
1447  }
1448
1449  #[test]
1450  fn should_decode_streaming_string() {
1451    let mut bytes: Bytes = "$?\r\n;4\r\nHell\r\n;6\r\no worl\r\n;1\r\nd\r\n;0\r\n".into();
1452
1453    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1454    assert_eq!(
1455      frame,
1456      DecodedFrame::Streaming(StreamedFrame::new(FrameKind::BlobString))
1457    );
1458    assert_eq!(amt, 4);
1459    let _ = bytes.split_to(amt);
1460
1461    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1462    assert_eq!(frame, DecodedFrame::Complete(Frame::ChunkedString("Hell".into())));
1463    assert_eq!(amt, 10);
1464    let _ = bytes.split_to(amt);
1465
1466    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1467    assert_eq!(frame, DecodedFrame::Complete(Frame::ChunkedString("o worl".into())));
1468    assert_eq!(amt, 12);
1469    let _ = bytes.split_to(amt);
1470
1471    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1472    assert_eq!(frame, DecodedFrame::Complete(Frame::ChunkedString("d".into())));
1473    assert_eq!(amt, 7);
1474    let _ = bytes.split_to(amt);
1475
1476    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1477    assert_eq!(frame, DecodedFrame::Complete(Frame::new_end_stream()));
1478    assert_eq!(amt, 4);
1479  }
1480
1481  #[test]
1482  fn should_decode_streaming_array() {
1483    let mut bytes: Bytes = "*?\r\n:1\r\n:2\r\n:3\r\n.\r\n".into();
1484
1485    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1486    assert_eq!(frame, DecodedFrame::Streaming(StreamedFrame::new(FrameKind::Array)));
1487    assert_eq!(amt, 4);
1488    let _ = bytes.split_to(amt);
1489    let mut streamed = frame.into_streaming_frame().unwrap();
1490
1491    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1492    assert_eq!(
1493      frame,
1494      DecodedFrame::Complete(Frame::Number {
1495        data: 1,
1496        attributes: None
1497      })
1498    );
1499    assert_eq!(amt, 4);
1500    let _ = bytes.split_to(amt);
1501    streamed.add_frame(frame.into_complete_frame().unwrap());
1502
1503    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1504    assert_eq!(
1505      frame,
1506      DecodedFrame::Complete(Frame::Number {
1507        data: 2,
1508        attributes: None
1509      })
1510    );
1511    assert_eq!(amt, 4);
1512    let _ = bytes.split_to(amt);
1513    streamed.add_frame(frame.into_complete_frame().unwrap());
1514
1515    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1516    assert_eq!(
1517      frame,
1518      DecodedFrame::Complete(Frame::Number {
1519        data: 3,
1520        attributes: None
1521      })
1522    );
1523    assert_eq!(amt, 4);
1524    let _ = bytes.split_to(amt);
1525    streamed.add_frame(frame.into_complete_frame().unwrap());
1526
1527    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1528    assert_eq!(frame, DecodedFrame::Complete(Frame::new_end_stream()));
1529    assert_eq!(amt, 3);
1530    streamed.add_frame(frame.into_complete_frame().unwrap());
1531
1532    assert!(streamed.is_finished());
1533    let actual = streamed.into_frame().unwrap();
1534    let expected = Frame::Array {
1535      data: vec![
1536        Frame::Number {
1537          data: 1,
1538          attributes: None,
1539        },
1540        Frame::Number {
1541          data: 2,
1542          attributes: None,
1543        },
1544        Frame::Number {
1545          data: 3,
1546          attributes: None,
1547        },
1548      ],
1549      attributes: None,
1550    };
1551
1552    assert_eq!(actual, expected);
1553  }
1554
1555  #[test]
1556  fn should_decode_streaming_set() {
1557    let mut bytes: Bytes = "~?\r\n:1\r\n:2\r\n:3\r\n.\r\n".into();
1558
1559    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1560    assert_eq!(frame, DecodedFrame::Streaming(StreamedFrame::new(FrameKind::Set)));
1561    assert_eq!(amt, 4);
1562    let _ = bytes.split_to(amt);
1563    let mut streamed = frame.into_streaming_frame().unwrap();
1564
1565    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1566    assert_eq!(
1567      frame,
1568      DecodedFrame::Complete(Frame::Number {
1569        data: 1,
1570        attributes: None
1571      })
1572    );
1573    assert_eq!(amt, 4);
1574    let _ = bytes.split_to(amt);
1575    streamed.add_frame(frame.into_complete_frame().unwrap());
1576
1577    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1578    assert_eq!(
1579      frame,
1580      DecodedFrame::Complete(Frame::Number {
1581        data: 2,
1582        attributes: None
1583      })
1584    );
1585    assert_eq!(amt, 4);
1586    let _ = bytes.split_to(amt);
1587    streamed.add_frame(frame.into_complete_frame().unwrap());
1588
1589    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1590    assert_eq!(
1591      frame,
1592      DecodedFrame::Complete(Frame::Number {
1593        data: 3,
1594        attributes: None
1595      })
1596    );
1597    assert_eq!(amt, 4);
1598    let _ = bytes.split_to(amt);
1599    streamed.add_frame(frame.into_complete_frame().unwrap());
1600
1601    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1602    assert_eq!(frame, DecodedFrame::Complete(Frame::new_end_stream()));
1603    assert_eq!(amt, 3);
1604    streamed.add_frame(frame.into_complete_frame().unwrap());
1605
1606    assert!(streamed.is_finished());
1607    let actual = streamed.into_frame().unwrap();
1608    let mut expected_result = resp3_utils::new_set(None);
1609    expected_result.insert(Frame::Number {
1610      data: 1,
1611      attributes: None,
1612    });
1613    expected_result.insert(Frame::Number {
1614      data: 2,
1615      attributes: None,
1616    });
1617    expected_result.insert(Frame::Number {
1618      data: 3,
1619      attributes: None,
1620    });
1621
1622    let expected = Frame::Set {
1623      data: expected_result,
1624      attributes: None,
1625    };
1626
1627    assert_eq!(actual, expected);
1628  }
1629
1630  #[test]
1631  fn should_decode_streaming_map() {
1632    let mut bytes: Bytes = "%?\r\n+a\r\n:1\r\n+b\r\n:2\r\n.\r\n".into();
1633
1634    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1635    assert_eq!(frame, DecodedFrame::Streaming(StreamedFrame::new(FrameKind::Map)));
1636    assert_eq!(amt, 4);
1637    let _ = bytes.split_to(amt);
1638    let mut streamed = frame.into_streaming_frame().unwrap();
1639
1640    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1641    assert_eq!(
1642      frame,
1643      DecodedFrame::Complete(Frame::SimpleString {
1644        data: "a".into(),
1645        attributes: None
1646      })
1647    );
1648    assert_eq!(amt, 4);
1649    let _ = bytes.split_to(amt);
1650    streamed.add_frame(frame.into_complete_frame().unwrap());
1651
1652    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1653    assert_eq!(
1654      frame,
1655      DecodedFrame::Complete(Frame::Number {
1656        data: 1.into(),
1657        attributes: None
1658      })
1659    );
1660    assert_eq!(amt, 4);
1661    let _ = bytes.split_to(amt);
1662    streamed.add_frame(frame.into_complete_frame().unwrap());
1663
1664    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1665    assert_eq!(
1666      frame,
1667      DecodedFrame::Complete(Frame::SimpleString {
1668        data: "b".into(),
1669        attributes: None
1670      })
1671    );
1672    assert_eq!(amt, 4);
1673    let _ = bytes.split_to(amt);
1674    streamed.add_frame(frame.into_complete_frame().unwrap());
1675
1676    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1677    assert_eq!(
1678      frame,
1679      DecodedFrame::Complete(Frame::Number {
1680        data: 2.into(),
1681        attributes: None
1682      })
1683    );
1684    assert_eq!(amt, 4);
1685    let _ = bytes.split_to(amt);
1686    streamed.add_frame(frame.into_complete_frame().unwrap());
1687
1688    let (frame, amt) = stream_decode(&bytes).unwrap().unwrap();
1689    assert_eq!(frame, DecodedFrame::Complete(Frame::new_end_stream()));
1690    assert_eq!(amt, 3);
1691    streamed.add_frame(frame.into_complete_frame().unwrap());
1692
1693    assert!(streamed.is_finished());
1694    let actual = streamed.into_frame().unwrap();
1695    let mut expected_result = resp3_utils::new_map(None);
1696    expected_result.insert(
1697      Frame::SimpleString {
1698        data: "a".into(),
1699        attributes: None,
1700      },
1701      Frame::Number {
1702        data: 1,
1703        attributes: None,
1704      },
1705    );
1706    expected_result.insert(
1707      Frame::SimpleString {
1708        data: "b".into(),
1709        attributes: None,
1710      },
1711      Frame::Number {
1712        data: 2,
1713        attributes: None,
1714      },
1715    );
1716    let expected = Frame::Map {
1717      data: expected_result,
1718      attributes: None,
1719    };
1720
1721    assert_eq!(actual, expected);
1722  }
1723}