redis_protocol/resp2/
types.rs

1use crate::resp2::utils as resp2_utils;
2use crate::types::{Redirection, RedisProtocolError};
3use crate::utils;
4use alloc::string::String;
5use alloc::vec::Vec;
6use bytes::Bytes;
7use bytes_utils::Str;
8use core::mem;
9use core::str;
10
11/// Byte prefix before a simple string type.
12pub const SIMPLESTRING_BYTE: u8 = b'+';
13/// Byte prefix before an error type.
14pub const ERROR_BYTE: u8 = b'-';
15/// Byte prefix before an integer type.
16pub const INTEGER_BYTE: u8 = b':';
17/// Byte prefix before a bulk string type.
18pub const BULKSTRING_BYTE: u8 = b'$';
19/// Byte prefix before an array type.
20pub const ARRAY_BYTE: u8 = b'*';
21
22/// The binary representation of NULL in RESP2.
23pub const NULL: &'static str = "$-1\r\n";
24
25pub use crate::utils::{PATTERN_PUBSUB_PREFIX, PUBSUB_PREFIX};
26
27/// An enum representing the kind of a Frame without references to any inner data.
28#[derive(Clone, Debug, Eq, PartialEq)]
29pub enum FrameKind {
30  SimpleString,
31  Error,
32  Integer,
33  BulkString,
34  Array,
35  Null,
36}
37
38impl FrameKind {
39  pub fn from_byte(d: u8) -> Option<FrameKind> {
40    use self::FrameKind::*;
41
42    match d {
43      SIMPLESTRING_BYTE => Some(SimpleString),
44      ERROR_BYTE => Some(Error),
45      INTEGER_BYTE => Some(Integer),
46      BULKSTRING_BYTE => Some(BulkString),
47      ARRAY_BYTE => Some(Array),
48      _ => None,
49    }
50  }
51
52  pub fn to_byte(&self) -> u8 {
53    use self::FrameKind::*;
54
55    match *self {
56      SimpleString => SIMPLESTRING_BYTE,
57      Error => ERROR_BYTE,
58      Integer => INTEGER_BYTE,
59      BulkString | Null => BULKSTRING_BYTE,
60      Array => ARRAY_BYTE,
61    }
62  }
63}
64
65/// An enum representing a Frame of data.
66#[derive(Clone, Debug, Eq, PartialEq)]
67pub enum Frame {
68  /// A short non binary-safe string.
69  SimpleString(Bytes),
70  /// A short non binary-safe string representing an error.
71  Error(Str),
72  /// A signed 64 bit integer.
73  Integer(i64),
74  /// A binary-safe string.
75  BulkString(Bytes),
76  /// An array of frames, arbitrarily nested.
77  Array(Vec<Frame>),
78  /// A null value.
79  Null,
80}
81
82impl Frame {
83  /// Replace `self` with Null, returning the original value.
84  pub fn take(&mut self) -> Frame {
85    mem::replace(self, Frame::Null)
86  }
87
88  /// Whether or not the frame is an error.
89  pub fn is_error(&self) -> bool {
90    match self.kind() {
91      FrameKind::Error => true,
92      _ => false,
93    }
94  }
95
96  /// Whether or not the frame represents a publish-subscribe message, but not a pattern publish-subscribe message.
97  pub fn is_normal_pubsub(&self) -> bool {
98    if let Frame::Array(ref frames) = *self {
99      resp2_utils::is_normal_pubsub(frames)
100    } else {
101      false
102    }
103  }
104
105  /// Whether or not the frame represents a message on a publish-subscribe channel.
106  pub fn is_pubsub_message(&self) -> bool {
107    if let Frame::Array(ref frames) = *self {
108      resp2_utils::is_normal_pubsub(frames) || resp2_utils::is_pattern_pubsub(frames)
109    } else {
110      false
111    }
112  }
113
114  /// Whether or not the frame represents a message on a publish-subscribe channel matched against a pattern subscription.
115  pub fn is_pattern_pubsub_message(&self) -> bool {
116    if let Frame::Array(ref frames) = *self {
117      resp2_utils::is_pattern_pubsub(frames)
118    } else {
119      false
120    }
121  }
122
123  /// Read the `FrameKind` value for this frame.
124  pub fn kind(&self) -> FrameKind {
125    match *self {
126      Frame::SimpleString(_) => FrameKind::SimpleString,
127      Frame::Error(_) => FrameKind::Error,
128      Frame::Integer(_) => FrameKind::Integer,
129      Frame::BulkString(_) => FrameKind::BulkString,
130      Frame::Array(_) => FrameKind::Array,
131      Frame::Null => FrameKind::Null,
132    }
133  }
134
135  /// Attempt to read the frame value as a string slice without allocating.
136  pub fn as_str(&self) -> Option<&str> {
137    match *self {
138      Frame::BulkString(ref b) => str::from_utf8(b).ok(),
139      Frame::SimpleString(ref s) => str::from_utf8(s).ok(),
140      Frame::Error(ref s) => Some(s),
141      _ => None,
142    }
143  }
144
145  /// Whether or not the frame is a simple string or bulk string.
146  pub fn is_string(&self) -> bool {
147    match *self {
148      Frame::SimpleString(_) | Frame::BulkString(_) => true,
149      _ => false,
150    }
151  }
152
153  /// Whether or not the frame is Null.
154  pub fn is_null(&self) -> bool {
155    match *self {
156      Frame::Null => true,
157      _ => false,
158    }
159  }
160
161  /// Whether or not the frame is an array of frames.
162  pub fn is_array(&self) -> bool {
163    match *self {
164      Frame::Array(_) => true,
165      _ => false,
166    }
167  }
168
169  /// Whether or not the frame is an integer.
170  pub fn is_integer(&self) -> bool {
171    match *self {
172      Frame::Integer(_) => true,
173      _ => false,
174    }
175  }
176
177  /// Whether or not the framed is a a Moved or Ask error.
178  pub fn is_moved_or_ask_error(&self) -> bool {
179    match *self {
180      Frame::Error(ref s) => utils::is_cluster_error(s),
181      _ => false,
182    }
183  }
184
185  // Copy and read the inner value as a string, if possible.
186  pub fn to_string(&self) -> Option<String> {
187    match *self {
188      Frame::BulkString(ref b) | Frame::SimpleString(ref b) => String::from_utf8(b.to_vec()).ok(),
189      _ => None,
190    }
191  }
192
193  /// Attempt to parse the frame as a publish-subscribe message, returning the `(channel, message)` tuple
194  /// if successful, or the original frame if the inner data is not a publish-subscribe message.
195  pub fn parse_as_pubsub(self) -> Result<(String, String), Self> {
196    if self.is_pubsub_message() {
197      // if `is_pubsub_message` returns true but this panics then there's a bug in `is_pubsub_message`, so this fails loudly
198      let (message, channel, _) = match self {
199        Frame::Array(mut frames) => (
200          resp2_utils::opt_frame_to_string_panic(frames.pop(), "Expected pubsub payload. This is a bug."),
201          resp2_utils::opt_frame_to_string_panic(frames.pop(), "Expected pubsub channel. This is a bug."),
202          resp2_utils::opt_frame_to_string_panic(frames.pop(), "Expected pubsub message kind. This is a bug."),
203        ),
204        _ => panic!("Unreachable 1. This is a bug."),
205      };
206
207      Ok((channel, message))
208    } else {
209      Err(self)
210    }
211  }
212
213  /// Attempt to parse the frame as a cluster redirection.
214  pub fn to_redirection(&self) -> Option<Redirection> {
215    match *self {
216      Frame::Error(ref s) => utils::read_cluster_error(s),
217      _ => None,
218    }
219  }
220
221  /// Attempt to read the number of bytes needed to encode this frame.
222  pub fn encode_len(&self) -> Result<usize, RedisProtocolError> {
223    resp2_utils::encode_len(self).map_err(|e| e.into())
224  }
225}
226
227impl From<Redirection> for Frame {
228  fn from(redirection: Redirection) -> Self {
229    redirection.to_resp2_frame()
230  }
231}
232
233impl<'a> From<&'a Redirection> for Frame {
234  fn from(redirection: &'a Redirection) -> Self {
235    redirection.to_resp2_frame()
236  }
237}
238
239#[cfg(test)]
240mod tests {
241  use super::*;
242
243  #[test]
244  fn should_convert_ask_redirection_to_frame() {
245    let redirection = Redirection::Ask {
246      slot: 3999,
247      server: "127.0.0.1:6381".into(),
248    };
249    let frame = Frame::Error("ASK 3999 127.0.0.1:6381".into());
250
251    assert_eq!(Frame::from(redirection), frame);
252  }
253
254  #[test]
255  fn should_convert_moved_redirection_to_frame() {
256    let redirection = Redirection::Moved {
257      slot: 3999,
258      server: "127.0.0.1:6381".into(),
259    };
260    let frame = Frame::Error("MOVED 3999 127.0.0.1:6381".into());
261
262    assert_eq!(Frame::from(redirection), frame);
263  }
264
265  #[test]
266  fn should_convert_frame_to_redirection_moved() {
267    let redirection = Redirection::Moved {
268      slot: 3999,
269      server: "127.0.0.1:6381".into(),
270    };
271    let frame = Frame::Error("MOVED 3999 127.0.0.1:6381".into());
272
273    assert_eq!(frame.to_redirection().unwrap(), redirection);
274  }
275
276  #[test]
277  fn should_convert_frame_to_redirection_ask() {
278    let redirection = Redirection::Ask {
279      slot: 3999,
280      server: "127.0.0.1:6381".into(),
281    };
282    let frame = Frame::Error("ASK 3999 127.0.0.1:6381".into());
283
284    assert_eq!(frame.to_redirection().unwrap(), redirection);
285  }
286
287  #[test]
288  #[should_panic]
289  fn should_convert_frame_to_redirection_error() {
290    let redirection = Redirection::Ask {
291      slot: 3999,
292      server: "127.0.0.1:6381".into(),
293    };
294    let frame = Frame::BulkString("ASK 3999 127.0.0.1:6381".into());
295
296    assert_eq!(frame.to_redirection().unwrap(), redirection);
297  }
298
299  #[test]
300  #[should_panic]
301  fn should_convert_frame_to_redirection_error_invalid_1() {
302    let f1 = Frame::Error("abc def".into());
303    let _ = f1.to_redirection().unwrap();
304  }
305
306  #[test]
307  #[should_panic]
308  fn should_convert_frame_to_redirection_error_invalid_2() {
309    let f2 = Frame::Error("abc def ghi".into());
310    let _ = f2.to_redirection().unwrap();
311  }
312
313  #[test]
314  #[should_panic]
315  fn should_convert_frame_to_redirection_error_invalid_3() {
316    let f3 = Frame::Error("MOVED abc def".into());
317    let _ = f3.to_redirection().unwrap();
318  }
319
320  #[test]
321  fn should_parse_pattern_pubsub_message() {
322    let frames = vec![
323      Frame::BulkString("pmessage".into()),
324      Frame::BulkString("fo*".into()),
325      Frame::BulkString("foo".into()),
326      Frame::BulkString("bar".into()),
327    ];
328    assert!(resp2_utils::is_pattern_pubsub(&frames));
329    let frame = Frame::Array(frames);
330
331    let (channel, message) = frame.parse_as_pubsub().expect("Expected pubsub frames");
332
333    assert_eq!(channel, "foo");
334    assert_eq!(message, "bar");
335  }
336
337  #[test]
338  fn should_parse_pubsub_message() {
339    let frames = vec![
340      Frame::BulkString("message".into()),
341      Frame::BulkString("foo".into()),
342      Frame::BulkString("bar".into()),
343    ];
344    assert!(!resp2_utils::is_pattern_pubsub(&frames));
345    let frame = Frame::Array(frames);
346
347    let (channel, message) = frame.parse_as_pubsub().expect("Expected pubsub frames");
348
349    assert_eq!(channel, "foo");
350    assert_eq!(message, "bar");
351  }
352
353  #[test]
354  #[should_panic]
355  fn should_fail_parsing_non_pubsub_message() {
356    let frame = Frame::Array(vec![Frame::BulkString("baz".into()), Frame::BulkString("foo".into())]);
357
358    frame.parse_as_pubsub().expect("Expected non pubsub frames");
359  }
360
361  #[test]
362  fn should_check_frame_types() {
363    let f = Frame::Null;
364    assert!(f.is_null());
365    assert!(!f.is_string());
366    assert!(!f.is_error());
367    assert!(!f.is_array());
368    assert!(!f.is_integer());
369    assert!(!f.is_moved_or_ask_error());
370
371    let f = Frame::BulkString("foo".as_bytes().into());
372    assert!(!f.is_null());
373    assert!(f.is_string());
374    assert!(!f.is_error());
375    assert!(!f.is_array());
376    assert!(!f.is_integer());
377    assert!(!f.is_moved_or_ask_error());
378
379    let f = Frame::SimpleString("foo".into());
380    assert!(!f.is_null());
381    assert!(f.is_string());
382    assert!(!f.is_error());
383    assert!(!f.is_array());
384    assert!(!f.is_integer());
385    assert!(!f.is_moved_or_ask_error());
386
387    let f = Frame::Error("foo".into());
388    assert!(!f.is_null());
389    assert!(!f.is_string());
390    assert!(f.is_error());
391    assert!(!f.is_array());
392    assert!(!f.is_integer());
393    assert!(!f.is_moved_or_ask_error());
394
395    let f = Frame::Array(vec![Frame::SimpleString("foo".into())]);
396    assert!(!f.is_null());
397    assert!(!f.is_string());
398    assert!(!f.is_error());
399    assert!(f.is_array());
400    assert!(!f.is_integer());
401    assert!(!f.is_moved_or_ask_error());
402
403    let f = Frame::Integer(10);
404    assert!(!f.is_null());
405    assert!(!f.is_string());
406    assert!(!f.is_error());
407    assert!(!f.is_array());
408    assert!(f.is_integer());
409    assert!(!f.is_moved_or_ask_error());
410  }
411
412  #[test]
413  fn should_decode_frame_kind_byte() {
414    assert_eq!(FrameKind::from_byte(SIMPLESTRING_BYTE), Some(FrameKind::SimpleString));
415    assert_eq!(FrameKind::from_byte(ERROR_BYTE), Some(FrameKind::Error));
416    assert_eq!(FrameKind::from_byte(BULKSTRING_BYTE), Some(FrameKind::BulkString));
417    assert_eq!(FrameKind::from_byte(INTEGER_BYTE), Some(FrameKind::Integer));
418    assert_eq!(FrameKind::from_byte(ARRAY_BYTE), Some(FrameKind::Array));
419  }
420
421  #[test]
422  fn should_encode_frame_kind_byte() {
423    assert_eq!(FrameKind::SimpleString.to_byte(), SIMPLESTRING_BYTE);
424    assert_eq!(FrameKind::Error.to_byte(), ERROR_BYTE);
425    assert_eq!(FrameKind::BulkString.to_byte(), BULKSTRING_BYTE);
426    assert_eq!(FrameKind::Integer.to_byte(), INTEGER_BYTE);
427    assert_eq!(FrameKind::Array.to_byte(), ARRAY_BYTE);
428  }
429}