redis_protocol/resp2/
types.rs

1use crate::{
2  digits_in_usize,
3  resp2::utils::{bulkstring_encode_len, error_encode_len, integer_encode_len, simplestring_encode_len},
4  resp3::types::OwnedFrame as Resp3OwnedFrame,
5  types::{_Range, PATTERN_PUBSUB_PREFIX, PUBSUB_PREFIX, PUBSUB_PUSH_PREFIX, SHARD_PUBSUB_PREFIX},
6  utils,
7};
8use alloc::{
9  string::{String, ToString},
10  vec::Vec,
11};
12use core::{
13  fmt::Debug,
14  hash::{Hash, Hasher},
15  mem,
16  str,
17};
18
19#[cfg(feature = "convert")]
20use crate::convert::FromResp2;
21#[cfg(any(feature = "bytes", feature = "convert"))]
22use crate::error::RedisProtocolError;
23#[cfg(feature = "bytes")]
24use crate::error::RedisProtocolErrorKind;
25#[cfg(feature = "bytes")]
26use crate::resp3::types::BytesFrame as Resp3BytesFrame;
27#[cfg(feature = "bytes")]
28use bytes::Bytes;
29#[cfg(feature = "bytes")]
30use bytes_utils::Str;
31
32/// Byte prefix before a simple string type.
33pub const SIMPLESTRING_BYTE: u8 = b'+';
34/// Byte prefix before an error type.
35pub const ERROR_BYTE: u8 = b'-';
36/// Byte prefix before an integer type.
37pub const INTEGER_BYTE: u8 = b':';
38/// Byte prefix before a bulk string type.
39pub const BULKSTRING_BYTE: u8 = b'$';
40/// Byte prefix before an array type.
41pub const ARRAY_BYTE: u8 = b'*';
42
43/// The binary representation of NULL in RESP2.
44pub const NULL: &str = "$-1\r\n";
45
46/// An enum representing the type of RESP frame.
47#[derive(Clone, Debug, Eq, PartialEq)]
48pub enum FrameKind {
49  SimpleString,
50  Error,
51  Integer,
52  BulkString,
53  Array,
54  Null,
55}
56
57impl FrameKind {
58  /// Whether the frame is an error.
59  pub fn is_error(&self) -> bool {
60    matches!(self, FrameKind::Error)
61  }
62
63  /// Read the frame type from the RESP2 byte prefix.
64  pub fn from_byte(d: u8) -> Option<FrameKind> {
65    use self::FrameKind::*;
66
67    match d {
68      SIMPLESTRING_BYTE => Some(SimpleString),
69      ERROR_BYTE => Some(Error),
70      INTEGER_BYTE => Some(Integer),
71      BULKSTRING_BYTE => Some(BulkString),
72      ARRAY_BYTE => Some(Array),
73      _ => None,
74    }
75  }
76
77  /// Convert to the associated RESP2 byte prefix.
78  pub fn to_byte(&self) -> u8 {
79    use self::FrameKind::*;
80
81    match *self {
82      SimpleString => SIMPLESTRING_BYTE,
83      Error => ERROR_BYTE,
84      Integer => INTEGER_BYTE,
85      BulkString | Null => BULKSTRING_BYTE,
86      Array => ARRAY_BYTE,
87    }
88  }
89
90  /// A function used to differentiate data types that may have the same inner binary representation when hashing.
91  pub fn hash_prefix(&self) -> &'static str {
92    use self::FrameKind::*;
93
94    match *self {
95      Array => "a",
96      BulkString => "b",
97      SimpleString => "s",
98      Error => "e",
99      Integer => "i",
100      Null => "N",
101    }
102  }
103}
104
105/// A borrowed frame variant, typically used for encoding use cases.
106#[derive(Debug, Eq, PartialEq)]
107pub enum BorrowedFrame<'a> {
108  /// A RESP2 simple string.
109  SimpleString(&'a [u8]),
110  /// A short string representing an error.
111  Error(&'a str),
112  /// A signed 64-bit integer.
113  Integer(i64),
114  /// A byte array.
115  BulkString(&'a [u8]),
116  /// An array of frames,
117  Array(&'a [BorrowedFrame<'a>]),
118  /// A null value.
119  Null,
120}
121
122impl<'a> Clone for BorrowedFrame<'a> {
123  fn clone(self: &BorrowedFrame<'a>) -> BorrowedFrame<'a> {
124    match self {
125      BorrowedFrame::Null => BorrowedFrame::Null,
126      BorrowedFrame::Array(a) => BorrowedFrame::Array(a),
127      BorrowedFrame::BulkString(b) => BorrowedFrame::BulkString(b),
128      BorrowedFrame::SimpleString(s) => BorrowedFrame::SimpleString(s),
129      BorrowedFrame::Error(e) => BorrowedFrame::Error(e),
130      BorrowedFrame::Integer(i) => BorrowedFrame::Integer(*i),
131    }
132  }
133}
134
135impl<'a> Hash for BorrowedFrame<'a> {
136  fn hash<H: Hasher>(&self, state: &mut H) {
137    self.kind().hash_prefix().hash(state);
138
139    match self {
140      BorrowedFrame::SimpleString(b) | BorrowedFrame::BulkString(b) => b.hash(state),
141      BorrowedFrame::Error(s) => s.hash(state),
142      BorrowedFrame::Integer(i) => i.hash(state),
143      BorrowedFrame::Array(f) => f.iter().for_each(|f| f.hash(state)),
144      BorrowedFrame::Null => NULL.hash(state),
145    }
146  }
147}
148
149impl<'a> BorrowedFrame<'a> {
150  /// Read the `FrameKind` value for this frame.
151  pub fn kind(&self) -> FrameKind {
152    match self {
153      BorrowedFrame::SimpleString(_) => FrameKind::SimpleString,
154      BorrowedFrame::Error(_) => FrameKind::Error,
155      BorrowedFrame::Integer(_) => FrameKind::Integer,
156      BorrowedFrame::BulkString(_) => FrameKind::BulkString,
157      BorrowedFrame::Array(_) => FrameKind::Array,
158      BorrowedFrame::Null => FrameKind::Null,
159    }
160  }
161
162  /// Return the number of bytes needed to encode this frame.
163  ///
164  /// The `int_as_bulkstring` argument controls whether integers should be encoded as bulk strings, which is how Redis
165  /// expects integers in practice.
166  pub fn encode_len(&self, int_as_bulkstring: bool) -> usize {
167    match self {
168      BorrowedFrame::BulkString(b) => bulkstring_encode_len(b),
169      BorrowedFrame::Array(frames) => frames.iter().fold(1 + digits_in_usize(frames.len()) + 2, |m, f| {
170        m + f.encode_len(int_as_bulkstring)
171      }),
172      BorrowedFrame::Null => NULL.as_bytes().len(),
173      BorrowedFrame::SimpleString(s) => simplestring_encode_len(s),
174      BorrowedFrame::Error(s) => error_encode_len(s),
175      BorrowedFrame::Integer(i) => integer_encode_len(*i, int_as_bulkstring),
176    }
177  }
178}
179
180/// A reference-free frame type representing ranges into an associated buffer, typically used to implement zero-copy
181/// parsing.
182#[derive(Clone, Debug, Eq, PartialEq)]
183pub enum RangeFrame {
184  /// A RESP2 simple string.
185  SimpleString(_Range),
186  /// A short string representing an error.
187  Error(_Range),
188  /// A signed 64-bit integer.
189  Integer(i64),
190  /// A byte array.
191  BulkString(_Range),
192  /// An array of frames.
193  Array(Vec<RangeFrame>),
194  /// A null value.
195  Null,
196}
197
198/// Generic operations on a RESP2 frame.
199pub trait Resp2Frame: Debug + Hash + Eq {
200  /// Replace `self` with Null, returning the original value.
201  fn take(&mut self) -> Self;
202
203  /// Read the `FrameKind` value for this frame.
204  fn kind(&self) -> FrameKind;
205
206  /// Attempt to read the frame value as a string slice.
207  fn as_str(&self) -> Option<&str>;
208
209  /// Convert the value to a boolean.
210  fn as_bool(&self) -> Option<bool>;
211
212  /// Attempt to read the frame value as a byte slice.
213  fn as_bytes(&self) -> Option<&[u8]>;
214
215  /// Copy and read the inner value as a string, if possible.
216  fn to_string(&self) -> Option<String>;
217
218  /// Read the number of bytes needed to encode this frame.
219  fn encode_len(&self, int_as_bulkstring: bool) -> usize;
220
221  /// Whether the frame is a message from a `subscribe` call.
222  fn is_normal_pubsub_message(&self) -> bool;
223
224  /// Whether the frame is a message from a `psubscribe` call.
225  fn is_pattern_pubsub_message(&self) -> bool;
226
227  /// Whether the frame is a message from a `ssubscribe` call.
228  fn is_shard_pubsub_message(&self) -> bool;
229
230  /// Whether the frame is a `MOVED` or `ASK` redirection.
231  fn is_redirection(&self) -> bool {
232    self.as_str().map(utils::is_redirection).unwrap_or(false)
233  }
234
235  /// Convert the frame to a double.
236  fn as_f64(&self) -> Option<f64> {
237    self.as_str().and_then(|s| s.parse::<f64>().ok())
238  }
239
240  /// Convert the frame to another type.
241  #[cfg(feature = "convert")]
242  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
243  fn convert<T>(self) -> Result<T, RedisProtocolError>
244  where
245    Self: Sized,
246    T: FromResp2<Self>,
247  {
248    T::from_frame(self)
249  }
250
251  /// Whether frame is a bulk array with a single element.
252  #[cfg(feature = "convert")]
253  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
254  fn is_single_element_vec(&self) -> bool;
255
256  /// Pop an element from the inner array or return the original frame.
257  ///
258  /// This function is intended to be used with [Self::is_single_element_vec] and may panic.
259  #[cfg(feature = "convert")]
260  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
261  fn pop_or_take(self) -> Self;
262}
263
264/// A RESP2 frame.
265#[derive(Clone, Debug, Eq, PartialEq)]
266pub enum OwnedFrame {
267  /// A RESP2 simple string.
268  SimpleString(Vec<u8>),
269  /// A short string representing an error.
270  Error(String),
271  /// A signed 64-bit integer.
272  Integer(i64),
273  /// A byte array.
274  BulkString(Vec<u8>),
275  /// An array of frames,
276  Array(Vec<OwnedFrame>),
277  /// A null value.
278  Null,
279}
280
281impl OwnedFrame {
282  /// Move the frame contents into a new [BytesFrame].
283  #[cfg(feature = "bytes")]
284  #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
285  pub fn into_bytes_frame(self) -> BytesFrame {
286    match self {
287      OwnedFrame::SimpleString(s) => BytesFrame::SimpleString(s.into()),
288      OwnedFrame::Error(e) => BytesFrame::Error(e.into()),
289      OwnedFrame::Integer(i) => BytesFrame::Integer(i),
290      OwnedFrame::BulkString(b) => BytesFrame::BulkString(b.into()),
291      OwnedFrame::Array(frames) => {
292        BytesFrame::Array(frames.into_iter().map(|frame| frame.into_bytes_frame()).collect())
293      },
294      OwnedFrame::Null => BytesFrame::Null,
295    }
296  }
297
298  /// Convert the frame to RESP3.
299  ///
300  /// For the most part RESP3 is a superset of RESP2, but there is one notable difference with publish-subscribe
301  /// messages - in RESP3 pubsub arrays have an additional prefix frame with the SimpleString value "pubsub". This
302  /// function will add this frame.
303  pub fn into_resp3(self) -> Resp3OwnedFrame {
304    let is_pubsub =
305      self.is_normal_pubsub_message() || self.is_pattern_pubsub_message() || self.is_shard_pubsub_message();
306    if is_pubsub {
307      let prefix = Resp3OwnedFrame::SimpleString {
308        data:       PUBSUB_PUSH_PREFIX.as_bytes().to_vec(),
309        attributes: None,
310      };
311      let frames = match self {
312        OwnedFrame::Array(inner) => {
313          let mut buf = Vec::with_capacity(inner.len() + 1);
314          buf.push(prefix);
315          buf.extend(inner.into_iter().map(|f| f.into_resp3()));
316          buf
317        },
318        _ => unreachable!(),
319      };
320
321      Resp3OwnedFrame::Push {
322        data:       frames,
323        attributes: None,
324      }
325    } else {
326      match self {
327        OwnedFrame::SimpleString(data) => Resp3OwnedFrame::SimpleString { data, attributes: None },
328        OwnedFrame::Error(data) => Resp3OwnedFrame::SimpleError { data, attributes: None },
329        OwnedFrame::Integer(data) => Resp3OwnedFrame::Number { data, attributes: None },
330        OwnedFrame::BulkString(data) => Resp3OwnedFrame::BlobString { data, attributes: None },
331        OwnedFrame::Array(frames) => Resp3OwnedFrame::Array {
332          data:       frames.into_iter().map(|f| f.into_resp3()).collect(),
333          attributes: None,
334        },
335        OwnedFrame::Null => Resp3OwnedFrame::Null,
336      }
337    }
338  }
339}
340
341impl Hash for OwnedFrame {
342  fn hash<H: Hasher>(&self, state: &mut H) {
343    self.kind().hash_prefix().hash(state);
344
345    match self {
346      OwnedFrame::SimpleString(b) | OwnedFrame::BulkString(b) => b.hash(state),
347      OwnedFrame::Error(s) => s.hash(state),
348      OwnedFrame::Integer(i) => i.hash(state),
349      OwnedFrame::Array(f) => f.iter().for_each(|f| f.hash(state)),
350      OwnedFrame::Null => NULL.hash(state),
351    }
352  }
353}
354
355impl Resp2Frame for OwnedFrame {
356  fn encode_len(&self, int_as_bulkstring: bool) -> usize {
357    match self {
358      OwnedFrame::BulkString(b) => bulkstring_encode_len(b),
359      OwnedFrame::Array(frames) => frames.iter().fold(1 + digits_in_usize(frames.len()) + 2, |m, f| {
360        m + f.encode_len(int_as_bulkstring)
361      }),
362      OwnedFrame::Null => NULL.as_bytes().len(),
363      OwnedFrame::SimpleString(s) => simplestring_encode_len(s),
364      OwnedFrame::Error(s) => error_encode_len(s),
365      OwnedFrame::Integer(i) => integer_encode_len(*i, int_as_bulkstring),
366    }
367  }
368
369  fn take(&mut self) -> OwnedFrame {
370    mem::replace(self, OwnedFrame::Null)
371  }
372
373  fn kind(&self) -> FrameKind {
374    match self {
375      OwnedFrame::SimpleString(_) => FrameKind::SimpleString,
376      OwnedFrame::Error(_) => FrameKind::Error,
377      OwnedFrame::Integer(_) => FrameKind::Integer,
378      OwnedFrame::BulkString(_) => FrameKind::BulkString,
379      OwnedFrame::Array(_) => FrameKind::Array,
380      OwnedFrame::Null => FrameKind::Null,
381    }
382  }
383
384  fn as_str(&self) -> Option<&str> {
385    match self {
386      OwnedFrame::BulkString(b) => str::from_utf8(b).ok(),
387      OwnedFrame::SimpleString(s) => str::from_utf8(s).ok(),
388      OwnedFrame::Error(s) => Some(s),
389      _ => None,
390    }
391  }
392
393  fn as_bool(&self) -> Option<bool> {
394    match self {
395      OwnedFrame::BulkString(b) | OwnedFrame::SimpleString(b) => utils::bytes_to_bool(b),
396      OwnedFrame::Integer(0) => Some(false),
397      OwnedFrame::Integer(1) => Some(true),
398      OwnedFrame::Null => Some(false),
399      _ => None,
400    }
401  }
402
403  fn as_bytes(&self) -> Option<&[u8]> {
404    Some(match self {
405      OwnedFrame::BulkString(b) => b,
406      OwnedFrame::SimpleString(s) => s,
407      OwnedFrame::Error(s) => s.as_bytes(),
408      _ => return None,
409    })
410  }
411
412  fn to_string(&self) -> Option<String> {
413    match self {
414      OwnedFrame::BulkString(b) | OwnedFrame::SimpleString(b) => String::from_utf8(b.to_vec()).ok(),
415      OwnedFrame::Error(b) => Some(b.clone()),
416      OwnedFrame::Integer(i) => Some(i.to_string()),
417      _ => None,
418    }
419  }
420
421  fn is_normal_pubsub_message(&self) -> bool {
422    // format is ["message", <channel>, <message>]
423    match self {
424      OwnedFrame::Array(data) => {
425        data.len() == 3
426          && data[0].kind() == FrameKind::BulkString
427          && data[0].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false)
428      },
429      _ => false,
430    }
431  }
432
433  fn is_pattern_pubsub_message(&self) -> bool {
434    // format is ["pmessage", <pattern>, <channel>, <message>]
435    match self {
436      OwnedFrame::Array(data) => {
437        data.len() == 4
438          && data[0].kind() == FrameKind::BulkString
439          && data[0].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false)
440      },
441      _ => false,
442    }
443  }
444
445  fn is_shard_pubsub_message(&self) -> bool {
446    // format is ["smessage", <channel>, <message>]
447    match self {
448      OwnedFrame::Array(data) => {
449        data.len() == 3
450          && data[0].kind() == FrameKind::BulkString
451          && data[0].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false)
452      },
453      _ => false,
454    }
455  }
456
457  #[cfg(feature = "convert")]
458  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
459  fn is_single_element_vec(&self) -> bool {
460    if let OwnedFrame::Array(values) = self {
461      values.len() == 1
462    } else {
463      false
464    }
465  }
466
467  #[cfg(feature = "convert")]
468  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
469  fn pop_or_take(self) -> Self {
470    if let OwnedFrame::Array(mut values) = self {
471      values.pop().unwrap().pop_or_take()
472    } else {
473      self
474    }
475  }
476}
477
478/// A RESP2 frame that uses [Bytes](bytes::Bytes) and [Str](bytes_utils::Str) as the underlying buffer type.
479#[cfg(feature = "bytes")]
480#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
481#[derive(Clone, Debug, Eq, PartialEq)]
482pub enum BytesFrame {
483  /// A RESP2 simple string.
484  SimpleString(Bytes),
485  /// A short string representing an error.
486  Error(Str),
487  /// A signed 64-bit integer.
488  Integer(i64),
489  /// A byte array.
490  BulkString(Bytes),
491  /// An array of frames.
492  Array(Vec<BytesFrame>),
493  /// A null value.
494  Null,
495}
496
497#[cfg(feature = "bytes")]
498impl Hash for BytesFrame {
499  fn hash<H: Hasher>(&self, state: &mut H) {
500    self.kind().hash_prefix().hash(state);
501
502    match self {
503      BytesFrame::SimpleString(b) | BytesFrame::BulkString(b) => b.hash(state),
504      BytesFrame::Error(s) => s.hash(state),
505      BytesFrame::Integer(i) => i.hash(state),
506      BytesFrame::Array(f) => f.iter().for_each(|f| f.hash(state)),
507      BytesFrame::Null => NULL.hash(state),
508    }
509  }
510}
511
512#[cfg(feature = "bytes")]
513impl Resp2Frame for BytesFrame {
514  fn encode_len(&self, int_as_bulkstring: bool) -> usize {
515    match self {
516      BytesFrame::BulkString(b) => bulkstring_encode_len(b),
517      BytesFrame::Array(frames) => frames.iter().fold(1 + digits_in_usize(frames.len()) + 2, |m, f| {
518        m + f.encode_len(int_as_bulkstring)
519      }),
520      BytesFrame::Null => NULL.as_bytes().len(),
521      BytesFrame::SimpleString(s) => simplestring_encode_len(s),
522      BytesFrame::Error(s) => error_encode_len(s),
523      BytesFrame::Integer(i) => integer_encode_len(*i, int_as_bulkstring),
524    }
525  }
526
527  fn take(&mut self) -> BytesFrame {
528    mem::replace(self, BytesFrame::Null)
529  }
530
531  fn kind(&self) -> FrameKind {
532    match self {
533      BytesFrame::SimpleString(_) => FrameKind::SimpleString,
534      BytesFrame::Error(_) => FrameKind::Error,
535      BytesFrame::Integer(_) => FrameKind::Integer,
536      BytesFrame::BulkString(_) => FrameKind::BulkString,
537      BytesFrame::Array(_) => FrameKind::Array,
538      BytesFrame::Null => FrameKind::Null,
539    }
540  }
541
542  fn as_str(&self) -> Option<&str> {
543    match self {
544      BytesFrame::BulkString(b) => str::from_utf8(b).ok(),
545      BytesFrame::SimpleString(s) => str::from_utf8(s).ok(),
546      BytesFrame::Error(s) => Some(s),
547      _ => None,
548    }
549  }
550
551  fn as_bool(&self) -> Option<bool> {
552    match self {
553      BytesFrame::BulkString(b) | BytesFrame::SimpleString(b) => utils::bytes_to_bool(b),
554      BytesFrame::Integer(0) => Some(false),
555      BytesFrame::Integer(1) => Some(true),
556      BytesFrame::Null => Some(false),
557      _ => None,
558    }
559  }
560
561  fn as_bytes(&self) -> Option<&[u8]> {
562    Some(match self {
563      BytesFrame::BulkString(b) => b,
564      BytesFrame::SimpleString(s) => s,
565      BytesFrame::Error(s) => s.as_bytes(),
566      _ => return None,
567    })
568  }
569
570  fn to_string(&self) -> Option<String> {
571    match self {
572      BytesFrame::BulkString(b) | BytesFrame::SimpleString(b) => String::from_utf8(b.to_vec()).ok(),
573      BytesFrame::Error(b) => Some(b.to_string()),
574      BytesFrame::Integer(i) => Some(i.to_string()),
575      _ => None,
576    }
577  }
578
579  fn is_normal_pubsub_message(&self) -> bool {
580    // format is ["message", <channel>, <message>]
581    match self {
582      BytesFrame::Array(data) => {
583        data.len() == 3
584          && data[0].kind() == FrameKind::BulkString
585          && data[0].as_str().map(|s| s == PUBSUB_PREFIX).unwrap_or(false)
586      },
587      _ => false,
588    }
589  }
590
591  fn is_pattern_pubsub_message(&self) -> bool {
592    // format is ["pmessage", <pattern>, <channel>, <message>]
593    match self {
594      BytesFrame::Array(data) => {
595        data.len() == 4
596          && data[0].kind() == FrameKind::BulkString
597          && data[0].as_str().map(|s| s == PATTERN_PUBSUB_PREFIX).unwrap_or(false)
598      },
599      _ => false,
600    }
601  }
602
603  fn is_shard_pubsub_message(&self) -> bool {
604    // format is ["smessage", <channel>, <message>]
605    match self {
606      BytesFrame::Array(data) => {
607        data.len() == 3
608          && data[0].kind() == FrameKind::BulkString
609          && data[0].as_str().map(|s| s == SHARD_PUBSUB_PREFIX).unwrap_or(false)
610      },
611      _ => false,
612    }
613  }
614
615  #[cfg(feature = "convert")]
616  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
617  fn is_single_element_vec(&self) -> bool {
618    if let BytesFrame::Array(values) = self {
619      values.len() == 1
620    } else {
621      false
622    }
623  }
624
625  #[cfg(feature = "convert")]
626  #[cfg_attr(docsrs, doc(cfg(feature = "convert")))]
627  fn pop_or_take(self) -> Self {
628    if let BytesFrame::Array(mut values) = self {
629      values.pop().unwrap().pop_or_take()
630    } else {
631      self
632    }
633  }
634}
635
636#[cfg(feature = "bytes")]
637impl BytesFrame {
638  /// Copy the contents into a new [OwnedFrame].
639  pub fn to_owned_frame(&self) -> OwnedFrame {
640    match self {
641      BytesFrame::SimpleString(s) => OwnedFrame::SimpleString(s.to_vec()),
642      BytesFrame::Error(e) => OwnedFrame::Error(e.to_string()),
643      BytesFrame::Integer(i) => OwnedFrame::Integer(*i),
644      BytesFrame::BulkString(b) => OwnedFrame::BulkString(b.to_vec()),
645      BytesFrame::Array(frames) => OwnedFrame::Array(frames.iter().map(|f| f.to_owned_frame()).collect()),
646      BytesFrame::Null => OwnedFrame::Null,
647    }
648  }
649
650  /// Convert the frame to RESP3.
651  ///
652  /// For the most part RESP3 is a superset of RESP2, but there is one notable difference with publish-subscribe
653  /// messages - in RESP3 pubsub arrays have an additional prefix frame with the SimpleString value "pubsub". This
654  /// function will add this frame.
655  pub fn into_resp3(self) -> Resp3BytesFrame {
656    let is_pubsub =
657      self.is_normal_pubsub_message() || self.is_pattern_pubsub_message() || self.is_shard_pubsub_message();
658    if is_pubsub {
659      let prefix = Resp3BytesFrame::SimpleString {
660        data:       Bytes::from_static(PUBSUB_PUSH_PREFIX.as_bytes()),
661        attributes: None,
662      };
663      let frames = match self {
664        BytesFrame::Array(inner) => {
665          let mut buf = Vec::with_capacity(inner.len() + 1);
666          buf.push(prefix);
667          buf.extend(inner.into_iter().map(|f| f.into_resp3()));
668          buf
669        },
670        _ => unreachable!(),
671      };
672
673      Resp3BytesFrame::Push {
674        data:       frames,
675        attributes: None,
676      }
677    } else {
678      match self {
679        BytesFrame::SimpleString(data) => Resp3BytesFrame::SimpleString { data, attributes: None },
680        BytesFrame::Error(data) => Resp3BytesFrame::SimpleError { data, attributes: None },
681        BytesFrame::Integer(data) => Resp3BytesFrame::Number { data, attributes: None },
682        BytesFrame::BulkString(data) => Resp3BytesFrame::BlobString { data, attributes: None },
683        BytesFrame::Array(frames) => Resp3BytesFrame::Array {
684          data:       frames.into_iter().map(|f| f.into_resp3()).collect(),
685          attributes: None,
686        },
687        BytesFrame::Null => Resp3BytesFrame::Null,
688      }
689    }
690  }
691}
692
693#[cfg(feature = "bytes")]
694impl<B: Into<Bytes>> TryFrom<(FrameKind, B)> for BytesFrame {
695  type Error = RedisProtocolError;
696
697  fn try_from((kind, buf): (FrameKind, B)) -> Result<Self, RedisProtocolError> {
698    Ok(match kind {
699      FrameKind::SimpleString => BytesFrame::SimpleString(buf.into()),
700      FrameKind::Error => BytesFrame::Error(Str::from_inner(buf.into())?),
701      FrameKind::BulkString => BytesFrame::BulkString(buf.into()),
702      FrameKind::Null => BytesFrame::Null,
703      _ => {
704        return Err(RedisProtocolError::new(
705          RedisProtocolErrorKind::Unknown,
706          "Cannot convert to frame.",
707        ))
708      },
709    })
710  }
711}
712
713#[cfg(test)]
714#[cfg(feature = "bytes")]
715mod tests {
716  use super::*;
717
718  #[test]
719  fn should_parse_pattern_pubsub_message() {
720    let frame = BytesFrame::Array(vec![
721      BytesFrame::BulkString("pmessage".into()),
722      BytesFrame::BulkString("fo*".into()),
723      BytesFrame::BulkString("foo".into()),
724      BytesFrame::BulkString("bar".into()),
725    ]);
726    assert!(frame.is_pattern_pubsub_message());
727  }
728
729  #[test]
730  fn should_parse_normal_pubsub_message() {
731    let frame = BytesFrame::Array(vec![
732      BytesFrame::BulkString("message".into()),
733      BytesFrame::BulkString("foo".into()),
734      BytesFrame::BulkString("bar".into()),
735    ]);
736    assert!(frame.is_normal_pubsub_message());
737  }
738
739  #[test]
740  fn should_parse_shard_pubsub_message() {
741    let frame = BytesFrame::Array(vec![
742      BytesFrame::BulkString("smessage".into()),
743      BytesFrame::BulkString("foo".into()),
744      BytesFrame::BulkString("bar".into()),
745    ]);
746    assert!(frame.is_shard_pubsub_message());
747  }
748
749  #[test]
750  fn should_decode_frame_kind_byte() {
751    assert_eq!(FrameKind::from_byte(SIMPLESTRING_BYTE), Some(FrameKind::SimpleString));
752    assert_eq!(FrameKind::from_byte(ERROR_BYTE), Some(FrameKind::Error));
753    assert_eq!(FrameKind::from_byte(BULKSTRING_BYTE), Some(FrameKind::BulkString));
754    assert_eq!(FrameKind::from_byte(INTEGER_BYTE), Some(FrameKind::Integer));
755    assert_eq!(FrameKind::from_byte(ARRAY_BYTE), Some(FrameKind::Array));
756  }
757
758  #[test]
759  fn should_encode_frame_kind_byte() {
760    assert_eq!(FrameKind::SimpleString.to_byte(), SIMPLESTRING_BYTE);
761    assert_eq!(FrameKind::Error.to_byte(), ERROR_BYTE);
762    assert_eq!(FrameKind::BulkString.to_byte(), BULKSTRING_BYTE);
763    assert_eq!(FrameKind::Integer.to_byte(), INTEGER_BYTE);
764    assert_eq!(FrameKind::Array.to_byte(), ARRAY_BYTE);
765  }
766}