1use crate::resp2::utils as resp2_utils;
2use crate::types::{Redirection, RedisProtocolError};
3use crate::utils;
4use alloc::string::String;
5use alloc::vec::Vec;
6use bytes::Bytes;
7use bytes_utils::Str;
8use core::mem;
9use core::str;
10
11pub const SIMPLESTRING_BYTE: u8 = b'+';
13pub const ERROR_BYTE: u8 = b'-';
15pub const INTEGER_BYTE: u8 = b':';
17pub const BULKSTRING_BYTE: u8 = b'$';
19pub const ARRAY_BYTE: u8 = b'*';
21
22pub const NULL: &'static str = "$-1\r\n";
24
25pub use crate::utils::{PATTERN_PUBSUB_PREFIX, PUBSUB_PREFIX};
26
27#[derive(Clone, Debug, Eq, PartialEq)]
29pub enum FrameKind {
30 SimpleString,
31 Error,
32 Integer,
33 BulkString,
34 Array,
35 Null,
36}
37
38impl FrameKind {
39 pub fn from_byte(d: u8) -> Option<FrameKind> {
40 use self::FrameKind::*;
41
42 match d {
43 SIMPLESTRING_BYTE => Some(SimpleString),
44 ERROR_BYTE => Some(Error),
45 INTEGER_BYTE => Some(Integer),
46 BULKSTRING_BYTE => Some(BulkString),
47 ARRAY_BYTE => Some(Array),
48 _ => None,
49 }
50 }
51
52 pub fn to_byte(&self) -> u8 {
53 use self::FrameKind::*;
54
55 match *self {
56 SimpleString => SIMPLESTRING_BYTE,
57 Error => ERROR_BYTE,
58 Integer => INTEGER_BYTE,
59 BulkString | Null => BULKSTRING_BYTE,
60 Array => ARRAY_BYTE,
61 }
62 }
63}
64
65#[derive(Clone, Debug, Eq, PartialEq)]
67pub enum Frame {
68 SimpleString(Bytes),
70 Error(Str),
72 Integer(i64),
74 BulkString(Bytes),
76 Array(Vec<Frame>),
78 Null,
80}
81
82impl Frame {
83 pub fn take(&mut self) -> Frame {
85 mem::replace(self, Frame::Null)
86 }
87
88 pub fn is_error(&self) -> bool {
90 match self.kind() {
91 FrameKind::Error => true,
92 _ => false,
93 }
94 }
95
96 pub fn is_normal_pubsub(&self) -> bool {
98 if let Frame::Array(ref frames) = *self {
99 resp2_utils::is_normal_pubsub(frames)
100 } else {
101 false
102 }
103 }
104
105 pub fn is_pubsub_message(&self) -> bool {
107 if let Frame::Array(ref frames) = *self {
108 resp2_utils::is_normal_pubsub(frames) || resp2_utils::is_pattern_pubsub(frames)
109 } else {
110 false
111 }
112 }
113
114 pub fn is_pattern_pubsub_message(&self) -> bool {
116 if let Frame::Array(ref frames) = *self {
117 resp2_utils::is_pattern_pubsub(frames)
118 } else {
119 false
120 }
121 }
122
123 pub fn kind(&self) -> FrameKind {
125 match *self {
126 Frame::SimpleString(_) => FrameKind::SimpleString,
127 Frame::Error(_) => FrameKind::Error,
128 Frame::Integer(_) => FrameKind::Integer,
129 Frame::BulkString(_) => FrameKind::BulkString,
130 Frame::Array(_) => FrameKind::Array,
131 Frame::Null => FrameKind::Null,
132 }
133 }
134
135 pub fn as_str(&self) -> Option<&str> {
137 match *self {
138 Frame::BulkString(ref b) => str::from_utf8(b).ok(),
139 Frame::SimpleString(ref s) => str::from_utf8(s).ok(),
140 Frame::Error(ref s) => Some(s),
141 _ => None,
142 }
143 }
144
145 pub fn is_string(&self) -> bool {
147 match *self {
148 Frame::SimpleString(_) | Frame::BulkString(_) => true,
149 _ => false,
150 }
151 }
152
153 pub fn is_null(&self) -> bool {
155 match *self {
156 Frame::Null => true,
157 _ => false,
158 }
159 }
160
161 pub fn is_array(&self) -> bool {
163 match *self {
164 Frame::Array(_) => true,
165 _ => false,
166 }
167 }
168
169 pub fn is_integer(&self) -> bool {
171 match *self {
172 Frame::Integer(_) => true,
173 _ => false,
174 }
175 }
176
177 pub fn is_moved_or_ask_error(&self) -> bool {
179 match *self {
180 Frame::Error(ref s) => utils::is_cluster_error(s),
181 _ => false,
182 }
183 }
184
185 pub fn to_string(&self) -> Option<String> {
187 match *self {
188 Frame::BulkString(ref b) | Frame::SimpleString(ref b) => String::from_utf8(b.to_vec()).ok(),
189 _ => None,
190 }
191 }
192
193 pub fn parse_as_pubsub(self) -> Result<(String, String), Self> {
196 if self.is_pubsub_message() {
197 let (message, channel, _) = match self {
199 Frame::Array(mut frames) => (
200 resp2_utils::opt_frame_to_string_panic(frames.pop(), "Expected pubsub payload. This is a bug."),
201 resp2_utils::opt_frame_to_string_panic(frames.pop(), "Expected pubsub channel. This is a bug."),
202 resp2_utils::opt_frame_to_string_panic(frames.pop(), "Expected pubsub message kind. This is a bug."),
203 ),
204 _ => panic!("Unreachable 1. This is a bug."),
205 };
206
207 Ok((channel, message))
208 } else {
209 Err(self)
210 }
211 }
212
213 pub fn to_redirection(&self) -> Option<Redirection> {
215 match *self {
216 Frame::Error(ref s) => utils::read_cluster_error(s),
217 _ => None,
218 }
219 }
220
221 pub fn encode_len(&self) -> Result<usize, RedisProtocolError> {
223 resp2_utils::encode_len(self).map_err(|e| e.into())
224 }
225}
226
227impl From<Redirection> for Frame {
228 fn from(redirection: Redirection) -> Self {
229 redirection.to_resp2_frame()
230 }
231}
232
233impl<'a> From<&'a Redirection> for Frame {
234 fn from(redirection: &'a Redirection) -> Self {
235 redirection.to_resp2_frame()
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242
243 #[test]
244 fn should_convert_ask_redirection_to_frame() {
245 let redirection = Redirection::Ask {
246 slot: 3999,
247 server: "127.0.0.1:6381".into(),
248 };
249 let frame = Frame::Error("ASK 3999 127.0.0.1:6381".into());
250
251 assert_eq!(Frame::from(redirection), frame);
252 }
253
254 #[test]
255 fn should_convert_moved_redirection_to_frame() {
256 let redirection = Redirection::Moved {
257 slot: 3999,
258 server: "127.0.0.1:6381".into(),
259 };
260 let frame = Frame::Error("MOVED 3999 127.0.0.1:6381".into());
261
262 assert_eq!(Frame::from(redirection), frame);
263 }
264
265 #[test]
266 fn should_convert_frame_to_redirection_moved() {
267 let redirection = Redirection::Moved {
268 slot: 3999,
269 server: "127.0.0.1:6381".into(),
270 };
271 let frame = Frame::Error("MOVED 3999 127.0.0.1:6381".into());
272
273 assert_eq!(frame.to_redirection().unwrap(), redirection);
274 }
275
276 #[test]
277 fn should_convert_frame_to_redirection_ask() {
278 let redirection = Redirection::Ask {
279 slot: 3999,
280 server: "127.0.0.1:6381".into(),
281 };
282 let frame = Frame::Error("ASK 3999 127.0.0.1:6381".into());
283
284 assert_eq!(frame.to_redirection().unwrap(), redirection);
285 }
286
287 #[test]
288 #[should_panic]
289 fn should_convert_frame_to_redirection_error() {
290 let redirection = Redirection::Ask {
291 slot: 3999,
292 server: "127.0.0.1:6381".into(),
293 };
294 let frame = Frame::BulkString("ASK 3999 127.0.0.1:6381".into());
295
296 assert_eq!(frame.to_redirection().unwrap(), redirection);
297 }
298
299 #[test]
300 #[should_panic]
301 fn should_convert_frame_to_redirection_error_invalid_1() {
302 let f1 = Frame::Error("abc def".into());
303 let _ = f1.to_redirection().unwrap();
304 }
305
306 #[test]
307 #[should_panic]
308 fn should_convert_frame_to_redirection_error_invalid_2() {
309 let f2 = Frame::Error("abc def ghi".into());
310 let _ = f2.to_redirection().unwrap();
311 }
312
313 #[test]
314 #[should_panic]
315 fn should_convert_frame_to_redirection_error_invalid_3() {
316 let f3 = Frame::Error("MOVED abc def".into());
317 let _ = f3.to_redirection().unwrap();
318 }
319
320 #[test]
321 fn should_parse_pattern_pubsub_message() {
322 let frames = vec![
323 Frame::BulkString("pmessage".into()),
324 Frame::BulkString("fo*".into()),
325 Frame::BulkString("foo".into()),
326 Frame::BulkString("bar".into()),
327 ];
328 assert!(resp2_utils::is_pattern_pubsub(&frames));
329 let frame = Frame::Array(frames);
330
331 let (channel, message) = frame.parse_as_pubsub().expect("Expected pubsub frames");
332
333 assert_eq!(channel, "foo");
334 assert_eq!(message, "bar");
335 }
336
337 #[test]
338 fn should_parse_pubsub_message() {
339 let frames = vec![
340 Frame::BulkString("message".into()),
341 Frame::BulkString("foo".into()),
342 Frame::BulkString("bar".into()),
343 ];
344 assert!(!resp2_utils::is_pattern_pubsub(&frames));
345 let frame = Frame::Array(frames);
346
347 let (channel, message) = frame.parse_as_pubsub().expect("Expected pubsub frames");
348
349 assert_eq!(channel, "foo");
350 assert_eq!(message, "bar");
351 }
352
353 #[test]
354 #[should_panic]
355 fn should_fail_parsing_non_pubsub_message() {
356 let frame = Frame::Array(vec![Frame::BulkString("baz".into()), Frame::BulkString("foo".into())]);
357
358 frame.parse_as_pubsub().expect("Expected non pubsub frames");
359 }
360
361 #[test]
362 fn should_check_frame_types() {
363 let f = Frame::Null;
364 assert!(f.is_null());
365 assert!(!f.is_string());
366 assert!(!f.is_error());
367 assert!(!f.is_array());
368 assert!(!f.is_integer());
369 assert!(!f.is_moved_or_ask_error());
370
371 let f = Frame::BulkString("foo".as_bytes().into());
372 assert!(!f.is_null());
373 assert!(f.is_string());
374 assert!(!f.is_error());
375 assert!(!f.is_array());
376 assert!(!f.is_integer());
377 assert!(!f.is_moved_or_ask_error());
378
379 let f = Frame::SimpleString("foo".into());
380 assert!(!f.is_null());
381 assert!(f.is_string());
382 assert!(!f.is_error());
383 assert!(!f.is_array());
384 assert!(!f.is_integer());
385 assert!(!f.is_moved_or_ask_error());
386
387 let f = Frame::Error("foo".into());
388 assert!(!f.is_null());
389 assert!(!f.is_string());
390 assert!(f.is_error());
391 assert!(!f.is_array());
392 assert!(!f.is_integer());
393 assert!(!f.is_moved_or_ask_error());
394
395 let f = Frame::Array(vec![Frame::SimpleString("foo".into())]);
396 assert!(!f.is_null());
397 assert!(!f.is_string());
398 assert!(!f.is_error());
399 assert!(f.is_array());
400 assert!(!f.is_integer());
401 assert!(!f.is_moved_or_ask_error());
402
403 let f = Frame::Integer(10);
404 assert!(!f.is_null());
405 assert!(!f.is_string());
406 assert!(!f.is_error());
407 assert!(!f.is_array());
408 assert!(f.is_integer());
409 assert!(!f.is_moved_or_ask_error());
410 }
411
412 #[test]
413 fn should_decode_frame_kind_byte() {
414 assert_eq!(FrameKind::from_byte(SIMPLESTRING_BYTE), Some(FrameKind::SimpleString));
415 assert_eq!(FrameKind::from_byte(ERROR_BYTE), Some(FrameKind::Error));
416 assert_eq!(FrameKind::from_byte(BULKSTRING_BYTE), Some(FrameKind::BulkString));
417 assert_eq!(FrameKind::from_byte(INTEGER_BYTE), Some(FrameKind::Integer));
418 assert_eq!(FrameKind::from_byte(ARRAY_BYTE), Some(FrameKind::Array));
419 }
420
421 #[test]
422 fn should_encode_frame_kind_byte() {
423 assert_eq!(FrameKind::SimpleString.to_byte(), SIMPLESTRING_BYTE);
424 assert_eq!(FrameKind::Error.to_byte(), ERROR_BYTE);
425 assert_eq!(FrameKind::BulkString.to_byte(), BULKSTRING_BYTE);
426 assert_eq!(FrameKind::Integer.to_byte(), INTEGER_BYTE);
427 assert_eq!(FrameKind::Array.to_byte(), ARRAY_BYTE);
428 }
429}