redis_protocol/resp3/
types.rs

1use crate::resp3::utils as resp3_utils;
2use crate::types::{Redirection, RedisProtocolError, RedisProtocolErrorKind};
3use crate::utils;
4use alloc::collections::VecDeque;
5use alloc::format;
6use alloc::string::{String, ToString};
7use alloc::vec::Vec;
8use bytes::Bytes;
9use bytes_utils::Str;
10use core::convert::TryFrom;
11use core::hash::{Hash, Hasher};
12use core::mem;
13use core::str;
14
15#[cfg(feature = "std")]
16use std::collections::{HashMap, HashSet};
17
18#[cfg(feature = "hashbrown")]
19use hashbrown::{HashMap, HashSet};
20
21#[cfg(feature = "index-map")]
22use indexmap::{IndexMap, IndexSet};
23
24#[cfg(test)]
25use std::convert::TryInto;
26
27/// Byte prefix before a simple string type.
28pub const SIMPLE_STRING_BYTE: u8 = b'+';
29/// Byte prefix before a simple error type.
30pub const SIMPLE_ERROR_BYTE: u8 = b'-';
31/// Byte prefix before a Number type.
32pub const NUMBER_BYTE: u8 = b':';
33/// Byte prefix before a Double type.
34pub const DOUBLE_BYTE: u8 = b',';
35/// Byte prefix before a blob string type.
36pub const BLOB_STRING_BYTE: u8 = b'$';
37/// Byte prefix before a blob error type.
38pub const BLOB_ERROR_BYTE: u8 = b'!';
39/// Byte prefix before a boolean type.
40pub const BOOLEAN_BYTE: u8 = b'#';
41/// Byte prefix before a verbatim string type.
42pub const VERBATIM_STRING_BYTE: u8 = b'=';
43/// Byte prefix before a big number type.
44pub const BIG_NUMBER_BYTE: u8 = b'(';
45/// Byte prefix before an array type.
46pub const ARRAY_BYTE: u8 = b'*';
47/// Byte prefix before a map type.
48pub const MAP_BYTE: u8 = b'%';
49/// Byte prefix before a set type.
50pub const SET_BYTE: u8 = b'~';
51/// Byte prefix before an attribute type.
52pub const ATTRIBUTE_BYTE: u8 = b'|';
53/// Byte prefix before a push type.
54pub const PUSH_BYTE: u8 = b'>';
55/// Byte prefix before a NULL value.
56pub const NULL_BYTE: u8 = b'_';
57/// Byte used to separate the verbatim string from the format prefix.
58pub const VERBATIM_FORMAT_BYTE: u8 = b':';
59/// Byte representation of a chunked string.
60pub const CHUNKED_STRING_BYTE: u8 = b';';
61/// Byte used to signify the end of a stream.
62pub const END_STREAM_BYTE: u8 = b'.';
63
64/// Byte prefix on a streamed type, following the byte that identifies the type.
65pub const STREAMED_LENGTH_BYTE: u8 = b'?';
66/// The terminating bytes when a streaming operation is done from a blob string.
67pub const END_STREAM_STRING_BYTES: &'static str = ";0\r\n";
68/// The terminating bytes when a streaming operation is done from an aggregate type.
69pub const END_STREAM_AGGREGATE_BYTES: &'static str = ".\r\n";
70
71/// The binary representation of NULL in RESP3.
72pub const NULL: &'static str = "_\r\n";
73
74/// Byte representation of positive infinity.
75pub const INFINITY: &'static str = "inf";
76/// Byte representation of negative infinity.
77pub const NEG_INFINITY: &'static str = "-inf";
78
79/// Byte representation of HELLO.
80pub const HELLO: &'static str = "HELLO";
81/// Byte representation of `true`.
82pub const BOOL_TRUE_BYTES: &'static str = "#t\r\n";
83/// Byte representation of `false`.
84pub const BOOL_FALSE_BYTES: &'static str = "#f\r\n";
85/// Byte representation of an empty space.
86pub const EMPTY_SPACE: &'static str = " ";
87/// Byte representation of `AUTH`.
88pub const AUTH: &'static str = "AUTH";
89
90pub use crate::utils::{PATTERN_PUBSUB_PREFIX, PUBSUB_PREFIX, PUBSUB_PUSH_PREFIX};
91
92/// A map struct for frames.
93#[cfg(not(feature = "index-map"))]
94pub type FrameMap = HashMap<Frame, Frame>;
95/// A set struct for frames.
96#[cfg(not(feature = "index-map"))]
97pub type FrameSet = HashSet<Frame>;
98/// A map struct for frames.
99#[cfg(feature = "index-map")]
100pub type FrameMap = IndexMap<Frame, Frame>;
101/// A set struct for frames.
102#[cfg(feature = "index-map")]
103pub type FrameSet = IndexSet<Frame>;
104
105/// Additional information returned alongside a frame.
106pub type Attributes = FrameMap;
107
108/// The RESP version used in the `HELLO` request.
109#[derive(Clone, Debug, Eq, PartialEq)]
110pub enum RespVersion {
111  RESP2,
112  RESP3,
113}
114
115impl RespVersion {
116  pub fn to_byte(&self) -> u8 {
117    match *self {
118      RespVersion::RESP2 => b'2',
119      RespVersion::RESP3 => b'3',
120    }
121  }
122}
123
124/// Authentication information used in the `HELLO` request.
125#[derive(Clone, Debug, Eq, PartialEq)]
126pub struct Auth {
127  pub username: Str,
128  pub password: Str,
129}
130
131impl Auth {
132  /// Create an [Auth] struct using the "default" user with the provided password.
133  pub fn from_password<S: Into<Str>>(password: S) -> Auth {
134    Auth {
135      username: Str::from("default"),
136      password: password.into(),
137    }
138  }
139}
140
141/// The format of a verbatim string frame.
142#[derive(Clone, Debug, Eq, PartialEq, Hash)]
143pub enum VerbatimStringFormat {
144  Text,
145  Markdown,
146}
147
148impl VerbatimStringFormat {
149  pub(crate) fn to_str(&self) -> &'static str {
150    match *self {
151      VerbatimStringFormat::Text => "txt",
152      VerbatimStringFormat::Markdown => "mkd",
153    }
154  }
155
156  pub(crate) fn encode_len(&self) -> usize {
157    match *self {
158      // the trailing colon is included here
159      VerbatimStringFormat::Text => 4,
160      VerbatimStringFormat::Markdown => 4,
161    }
162  }
163}
164
165/// The type of frame without any associated data.
166#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
167pub enum FrameKind {
168  Array,
169  BlobString,
170  SimpleString,
171  SimpleError,
172  Number,
173  Null,
174  Double,
175  Boolean,
176  BlobError,
177  VerbatimString,
178  Map,
179  Set,
180  Attribute,
181  Push,
182  Hello,
183  BigNumber,
184  ChunkedString,
185  EndStream,
186}
187
188impl FrameKind {
189  /// Whether or not the frame is an aggregate type (array, set, map).
190  pub fn is_aggregate_type(&self) -> bool {
191    match *self {
192      FrameKind::Array | FrameKind::Set | FrameKind::Map => true,
193      _ => false,
194    }
195  }
196
197  /// Whether or not the frame is an aggregate type or blob string.
198  pub fn is_streaming_type(&self) -> bool {
199    match *self {
200      FrameKind::Array | FrameKind::Set | FrameKind::Map | FrameKind::BlobString => true,
201      _ => false,
202    }
203  }
204
205  /// A function used to differentiate data types that may have the same inner binary representation when hashing a `Frame`.
206  pub fn hash_prefix(&self) -> &'static str {
207    use self::FrameKind::*;
208
209    match *self {
210      Array => "a",
211      BlobString => "b",
212      SimpleString => "s",
213      SimpleError => "e",
214      Number => "n",
215      Null => "N",
216      Double => "d",
217      Boolean => "B",
218      BlobError => "E",
219      VerbatimString => "v",
220      Map => "m",
221      Set => "S",
222      Attribute => "A",
223      Push => "p",
224      Hello => "h",
225      BigNumber => "bn",
226      ChunkedString => "cs",
227      EndStream => "es",
228    }
229  }
230
231  /// Attempt to detect the type of the frame from the first byte.
232  pub fn from_byte(d: u8) -> Option<FrameKind> {
233    use self::FrameKind::*;
234
235    match d {
236      SIMPLE_STRING_BYTE => Some(SimpleString),
237      SIMPLE_ERROR_BYTE => Some(SimpleError),
238      NUMBER_BYTE => Some(Number),
239      DOUBLE_BYTE => Some(Double),
240      BLOB_STRING_BYTE => Some(BlobString),
241      BLOB_ERROR_BYTE => Some(BlobError),
242      BOOLEAN_BYTE => Some(Boolean),
243      VERBATIM_STRING_BYTE => Some(VerbatimString),
244      BIG_NUMBER_BYTE => Some(BigNumber),
245      ARRAY_BYTE => Some(Array),
246      MAP_BYTE => Some(Map),
247      SET_BYTE => Some(Set),
248      ATTRIBUTE_BYTE => Some(Attribute),
249      PUSH_BYTE => Some(Push),
250      NULL_BYTE => Some(Null),
251      CHUNKED_STRING_BYTE => Some(ChunkedString),
252      END_STREAM_BYTE => Some(EndStream),
253      _ => None,
254    }
255  }
256
257  /// Read the byte prefix for the associated frame type.
258  pub fn to_byte(&self) -> u8 {
259    use self::FrameKind::*;
260
261    match *self {
262      SimpleString => SIMPLE_STRING_BYTE,
263      SimpleError => SIMPLE_ERROR_BYTE,
264      Number => NUMBER_BYTE,
265      Double => DOUBLE_BYTE,
266      BlobString => BLOB_STRING_BYTE,
267      BlobError => BLOB_ERROR_BYTE,
268      Boolean => BOOLEAN_BYTE,
269      VerbatimString => VERBATIM_STRING_BYTE,
270      BigNumber => BIG_NUMBER_BYTE,
271      Array => ARRAY_BYTE,
272      Map => MAP_BYTE,
273      Set => SET_BYTE,
274      Attribute => ATTRIBUTE_BYTE,
275      Push => PUSH_BYTE,
276      Null => NULL_BYTE,
277      ChunkedString => CHUNKED_STRING_BYTE,
278      EndStream => END_STREAM_BYTE,
279      Hello => panic!("HELLO does not have a byte prefix."),
280    }
281  }
282
283  /// Whether or not the frame type is for a `HELLO` frame.
284  ///
285  /// `HELLO` is encoded differently than other frames so this is used to prevent panics in [to_byte](Self::to_byte).
286  pub fn is_hello(&self) -> bool {
287    match *self {
288      FrameKind::Hello => true,
289      _ => false,
290    }
291  }
292}
293
294/// An enum describing the possible data types in RESP3 along with the corresponding Rust data type to represent the payload.
295///
296/// <https://github.com/antirez/RESP3/blob/master/spec.md>
297#[derive(Clone, Debug)]
298pub enum Frame {
299  /// A binary-safe blob.
300  BlobString {
301    data: Bytes,
302    attributes: Option<Attributes>,
303  },
304  /// A binary-safe blob representing an error.
305  BlobError {
306    data: Bytes,
307    attributes: Option<Attributes>,
308  },
309  /// A small non binary-safe string.
310  ///
311  /// The internal data type is `Bytes` in order to support callers that use this interface to parse a `MONITOR` stream.
312  SimpleString {
313    data: Bytes,
314    attributes: Option<Attributes>,
315  },
316  /// A small non binary-safe string representing an error.
317  SimpleError { data: Str, attributes: Option<Attributes> },
318  /// A boolean type.
319  Boolean { data: bool, attributes: Option<Attributes> },
320  /// A null type.
321  Null,
322  /// A signed 64 bit integer.
323  Number { data: i64, attributes: Option<Attributes> },
324  /// A signed 64 bit floating point number.
325  Double { data: f64, attributes: Option<Attributes> },
326  /// A large number not representable as a `Number` or `Double`.
327  ///
328  /// This library does not attempt to parse this, nor does it offer any utilities to do so.
329  BigNumber {
330    data: Bytes,
331    attributes: Option<Attributes>,
332  },
333  /// A binary-safe string to be displayed without any escaping or filtering.
334  VerbatimString {
335    data: Bytes,
336    format: VerbatimStringFormat,
337    attributes: Option<Attributes>,
338  },
339  /// An array of frames, arbitrarily nested.
340  Array {
341    data: Vec<Frame>,
342    attributes: Option<Attributes>,
343  },
344  /// An unordered map of key-value pairs.
345  ///
346  /// According to the spec keys can be any other RESP3 data type. However, it doesn't make sense to implement `Hash` for certain Rust data types like
347  /// `HashMap`, `Vec`, `HashSet`, etc, so this library limits the possible data types for keys to only those that can be hashed in a semi-sane way.
348  ///
349  /// For example, attempting to create a `Frame::Map<HashMap<Frame::Set<HashSet<Frame>>, Frame::Foo>>` from bytes will panic.
350  Map {
351    data: FrameMap,
352    attributes: Option<Attributes>,
353  },
354  /// An unordered collection of other frames with a uniqueness constraint.
355  Set {
356    data: FrameSet,
357    attributes: Option<Attributes>,
358  },
359  /// Out-of-band data to be returned to the caller if necessary.
360  Push {
361    data: Vec<Frame>,
362    attributes: Option<Attributes>,
363  },
364  /// A special frame type used when first connecting to the server to describe the protocol version and optional credentials.
365  Hello { version: RespVersion, auth: Option<Auth> },
366  /// One chunk of a streaming string.
367  ChunkedString(Bytes),
368}
369
370impl Hash for Frame {
371  fn hash<H: Hasher>(&self, state: &mut H) {
372    use self::Frame::*;
373    self.kind().hash_prefix().hash(state);
374
375    match *self {
376      BlobString { ref data, .. } => data.hash(state),
377      SimpleString { ref data, .. } => data.hash(state),
378      SimpleError { ref data, .. } => data.hash(state),
379      Number { ref data, .. } => data.hash(state),
380      Null => NULL.hash(state),
381      Double { ref data, .. } => data.to_string().hash(state),
382      Boolean { ref data, .. } => data.hash(state),
383      BlobError { ref data, .. } => data.hash(state),
384      VerbatimString {
385        ref data, ref format, ..
386      } => {
387        format.hash(state);
388        data.hash(state);
389      }
390      ChunkedString(ref data) => data.hash(state),
391      BigNumber { ref data, .. } => data.hash(state),
392      _ => panic!("Invalid RESP3 data type to use as hash key."),
393    };
394  }
395}
396
397impl PartialEq for Frame {
398  fn eq(&self, other: &Self) -> bool {
399    use self::Frame::*;
400
401    match *self {
402      ChunkedString(ref b) => match *other {
403        ChunkedString(ref _b) => b == _b,
404        _ => false,
405      },
406      Array {
407        ref data,
408        ref attributes,
409      } => {
410        let (_data, _attributes) = (data, attributes);
411        match *other {
412          Array {
413            ref data,
414            ref attributes,
415          } => data == _data && attributes == _attributes,
416          _ => false,
417        }
418      }
419      BlobString {
420        ref data,
421        ref attributes,
422      } => {
423        let (_data, _attributes) = (data, attributes);
424        match *other {
425          BlobString {
426            ref data,
427            ref attributes,
428          } => data == _data && attributes == _attributes,
429          _ => false,
430        }
431      }
432      SimpleString {
433        ref data,
434        ref attributes,
435      } => {
436        let (_data, _attributes) = (data, attributes);
437        match *other {
438          SimpleString {
439            ref data,
440            ref attributes,
441          } => data == _data && attributes == _attributes,
442          _ => false,
443        }
444      }
445      SimpleError {
446        ref data,
447        ref attributes,
448      } => {
449        let (_data, _attributes) = (data, attributes);
450        match *other {
451          SimpleError {
452            ref data,
453            ref attributes,
454          } => data == _data && attributes == _attributes,
455          _ => false,
456        }
457      }
458      Number {
459        ref data,
460        ref attributes,
461      } => {
462        let (_data, _attributes) = (data, attributes);
463        match *other {
464          Number {
465            ref data,
466            ref attributes,
467          } => data == _data && attributes == _attributes,
468          _ => false,
469        }
470      }
471      Null => match *other {
472        Null => true,
473        _ => false,
474      },
475      Boolean {
476        ref data,
477        ref attributes,
478      } => {
479        let (_data, _attributes) = (data, attributes);
480        match *other {
481          Boolean {
482            ref data,
483            ref attributes,
484          } => data == _data && attributes == _attributes,
485          _ => false,
486        }
487      }
488      Double {
489        ref data,
490        ref attributes,
491      } => {
492        let (_data, _attributes) = (data, attributes);
493        match *other {
494          Double {
495            ref data,
496            ref attributes,
497          } => data == _data && attributes == _attributes,
498          _ => false,
499        }
500      }
501      BlobError {
502        ref data,
503        ref attributes,
504      } => {
505        let (_data, _attributes) = (data, attributes);
506        match *other {
507          BlobError {
508            ref data,
509            ref attributes,
510          } => data == _data && attributes == _attributes,
511          _ => false,
512        }
513      }
514      VerbatimString {
515        ref data,
516        ref format,
517        ref attributes,
518      } => {
519        let (_data, _format, _attributes) = (data, format, attributes);
520        match *other {
521          VerbatimString {
522            ref data,
523            ref format,
524            ref attributes,
525          } => _data == data && _format == format && attributes == _attributes,
526          _ => false,
527        }
528      }
529      Map {
530        ref data,
531        ref attributes,
532      } => {
533        let (_data, _attributes) = (data, attributes);
534        match *other {
535          Map {
536            ref data,
537            ref attributes,
538          } => data == _data && attributes == _attributes,
539          _ => false,
540        }
541      }
542      Set {
543        ref data,
544        ref attributes,
545      } => {
546        let (_data, _attributes) = (data, attributes);
547        match *other {
548          Set {
549            ref data,
550            ref attributes,
551          } => data == _data && attributes == _attributes,
552          _ => false,
553        }
554      }
555      Push {
556        ref data,
557        ref attributes,
558      } => {
559        let (_data, _attributes) = (data, attributes);
560        match *other {
561          Push {
562            ref data,
563            ref attributes,
564          } => data == _data && attributes == _attributes,
565          _ => false,
566        }
567      }
568      Hello { ref version, ref auth } => {
569        let (_version, _auth) = (version, auth);
570        match *other {
571          Hello { ref version, ref auth } => _version == version && _auth == auth,
572          _ => false,
573        }
574      }
575      BigNumber {
576        ref data,
577        ref attributes,
578      } => {
579        let (_data, _attributes) = (data, attributes);
580        match *other {
581          BigNumber {
582            ref data,
583            ref attributes,
584          } => data == _data && attributes == _attributes,
585          _ => false,
586        }
587      }
588    }
589  }
590}
591
592impl Eq for Frame {}
593
594#[cfg(test)]
595impl TryFrom<(FrameKind, Vec<u8>)> for Frame {
596  type Error = RedisProtocolError;
597
598  fn try_from((kind, value): (FrameKind, Vec<u8>)) -> Result<Self, Self::Error> {
599    let frame = match kind {
600      FrameKind::BlobString => Frame::BlobString {
601        data: value.into(),
602        attributes: None,
603      },
604      FrameKind::BlobError => Frame::BlobError {
605        data: value.into(),
606        attributes: None,
607      },
608      FrameKind::BigNumber => Frame::BigNumber {
609        data: value.into(),
610        attributes: None,
611      },
612      FrameKind::ChunkedString => Frame::ChunkedString(value.into()),
613      _ => {
614        return Err(RedisProtocolError::new(
615          RedisProtocolErrorKind::Unknown,
616          "Cannot convert to Frame.",
617        ))
618      }
619    };
620
621    Ok(frame)
622  }
623}
624
625#[cfg(test)]
626impl TryFrom<(FrameKind, Vec<Frame>)> for Frame {
627  type Error = RedisProtocolError;
628
629  fn try_from((kind, data): (FrameKind, Vec<Frame>)) -> Result<Self, Self::Error> {
630    let frame = match kind {
631      FrameKind::Array => Frame::Array { data, attributes: None },
632      FrameKind::Push => Frame::Push { data, attributes: None },
633      _ => {
634        return Err(RedisProtocolError::new(
635          RedisProtocolErrorKind::Unknown,
636          "Cannot convert to Frame.",
637        ))
638      }
639    };
640
641    Ok(frame)
642  }
643}
644
645#[cfg(test)]
646impl TryFrom<(FrameKind, VecDeque<Frame>)> for Frame {
647  type Error = RedisProtocolError;
648
649  fn try_from((kind, value): (FrameKind, VecDeque<Frame>)) -> Result<Self, Self::Error> {
650    let data: Vec<Frame> = value.into_iter().collect();
651
652    let frame = match kind {
653      FrameKind::Array => Frame::Array { data, attributes: None },
654      FrameKind::Push => Frame::Push { data, attributes: None },
655      _ => {
656        return Err(RedisProtocolError::new(
657          RedisProtocolErrorKind::Unknown,
658          "Cannot convert to Frame.",
659        ))
660      }
661    };
662
663    Ok(frame)
664  }
665}
666
667impl TryFrom<HashMap<Frame, Frame>> for Frame {
668  type Error = RedisProtocolError;
669
670  fn try_from(value: HashMap<Frame, Frame>) -> Result<Self, Self::Error> {
671    Ok(Frame::Map {
672      data: resp3_utils::hashmap_to_frame_map(value),
673      attributes: None,
674    })
675  }
676}
677
678impl TryFrom<HashSet<Frame>> for Frame {
679  type Error = RedisProtocolError;
680
681  fn try_from(value: HashSet<Frame>) -> Result<Self, Self::Error> {
682    Ok(Frame::Set {
683      data: resp3_utils::hashset_to_frame_set(value),
684      attributes: None,
685    })
686  }
687}
688
689#[cfg(feature = "index-map")]
690impl TryFrom<IndexMap<Frame, Frame>> for Frame {
691  type Error = RedisProtocolError;
692
693  fn try_from(value: IndexMap<Frame, Frame>) -> Result<Self, Self::Error> {
694    Ok(Frame::Map {
695      data: value,
696      attributes: None,
697    })
698  }
699}
700
701#[cfg(feature = "index-map")]
702impl TryFrom<IndexSet<Frame>> for Frame {
703  type Error = RedisProtocolError;
704
705  fn try_from(value: IndexSet<Frame>) -> Result<Self, Self::Error> {
706    Ok(Frame::Set {
707      data: value,
708      attributes: None,
709    })
710  }
711}
712
713impl From<i64> for Frame {
714  fn from(value: i64) -> Self {
715    Frame::Number {
716      data: value,
717      attributes: None,
718    }
719  }
720}
721
722impl From<bool> for Frame {
723  fn from(value: bool) -> Self {
724    Frame::Boolean {
725      data: value,
726      attributes: None,
727    }
728  }
729}
730
731impl TryFrom<f64> for Frame {
732  type Error = RedisProtocolError;
733
734  fn try_from(value: f64) -> Result<Self, Self::Error> {
735    if value.is_nan() {
736      Err(RedisProtocolError::new(
737        RedisProtocolErrorKind::Unknown,
738        "Cannot cast NaN to double.",
739      ))
740    } else {
741      Ok(Frame::Double {
742        data: value,
743        attributes: None,
744      })
745    }
746  }
747}
748
749#[cfg(test)]
750impl TryFrom<(FrameKind, String)> for Frame {
751  type Error = RedisProtocolError;
752
753  fn try_from((kind, value): (FrameKind, String)) -> Result<Self, Self::Error> {
754    let frame = match kind {
755      FrameKind::SimpleError => Frame::SimpleError {
756        data: value.into(),
757        attributes: None,
758      },
759      FrameKind::SimpleString => Frame::SimpleString {
760        data: value.into(),
761        attributes: None,
762      },
763      FrameKind::BlobError => Frame::BlobError {
764        data: value.into(),
765        attributes: None,
766      },
767      FrameKind::BlobString => Frame::BlobString {
768        data: value.into(),
769        attributes: None,
770      },
771      FrameKind::BigNumber => Frame::BigNumber {
772        data: value.into(),
773        attributes: None,
774      },
775      FrameKind::ChunkedString => Frame::ChunkedString(value.into()),
776      _ => {
777        return Err(RedisProtocolError::new(
778          RedisProtocolErrorKind::Unknown,
779          "Cannot convert to Frame.",
780        ))
781      }
782    };
783
784    Ok(frame)
785  }
786}
787
788#[cfg(test)]
789impl<'a> TryFrom<(FrameKind, &'a str)> for Frame {
790  type Error = RedisProtocolError;
791
792  fn try_from((kind, value): (FrameKind, &'a str)) -> Result<Self, Self::Error> {
793    (kind, value.to_owned()).try_into()
794  }
795}
796
797impl Frame {
798  /// Whether or not the frame can be used as a key in a `HashMap` or `HashSet`.
799  ///
800  /// Not all frame types can be hashed, and trying to do so can panic. This function can be used to handle this gracefully.
801  pub fn can_hash(&self) -> bool {
802    match self.kind() {
803      FrameKind::BlobString
804      | FrameKind::BlobError
805      | FrameKind::SimpleString
806      | FrameKind::SimpleError
807      | FrameKind::Number
808      | FrameKind::Double
809      | FrameKind::Boolean
810      | FrameKind::Null
811      | FrameKind::ChunkedString
812      | FrameKind::EndStream
813      | FrameKind::VerbatimString
814      | FrameKind::BigNumber => true,
815      _ => false,
816    }
817  }
818
819  /// Read the attributes attached to the frame.
820  pub fn attributes(&self) -> Option<&Attributes> {
821    let attributes = match *self {
822      Frame::Array { ref attributes, .. } => attributes,
823      Frame::Push { ref attributes, .. } => attributes,
824      Frame::BlobString { ref attributes, .. } => attributes,
825      Frame::BlobError { ref attributes, .. } => attributes,
826      Frame::BigNumber { ref attributes, .. } => attributes,
827      Frame::Boolean { ref attributes, .. } => attributes,
828      Frame::Number { ref attributes, .. } => attributes,
829      Frame::Double { ref attributes, .. } => attributes,
830      Frame::VerbatimString { ref attributes, .. } => attributes,
831      Frame::SimpleError { ref attributes, .. } => attributes,
832      Frame::SimpleString { ref attributes, .. } => attributes,
833      Frame::Set { ref attributes, .. } => attributes,
834      Frame::Map { ref attributes, .. } => attributes,
835      Frame::Null | Frame::ChunkedString(_) | Frame::Hello { .. } => return None,
836    };
837
838    attributes.as_ref()
839  }
840
841  /// Take the attributes off this frame.
842  pub fn take_attributes(&mut self) -> Option<Attributes> {
843    let attributes = match *self {
844      Frame::Array { ref mut attributes, .. } => attributes,
845      Frame::Push { ref mut attributes, .. } => attributes,
846      Frame::BlobString { ref mut attributes, .. } => attributes,
847      Frame::BlobError { ref mut attributes, .. } => attributes,
848      Frame::BigNumber { ref mut attributes, .. } => attributes,
849      Frame::Boolean { ref mut attributes, .. } => attributes,
850      Frame::Number { ref mut attributes, .. } => attributes,
851      Frame::Double { ref mut attributes, .. } => attributes,
852      Frame::VerbatimString { ref mut attributes, .. } => attributes,
853      Frame::SimpleError { ref mut attributes, .. } => attributes,
854      Frame::SimpleString { ref mut attributes, .. } => attributes,
855      Frame::Set { ref mut attributes, .. } => attributes,
856      Frame::Map { ref mut attributes, .. } => attributes,
857      Frame::Null | Frame::ChunkedString(_) | Frame::Hello { .. } => return None,
858    };
859
860    attributes.take()
861  }
862
863  /// Read a mutable reference to any attributes attached to the frame.
864  pub fn attributes_mut(&mut self) -> Option<&mut Attributes> {
865    let attributes = match *self {
866      Frame::Array { ref mut attributes, .. } => attributes,
867      Frame::Push { ref mut attributes, .. } => attributes,
868      Frame::BlobString { ref mut attributes, .. } => attributes,
869      Frame::BlobError { ref mut attributes, .. } => attributes,
870      Frame::BigNumber { ref mut attributes, .. } => attributes,
871      Frame::Boolean { ref mut attributes, .. } => attributes,
872      Frame::Number { ref mut attributes, .. } => attributes,
873      Frame::Double { ref mut attributes, .. } => attributes,
874      Frame::VerbatimString { ref mut attributes, .. } => attributes,
875      Frame::SimpleError { ref mut attributes, .. } => attributes,
876      Frame::SimpleString { ref mut attributes, .. } => attributes,
877      Frame::Set { ref mut attributes, .. } => attributes,
878      Frame::Map { ref mut attributes, .. } => attributes,
879      Frame::Null | Frame::ChunkedString(_) | Frame::Hello { .. } => return None,
880    };
881
882    attributes.as_mut()
883  }
884
885  /// Attempt to add attributes to the frame, extending the existing attributes if needed.
886  pub fn add_attributes(&mut self, attributes: Attributes) -> Result<(), RedisProtocolError> {
887    let _attributes = match *self {
888      Frame::Array { ref mut attributes, .. } => attributes,
889      Frame::Push { ref mut attributes, .. } => attributes,
890      Frame::BlobString { ref mut attributes, .. } => attributes,
891      Frame::BlobError { ref mut attributes, .. } => attributes,
892      Frame::BigNumber { ref mut attributes, .. } => attributes,
893      Frame::Boolean { ref mut attributes, .. } => attributes,
894      Frame::Number { ref mut attributes, .. } => attributes,
895      Frame::Double { ref mut attributes, .. } => attributes,
896      Frame::VerbatimString { ref mut attributes, .. } => attributes,
897      Frame::SimpleError { ref mut attributes, .. } => attributes,
898      Frame::SimpleString { ref mut attributes, .. } => attributes,
899      Frame::Set { ref mut attributes, .. } => attributes,
900      Frame::Map { ref mut attributes, .. } => attributes,
901      Frame::Null | Frame::ChunkedString(_) | Frame::Hello { .. } => {
902        return Err(RedisProtocolError::new(
903          RedisProtocolErrorKind::Unknown,
904          format!("{:?} cannot have attributes.", self.kind()),
905        ))
906      }
907    };
908
909    if let Some(_attributes) = _attributes.as_mut() {
910      _attributes.extend(attributes.into_iter());
911    } else {
912      *_attributes = Some(attributes);
913    }
914
915    Ok(())
916  }
917
918  /// Create a new `Frame` that terminates a stream.
919  pub fn new_end_stream() -> Self {
920    Frame::ChunkedString(Bytes::new())
921  }
922
923  /// A context-aware length function that returns the length of the inner frame contents.
924  ///
925  /// This does not return the encoded length, but rather the length of the contents of the frame such as the number of elements in an array, the size of any inner buffers, etc.
926  ///
927  /// Note: `Null` has a length of 0 and `Hello`, `Number`, `Double`, and `Boolean` have a length of 1.
928  ///
929  /// See [encode_len](Self::encode_len) to read the number of bytes necessary to encode the frame.
930  pub fn len(&self) -> usize {
931    use self::Frame::*;
932
933    match *self {
934      Array { ref data, .. } | Push { ref data, .. } => data.len(),
935      BlobString { ref data, .. }
936      | BlobError { ref data, .. }
937      | BigNumber { ref data, .. }
938      | ChunkedString(ref data) => data.len(),
939      SimpleString { ref data, .. } => data.len(),
940      SimpleError { ref data, .. } => data.len(),
941      Number { .. } | Double { .. } | Boolean { .. } => 1,
942      Null => 0,
943      VerbatimString { ref data, .. } => data.len(),
944      Map { ref data, .. } => data.len(),
945      Set { ref data, .. } => data.len(),
946      Hello { .. } => 1,
947    }
948  }
949
950  /// Replace `self` with Null, returning the original value.
951  pub fn take(&mut self) -> Frame {
952    mem::replace(self, Frame::Null)
953  }
954
955  /// Read the associated `FrameKind`.
956  pub fn kind(&self) -> FrameKind {
957    use self::Frame::*;
958
959    match *self {
960      Array { .. } => FrameKind::Array,
961      BlobString { .. } => FrameKind::BlobString,
962      SimpleString { .. } => FrameKind::SimpleString,
963      SimpleError { .. } => FrameKind::SimpleError,
964      Number { .. } => FrameKind::Number,
965      Null => FrameKind::Null,
966      Double { .. } => FrameKind::Double,
967      BlobError { .. } => FrameKind::BlobError,
968      VerbatimString { .. } => FrameKind::VerbatimString,
969      Boolean { .. } => FrameKind::Boolean,
970      Map { .. } => FrameKind::Map,
971      Set { .. } => FrameKind::Set,
972      Push { .. } => FrameKind::Push,
973      Hello { .. } => FrameKind::Hello,
974      BigNumber { .. } => FrameKind::BigNumber,
975      ChunkedString(ref inner) => {
976        if inner.is_empty() {
977          FrameKind::EndStream
978        } else {
979          FrameKind::ChunkedString
980        }
981      }
982    }
983  }
984
985  /// Whether or not the frame is a `Null` variant.
986  pub fn is_null(&self) -> bool {
987    match *self {
988      Frame::Null => true,
989      _ => false,
990    }
991  }
992
993  /// Whether or not the frame represents an array of frames.
994  pub fn is_array(&self) -> bool {
995    match *self {
996      Frame::Array { .. } => true,
997      _ => false,
998    }
999  }
1000
1001  /// Whether or not the frame represents data pushed to the client from the server.
1002  pub fn is_push(&self) -> bool {
1003    match *self {
1004      Frame::Push { .. } => true,
1005      _ => false,
1006    }
1007  }
1008
1009  /// Whether or not the frame is a boolean value.
1010  pub fn is_boolean(&self) -> bool {
1011    match *self {
1012      Frame::Boolean { .. } => true,
1013      _ => false,
1014    }
1015  }
1016
1017  /// Whether or not the frame represents an error.
1018  pub fn is_error(&self) -> bool {
1019    match *self {
1020      Frame::BlobError { .. } | Frame::SimpleError { .. } => true,
1021      _ => false,
1022    }
1023  }
1024
1025  /// Whether or not the frame is an array, map, or set.
1026  pub fn is_aggregate_type(&self) -> bool {
1027    match *self {
1028      Frame::Map { .. } | Frame::Set { .. } | Frame::Array { .. } => true,
1029      _ => false,
1030    }
1031  }
1032
1033  /// Whether or not the frame represents a `BlobString` or `BlobError`.
1034  pub fn is_blob(&self) -> bool {
1035    match *self {
1036      Frame::BlobString { .. } | Frame::BlobError { .. } => true,
1037      _ => false,
1038    }
1039  }
1040
1041  /// Whether or not the frame represents a chunked string.
1042  pub fn is_chunked_string(&self) -> bool {
1043    match *self {
1044      Frame::ChunkedString(_) => true,
1045      _ => false,
1046    }
1047  }
1048
1049  /// Whether or not the frame is an empty chunked string, signifying the end of a chunked string stream.
1050  pub fn is_end_stream_frame(&self) -> bool {
1051    match *self {
1052      Frame::ChunkedString(ref s) => s.is_empty(),
1053      _ => false,
1054    }
1055  }
1056
1057  /// Whether or not the frame is a verbatim string.
1058  pub fn is_verbatim_string(&self) -> bool {
1059    match *self {
1060      Frame::VerbatimString { .. } => true,
1061      _ => false,
1062    }
1063  }
1064
1065  /// If the frame is a verbatim string then read the associated format.
1066  pub fn verbatim_string_format(&self) -> Option<&VerbatimStringFormat> {
1067    match *self {
1068      Frame::VerbatimString { ref format, .. } => Some(format),
1069      _ => None,
1070    }
1071  }
1072
1073  /// Read the frame as a string slice if it can be parsed as a UTF-8 string without allocating.
1074  ///
1075  /// Numbers and Doubles will not be cast to a string since that would require allocating.
1076  pub fn as_str(&self) -> Option<&str> {
1077    match *self {
1078      Frame::SimpleError { ref data, .. } => Some(data),
1079      Frame::SimpleString { ref data, .. } => str::from_utf8(data).ok(),
1080      Frame::BlobError { ref data, .. } | Frame::BlobString { ref data, .. } | Frame::BigNumber { ref data, .. } => {
1081        str::from_utf8(data).ok()
1082      }
1083      Frame::VerbatimString { ref data, .. } => str::from_utf8(data).ok(),
1084      Frame::ChunkedString(ref data) => str::from_utf8(data).ok(),
1085      _ => None,
1086    }
1087  }
1088
1089  /// Read the frame as a `String` if it can be parsed as a UTF-8 string.
1090  pub fn to_string(&self) -> Option<String> {
1091    match *self {
1092      Frame::SimpleError { ref data, .. } => Some(data.to_string()),
1093      Frame::SimpleString { ref data, .. } => String::from_utf8(data.to_vec()).ok(),
1094      Frame::BlobError { ref data, .. } | Frame::BlobString { ref data, .. } | Frame::BigNumber { ref data, .. } => {
1095        String::from_utf8(data.to_vec()).ok()
1096      }
1097      Frame::VerbatimString { ref data, .. } => String::from_utf8(data.to_vec()).ok(),
1098      Frame::ChunkedString(ref b) => String::from_utf8(b.to_vec()).ok(),
1099      Frame::Double { ref data, .. } => Some(data.to_string()),
1100      Frame::Number { ref data, .. } => Some(data.to_string()),
1101      _ => None,
1102    }
1103  }
1104
1105  /// Attempt to read the frame as a byte slice.
1106  pub fn as_bytes(&self) -> Option<&[u8]> {
1107    match *self {
1108      Frame::SimpleError { ref data, .. } => Some(data.as_bytes()),
1109      Frame::SimpleString { ref data, .. } => Some(&data),
1110      Frame::BlobError { ref data, .. } | Frame::BlobString { ref data, .. } | Frame::BigNumber { ref data, .. } => {
1111        Some(data)
1112      }
1113      Frame::VerbatimString { ref data, .. } => Some(data),
1114      Frame::ChunkedString(ref b) => Some(b),
1115      _ => None,
1116    }
1117  }
1118
1119  /// Attempt to read the frame as an `i64`.
1120  pub fn as_i64(&self) -> Option<i64> {
1121    match *self {
1122      Frame::Number { ref data, .. } => Some(*data),
1123      Frame::Double { ref data, .. } => Some(*data as i64),
1124      Frame::BlobString { ref data, .. } => str::from_utf8(data).ok().and_then(|s| s.parse::<i64>().ok()),
1125      Frame::SimpleString { ref data, .. } => str::from_utf8(data).ok().and_then(|s| s.parse::<i64>().ok()),
1126      _ => None,
1127    }
1128  }
1129
1130  /// Attempt to read the frame as an `f64`.
1131  pub fn as_f64(&self) -> Option<f64> {
1132    match *self {
1133      Frame::Double { ref data, .. } => Some(*data),
1134      Frame::Number { ref data, .. } => Some(*data as f64),
1135      Frame::BlobString { ref data, .. } => str::from_utf8(data).ok().and_then(|s| s.parse::<f64>().ok()),
1136      Frame::SimpleString { ref data, .. } => str::from_utf8(data).ok().and_then(|s| s.parse::<f64>().ok()),
1137      _ => None,
1138    }
1139  }
1140
1141  /// Whether or not the frame represents a MOVED or ASK error.
1142  pub fn is_moved_or_ask_error(&self) -> bool {
1143    match *self {
1144      Frame::SimpleError { ref data, .. } => utils::is_cluster_error(data),
1145      _ => false,
1146    }
1147  }
1148
1149  /// Attempt to parse the frame as a cluster redirection error.
1150  pub fn to_redirection(&self) -> Option<Redirection> {
1151    match *self {
1152      Frame::SimpleError { ref data, .. } => utils::read_cluster_error(data),
1153      _ => None,
1154    }
1155  }
1156
1157  /// Whether or not the frame represents a publish-subscribe message, but not a pattern publish-subscribe message.
1158  pub fn is_normal_pubsub(&self) -> bool {
1159    if let Frame::Push { ref data, .. } = *self {
1160      resp3_utils::is_normal_pubsub(data)
1161    } else {
1162      false
1163    }
1164  }
1165
1166  /// Whether or not the frame represents a message on a publish-subscribe channel.
1167  pub fn is_pubsub_message(&self) -> bool {
1168    if let Frame::Push { ref data, .. } = *self {
1169      resp3_utils::is_normal_pubsub(data) || resp3_utils::is_pattern_pubsub(data)
1170    } else {
1171      false
1172    }
1173  }
1174
1175  /// Whether or not the frame represents a message on a publish-subscribe channel matched against a pattern subscription.
1176  pub fn is_pattern_pubsub_message(&self) -> bool {
1177    if let Frame::Push { ref data, .. } = *self {
1178      resp3_utils::is_pattern_pubsub(data)
1179    } else {
1180      false
1181    }
1182  }
1183
1184  /// Attempt to parse the frame as a publish-subscribe message, returning the `(channel, message)` tuple
1185  /// if successful, or the original frame if the inner data is not a publish-subscribe message.
1186  pub fn parse_as_pubsub(self) -> Result<(Frame, Frame), Self> {
1187    if self.is_pubsub_message() {
1188      if let Frame::Push { mut data, .. } = self {
1189        // array len checked in `is_pubsub_message`
1190        let message = data.pop().unwrap();
1191        let channel = data.pop().unwrap();
1192
1193        Ok((channel, message))
1194      } else {
1195        warn!("Invalid pubsub frame. Expected a Push frame.");
1196        Err(self)
1197      }
1198    } else {
1199      Err(self)
1200    }
1201  }
1202
1203  /// Attempt to read the number of bytes needed to encode the frame.
1204  pub fn encode_len(&self) -> Result<usize, RedisProtocolError> {
1205    resp3_utils::encode_len(self).map_err(|e| e.into())
1206  }
1207}
1208
1209/// A helper struct for reading and managing streaming data types.
1210///
1211/// ```rust edition2018
1212/// # use bytes::Bytes;
1213/// use redis_protocol::resp3::decode::streaming::decode;
1214///
1215/// fn main() {
1216///   let parts: Vec<Bytes> = vec!["*?\r\n".into(), ":1\r\n".into(), ":2\r\n".into(), ".\r\n".into()];
1217///
1218///   let (frame, _) = decode(&parts[0]).unwrap().unwrap();
1219///   assert!(frame.is_streaming());
1220///   let mut streaming = frame.into_streaming_frame().unwrap();
1221///   println!("Reading streaming {:?}", streaming.kind);
1222///
1223///   let (frame, _) = decode(&parts[1]).unwrap().unwrap();
1224///   assert!(frame.is_complete());
1225///   // add frames to the buffer until we reach the terminating byte sequence
1226///   streaming.add_frame(frame.into_complete_frame().unwrap());
1227///
1228///   let (frame, _) = decode(&parts[2]).unwrap().unwrap();
1229///   assert!(frame.is_complete());
1230///   streaming.add_frame(frame.into_complete_frame().unwrap());
1231///
1232///   let (frame, _) = decode(&parts[3]).unwrap().unwrap();
1233///   assert!(frame.is_complete());
1234///   streaming.add_frame(frame.into_complete_frame().unwrap());
1235///
1236///   assert!(streaming.is_finished());
1237///   // convert the buffer into one frame
1238///   let result = streaming.into_frame().unwrap();
1239///
1240///   // Frame::Array { data: [1, 2], attributes: None }
1241///   println!("{:?}", result);
1242/// }
1243/// ```
1244#[derive(Debug, Eq, PartialEq)]
1245pub struct StreamedFrame {
1246  /// The internal buffer of frames and attributes.
1247  buffer: VecDeque<Frame>,
1248  /// Any leading attributes before the stream starts.
1249  pub attributes: Option<Attributes>,
1250  /// The data type being streamed.  
1251  pub kind: FrameKind,
1252}
1253
1254impl StreamedFrame {
1255  /// Create a new `StreamedFrame` from the first section of data in a streaming response.
1256  pub fn new(kind: FrameKind) -> Self {
1257    let buffer = VecDeque::new();
1258    StreamedFrame {
1259      buffer,
1260      kind,
1261      attributes: None,
1262    }
1263  }
1264
1265  /// Convert the internal buffer into one frame matching `self.kind`, clearing the internal buffer.
1266  pub fn into_frame(&mut self) -> Result<Frame, RedisProtocolError> {
1267    if !self.kind.is_streaming_type() {
1268      // try to catch invalid type errors early so the caller can modify the frame before we clear the buffer
1269      return Err(RedisProtocolError::new(
1270        RedisProtocolErrorKind::DecodeError,
1271        "Only blob strings, sets, maps, and arrays can be streamed.",
1272      ));
1273    }
1274
1275    if self.is_finished() {
1276      // the last frame is an empty chunked string when the stream is finished
1277      let _ = self.buffer.pop_back();
1278    }
1279    let buffer = mem::replace(&mut self.buffer, VecDeque::new());
1280    let attributes = self.attributes.take();
1281
1282    let frame = match self.kind {
1283      FrameKind::BlobString => resp3_utils::reconstruct_blobstring(buffer, attributes)?,
1284      FrameKind::Map => resp3_utils::reconstruct_map(buffer, attributes)?,
1285      FrameKind::Set => resp3_utils::reconstruct_set(buffer, attributes)?,
1286      FrameKind::Array => resp3_utils::reconstruct_array(buffer, attributes)?,
1287      _ => {
1288        return Err(RedisProtocolError::new(
1289          RedisProtocolErrorKind::DecodeError,
1290          "Streaming frames only supported for blob strings, maps, sets, and arrays.",
1291        ))
1292      }
1293    };
1294
1295    Ok(frame)
1296  }
1297
1298  /// Add a frame to the internal buffer.
1299  pub fn add_frame(&mut self, data: Frame) {
1300    self.buffer.push_back(data);
1301  }
1302
1303  /// Whether or not the last frame represents the terminating sequence at the end of a frame stream.
1304  pub fn is_finished(&self) -> bool {
1305    self.buffer.back().map(|f| f.is_end_stream_frame()).unwrap_or(false)
1306  }
1307}
1308
1309/// Wrapper enum around a decoded frame that supports streaming frames.
1310#[derive(Debug, Eq, PartialEq)]
1311pub enum DecodedFrame {
1312  Streaming(StreamedFrame),
1313  Complete(Frame),
1314}
1315
1316impl DecodedFrame {
1317  /// Add attributes to the decoded frame, if possible.
1318  pub fn add_attributes(&mut self, attributes: Attributes) -> Result<(), RedisProtocolError> {
1319    let _ = match *self {
1320      DecodedFrame::Streaming(ref mut inner) => inner.attributes = Some(attributes),
1321      DecodedFrame::Complete(ref mut inner) => inner.add_attributes(attributes)?,
1322    };
1323
1324    Ok(())
1325  }
1326
1327  /// Convert the decoded frame to a complete frame, returning an error if a streaming variant is found.
1328  pub fn into_complete_frame(self) -> Result<Frame, RedisProtocolError> {
1329    match self {
1330      DecodedFrame::Complete(frame) => Ok(frame),
1331      DecodedFrame::Streaming(_) => Err(RedisProtocolError::new(
1332        RedisProtocolErrorKind::DecodeError,
1333        "Expected complete frame.",
1334      )),
1335    }
1336  }
1337
1338  /// Convert the decoded frame into a streaming frame, returning an error if a complete variant is found.
1339  pub fn into_streaming_frame(self) -> Result<StreamedFrame, RedisProtocolError> {
1340    match self {
1341      DecodedFrame::Streaming(frame) => Ok(frame),
1342      DecodedFrame::Complete(_) => Err(RedisProtocolError::new(
1343        RedisProtocolErrorKind::DecodeError,
1344        "Expected streamed frame.",
1345      )),
1346    }
1347  }
1348
1349  /// Whether or not the decoded frame starts a stream.
1350  pub fn is_streaming(&self) -> bool {
1351    match *self {
1352      DecodedFrame::Streaming(_) => true,
1353      _ => false,
1354    }
1355  }
1356
1357  /// Whether or not the decoded frame is a complete frame.
1358  pub fn is_complete(&self) -> bool {
1359    !self.is_streaming()
1360  }
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365  use super::*;
1366  use crate::resp3::utils::new_map;
1367
1368  #[test]
1369  fn should_convert_basic_streaming_buffer_to_frame() {
1370    let mut streaming_buf = StreamedFrame::new(FrameKind::BlobString);
1371    streaming_buf.add_frame((FrameKind::ChunkedString, "foo").try_into().unwrap());
1372    streaming_buf.add_frame((FrameKind::ChunkedString, "bar").try_into().unwrap());
1373    streaming_buf.add_frame((FrameKind::ChunkedString, "baz").try_into().unwrap());
1374    streaming_buf.add_frame(Frame::new_end_stream());
1375    let frame = streaming_buf
1376      .into_frame()
1377      .expect("Failed to build frame from chunked stream");
1378
1379    assert_eq!(frame.as_str(), Some("foobarbaz"));
1380  }
1381
1382  #[test]
1383  fn should_convert_basic_streaming_buffer_to_frame_with_attributes() {
1384    let mut attributes = new_map(None);
1385    attributes.insert((FrameKind::SimpleString, "a").try_into().unwrap(), 1.into());
1386    attributes.insert((FrameKind::SimpleString, "b").try_into().unwrap(), 2.into());
1387    attributes.insert((FrameKind::SimpleString, "c").try_into().unwrap(), 3.into());
1388
1389    let mut streaming_buf = StreamedFrame::new(FrameKind::BlobString);
1390    streaming_buf.attributes = Some(attributes.clone());
1391
1392    streaming_buf.add_frame((FrameKind::ChunkedString, "foo").try_into().unwrap());
1393    streaming_buf.add_frame((FrameKind::ChunkedString, "bar").try_into().unwrap());
1394    streaming_buf.add_frame((FrameKind::ChunkedString, "baz").try_into().unwrap());
1395    streaming_buf.add_frame(Frame::new_end_stream());
1396
1397    let frame = streaming_buf
1398      .into_frame()
1399      .expect("Failed to build frame from chunked stream");
1400
1401    assert_eq!(frame.as_str(), Some("foobarbaz"));
1402    assert_eq!(frame.attributes(), Some(&attributes));
1403  }
1404}