redis_protocol/resp3/
types.rs

1use crate::{
2  error::{RedisProtocolError, RedisProtocolErrorKind},
3  resp3::utils as resp3_utils,
4  types::{_Range, PATTERN_PUBSUB_PREFIX, PUBSUB_PREFIX, PUBSUB_PUSH_PREFIX, SHARD_PUBSUB_PREFIX},
5  utils,
6};
7use alloc::{
8  collections::VecDeque,
9  format,
10  string::{String, ToString},
11  vec::Vec,
12};
13use core::{
14  fmt::Debug,
15  hash::{Hash, Hasher},
16  mem,
17  str,
18};
19
20#[cfg(feature = "convert")]
21use crate::convert::FromResp3;
22#[cfg(feature = "bytes")]
23use bytes::{Bytes, BytesMut};
24#[cfg(feature = "bytes")]
25use bytes_utils::Str;
26
27#[cfg(feature = "hashbrown")]
28use hashbrown::{HashMap, HashSet};
29#[cfg(feature = "index-map")]
30use indexmap::{IndexMap, IndexSet};
31#[cfg(feature = "std")]
32use std::collections::{HashMap, HashSet};
33
34/// Byte prefix before a simple string type.
35pub const SIMPLE_STRING_BYTE: u8 = b'+';
36/// Byte prefix before a simple error type.
37pub const SIMPLE_ERROR_BYTE: u8 = b'-';
38/// Byte prefix before a Number type.
39pub const NUMBER_BYTE: u8 = b':';
40/// Byte prefix before a Double type.
41pub const DOUBLE_BYTE: u8 = b',';
42/// Byte prefix before a blob string type.
43pub const BLOB_STRING_BYTE: u8 = b'$';
44/// Byte prefix before a blob error type.
45pub const BLOB_ERROR_BYTE: u8 = b'!';
46/// Byte prefix before a boolean type.
47pub const BOOLEAN_BYTE: u8 = b'#';
48/// Byte prefix before a verbatim string type.
49pub const VERBATIM_STRING_BYTE: u8 = b'=';
50/// Byte prefix before a big number type.
51pub const BIG_NUMBER_BYTE: u8 = b'(';
52/// Byte prefix before an array type.
53pub const ARRAY_BYTE: u8 = b'*';
54/// Byte prefix before a map type.
55pub const MAP_BYTE: u8 = b'%';
56/// Byte prefix before a set type.
57pub const SET_BYTE: u8 = b'~';
58/// Byte prefix before an attribute type.
59pub const ATTRIBUTE_BYTE: u8 = b'|';
60/// Byte prefix before a push type.
61pub const PUSH_BYTE: u8 = b'>';
62/// Byte prefix before a NULL value.
63pub const NULL_BYTE: u8 = b'_';
64/// Byte used to separate the verbatim string from the format prefix.
65pub const VERBATIM_FORMAT_BYTE: u8 = b':';
66/// Byte representation of a chunked string.
67pub const CHUNKED_STRING_BYTE: u8 = b';';
68/// Byte used to signify the end of a stream.
69pub const END_STREAM_BYTE: u8 = b'.';
70
71/// Byte prefix on a streamed type, following the byte that identifies the type.
72pub const STREAMED_LENGTH_BYTE: u8 = b'?';
73/// The terminating bytes when a streaming operation is done from a blob string.
74pub const END_STREAM_STRING_BYTES: &str = ";0\r\n";
75/// The terminating bytes when a streaming operation is done from an aggregate type.
76pub const END_STREAM_AGGREGATE_BYTES: &str = ".\r\n";
77
78/// The binary representation of NULL in RESP3.
79pub const NULL: &str = "_\r\n";
80
81/// Byte representation of positive infinity.
82pub const INFINITY: &str = "inf";
83/// Byte representation of negative infinity.
84pub const NEG_INFINITY: &str = "-inf";
85/// Byte representation of NaN.
86pub const NAN: &str = "nan";
87
88/// Byte representation of HELLO.
89pub const HELLO: &str = "HELLO";
90/// Byte representation of `true`.
91pub const BOOL_TRUE_BYTES: &str = "#t\r\n";
92/// Byte representation of `false`.
93pub const BOOL_FALSE_BYTES: &str = "#f\r\n";
94/// Byte representation of an empty space.
95pub const EMPTY_SPACE: &str = " ";
96/// Byte representation of `AUTH`.
97pub const AUTH: &str = "AUTH";
98/// Byte representation of `SETNAME`.
99pub const SETNAME: &str = "SETNAME";
100
101/// A map struct for frames that uses either `indexmap::IndexMap`, `hashbrown::HashMap`, or
102/// `std::collections::HashMap` depending on the enabled feature flags.
103#[cfg(not(feature = "index-map"))]
104pub type FrameMap<K, V> = HashMap<K, V>;
105/// A map struct for frames that uses either `indexmap::IndexSet`, `hashbrown::HashSet`, or
106/// `std::collections::HashSet` depending on the enabled feature flags.
107#[cfg(not(feature = "index-map"))]
108pub type FrameSet<T> = HashSet<T>;
109/// A map struct for frames that uses either `indexmap::IndexMap`, `hashbrown::HashMap`, or
110/// `std::collections::HashMap` depending on the enabled feature flags.
111#[cfg(feature = "index-map")]
112pub type FrameMap<K, V> = IndexMap<K, V>;
113/// A map struct for frames that uses either `indexmap::IndexSet`, `hashbrown::HashSet`, or
114/// `std::collections::HashSet` depending on the enabled feature flags.
115#[cfg(feature = "index-map")]
116pub type FrameSet<T> = IndexSet<T>;
117
118/// Additional information returned alongside a [BytesFrame].
119#[cfg(feature = "bytes")]
120#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
121pub type BytesAttributes = FrameMap<BytesFrame, BytesFrame>;
122/// Additional information returned alongside an [OwnedFrame].
123pub type OwnedAttributes = FrameMap<OwnedFrame, OwnedFrame>;
124/// Additional information returned alongside an [RangeFrame].
125pub type RangeAttributes = FrameMap<RangeFrame, RangeFrame>;
126
127/// The RESP version used in the `HELLO` request.
128#[derive(Clone, Debug, Eq, PartialEq)]
129pub enum RespVersion {
130  RESP2,
131  RESP3,
132}
133
134impl RespVersion {
135  pub fn to_byte(&self) -> u8 {
136    match *self {
137      RespVersion::RESP2 => b'2',
138      RespVersion::RESP3 => b'3',
139    }
140  }
141}
142
143/// The format of a verbatim string frame.
144#[derive(Clone, Debug, Eq, PartialEq, Hash)]
145pub enum VerbatimStringFormat {
146  Text,
147  Markdown,
148}
149
150impl VerbatimStringFormat {
151  pub(crate) fn to_str(&self) -> &'static str {
152    match *self {
153      VerbatimStringFormat::Text => "txt",
154      VerbatimStringFormat::Markdown => "mkd",
155    }
156  }
157
158  pub(crate) fn encode_len(&self) -> usize {
159    match *self {
160      // the trailing colon is included here
161      VerbatimStringFormat::Text => 4,
162      VerbatimStringFormat::Markdown => 4,
163    }
164  }
165}
166
167/// The type of frame without any associated data.
168#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
169pub enum FrameKind {
170  Array,
171  BlobString,
172  SimpleString,
173  SimpleError,
174  Number,
175  Null,
176  Double,
177  Boolean,
178  BlobError,
179  VerbatimString,
180  Map,
181  Set,
182  Attribute,
183  Push,
184  Hello,
185  BigNumber,
186  ChunkedString,
187  EndStream,
188}
189
190impl FrameKind {
191  /// Whether the frame is an aggregate type (array, set, map).
192  pub fn is_aggregate_type(&self) -> bool {
193    matches!(self, FrameKind::Array | FrameKind::Set | FrameKind::Map)
194  }
195
196  /// Whether the frame is an aggregate type or blob string.
197  pub fn is_streaming_type(&self) -> bool {
198    matches!(
199      self,
200      FrameKind::Array | FrameKind::Set | FrameKind::Map | FrameKind::BlobString
201    )
202  }
203
204  /// Whether the frame can be used as a key in a `HashMap` or `HashSet`.
205  ///
206  /// Not all frame types can be hashed, and trying to do so can panic. This function can be used to handle this
207  /// gracefully.
208  pub fn can_hash(&self) -> bool {
209    matches!(
210      self,
211      FrameKind::BlobString
212        | FrameKind::BlobError
213        | FrameKind::SimpleString
214        | FrameKind::SimpleError
215        | FrameKind::Number
216        | FrameKind::Double
217        | FrameKind::Boolean
218        | FrameKind::Null
219        | FrameKind::ChunkedString
220        | FrameKind::EndStream
221        | FrameKind::VerbatimString
222        | FrameKind::BigNumber
223    )
224  }
225
226  /// A function used to differentiate data types that may have the same inner binary representation when hashing.
227  pub fn hash_prefix(&self) -> &'static str {
228    use self::FrameKind::*;
229
230    match *self {
231      Array => "a",
232      BlobString => "b",
233      SimpleString => "s",
234      SimpleError => "e",
235      Number => "n",
236      Null => "N",
237      Double => "d",
238      Boolean => "B",
239      BlobError => "E",
240      VerbatimString => "v",
241      Map => "m",
242      Set => "S",
243      Attribute => "A",
244      Push => "p",
245      Hello => "h",
246      BigNumber => "bn",
247      ChunkedString => "cs",
248      EndStream => "es",
249    }
250  }
251
252  /// Attempt to detect the type of the frame from the first byte.
253  pub fn from_byte(d: u8) -> Option<FrameKind> {
254    use self::FrameKind::*;
255
256    match d {
257      SIMPLE_STRING_BYTE => Some(SimpleString),
258      SIMPLE_ERROR_BYTE => Some(SimpleError),
259      NUMBER_BYTE => Some(Number),
260      DOUBLE_BYTE => Some(Double),
261      BLOB_STRING_BYTE => Some(BlobString),
262      BLOB_ERROR_BYTE => Some(BlobError),
263      BOOLEAN_BYTE => Some(Boolean),
264      VERBATIM_STRING_BYTE => Some(VerbatimString),
265      BIG_NUMBER_BYTE => Some(BigNumber),
266      ARRAY_BYTE => Some(Array),
267      MAP_BYTE => Some(Map),
268      SET_BYTE => Some(Set),
269      ATTRIBUTE_BYTE => Some(Attribute),
270      PUSH_BYTE => Some(Push),
271      NULL_BYTE => Some(Null),
272      CHUNKED_STRING_BYTE => Some(ChunkedString),
273      END_STREAM_BYTE => Some(EndStream),
274      _ => None,
275    }
276  }
277
278  /// Read the byte prefix for the associated frame type.
279  pub fn to_byte(&self) -> u8 {
280    use self::FrameKind::*;
281
282    match *self {
283      SimpleString => SIMPLE_STRING_BYTE,
284      SimpleError => SIMPLE_ERROR_BYTE,
285      Number => NUMBER_BYTE,
286      Double => DOUBLE_BYTE,
287      BlobString => BLOB_STRING_BYTE,
288      BlobError => BLOB_ERROR_BYTE,
289      Boolean => BOOLEAN_BYTE,
290      VerbatimString => VERBATIM_STRING_BYTE,
291      BigNumber => BIG_NUMBER_BYTE,
292      Array => ARRAY_BYTE,
293      Map => MAP_BYTE,
294      Set => SET_BYTE,
295      Attribute => ATTRIBUTE_BYTE,
296      Push => PUSH_BYTE,
297      Null => NULL_BYTE,
298      ChunkedString => CHUNKED_STRING_BYTE,
299      EndStream => END_STREAM_BYTE,
300      Hello => panic!("HELLO does not have a byte prefix."),
301    }
302  }
303}
304
305/// A borrowed attributes type, typically used with `BorrowedFrame` for encoding use cases.
306#[derive(Debug, PartialEq, Eq)]
307pub enum BorrowedAttrs<'a> {
308  Owned(&'a OwnedAttributes),
309  #[cfg(feature = "bytes")]
310  #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
311  Bytes(&'a BytesAttributes),
312}
313
314impl<'a> Clone for BorrowedAttrs<'a> {
315  fn clone(self: &BorrowedAttrs<'a>) -> BorrowedAttrs<'a> {
316    match self {
317      BorrowedAttrs::Owned(owned) => BorrowedAttrs::Owned(owned),
318      #[cfg(feature = "bytes")]
319      BorrowedAttrs::Bytes(bytes) => BorrowedAttrs::Bytes(bytes),
320    }
321  }
322}
323
324impl<'a> From<&'a BorrowedAttrs<'a>> for BorrowedAttrs<'a> {
325  fn from(value: &'a BorrowedAttrs) -> Self {
326    value.clone()
327  }
328}
329
330impl<'a> From<&'a OwnedAttributes> for BorrowedAttrs<'a> {
331  fn from(attr: &'a OwnedAttributes) -> BorrowedAttrs<'a> {
332    BorrowedAttrs::Owned(attr)
333  }
334}
335
336#[cfg(feature = "bytes")]
337#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
338impl<'a> From<&'a BytesAttributes> for BorrowedAttrs<'a> {
339  fn from(attr: &'a BytesAttributes) -> BorrowedAttrs<'a> {
340    BorrowedAttrs::Bytes(attr)
341  }
342}
343
344/// A borrowed frame variant, typically used for encoding use cases.
345#[derive(Debug, PartialEq)]
346pub enum BorrowedFrame<'a> {
347  /// A blob of bytes.
348  BlobString {
349    data:       &'a [u8],
350    attributes: Option<BorrowedAttrs<'a>>,
351  },
352  /// A blob representing an error.
353  BlobError {
354    data:       &'a [u8],
355    attributes: Option<BorrowedAttrs<'a>>,
356  },
357  /// A small string.
358  SimpleString {
359    data:       &'a [u8],
360    attributes: Option<BorrowedAttrs<'a>>,
361  },
362  /// A small string representing an error.
363  SimpleError {
364    data:       &'a str,
365    attributes: Option<BorrowedAttrs<'a>>,
366  },
367  /// A boolean type.
368  Boolean {
369    data:       bool,
370    attributes: Option<BorrowedAttrs<'a>>,
371  },
372  /// A null type.
373  Null,
374  /// A signed 64-bit integer.
375  Number {
376    data:       i64,
377    attributes: Option<BorrowedAttrs<'a>>,
378  },
379  /// A signed 64-bit floating point number.
380  Double {
381    data:       f64,
382    attributes: Option<BorrowedAttrs<'a>>,
383  },
384  /// A large number not representable as a `Number` or `Double`.
385  BigNumber {
386    data:       &'a [u8],
387    attributes: Option<BorrowedAttrs<'a>>,
388  },
389  /// A string to be displayed without any escaping or filtering.
390  VerbatimString {
391    data:       &'a [u8],
392    format:     VerbatimStringFormat,
393    attributes: Option<BorrowedAttrs<'a>>,
394  },
395  /// An array of frames.
396  Array {
397    data:       &'a [BorrowedFrame<'a>],
398    attributes: Option<BorrowedAttrs<'a>>,
399  },
400  /// An unordered map of key-value pairs.
401  Map {
402    data:       &'a FrameMap<BorrowedFrame<'a>, BorrowedFrame<'a>>,
403    attributes: Option<BorrowedAttrs<'a>>,
404  },
405  /// An unordered collection of other frames with a uniqueness constraint.
406  Set {
407    data:       &'a FrameSet<BorrowedFrame<'a>>,
408    attributes: Option<BorrowedAttrs<'a>>,
409  },
410  /// Out-of-band data.
411  Push {
412    data:       &'a [BorrowedFrame<'a>],
413    attributes: Option<BorrowedAttrs<'a>>,
414  },
415  /// A special frame type used when first connecting to the server to describe the protocol version and optional
416  /// credentials.
417  Hello {
418    version: RespVersion,
419    auth:    Option<(&'a str, &'a str)>,
420    setname: Option<&'a str>,
421  },
422  /// One chunk of a streaming blob.
423  ChunkedString(&'a [u8]),
424}
425
426impl<'a> Clone for BorrowedFrame<'a> {
427  fn clone(self: &BorrowedFrame<'a>) -> BorrowedFrame<'a> {
428    use self::BorrowedFrame::*;
429
430    match self {
431      Array { data, attributes } => Array {
432        data,
433        attributes: attributes.clone(),
434      },
435      Push { data, attributes } => Push {
436        data,
437        attributes: attributes.clone(),
438      },
439      Set { data, attributes } => Set {
440        data,
441        attributes: attributes.clone(),
442      },
443      Map { data, attributes } => Map {
444        data,
445        attributes: attributes.clone(),
446      },
447      BlobString { data, attributes } => BlobString {
448        data,
449        attributes: attributes.clone(),
450      },
451      SimpleString { data, attributes } => SimpleString {
452        data,
453        attributes: attributes.clone(),
454      },
455      SimpleError { data, attributes } => SimpleError {
456        data,
457        attributes: attributes.clone(),
458      },
459      Number { data, attributes } => Number {
460        data:       *data,
461        attributes: attributes.clone(),
462      },
463      Null => Null,
464      Double { data, attributes } => Double {
465        data:       *data,
466        attributes: attributes.clone(),
467      },
468      Boolean { data, attributes } => Boolean {
469        data:       *data,
470        attributes: attributes.clone(),
471      },
472      BlobError { data, attributes } => BlobError {
473        data,
474        attributes: attributes.clone(),
475      },
476      VerbatimString {
477        data,
478        format,
479        attributes,
480      } => VerbatimString {
481        data,
482        format: format.clone(),
483        attributes: attributes.clone(),
484      },
485      ChunkedString(data) => ChunkedString(data),
486      BigNumber { data, attributes } => BigNumber {
487        data,
488        attributes: attributes.clone(),
489      },
490      Hello { auth, setname, version } => Hello {
491        version: version.clone(),
492        setname: *setname,
493        auth:    *auth,
494      },
495    }
496  }
497}
498
499impl Eq for BorrowedFrame<'_> {}
500
501impl<'a> Hash for BorrowedFrame<'a> {
502  fn hash<H: Hasher>(&self, state: &mut H) {
503    use self::BorrowedFrame::*;
504    self.kind().hash_prefix().hash(state);
505
506    match self {
507      BlobString { data, .. } => data.hash(state),
508      SimpleString { data, .. } => data.hash(state),
509      SimpleError { data, .. } => data.hash(state),
510      Number { data, .. } => data.hash(state),
511      Null => NULL.hash(state),
512      Double { data, .. } => data.to_string().hash(state),
513      Boolean { data, .. } => data.hash(state),
514      BlobError { data, .. } => data.hash(state),
515      VerbatimString { data, format, .. } => {
516        format.hash(state);
517        data.hash(state);
518      },
519      ChunkedString(data) => data.hash(state),
520      BigNumber { data, .. } => data.hash(state),
521      _ => panic!("Invalid RESP3 data type to use as hash key."),
522    };
523  }
524}
525
526impl<'a> BorrowedFrame<'a> {
527  /// Read the `FrameKind` value for this frame.
528  pub fn kind(&self) -> FrameKind {
529    match self {
530      BorrowedFrame::Array { .. } => FrameKind::Array,
531      BorrowedFrame::BlobString { .. } => FrameKind::BlobString,
532      BorrowedFrame::SimpleString { .. } => FrameKind::SimpleString,
533      BorrowedFrame::SimpleError { .. } => FrameKind::SimpleError,
534      BorrowedFrame::Number { .. } => FrameKind::Number,
535      BorrowedFrame::Null => FrameKind::Null,
536      BorrowedFrame::Double { .. } => FrameKind::Double,
537      BorrowedFrame::BlobError { .. } => FrameKind::BlobError,
538      BorrowedFrame::VerbatimString { .. } => FrameKind::VerbatimString,
539      BorrowedFrame::Boolean { .. } => FrameKind::Boolean,
540      BorrowedFrame::Map { .. } => FrameKind::Map,
541      BorrowedFrame::Set { .. } => FrameKind::Set,
542      BorrowedFrame::Push { .. } => FrameKind::Push,
543      BorrowedFrame::Hello { .. } => FrameKind::Hello,
544      BorrowedFrame::BigNumber { .. } => FrameKind::BigNumber,
545      BorrowedFrame::ChunkedString(inner) => {
546        if inner.is_empty() {
547          FrameKind::EndStream
548        } else {
549          FrameKind::ChunkedString
550        }
551      },
552    }
553  }
554
555  /// Return the number of byte necessary to encode this frame.
556  pub fn encode_len(&self, int_as_blobstring: bool) -> usize {
557    resp3_utils::borrowed_encode_len(self, int_as_blobstring)
558  }
559}
560
561/// A reference-free frame type representing ranges into an associated buffer, typically used to implement zero-copy
562/// parsing.
563#[derive(Clone, Debug, PartialEq)]
564pub enum RangeFrame {
565  /// A blob of bytes.
566  BlobString {
567    data:       _Range,
568    attributes: Option<RangeAttributes>,
569  },
570  /// A blob representing an error.
571  BlobError {
572    data:       _Range,
573    attributes: Option<RangeAttributes>,
574  },
575  /// A small string.
576  SimpleString {
577    data:       _Range,
578    attributes: Option<RangeAttributes>,
579  },
580  /// A small string representing an error.
581  SimpleError {
582    data:       _Range,
583    attributes: Option<RangeAttributes>,
584  },
585  /// A boolean type.
586  Boolean {
587    data:       bool,
588    attributes: Option<RangeAttributes>,
589  },
590  /// A null type.
591  Null,
592  /// A signed 64-bit integer.
593  Number {
594    data:       i64,
595    attributes: Option<RangeAttributes>,
596  },
597  /// A signed 64-bit floating point number.
598  Double {
599    data:       f64,
600    attributes: Option<RangeAttributes>,
601  },
602  /// A large number not representable as a `Number` or `Double`.
603  BigNumber {
604    data:       _Range,
605    attributes: Option<RangeAttributes>,
606  },
607  /// A string to be displayed without any escaping or filtering.
608  VerbatimString {
609    data:       _Range,
610    format:     VerbatimStringFormat,
611    attributes: Option<RangeAttributes>,
612  },
613  /// An array of frames.
614  Array {
615    data:       Vec<RangeFrame>,
616    attributes: Option<RangeAttributes>,
617  },
618  /// An unordered map of key-value pairs.
619  Map {
620    data:       FrameMap<RangeFrame, RangeFrame>,
621    attributes: Option<RangeAttributes>,
622  },
623  /// An unordered collection of other frames with a uniqueness constraint.
624  Set {
625    data:       FrameSet<RangeFrame>,
626    attributes: Option<RangeAttributes>,
627  },
628  /// Out-of-band data.
629  Push {
630    data:       Vec<RangeFrame>,
631    attributes: Option<RangeAttributes>,
632  },
633  /// A special frame type used when first connecting to the server to describe the protocol version and optional
634  /// credentials.
635  Hello {
636    version:  RespVersion,
637    username: Option<_Range>,
638    password: Option<_Range>,
639    setname:  Option<_Range>,
640  },
641  /// One chunk of a streaming blob.
642  ChunkedString(_Range),
643}
644
645impl Eq for RangeFrame {}
646
647impl RangeFrame {
648  /// Read the associated `FrameKind`.
649  pub fn kind(&self) -> FrameKind {
650    match self {
651      RangeFrame::Array { .. } => FrameKind::Array,
652      RangeFrame::BlobString { .. } => FrameKind::BlobString,
653      RangeFrame::SimpleString { .. } => FrameKind::SimpleString,
654      RangeFrame::SimpleError { .. } => FrameKind::SimpleError,
655      RangeFrame::Number { .. } => FrameKind::Number,
656      RangeFrame::Null => FrameKind::Null,
657      RangeFrame::Double { .. } => FrameKind::Double,
658      RangeFrame::BlobError { .. } => FrameKind::BlobError,
659      RangeFrame::VerbatimString { .. } => FrameKind::VerbatimString,
660      RangeFrame::Boolean { .. } => FrameKind::Boolean,
661      RangeFrame::Map { .. } => FrameKind::Map,
662      RangeFrame::Set { .. } => FrameKind::Set,
663      RangeFrame::Push { .. } => FrameKind::Push,
664      RangeFrame::Hello { .. } => FrameKind::Hello,
665      RangeFrame::BigNumber { .. } => FrameKind::BigNumber,
666      RangeFrame::ChunkedString(inner) => {
667        if inner.1 == 0 && inner.0 == 0 {
668          FrameKind::EndStream
669        } else {
670          FrameKind::ChunkedString
671        }
672      },
673    }
674  }
675
676  /// A context-aware length function that returns the length of the inner frame contents.
677  ///
678  /// This does not return the encoded length, but rather the length of the contents of the frame such as the number
679  /// of elements in an array, the size of any inner buffers, etc.
680  ///
681  /// Note: `Null` has a length of 0 and `Hello`, `Number`, `Double`, and `Boolean` have a length of 1.
682  pub fn len(&self) -> usize {
683    match self {
684      RangeFrame::Array { data, .. } | RangeFrame::Push { data, .. } => data.len(),
685      RangeFrame::BlobString { data, .. }
686      | RangeFrame::BlobError { data, .. }
687      | RangeFrame::BigNumber { data, .. }
688      | RangeFrame::ChunkedString(data) => data.1 - data.0,
689      RangeFrame::SimpleString { data, .. } => data.1 - data.0,
690      RangeFrame::SimpleError { data, .. } => data.1 - data.0,
691      RangeFrame::Number { .. } | RangeFrame::Double { .. } | RangeFrame::Boolean { .. } => 1,
692      RangeFrame::Null => 0,
693      RangeFrame::VerbatimString { data, .. } => data.1 - data.0,
694      RangeFrame::Map { data, .. } => data.len(),
695      RangeFrame::Set { data, .. } => data.len(),
696      RangeFrame::Hello { .. } => 1,
697    }
698  }
699
700  /// Add attributes to the frame.
701  pub fn add_attributes(&mut self, attributes: RangeAttributes) -> Result<(), RedisProtocolError> {
702    let _attributes = match self {
703      RangeFrame::Array { attributes, .. } => attributes,
704      RangeFrame::Push { attributes, .. } => attributes,
705      RangeFrame::BlobString { attributes, .. } => attributes,
706      RangeFrame::BlobError { attributes, .. } => attributes,
707      RangeFrame::BigNumber { attributes, .. } => attributes,
708      RangeFrame::Boolean { attributes, .. } => attributes,
709      RangeFrame::Number { attributes, .. } => attributes,
710      RangeFrame::Double { attributes, .. } => attributes,
711      RangeFrame::VerbatimString { attributes, .. } => attributes,
712      RangeFrame::SimpleError { attributes, .. } => attributes,
713      RangeFrame::SimpleString { attributes, .. } => attributes,
714      RangeFrame::Set { attributes, .. } => attributes,
715      RangeFrame::Map { attributes, .. } => attributes,
716      RangeFrame::Null | RangeFrame::ChunkedString(_) | RangeFrame::Hello { .. } => {
717        return Err(RedisProtocolError::new(
718          RedisProtocolErrorKind::Unknown,
719          format!("{:?} cannot have attributes.", self.kind()),
720        ))
721      },
722    };
723
724    if let Some(_attributes) = _attributes.as_mut() {
725      _attributes.extend(attributes);
726    } else {
727      *_attributes = Some(attributes);
728    }
729
730    Ok(())
731  }
732
733  pub fn new_end_stream() -> Self {
734    RangeFrame::ChunkedString((0, 0))
735  }
736}
737
738impl Hash for RangeFrame {
739  fn hash<H: Hasher>(&self, state: &mut H) {
740    use self::RangeFrame::*;
741    self.kind().hash_prefix().hash(state);
742
743    match self {
744      BlobString { data, .. } => resp3_utils::hash_tuple(state, data),
745      SimpleString { data, .. } => resp3_utils::hash_tuple(state, data),
746      SimpleError { data, .. } => resp3_utils::hash_tuple(state, data),
747      Number { data, .. } => data.hash(state),
748      Null => NULL.hash(state),
749      Double { data, .. } => data.to_string().hash(state),
750      Boolean { data, .. } => data.hash(state),
751      BlobError { data, .. } => resp3_utils::hash_tuple(state, data),
752      VerbatimString { data, format, .. } => {
753        format.hash(state);
754        resp3_utils::hash_tuple(state, data);
755      },
756      ChunkedString(data) => resp3_utils::hash_tuple(state, data),
757      BigNumber { data, .. } => resp3_utils::hash_tuple(state, data),
758      _ => panic!("Invalid RESP3 data type to use as hash key."),
759    };
760  }
761}
762
763/// A frame type representing ranges into an associated buffer that contains the starting portion of a streaming
764/// frame.
765#[derive(Debug)]
766pub struct StreamedRangeFrame {
767  pub kind:       FrameKind,
768  pub attributes: Option<RangeAttributes>,
769}
770
771impl StreamedRangeFrame {
772  /// Create a new streamed frame with the provided frame type.
773  pub fn new(kind: FrameKind) -> Self {
774    StreamedRangeFrame { kind, attributes: None }
775  }
776
777  /// Add range attributes to the frame.
778  pub fn add_attributes(&mut self, attributes: RangeAttributes) {
779    if let Some(_attributes) = self.attributes.as_mut() {
780      _attributes.extend(attributes);
781    } else {
782      self.attributes = Some(attributes);
783    }
784  }
785}
786
787/// A reference-free frame type representing ranges into an associated buffer, typically used to implement zero-copy
788/// parsing.
789#[derive(Debug)]
790pub enum DecodedRangeFrame {
791  Complete(RangeFrame),
792  Streaming(StreamedRangeFrame),
793}
794
795impl DecodedRangeFrame {
796  /// Add attributes to the decoded frame, if possible.
797  pub fn add_attributes(&mut self, attributes: RangeAttributes) -> Result<(), RedisProtocolError> {
798    match self {
799      DecodedRangeFrame::Streaming(inner) => {
800        inner.add_attributes(attributes);
801        Ok(())
802      },
803      DecodedRangeFrame::Complete(inner) => inner.add_attributes(attributes),
804    }
805  }
806
807  /// Convert the decoded frame to a complete frame, returning an error if a streaming variant is found.
808  pub fn into_complete_frame(self) -> Result<RangeFrame, RedisProtocolError> {
809    match self {
810      DecodedRangeFrame::Complete(frame) => Ok(frame),
811      DecodedRangeFrame::Streaming(_) => Err(RedisProtocolError::new(
812        RedisProtocolErrorKind::DecodeError,
813        "Expected complete frame.",
814      )),
815    }
816  }
817}
818
819/// Generic operations on a RESP3 frame.
820pub trait Resp3Frame: Debug + Hash + Eq + Sized {
821  type Attributes;
822
823  /// Create the target aggregate type based on a buffered set of chunked frames.
824  fn from_buffer(
825    target: FrameKind,
826    buf: impl IntoIterator<Item = Self>,
827    attributes: Option<Self::Attributes>,
828  ) -> Result<Self, RedisProtocolError>;
829
830  /// Read the attributes attached to the frame.
831  fn attributes(&self) -> Option<&Self::Attributes>;
832
833  /// Take the attributes off this frame.
834  fn take_attributes(&mut self) -> Option<Self::Attributes>;
835
836  /// Read a mutable reference to any attributes attached to the frame.
837  fn attributes_mut(&mut self) -> Option<&mut Self::Attributes>;
838
839  /// Attempt to add attributes to the frame, extending the existing attributes if needed.
840  fn add_attributes(&mut self, attributes: Self::Attributes) -> Result<(), RedisProtocolError>;
841
842  /// Create a new frame that terminates a stream.
843  fn new_end_stream() -> Self;
844
845  /// Create a new empty frame with attribute support.
846  fn new_empty() -> Self;
847
848  /// A context-aware length function that returns the length of the inner frame contents.
849  ///
850  /// This does not return the encoded length, but rather the length of the contents of the frame such as the number
851  /// of elements in an array, the size of any inner buffers, etc.
852  ///
853  /// Note: `Null` has a length of 0 and `Hello`, `Number`, `Double`, and `Boolean` have a length of 1.
854  ///
855  /// See [encode_len](Self::encode_len) to read the number of bytes necessary to encode the frame.
856  fn len(&self) -> usize;
857
858  /// Replace `self` with Null, returning the original value.
859  fn take(&mut self) -> Self;
860
861  /// Read the associated `FrameKind`.
862  fn kind(&self) -> FrameKind;
863
864  /// Whether the frame is an empty chunked string, signifying the end of a chunked string stream.
865  fn is_end_stream_frame(&self) -> bool;
866
867  /// If the frame is a verbatim string then read the associated format.
868  fn verbatim_string_format(&self) -> Option<&VerbatimStringFormat>;
869
870  /// Read the frame as a string slice if it can be parsed as a UTF-8 string without allocating.
871  fn as_str(&self) -> Option<&str>;
872
873  /// Attempt to convert the frame to a bool.
874  fn as_bool(&self) -> Option<bool>;
875
876  /// Read the frame as a `String` if it can be parsed as a UTF-8 string.
877  fn to_string(&self) -> Option<String>;
878
879  /// Attempt to read the frame as a byte slice.
880  fn as_bytes(&self) -> Option<&[u8]>;
881
882  /// Read the number of bytes necessary to represent the frame and any associated attributes.
883  fn encode_len(&self, int_as_blobstring: bool) -> usize;
884
885  /// Whether the frame is a message from a `subscribe` call.
886  fn is_normal_pubsub_message(&self) -> bool;
887
888  /// Whether the frame is a message from a `psubscribe` call.
889  fn is_pattern_pubsub_message(&self) -> bool;
890
891  /// Whether the frame is a message from a `ssubscribe` call.
892  fn is_shard_pubsub_message(&self) -> bool;
893
894  /// Whether the frame is a `MOVED` or `ASK` redirection.
895  fn is_redirection(&self) -> bool {
896    self.as_str().map(utils::is_redirection).unwrap_or(false)
897  }
898
899  /// Convert the frame to a double.
900  fn as_f64(&self) -> Option<f64> {
901    self.as_str().and_then(|s| s.parse::<f64>().ok())
902  }
903
904  /// Convert the frame to another type.
905  #[cfg(feature = "convert")]
906  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
907  fn convert<T>(self) -> Result<T, RedisProtocolError>
908  where
909    Self: Sized,
910    T: FromResp3<Self>,
911  {
912    T::from_frame(self)
913  }
914
915  /// Whether frame is a bulk array with a single element.
916  #[cfg(feature = "convert")]
917  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
918  fn is_single_element_vec(&self) -> bool;
919
920  /// Pop an element from the inner array or return the original frame.
921  ///
922  /// This function is intended to be used with [Self::is_single_element_vec] and may panic.
923  #[cfg(feature = "convert")]
924  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
925  fn pop_or_take(self) -> Self;
926}
927
928/// An enum describing a RESP3 frame that uses owned byte containers.
929///
930/// <https://github.com/antirez/RESP3/blob/master/spec.md>
931#[derive(Clone, Debug, PartialEq)]
932pub enum OwnedFrame {
933  /// A blob of bytes.
934  BlobString {
935    data:       Vec<u8>,
936    attributes: Option<OwnedAttributes>,
937  },
938  /// A blob representing an error.
939  BlobError {
940    data:       Vec<u8>,
941    attributes: Option<OwnedAttributes>,
942  },
943  /// A small string.
944  ///
945  /// The internal data type is `Vec<u8>` in order to support callers that use this interface to parse a `MONITOR`
946  /// stream.
947  SimpleString {
948    data:       Vec<u8>,
949    attributes: Option<OwnedAttributes>,
950  },
951  /// A small string representing an error.
952  SimpleError {
953    data:       String,
954    attributes: Option<OwnedAttributes>,
955  },
956  /// A boolean type.
957  Boolean {
958    data:       bool,
959    attributes: Option<OwnedAttributes>,
960  },
961  /// A null type.
962  Null,
963  /// A signed 64-bit integer.
964  Number {
965    data:       i64,
966    attributes: Option<OwnedAttributes>,
967  },
968  /// A signed 64-bit floating point number.
969  Double {
970    data:       f64,
971    attributes: Option<OwnedAttributes>,
972  },
973  /// A large number not representable as a `Number` or `Double`.
974  ///
975  /// This library does not attempt to parse this.
976  BigNumber {
977    data:       Vec<u8>,
978    attributes: Option<OwnedAttributes>,
979  },
980  /// A string to be displayed without any escaping or filtering.
981  VerbatimString {
982    data:       Vec<u8>,
983    format:     VerbatimStringFormat,
984    attributes: Option<OwnedAttributes>,
985  },
986  /// An array of frames.
987  Array {
988    data:       Vec<OwnedFrame>,
989    attributes: Option<OwnedAttributes>,
990  },
991  /// An unordered map of key-value pairs.
992  ///
993  /// According to the spec keys can be any other RESP3 data type. However, it doesn't make sense to implement `Hash`
994  /// for certain aggregate types. The [can_hash](crate::resp3::types::FrameKind::can_hash) function can be used to
995  /// detect this.
996  Map {
997    data:       FrameMap<OwnedFrame, OwnedFrame>,
998    attributes: Option<OwnedAttributes>,
999  },
1000  /// An unordered collection of other frames with a uniqueness constraint.
1001  Set {
1002    data:       FrameSet<OwnedFrame>,
1003    attributes: Option<OwnedAttributes>,
1004  },
1005  /// Out-of-band data.
1006  Push {
1007    data:       Vec<OwnedFrame>,
1008    attributes: Option<OwnedAttributes>,
1009  },
1010  /// A special frame type used when first connecting to the server to describe the protocol version and optional
1011  /// credentials.
1012  Hello {
1013    version: RespVersion,
1014    auth:    Option<(String, String)>,
1015    setname: Option<String>,
1016  },
1017  /// One chunk of a streaming blob.
1018  ChunkedString(Vec<u8>),
1019}
1020
1021impl Eq for OwnedFrame {}
1022
1023impl Hash for OwnedFrame {
1024  fn hash<H: Hasher>(&self, state: &mut H) {
1025    use self::OwnedFrame::*;
1026    self.kind().hash_prefix().hash(state);
1027
1028    match self {
1029      BlobString { data, .. } => data.hash(state),
1030      SimpleString { data, .. } => data.hash(state),
1031      SimpleError { data, .. } => data.hash(state),
1032      Number { data, .. } => data.hash(state),
1033      Null => NULL.hash(state),
1034      Double { data, .. } => data.to_string().hash(state),
1035      Boolean { data, .. } => data.hash(state),
1036      BlobError { data, .. } => data.hash(state),
1037      VerbatimString { data, format, .. } => {
1038        format.hash(state);
1039        data.hash(state);
1040      },
1041      ChunkedString(data) => data.hash(state),
1042      BigNumber { data, .. } => data.hash(state),
1043      _ => panic!("Invalid RESP3 data type to use as hash key."),
1044    };
1045  }
1046}
1047
1048impl Resp3Frame for OwnedFrame {
1049  type Attributes = OwnedAttributes;
1050
1051  fn from_buffer(
1052    target: FrameKind,
1053    buf: impl IntoIterator<Item = Self>,
1054    attributes: Option<Self::Attributes>,
1055  ) -> Result<Self, RedisProtocolError> {
1056    let mut data: Vec<_> = buf.into_iter().collect();
1057
1058    Ok(match target {
1059      FrameKind::BlobString => {
1060        let total_len = data.iter().fold(0, |m, f| m + f.len());
1061        let mut buf = Vec::with_capacity(total_len);
1062        for frame in data.into_iter() {
1063          buf.extend(match frame {
1064            OwnedFrame::ChunkedString(chunk) => chunk,
1065            OwnedFrame::BlobString { data, .. } => data,
1066            _ => {
1067              return Err(RedisProtocolError::new(
1068                RedisProtocolErrorKind::DecodeError,
1069                "Expected chunked or blob string.",
1070              ));
1071            },
1072          });
1073        }
1074
1075        OwnedFrame::BlobString { data: buf, attributes }
1076      },
1077      FrameKind::Map => OwnedFrame::Map {
1078        attributes,
1079        data: data
1080          .chunks_exact_mut(2)
1081          .map(|chunk| (chunk[0].take(), chunk[1].take()))
1082          .collect(),
1083      },
1084      FrameKind::Set => OwnedFrame::Set {
1085        attributes,
1086        data: data.into_iter().collect(),
1087      },
1088      FrameKind::Array => OwnedFrame::Array { attributes, data },
1089      _ => {
1090        return Err(RedisProtocolError::new(
1091          RedisProtocolErrorKind::DecodeError,
1092          "Streaming frames only supported for blob strings, maps, sets, and arrays.",
1093        ))
1094      },
1095    })
1096  }
1097
1098  fn add_attributes(&mut self, attributes: Self::Attributes) -> Result<(), RedisProtocolError> {
1099    let _attributes = match self {
1100      OwnedFrame::Array { attributes, .. } => attributes,
1101      OwnedFrame::Push { attributes, .. } => attributes,
1102      OwnedFrame::BlobString { attributes, .. } => attributes,
1103      OwnedFrame::BlobError { attributes, .. } => attributes,
1104      OwnedFrame::BigNumber { attributes, .. } => attributes,
1105      OwnedFrame::Boolean { attributes, .. } => attributes,
1106      OwnedFrame::Number { attributes, .. } => attributes,
1107      OwnedFrame::Double { attributes, .. } => attributes,
1108      OwnedFrame::VerbatimString { attributes, .. } => attributes,
1109      OwnedFrame::SimpleError { attributes, .. } => attributes,
1110      OwnedFrame::SimpleString { attributes, .. } => attributes,
1111      OwnedFrame::Set { attributes, .. } => attributes,
1112      OwnedFrame::Map { attributes, .. } => attributes,
1113      OwnedFrame::Null | OwnedFrame::ChunkedString(_) | OwnedFrame::Hello { .. } => {
1114        return Err(RedisProtocolError::new(
1115          RedisProtocolErrorKind::Unknown,
1116          format!("{:?} cannot have attributes.", self.kind()),
1117        ))
1118      },
1119    };
1120
1121    if let Some(_attributes) = _attributes.as_mut() {
1122      _attributes.extend(attributes);
1123    } else {
1124      *_attributes = Some(attributes);
1125    }
1126
1127    Ok(())
1128  }
1129
1130  fn attributes(&self) -> Option<&Self::Attributes> {
1131    let attributes = match self {
1132      OwnedFrame::Array { attributes, .. } => attributes,
1133      OwnedFrame::Push { attributes, .. } => attributes,
1134      OwnedFrame::BlobString { attributes, .. } => attributes,
1135      OwnedFrame::BlobError { attributes, .. } => attributes,
1136      OwnedFrame::BigNumber { attributes, .. } => attributes,
1137      OwnedFrame::Boolean { attributes, .. } => attributes,
1138      OwnedFrame::Number { attributes, .. } => attributes,
1139      OwnedFrame::Double { attributes, .. } => attributes,
1140      OwnedFrame::VerbatimString { attributes, .. } => attributes,
1141      OwnedFrame::SimpleError { attributes, .. } => attributes,
1142      OwnedFrame::SimpleString { attributes, .. } => attributes,
1143      OwnedFrame::Set { attributes, .. } => attributes,
1144      OwnedFrame::Map { attributes, .. } => attributes,
1145      OwnedFrame::Null | OwnedFrame::ChunkedString(_) | OwnedFrame::Hello { .. } => return None,
1146    };
1147
1148    attributes.as_ref()
1149  }
1150
1151  fn attributes_mut(&mut self) -> Option<&mut Self::Attributes> {
1152    let attributes = match self {
1153      OwnedFrame::Array { attributes, .. } => attributes,
1154      OwnedFrame::Push { attributes, .. } => attributes,
1155      OwnedFrame::BlobString { attributes, .. } => attributes,
1156      OwnedFrame::BlobError { attributes, .. } => attributes,
1157      OwnedFrame::BigNumber { attributes, .. } => attributes,
1158      OwnedFrame::Boolean { attributes, .. } => attributes,
1159      OwnedFrame::Number { attributes, .. } => attributes,
1160      OwnedFrame::Double { attributes, .. } => attributes,
1161      OwnedFrame::VerbatimString { attributes, .. } => attributes,
1162      OwnedFrame::SimpleError { attributes, .. } => attributes,
1163      OwnedFrame::SimpleString { attributes, .. } => attributes,
1164      OwnedFrame::Set { attributes, .. } => attributes,
1165      OwnedFrame::Map { attributes, .. } => attributes,
1166      OwnedFrame::Null | OwnedFrame::ChunkedString(_) | OwnedFrame::Hello { .. } => return None,
1167    };
1168
1169    attributes.as_mut()
1170  }
1171
1172  fn take_attributes(&mut self) -> Option<Self::Attributes> {
1173    let attributes = match self {
1174      OwnedFrame::Array { attributes, .. } => attributes,
1175      OwnedFrame::Push { attributes, .. } => attributes,
1176      OwnedFrame::BlobString { attributes, .. } => attributes,
1177      OwnedFrame::BlobError { attributes, .. } => attributes,
1178      OwnedFrame::BigNumber { attributes, .. } => attributes,
1179      OwnedFrame::Boolean { attributes, .. } => attributes,
1180      OwnedFrame::Number { attributes, .. } => attributes,
1181      OwnedFrame::Double { attributes, .. } => attributes,
1182      OwnedFrame::VerbatimString { attributes, .. } => attributes,
1183      OwnedFrame::SimpleError { attributes, .. } => attributes,
1184      OwnedFrame::SimpleString { attributes, .. } => attributes,
1185      OwnedFrame::Set { attributes, .. } => attributes,
1186      OwnedFrame::Map { attributes, .. } => attributes,
1187      OwnedFrame::Null | OwnedFrame::ChunkedString(_) | OwnedFrame::Hello { .. } => return None,
1188    };
1189
1190    attributes.take()
1191  }
1192
1193  fn new_end_stream() -> Self {
1194    OwnedFrame::ChunkedString(Vec::new())
1195  }
1196
1197  fn new_empty() -> Self {
1198    OwnedFrame::Number {
1199      data:       0,
1200      attributes: None,
1201    }
1202  }
1203
1204  fn len(&self) -> usize {
1205    match self {
1206      OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => data.len(),
1207      OwnedFrame::BlobString { data, .. }
1208      | OwnedFrame::BlobError { data, .. }
1209      | OwnedFrame::BigNumber { data, .. }
1210      | OwnedFrame::ChunkedString(data) => data.len(),
1211      OwnedFrame::SimpleString { data, .. } => data.len(),
1212      OwnedFrame::SimpleError { data, .. } => data.len(),
1213      OwnedFrame::Number { .. } | OwnedFrame::Double { .. } | OwnedFrame::Boolean { .. } => 1,
1214      OwnedFrame::Null => 0,
1215      OwnedFrame::VerbatimString { data, .. } => data.len(),
1216      OwnedFrame::Map { data, .. } => data.len(),
1217      OwnedFrame::Set { data, .. } => data.len(),
1218      OwnedFrame::Hello { .. } => 1,
1219    }
1220  }
1221
1222  fn take(&mut self) -> Self {
1223    mem::replace(self, OwnedFrame::Null)
1224  }
1225
1226  fn kind(&self) -> FrameKind {
1227    match self {
1228      OwnedFrame::Array { .. } => FrameKind::Array,
1229      OwnedFrame::BlobString { .. } => FrameKind::BlobString,
1230      OwnedFrame::SimpleString { .. } => FrameKind::SimpleString,
1231      OwnedFrame::SimpleError { .. } => FrameKind::SimpleError,
1232      OwnedFrame::Number { .. } => FrameKind::Number,
1233      OwnedFrame::Null => FrameKind::Null,
1234      OwnedFrame::Double { .. } => FrameKind::Double,
1235      OwnedFrame::BlobError { .. } => FrameKind::BlobError,
1236      OwnedFrame::VerbatimString { .. } => FrameKind::VerbatimString,
1237      OwnedFrame::Boolean { .. } => FrameKind::Boolean,
1238      OwnedFrame::Map { .. } => FrameKind::Map,
1239      OwnedFrame::Set { .. } => FrameKind::Set,
1240      OwnedFrame::Push { .. } => FrameKind::Push,
1241      OwnedFrame::Hello { .. } => FrameKind::Hello,
1242      OwnedFrame::BigNumber { .. } => FrameKind::BigNumber,
1243      OwnedFrame::ChunkedString(inner) => {
1244        if inner.is_empty() {
1245          FrameKind::EndStream
1246        } else {
1247          FrameKind::ChunkedString
1248        }
1249      },
1250    }
1251  }
1252
1253  fn is_end_stream_frame(&self) -> bool {
1254    match self {
1255      OwnedFrame::ChunkedString(s) => s.is_empty(),
1256      _ => false,
1257    }
1258  }
1259
1260  fn verbatim_string_format(&self) -> Option<&VerbatimStringFormat> {
1261    match self {
1262      OwnedFrame::VerbatimString { format, .. } => Some(format),
1263      _ => None,
1264    }
1265  }
1266
1267  fn as_str(&self) -> Option<&str> {
1268    match self {
1269      OwnedFrame::SimpleError { data, .. } => Some(data),
1270      OwnedFrame::SimpleString { data, .. } => str::from_utf8(data).ok(),
1271      OwnedFrame::BlobError { data, .. }
1272      | OwnedFrame::BlobString { data, .. }
1273      | OwnedFrame::BigNumber { data, .. } => str::from_utf8(data).ok(),
1274      OwnedFrame::VerbatimString { data, .. } => str::from_utf8(data).ok(),
1275      OwnedFrame::ChunkedString(data) => str::from_utf8(data).ok(),
1276      _ => None,
1277    }
1278  }
1279
1280  fn as_bool(&self) -> Option<bool> {
1281    match self {
1282      OwnedFrame::SimpleString { data, .. }
1283      | OwnedFrame::BlobString { data, .. }
1284      | OwnedFrame::VerbatimString { data, .. } => utils::bytes_to_bool(data),
1285      OwnedFrame::ChunkedString(data) => utils::bytes_to_bool(data),
1286      OwnedFrame::Boolean { data, .. } => Some(*data),
1287      OwnedFrame::Number { data, .. } => match data {
1288        0 => Some(false),
1289        1 => Some(true),
1290        _ => None,
1291      },
1292      _ => None,
1293    }
1294  }
1295
1296  fn to_string(&self) -> Option<String> {
1297    match self {
1298      OwnedFrame::SimpleError { data, .. } => Some(data.to_string()),
1299      OwnedFrame::SimpleString { data, .. } => String::from_utf8(data.to_vec()).ok(),
1300      OwnedFrame::BlobError { data, .. }
1301      | OwnedFrame::BlobString { data, .. }
1302      | OwnedFrame::BigNumber { data, .. } => String::from_utf8(data.to_vec()).ok(),
1303      OwnedFrame::VerbatimString { data, .. } => String::from_utf8(data.to_vec()).ok(),
1304      OwnedFrame::ChunkedString(b) => String::from_utf8(b.to_vec()).ok(),
1305      OwnedFrame::Double { data, .. } => Some(data.to_string()),
1306      OwnedFrame::Number { data, .. } => Some(data.to_string()),
1307      _ => None,
1308    }
1309  }
1310
1311  fn as_bytes(&self) -> Option<&[u8]> {
1312    match self {
1313      OwnedFrame::SimpleError { data, .. } => Some(data.as_bytes()),
1314      OwnedFrame::SimpleString { data, .. } => Some(data),
1315      OwnedFrame::BlobError { data, .. }
1316      | OwnedFrame::BlobString { data, .. }
1317      | OwnedFrame::BigNumber { data, .. } => Some(data),
1318      OwnedFrame::VerbatimString { data, .. } => Some(data),
1319      OwnedFrame::ChunkedString(b) => Some(b),
1320      _ => None,
1321    }
1322  }
1323
1324  fn encode_len(&self, int_as_blobstring: bool) -> usize {
1325    resp3_utils::owned_encode_len(self, int_as_blobstring)
1326  }
1327
1328  fn is_normal_pubsub_message(&self) -> bool {
1329    // see the `BytesFrame::is_normal_pubsub_message` comments
1330    // format should be ["pubsub", "message", <channel>, <message>]
1331    match self {
1332      OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => {
1333        (data.len() == 3 && data[0].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false))
1334          || (data.len() == 4
1335            && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1336            && data[1].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false))
1337      },
1338      _ => false,
1339    }
1340  }
1341
1342  fn is_pattern_pubsub_message(&self) -> bool {
1343    // see the `BytesFrame::is_normal_pubsub_message` comments
1344    // format should be ["pubsub", "pmessage", <pattern>, <channel>, <message>]
1345    match self {
1346      OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => {
1347        (data.len() == 4 && data[0].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false))
1348          || (data.len() == 5
1349            && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1350            && data[1].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false))
1351      },
1352      _ => false,
1353    }
1354  }
1355
1356  fn is_shard_pubsub_message(&self) -> bool {
1357    // see the `BytesFrame::is_normal_pubsub_message` comments
1358    // format should be ["pubsub", "smessage", <channel>, <message>]
1359    match self {
1360      OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => {
1361        (data.len() == 3 && data[0].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false))
1362          || (data.len() == 4
1363            && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1364            && data[1].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false))
1365      },
1366      _ => false,
1367    }
1368  }
1369
1370  #[cfg(feature = "convert")]
1371  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
1372  fn is_single_element_vec(&self) -> bool {
1373    match self {
1374      OwnedFrame::Array { data, .. } | OwnedFrame::Push { data, .. } => data.len() == 1,
1375      _ => false,
1376    }
1377  }
1378
1379  #[cfg(feature = "convert")]
1380  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
1381  fn pop_or_take(self) -> Self {
1382    match self {
1383      OwnedFrame::Array { mut data, .. } | OwnedFrame::Push { mut data, .. } => data.pop().unwrap(),
1384      _ => self,
1385    }
1386  }
1387}
1388
1389impl OwnedFrame {
1390  /// Move the frame contents into a new [BytesFrame].
1391  #[cfg(feature = "bytes")]
1392  #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
1393  pub fn into_bytes_frame(self) -> BytesFrame {
1394    resp3_utils::owned_to_bytes_frame(self)
1395  }
1396}
1397
1398impl<B: Into<Vec<u8>>> TryFrom<(FrameKind, B)> for OwnedFrame {
1399  type Error = RedisProtocolError;
1400
1401  fn try_from((kind, buf): (FrameKind, B)) -> Result<Self, Self::Error> {
1402    Ok(match kind {
1403      FrameKind::SimpleString => OwnedFrame::SimpleString {
1404        data:       buf.into(),
1405        attributes: None,
1406      },
1407      FrameKind::SimpleError => OwnedFrame::SimpleError {
1408        data:       String::from_utf8(buf.into())?,
1409        attributes: None,
1410      },
1411      FrameKind::BlobString => OwnedFrame::BlobString {
1412        data:       buf.into(),
1413        attributes: None,
1414      },
1415      FrameKind::BlobError => OwnedFrame::BlobError {
1416        data:       buf.into(),
1417        attributes: None,
1418      },
1419      FrameKind::BigNumber => OwnedFrame::BigNumber {
1420        data:       buf.into(),
1421        attributes: None,
1422      },
1423      FrameKind::ChunkedString => OwnedFrame::ChunkedString(buf.into()),
1424      FrameKind::Null => OwnedFrame::Null,
1425      _ => {
1426        return Err(RedisProtocolError::new(
1427          RedisProtocolErrorKind::Unknown,
1428          "Cannot convert to frame.",
1429        ))
1430      },
1431    })
1432  }
1433}
1434
1435impl From<i64> for OwnedFrame {
1436  fn from(value: i64) -> Self {
1437    OwnedFrame::Number {
1438      data:       value,
1439      attributes: None,
1440    }
1441  }
1442}
1443
1444impl From<bool> for OwnedFrame {
1445  fn from(value: bool) -> Self {
1446    OwnedFrame::Boolean {
1447      data:       value,
1448      attributes: None,
1449    }
1450  }
1451}
1452
1453impl From<f64> for OwnedFrame {
1454  fn from(value: f64) -> Self {
1455    OwnedFrame::Double {
1456      data:       value,
1457      attributes: None,
1458    }
1459  }
1460}
1461
1462/// A RESP3 frame that uses [Bytes](bytes::Bytes) and [Str](bytes_utils::Str) as the underlying buffer type.
1463///
1464/// <https://github.com/antirez/RESP3/blob/master/spec.md>
1465#[cfg(feature = "bytes")]
1466#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
1467#[derive(Clone, Debug, PartialEq)]
1468pub enum BytesFrame {
1469  /// A blob of bytes.
1470  BlobString {
1471    data:       Bytes,
1472    attributes: Option<BytesAttributes>,
1473  },
1474  /// A blob representing an error.
1475  BlobError {
1476    data:       Bytes,
1477    attributes: Option<BytesAttributes>,
1478  },
1479  /// A small string.
1480  ///
1481  /// The internal data type is `Bytes` in order to support callers that use this interface to parse a `MONITOR`
1482  /// stream.
1483  SimpleString {
1484    data:       Bytes,
1485    attributes: Option<BytesAttributes>,
1486  },
1487  /// A small string representing an error.
1488  SimpleError {
1489    data:       Str,
1490    attributes: Option<BytesAttributes>,
1491  },
1492  /// A boolean type.
1493  Boolean {
1494    data:       bool,
1495    attributes: Option<BytesAttributes>,
1496  },
1497  /// A null type.
1498  Null,
1499  /// A signed 64-bit integer.
1500  Number {
1501    data:       i64,
1502    attributes: Option<BytesAttributes>,
1503  },
1504  /// A signed 64-bit floating point number.
1505  Double {
1506    data:       f64,
1507    attributes: Option<BytesAttributes>,
1508  },
1509  /// A large number not representable as a `Number` or `Double`.
1510  ///
1511  /// This library does not attempt to parse this.
1512  BigNumber {
1513    data:       Bytes,
1514    attributes: Option<BytesAttributes>,
1515  },
1516  /// A string to be displayed without any escaping or filtering.
1517  VerbatimString {
1518    data:       Bytes,
1519    format:     VerbatimStringFormat,
1520    attributes: Option<BytesAttributes>,
1521  },
1522  /// An array of frames.
1523  Array {
1524    data:       Vec<BytesFrame>,
1525    attributes: Option<BytesAttributes>,
1526  },
1527  /// An unordered map of key-value pairs.
1528  ///
1529  /// According to the spec keys can be any other RESP3 data type. However, it doesn't make sense to implement `Hash`
1530  /// for certain aggregate types. The [can_hash](crate::resp3::types::FrameKind::can_hash) function can be used to
1531  /// detect this.
1532  Map {
1533    data:       FrameMap<BytesFrame, BytesFrame>,
1534    attributes: Option<BytesAttributes>,
1535  },
1536  /// An unordered collection of other frames with a uniqueness constraint.
1537  Set {
1538    data:       FrameSet<BytesFrame>,
1539    attributes: Option<BytesAttributes>,
1540  },
1541  /// Out-of-band data.
1542  Push {
1543    data:       Vec<BytesFrame>,
1544    attributes: Option<BytesAttributes>,
1545  },
1546  /// A special frame type used when first connecting to the server to describe the protocol version and optional
1547  /// credentials.
1548  Hello {
1549    version: RespVersion,
1550    auth:    Option<(Str, Str)>,
1551    setname: Option<Str>,
1552  },
1553  /// One chunk of a streaming blob.
1554  ChunkedString(Bytes),
1555}
1556
1557#[cfg(feature = "bytes")]
1558impl<B: Into<Bytes>> TryFrom<(FrameKind, B)> for BytesFrame {
1559  type Error = RedisProtocolError;
1560
1561  fn try_from((kind, buf): (FrameKind, B)) -> Result<Self, Self::Error> {
1562    Ok(match kind {
1563      FrameKind::SimpleString => BytesFrame::SimpleString {
1564        data:       buf.into(),
1565        attributes: None,
1566      },
1567      FrameKind::SimpleError => BytesFrame::SimpleError {
1568        data:       Str::from_inner(buf.into())?,
1569        attributes: None,
1570      },
1571      FrameKind::BlobString => BytesFrame::BlobString {
1572        data:       buf.into(),
1573        attributes: None,
1574      },
1575      FrameKind::BlobError => BytesFrame::BlobError {
1576        data:       buf.into(),
1577        attributes: None,
1578      },
1579      FrameKind::BigNumber => BytesFrame::BigNumber {
1580        data:       buf.into(),
1581        attributes: None,
1582      },
1583      FrameKind::ChunkedString => BytesFrame::ChunkedString(buf.into()),
1584      FrameKind::Null => BytesFrame::Null,
1585      _ => {
1586        return Err(RedisProtocolError::new(
1587          RedisProtocolErrorKind::Unknown,
1588          "Cannot convert to frame.",
1589        ))
1590      },
1591    })
1592  }
1593}
1594
1595#[cfg(feature = "bytes")]
1596impl From<i64> for BytesFrame {
1597  fn from(value: i64) -> Self {
1598    BytesFrame::Number {
1599      data:       value,
1600      attributes: None,
1601    }
1602  }
1603}
1604
1605#[cfg(feature = "bytes")]
1606impl From<bool> for BytesFrame {
1607  fn from(value: bool) -> Self {
1608    BytesFrame::Boolean {
1609      data:       value,
1610      attributes: None,
1611    }
1612  }
1613}
1614
1615#[cfg(feature = "bytes")]
1616impl From<f64> for BytesFrame {
1617  fn from(value: f64) -> Self {
1618    BytesFrame::Double {
1619      data:       value,
1620      attributes: None,
1621    }
1622  }
1623}
1624
1625#[cfg(feature = "bytes")]
1626impl Hash for BytesFrame {
1627  fn hash<H: Hasher>(&self, state: &mut H) {
1628    use self::BytesFrame::*;
1629    self.kind().hash_prefix().hash(state);
1630
1631    match self {
1632      BlobString { data, .. } => data.hash(state),
1633      SimpleString { data, .. } => data.hash(state),
1634      SimpleError { data, .. } => data.hash(state),
1635      Number { data, .. } => data.hash(state),
1636      Null => NULL.hash(state),
1637      Double { data, .. } => data.to_string().hash(state),
1638      Boolean { data, .. } => data.hash(state),
1639      BlobError { data, .. } => data.hash(state),
1640      VerbatimString { data, format, .. } => {
1641        format.hash(state);
1642        data.hash(state);
1643      },
1644      ChunkedString(data) => data.hash(state),
1645      BigNumber { data, .. } => data.hash(state),
1646      _ => panic!("Invalid RESP3 data type to use as hash key."),
1647    };
1648  }
1649}
1650
1651#[cfg(feature = "bytes")]
1652impl Eq for BytesFrame {}
1653
1654#[cfg(feature = "bytes")]
1655impl Resp3Frame for BytesFrame {
1656  type Attributes = BytesAttributes;
1657
1658  fn from_buffer(
1659    target: FrameKind,
1660    buf: impl IntoIterator<Item = Self>,
1661    attributes: Option<Self::Attributes>,
1662  ) -> Result<Self, RedisProtocolError> {
1663    let mut data: Vec<_> = buf.into_iter().collect();
1664
1665    Ok(match target {
1666      FrameKind::BlobString => {
1667        let total_len = data.iter().fold(0, |m, f| m + f.len());
1668        let mut buf = BytesMut::with_capacity(total_len);
1669        for frame in data.into_iter() {
1670          buf.extend(match frame {
1671            BytesFrame::ChunkedString(chunk) => chunk,
1672            BytesFrame::BlobString { data, .. } => data,
1673            _ => {
1674              return Err(RedisProtocolError::new(
1675                RedisProtocolErrorKind::DecodeError,
1676                "Expected chunked or blob string.",
1677              ));
1678            },
1679          });
1680        }
1681
1682        BytesFrame::BlobString {
1683          data: buf.freeze(),
1684          attributes,
1685        }
1686      },
1687      FrameKind::Map => BytesFrame::Map {
1688        attributes,
1689        data: data
1690          .chunks_exact_mut(2)
1691          .map(|chunk| (chunk[0].take(), chunk[1].take()))
1692          .collect(),
1693      },
1694      FrameKind::Set => BytesFrame::Set {
1695        attributes,
1696        data: data.into_iter().collect(),
1697      },
1698      FrameKind::Array => BytesFrame::Array { attributes, data },
1699      _ => {
1700        return Err(RedisProtocolError::new(
1701          RedisProtocolErrorKind::DecodeError,
1702          "Streaming frames only supported for blob strings, maps, sets, and arrays.",
1703        ))
1704      },
1705    })
1706  }
1707
1708  fn attributes(&self) -> Option<&Self::Attributes> {
1709    let attributes = match self {
1710      BytesFrame::Array { attributes, .. } => attributes,
1711      BytesFrame::Push { attributes, .. } => attributes,
1712      BytesFrame::BlobString { attributes, .. } => attributes,
1713      BytesFrame::BlobError { attributes, .. } => attributes,
1714      BytesFrame::BigNumber { attributes, .. } => attributes,
1715      BytesFrame::Boolean { attributes, .. } => attributes,
1716      BytesFrame::Number { attributes, .. } => attributes,
1717      BytesFrame::Double { attributes, .. } => attributes,
1718      BytesFrame::VerbatimString { attributes, .. } => attributes,
1719      BytesFrame::SimpleError { attributes, .. } => attributes,
1720      BytesFrame::SimpleString { attributes, .. } => attributes,
1721      BytesFrame::Set { attributes, .. } => attributes,
1722      BytesFrame::Map { attributes, .. } => attributes,
1723      BytesFrame::Null | BytesFrame::ChunkedString(_) | BytesFrame::Hello { .. } => return None,
1724    };
1725
1726    attributes.as_ref()
1727  }
1728
1729  fn take_attributes(&mut self) -> Option<Self::Attributes> {
1730    let attributes = match self {
1731      BytesFrame::Array { attributes, .. } => attributes,
1732      BytesFrame::Push { attributes, .. } => attributes,
1733      BytesFrame::BlobString { attributes, .. } => attributes,
1734      BytesFrame::BlobError { attributes, .. } => attributes,
1735      BytesFrame::BigNumber { attributes, .. } => attributes,
1736      BytesFrame::Boolean { attributes, .. } => attributes,
1737      BytesFrame::Number { attributes, .. } => attributes,
1738      BytesFrame::Double { attributes, .. } => attributes,
1739      BytesFrame::VerbatimString { attributes, .. } => attributes,
1740      BytesFrame::SimpleError { attributes, .. } => attributes,
1741      BytesFrame::SimpleString { attributes, .. } => attributes,
1742      BytesFrame::Set { attributes, .. } => attributes,
1743      BytesFrame::Map { attributes, .. } => attributes,
1744      BytesFrame::Null | BytesFrame::ChunkedString(_) | BytesFrame::Hello { .. } => return None,
1745    };
1746
1747    attributes.take()
1748  }
1749
1750  fn attributes_mut(&mut self) -> Option<&mut Self::Attributes> {
1751    let attributes = match self {
1752      BytesFrame::Array { attributes, .. } => attributes,
1753      BytesFrame::Push { attributes, .. } => attributes,
1754      BytesFrame::BlobString { attributes, .. } => attributes,
1755      BytesFrame::BlobError { attributes, .. } => attributes,
1756      BytesFrame::BigNumber { attributes, .. } => attributes,
1757      BytesFrame::Boolean { attributes, .. } => attributes,
1758      BytesFrame::Number { attributes, .. } => attributes,
1759      BytesFrame::Double { attributes, .. } => attributes,
1760      BytesFrame::VerbatimString { attributes, .. } => attributes,
1761      BytesFrame::SimpleError { attributes, .. } => attributes,
1762      BytesFrame::SimpleString { attributes, .. } => attributes,
1763      BytesFrame::Set { attributes, .. } => attributes,
1764      BytesFrame::Map { attributes, .. } => attributes,
1765      BytesFrame::Null | BytesFrame::ChunkedString(_) | BytesFrame::Hello { .. } => return None,
1766    };
1767
1768    attributes.as_mut()
1769  }
1770
1771  /// Attempt to add attributes to the frame, extending the existing attributes if needed.
1772  fn add_attributes(&mut self, attributes: Self::Attributes) -> Result<(), RedisProtocolError> {
1773    let _attributes = match self {
1774      BytesFrame::Array { attributes, .. } => attributes,
1775      BytesFrame::Push { attributes, .. } => attributes,
1776      BytesFrame::BlobString { attributes, .. } => attributes,
1777      BytesFrame::BlobError { attributes, .. } => attributes,
1778      BytesFrame::BigNumber { attributes, .. } => attributes,
1779      BytesFrame::Boolean { attributes, .. } => attributes,
1780      BytesFrame::Number { attributes, .. } => attributes,
1781      BytesFrame::Double { attributes, .. } => attributes,
1782      BytesFrame::VerbatimString { attributes, .. } => attributes,
1783      BytesFrame::SimpleError { attributes, .. } => attributes,
1784      BytesFrame::SimpleString { attributes, .. } => attributes,
1785      BytesFrame::Set { attributes, .. } => attributes,
1786      BytesFrame::Map { attributes, .. } => attributes,
1787      BytesFrame::Null | BytesFrame::ChunkedString(_) | BytesFrame::Hello { .. } => {
1788        return Err(RedisProtocolError::new(
1789          RedisProtocolErrorKind::Unknown,
1790          format!("{:?} cannot have attributes.", self.kind()),
1791        ))
1792      },
1793    };
1794
1795    if let Some(_attributes) = _attributes.as_mut() {
1796      _attributes.extend(attributes);
1797    } else {
1798      *_attributes = Some(attributes);
1799    }
1800
1801    Ok(())
1802  }
1803
1804  fn new_empty() -> Self {
1805    BytesFrame::Number {
1806      data:       0,
1807      attributes: None,
1808    }
1809  }
1810
1811  fn new_end_stream() -> Self {
1812    BytesFrame::ChunkedString(Bytes::new())
1813  }
1814
1815  fn len(&self) -> usize {
1816    match self {
1817      BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => data.len(),
1818      BytesFrame::BlobString { data, .. }
1819      | BytesFrame::BlobError { data, .. }
1820      | BytesFrame::BigNumber { data, .. }
1821      | BytesFrame::ChunkedString(data) => data.len(),
1822      BytesFrame::SimpleString { data, .. } => data.len(),
1823      BytesFrame::SimpleError { data, .. } => data.len(),
1824      BytesFrame::Number { .. } | BytesFrame::Double { .. } | BytesFrame::Boolean { .. } => 1,
1825      BytesFrame::Null => 0,
1826      BytesFrame::VerbatimString { data, .. } => data.len(),
1827      BytesFrame::Map { data, .. } => data.len(),
1828      BytesFrame::Set { data, .. } => data.len(),
1829      BytesFrame::Hello { .. } => 1,
1830    }
1831  }
1832
1833  fn take(&mut self) -> BytesFrame {
1834    mem::replace(self, BytesFrame::Null)
1835  }
1836
1837  fn kind(&self) -> FrameKind {
1838    match self {
1839      BytesFrame::Array { .. } => FrameKind::Array,
1840      BytesFrame::BlobString { .. } => FrameKind::BlobString,
1841      BytesFrame::SimpleString { .. } => FrameKind::SimpleString,
1842      BytesFrame::SimpleError { .. } => FrameKind::SimpleError,
1843      BytesFrame::Number { .. } => FrameKind::Number,
1844      BytesFrame::Null => FrameKind::Null,
1845      BytesFrame::Double { .. } => FrameKind::Double,
1846      BytesFrame::BlobError { .. } => FrameKind::BlobError,
1847      BytesFrame::VerbatimString { .. } => FrameKind::VerbatimString,
1848      BytesFrame::Boolean { .. } => FrameKind::Boolean,
1849      BytesFrame::Map { .. } => FrameKind::Map,
1850      BytesFrame::Set { .. } => FrameKind::Set,
1851      BytesFrame::Push { .. } => FrameKind::Push,
1852      BytesFrame::Hello { .. } => FrameKind::Hello,
1853      BytesFrame::BigNumber { .. } => FrameKind::BigNumber,
1854      BytesFrame::ChunkedString(inner) => {
1855        if inner.is_empty() {
1856          FrameKind::EndStream
1857        } else {
1858          FrameKind::ChunkedString
1859        }
1860      },
1861    }
1862  }
1863
1864  fn is_end_stream_frame(&self) -> bool {
1865    match self {
1866      BytesFrame::ChunkedString(s) => s.is_empty(),
1867      _ => false,
1868    }
1869  }
1870
1871  fn verbatim_string_format(&self) -> Option<&VerbatimStringFormat> {
1872    match self {
1873      BytesFrame::VerbatimString { format, .. } => Some(format),
1874      _ => None,
1875    }
1876  }
1877
1878  fn as_str(&self) -> Option<&str> {
1879    match self {
1880      BytesFrame::SimpleError { data, .. } => Some(data),
1881      BytesFrame::SimpleString { data, .. }
1882      | BytesFrame::BlobError { data, .. }
1883      | BytesFrame::BlobString { data, .. }
1884      | BytesFrame::BigNumber { data, .. } => str::from_utf8(data).ok(),
1885      BytesFrame::VerbatimString { data, .. } => str::from_utf8(data).ok(),
1886      BytesFrame::ChunkedString(data) => str::from_utf8(data).ok(),
1887      _ => None,
1888    }
1889  }
1890
1891  fn as_bool(&self) -> Option<bool> {
1892    match self {
1893      BytesFrame::SimpleString { data, .. }
1894      | BytesFrame::BlobString { data, .. }
1895      | BytesFrame::VerbatimString { data, .. } => utils::bytes_to_bool(data),
1896      BytesFrame::ChunkedString(data) => utils::bytes_to_bool(data),
1897      BytesFrame::Boolean { data, .. } => Some(*data),
1898      BytesFrame::Number { data, .. } => match data {
1899        0 => Some(false),
1900        1 => Some(true),
1901        _ => None,
1902      },
1903      _ => None,
1904    }
1905  }
1906
1907  fn to_string(&self) -> Option<String> {
1908    match self {
1909      BytesFrame::SimpleError { data, .. } => Some(data.to_string()),
1910      BytesFrame::SimpleString { data, .. }
1911      | BytesFrame::BlobError { data, .. }
1912      | BytesFrame::BlobString { data, .. }
1913      | BytesFrame::BigNumber { data, .. } => String::from_utf8(data.to_vec()).ok(),
1914      BytesFrame::VerbatimString { data, .. } => String::from_utf8(data.to_vec()).ok(),
1915      BytesFrame::ChunkedString(b) => String::from_utf8(b.to_vec()).ok(),
1916      BytesFrame::Double { data, .. } => Some(data.to_string()),
1917      BytesFrame::Number { data, .. } => Some(data.to_string()),
1918      _ => None,
1919    }
1920  }
1921
1922  fn as_bytes(&self) -> Option<&[u8]> {
1923    match self {
1924      BytesFrame::SimpleError { data, .. } => Some(data.as_bytes()),
1925      BytesFrame::SimpleString { data, .. } => Some(data),
1926      BytesFrame::BlobError { data, .. }
1927      | BytesFrame::BlobString { data, .. }
1928      | BytesFrame::BigNumber { data, .. } => Some(data),
1929      BytesFrame::VerbatimString { data, .. } => Some(data),
1930      BytesFrame::ChunkedString(b) => Some(b),
1931      _ => None,
1932    }
1933  }
1934
1935  fn encode_len(&self, int_as_blobstring: bool) -> usize {
1936    resp3_utils::bytes_encode_len(self, int_as_blobstring)
1937  }
1938
1939  fn is_normal_pubsub_message(&self) -> bool {
1940    // All of these `is_pubsub_message` variants currently require this hack to get around some differences between the RESP3 spec (https://github.com/antirez/RESP3/blob/master/spec.md#push-type)
1941    // and what Redis does in practice. According to the spec pubsub events are sent as Push frames with either a
1942    // `pubsub` or `monitor` prefix frame in the inner array. However, in practice Redis (v7 at least) doesn't send
1943    // this prefix frame. Instead, the inner contents typically begin with the operation (subscribe, unsubscribe,
1944    // pmessage, message, smessage, etc). The RESP2 `into_resp3()` function adds the `pubsub` prefix, so this code
1945    // needs to handle both formats.
1946
1947    // format should be ["pubsub", "message", <channel>, <message>]
1948    match self {
1949      BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => {
1950        (data.len() == 3 && data[0].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false))
1951          || (data.len() == 4
1952            && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1953            && data[1].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false))
1954      },
1955      _ => false,
1956    }
1957  }
1958
1959  fn is_pattern_pubsub_message(&self) -> bool {
1960    // see the `BytesFrame::is_normal_pubsub_message` comments
1961    // format should be ["pubsub", "pmessage", <pattern>, <channel>, <message>]
1962    match self {
1963      BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => {
1964        (data.len() == 4 && data[0].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false))
1965          || (data.len() == 5
1966            && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1967            && data[1].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false))
1968      },
1969      _ => false,
1970    }
1971  }
1972
1973  fn is_shard_pubsub_message(&self) -> bool {
1974    // see the `BytesFrame::is_normal_pubsub_message` comments
1975    // format should be ["pubsub", "smessage", <channel>, <message>]
1976    match self {
1977      BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => {
1978        (data.len() == 3 && data[0].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false))
1979          || (data.len() == 4
1980            && data[0].as_str().map(|s| s == PUBSUB_PUSH_PREFIX).unwrap_or(false)
1981            && data[1].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false))
1982      },
1983      _ => false,
1984    }
1985  }
1986
1987  #[cfg(feature = "convert")]
1988  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
1989  fn is_single_element_vec(&self) -> bool {
1990    match self {
1991      BytesFrame::Array { data, .. } | BytesFrame::Push { data, .. } => data.len() == 1,
1992      _ => false,
1993    }
1994  }
1995
1996  #[cfg(feature = "convert")]
1997  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
1998  fn pop_or_take(self) -> Self {
1999    match self {
2000      BytesFrame::Array { mut data, .. } | BytesFrame::Push { mut data, .. } => data.pop().unwrap(),
2001      _ => self,
2002    }
2003  }
2004}
2005
2006#[cfg(feature = "bytes")]
2007impl BytesFrame {
2008  /// Copy the frame contents into a new [OwnedFrame].
2009  pub fn to_owned_frame(&self) -> OwnedFrame {
2010    resp3_utils::bytes_to_owned_frame(self)
2011  }
2012}
2013
2014/// An enum describing the possible return types from the stream decoding interface.
2015#[derive(Debug, Eq, PartialEq)]
2016pub enum DecodedFrame<T: Resp3Frame> {
2017  Streaming(StreamedFrame<T>),
2018  Complete(T),
2019}
2020
2021impl<T: Resp3Frame> DecodedFrame<T> {
2022  /// Add attributes to the decoded frame, if possible.
2023  pub fn add_attributes(&mut self, attributes: T::Attributes) -> Result<(), RedisProtocolError> {
2024    match self {
2025      DecodedFrame::Streaming(inner) => inner.add_attributes(attributes),
2026      DecodedFrame::Complete(inner) => inner.add_attributes(attributes),
2027    }
2028  }
2029
2030  /// Convert the decoded frame to a complete frame, returning an error if a streaming variant is found.
2031  pub fn into_complete_frame(self) -> Result<T, RedisProtocolError> {
2032    match self {
2033      DecodedFrame::Complete(frame) => Ok(frame),
2034      DecodedFrame::Streaming(_) => Err(RedisProtocolError::new(
2035        RedisProtocolErrorKind::DecodeError,
2036        "Expected complete frame.",
2037      )),
2038    }
2039  }
2040
2041  /// Convert the decoded frame into a streaming frame, returning an error if a complete variant is found.
2042  pub fn into_streaming_frame(self) -> Result<StreamedFrame<T>, RedisProtocolError> {
2043    match self {
2044      DecodedFrame::Streaming(frame) => Ok(frame),
2045      DecodedFrame::Complete(_) => Err(RedisProtocolError::new(
2046        RedisProtocolErrorKind::DecodeError,
2047        "Expected streamed frame.",
2048      )),
2049    }
2050  }
2051
2052  /// Whether the decoded frame starts a stream.
2053  pub fn is_streaming(&self) -> bool {
2054    matches!(self, DecodedFrame::Streaming(_))
2055  }
2056
2057  /// Whether the decoded frame is a complete frame.
2058  pub fn is_complete(&self) -> bool {
2059    matches!(self, DecodedFrame::Complete(_))
2060  }
2061}
2062
2063/// A helper struct for reading and managing streaming data types.
2064///
2065/// ```rust
2066/// use redis_protocol::resp3::decode::streaming;
2067///
2068/// fn example() {
2069///   // decode the streamed array `[1,2]` one element at a time
2070///   let parts: Vec<Vec<u8>> = vec![
2071///     "*?\r\n".into(),
2072///     ":1\r\n".into(),
2073///     ":2\r\n".into(),
2074///     ".\r\n".into(),
2075///   ];
2076///
2077///   let (frame, _) = streaming::decode(&parts[0]).unwrap().unwrap();
2078///   assert!(frame.is_streaming());
2079///   let mut streaming = frame.into_streaming_frame().unwrap();
2080///   println!("Reading streaming {:?}", streaming.kind);
2081///
2082///   let (frame, _) = streaming::decode(&parts[1]).unwrap().unwrap();
2083///   assert!(frame.is_complete());
2084///   // add frames to the buffer until we reach the terminating byte sequence
2085///   streaming.add_frame(frame.into_complete_frame().unwrap());
2086///
2087///   let (frame, _) = streaming::decode(&parts[2]).unwrap().unwrap();
2088///   assert!(frame.is_complete());
2089///   streaming.add_frame(frame.into_complete_frame().unwrap());
2090///
2091///   let (frame, _) = streaming::decode(&parts[3]).unwrap().unwrap();
2092///   assert!(frame.is_complete());
2093///   streaming.add_frame(frame.into_complete_frame().unwrap());
2094///
2095///   assert!(streaming.is_finished());
2096///   // convert the buffer into one frame
2097///   let result = streaming.take().unwrap();
2098///
2099///   println!("{:?}", result); // OwnedFrame::Array { data: [1, 2], attributes: None }
2100/// }
2101/// ```
2102#[derive(Debug, Eq, PartialEq)]
2103pub struct StreamedFrame<T: Resp3Frame> {
2104  /// The internal buffer of frames and attributes.
2105  buffer:          VecDeque<T>,
2106  /// Any leading attributes before the stream starts.
2107  attribute_frame: T,
2108  /// The data type being streamed.
2109  pub kind:        FrameKind,
2110}
2111
2112impl<T: Resp3Frame> StreamedFrame<T> {
2113  /// Create a new `StreamedFrame` from the first section of data in a streaming response.
2114  pub fn new(kind: FrameKind) -> Self {
2115    let buffer = VecDeque::new();
2116    StreamedFrame {
2117      buffer,
2118      kind,
2119      attribute_frame: T::new_empty(),
2120    }
2121  }
2122
2123  /// Add the provided attributes to the frame buffer.
2124  pub fn add_attributes(&mut self, attributes: T::Attributes) -> Result<(), RedisProtocolError> {
2125    self.attribute_frame.add_attributes(attributes)
2126  }
2127
2128  /// Convert the internal buffer into one frame matching `self.kind`, clearing the internal buffer.
2129  pub fn take(&mut self) -> Result<T, RedisProtocolError> {
2130    if !self.kind.is_streaming_type() {
2131      // try to catch invalid type errors early so the caller can modify the frame before we clear the buffer
2132      return Err(RedisProtocolError::new(
2133        RedisProtocolErrorKind::DecodeError,
2134        "Only blob strings, sets, maps, and arrays can be streamed.",
2135      ));
2136    }
2137
2138    if self.is_finished() {
2139      // the last frame is an empty chunked string when the stream is finished
2140      self.buffer.pop_back();
2141    }
2142    let buffer = mem::take(&mut self.buffer);
2143    let attributes = self.attribute_frame.take_attributes();
2144    T::from_buffer(self.kind, buffer, attributes)
2145  }
2146
2147  /// Add a frame to the internal buffer.
2148  pub fn add_frame(&mut self, frame: T) {
2149    self.buffer.push_back(frame);
2150  }
2151
2152  /// Whether the last frame represents the terminating sequence at the end of a frame stream.
2153  pub fn is_finished(&self) -> bool {
2154    self.buffer.back().map(|f| f.is_end_stream_frame()).unwrap_or(false)
2155  }
2156}
2157
2158#[cfg(test)]
2159#[cfg(feature = "bytes")]
2160mod tests {
2161  use super::*;
2162  use crate::resp3::utils::new_map;
2163
2164  #[test]
2165  fn should_convert_basic_streaming_buffer_to_frame() {
2166    let mut streaming_buf = StreamedFrame::new(FrameKind::BlobString);
2167    streaming_buf.add_frame((FrameKind::ChunkedString, "foo").try_into().unwrap());
2168    streaming_buf.add_frame((FrameKind::ChunkedString, "bar").try_into().unwrap());
2169    streaming_buf.add_frame((FrameKind::ChunkedString, "baz").try_into().unwrap());
2170    streaming_buf.add_frame(BytesFrame::new_end_stream());
2171
2172    let frame = streaming_buf.take().expect("Failed to build frame from chunked stream");
2173    assert_eq!(frame.as_str(), Some("foobarbaz"));
2174  }
2175
2176  #[test]
2177  fn should_convert_basic_streaming_buffer_to_frame_with_attributes() {
2178    let mut attributes = new_map(0);
2179    attributes.insert((FrameKind::SimpleString, "a").try_into().unwrap(), 1.into());
2180    attributes.insert((FrameKind::SimpleString, "b").try_into().unwrap(), 2.into());
2181    attributes.insert((FrameKind::SimpleString, "c").try_into().unwrap(), 3.into());
2182
2183    let mut streaming_buf = StreamedFrame::new(FrameKind::BlobString);
2184    streaming_buf.add_attributes(attributes.clone()).unwrap();
2185
2186    streaming_buf.add_frame((FrameKind::ChunkedString, "foo").try_into().unwrap());
2187    streaming_buf.add_frame((FrameKind::ChunkedString, "bar").try_into().unwrap());
2188    streaming_buf.add_frame((FrameKind::ChunkedString, "baz").try_into().unwrap());
2189    streaming_buf.add_frame(BytesFrame::new_end_stream());
2190
2191    let frame = streaming_buf.take().expect("Failed to build frame from chunked stream");
2192    assert_eq!(frame.as_str(), Some("foobarbaz"));
2193    assert_eq!(frame.attributes(), Some(&attributes));
2194  }
2195}