redis_zero_protocol_parser/
lib.rs

1//! # A zero-copy redis protocol parser
2//!
3//! A zero-copy redis protocol parser
4
5#![deny(missing_docs)]
6#![deny(warnings)]
7
8#[macro_use]
9mod macros;
10
11use std::{borrow::Cow, cmp::Ordering, convert::TryInto};
12
13/// parse_server response. It is a tuple with two elements. The first element is
14/// the stream of bytes to be processed, and the second element is the vector of
15/// parsed arguments.
16pub type ServerResponse<'a> = (&'a [u8], Vec<Cow<'a, [u8]>>);
17
18/// Redis Value.
19#[derive(Debug, PartialEq, Clone)]
20pub enum Value<'a> {
21    /// Vector of values
22    Array(Vec<Value<'a>>),
23    /// Binary data
24    Blob(&'a [u8]),
25    /// String. New lines are not allowed
26    String(Cow<'a, str>),
27    /// Error
28    Error(Cow<'a, str>, Cow<'a, str>),
29    /// Integer
30    Integer(i64),
31    /// Boolean
32    Boolean(bool),
33    /// Float number
34    Float(f64),
35    /// Big integers
36    BigInteger(i128),
37    /// Null
38    Null,
39}
40
41/// Protocol errors
42#[derive(Debug, Clone, Eq, PartialEq)]
43pub enum Error {
44    /// The data is incomplete. This it not an error per-se, but rather a
45    /// mechanism to let the caller know they should keep buffering data before
46    /// calling the parser again.
47    Partial,
48    /// Unexpected first byte after a new line
49    InvalidPrefix,
50    /// Invalid data length
51    InvalidLength,
52    /// Parsed value is not boolean
53    InvalidBoolean,
54    /// Parsed data is not a number
55    InvalidNumber,
56    /// Protocol error
57    Protocol(u8, u8),
58    /// Missing new line
59    NewLine,
60}
61
62/// Parses data in the format that redis expects
63///
64/// Redis expects an array of blobs. Although the protocol is much wider at the
65/// top level, redis expects an array of blobs.
66///
67/// The first value is returned along side with the unconsumed stream of bytes.
68pub fn parse_server(bytes: &[u8]) -> Result<ServerResponse<'_>, Error> {
69    let (new_bytes, byte) = next!(bytes);
70    match byte {
71        b'*' => parse_server_array(new_bytes),
72        b'a'..=b'z' | b'A'..=b'Z' | b'\r' | b' ' | b'\t' | b'\n' => parse_inline_proto(bytes),
73        _ => Err(Error::Protocol(b'*', byte)),
74    }
75}
76
77fn parse_inline_proto(bytes: &[u8]) -> Result<ServerResponse, Error> {
78    let mut items = vec![];
79    let len = bytes.len();
80    let mut i = 0;
81    let mut start = 0;
82    loop {
83        if i >= len {
84            return Err(Error::Partial);
85        }
86        match bytes[i] {
87            b' ' | b'\t' => {
88                if start != i {
89                    items.push(Cow::from(&bytes[start..i]));
90                }
91                start = i + 1;
92            }
93            b'"' | b'\'' => {
94                let stop_at = bytes[i];
95                let start_str = i + 1;
96                let mut has_escape = false;
97                i += 1;
98                loop {
99                    i += 1;
100                    if i >= len {
101                        return Err(Error::Partial);
102                    }
103                    if bytes[i] == b'\\' {
104                        has_escape = true;
105                        i += 1;
106                    } else if bytes[i] == stop_at {
107                        let mut v = Cow::from(&bytes[start_str..i]);
108                        if has_escape {
109                            let len = v.len();
110                            let mut old_i = 0;
111                            let mut new_i = 0;
112                            let v = v.to_mut();
113                            loop {
114                                if old_i >= len {
115                                    v.resize(new_i, 0);
116                                    break;
117                                }
118                                if v[old_i] == b'\\' {
119                                    match v.get(old_i + 1) {
120                                        Some(_) => v[new_i] = v[old_i + 1],
121                                        None => v[new_i] = b'\\',
122                                    }
123                                    old_i += 2;
124                                    new_i += 1;
125                                    continue;
126                                }
127                                if old_i != new_i {
128                                    v[new_i] = v[old_i];
129                                }
130                                new_i += 1;
131                                old_i += 1;
132                            }
133                        }
134                        items.push(v);
135                        break;
136                    }
137                }
138                start = i + 1;
139            }
140            b'\n' => {
141                if start != i {
142                    items.push(Cow::from(&bytes[start..i]));
143                }
144                i += 1;
145                break;
146            }
147            b'\r' => {
148                if bytes.get(i + 1) == Some(&b'\n') {
149                    if start != i {
150                        items.push(Cow::from(&bytes[start..i]));
151                    }
152                    i += 2;
153                    break;
154                }
155            }
156            _ => {}
157        };
158        i += 1;
159    }
160    Ok((&bytes[i..], items))
161}
162
163/// Parses an array from an steam of bytes
164///
165/// The first value is returned along side with the unconsumed stream of bytes.
166fn parse_server_array(bytes: &[u8]) -> Result<ServerResponse, Error> {
167    let (bytes, len) = read_line_number!(bytes, i32);
168    if len <= 0 {
169        return Err(Error::Protocol(b'x', b'y'));
170    }
171
172    let mut v = vec![];
173    let mut bytes = bytes;
174
175    for _i in 0..len {
176        let n = next!(bytes);
177        let r = match n.1 {
178            b'$' => parse_blob(n.0),
179            _ => Err(Error::Protocol(b'$', n.1)),
180        }?;
181        bytes = r.0;
182        v.push(match r.1 {
183            Value::Blob(x) => Ok(Cow::from(x)),
184            _ => Err(Error::Protocol(b'x', b'y')),
185        }?);
186    }
187
188    Ok((bytes, v))
189}
190
191/// Parses redis values from an stream of bytes. If the data is incomplete
192/// Err(Error::Partial) is returned.
193///
194/// The first value is returned along side with the unconsumed stream of bytes.
195pub fn parse(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
196    let (bytes, byte) = next!(bytes);
197    match byte {
198        b'*' => parse_array(bytes),
199        b'$' => parse_blob(bytes),
200        b':' => parse_integer(bytes),
201        b'(' => parse_big_integer(bytes),
202        b',' => parse_float(bytes),
203        b'#' => parse_boolean(bytes),
204        b'+' => parse_str(bytes),
205        b'-' => parse_error(bytes),
206        _ => Err(Error::InvalidPrefix),
207    }
208}
209
210fn parse_error(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
211    let (bytes, err_type) = read_until!(bytes, b' ');
212    let (bytes, str) = read_line!(bytes);
213    let err_type = String::from_utf8_lossy(err_type);
214    let str = String::from_utf8_lossy(str);
215    ret!(bytes, Value::Error(err_type, str))
216}
217
218fn parse_str(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
219    let (bytes, str) = read_line!(bytes);
220    let str = String::from_utf8_lossy(str);
221    ret!(bytes, Value::String(str))
222}
223
224fn parse_boolean(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
225    let (bytes, byte) = next!(bytes);
226    let v = match byte {
227        b't' => true,
228        b'f' => false,
229        _ => return Err(Error::InvalidBoolean),
230    };
231    ret!(bytes, Value::Boolean(v))
232}
233
234fn parse_big_integer(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
235    let (bytes, number) = read_line_number!(bytes, i128);
236    ret!(bytes, Value::BigInteger(number))
237}
238
239fn parse_integer(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
240    let (bytes, number) = read_line_number!(bytes, i64);
241    ret!(bytes, Value::Integer(number))
242}
243
244fn parse_float(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
245    let (bytes, number) = read_line_number!(bytes, f64);
246    ret!(bytes, Value::Float(number))
247}
248
249fn parse_blob(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
250    let (bytes, len) = read_line_number!(bytes, i64);
251
252    match len.cmp(&0) {
253        Ordering::Less => {
254            let bytes = assert_nl!(bytes);
255            return ret!(bytes, Value::Null);
256        }
257        Ordering::Equal => {
258            let bytes = assert_nl!(bytes);
259            return ret!(bytes, Value::Blob(b""));
260        }
261        _ => {}
262    };
263
264    let len = len.try_into().expect("Positive number");
265
266    let (bytes, blob) = read_len!(bytes, len);
267    let bytes = assert_nl!(bytes);
268
269    ret!(bytes, Value::Blob(blob))
270}
271
272fn parse_array(bytes: &[u8]) -> Result<(&[u8], Value), Error> {
273    let (bytes, len) = read_line_number!(bytes, i32);
274    if len <= 0 {
275        return ret!(bytes, Value::Null);
276    }
277
278    let mut v = vec![Value::Null; len as usize];
279    let mut bytes = bytes;
280
281    for i in 0..len {
282        let r = parse(bytes)?;
283        bytes = r.0;
284        v[i as usize] = r.1;
285    }
286
287    ret!(bytes, Value::Array(v))
288}
289
290#[cfg(test)]
291mod test {
292    use super::*;
293
294    #[test]
295    fn test_parse_partial() {
296        let d = b"*-1";
297        assert_eq!(Err(Error::Partial), parse(d));
298    }
299
300    #[test]
301    fn test_parse_partial_2() {
302        let d = b"*12\r\n";
303        assert_eq!(Err(Error::Partial), parse(d));
304    }
305
306    #[test]
307    fn test_incomplete_blob_parsing() {
308        let d = b"$60\r\nfoobar\r\n";
309
310        assert_eq!(Err(Error::Partial), parse(d));
311    }
312
313    #[test]
314    fn test_complete_blob_parsing() {
315        let d = b"$6\r\nfoobar\r\n";
316
317        let r = parse(d);
318        assert!(r.is_ok());
319
320        assert_eq!(Value::Blob(b"foobar"), r.unwrap().1);
321    }
322
323    #[test]
324    fn test_complete_blob_parsing_and_extra_buffer() {
325        let d = b"$6\r\nfoobar\r\n$6\r\nfoobar\r\n";
326
327        let r = parse(d);
328        assert!(r.is_ok());
329
330        let (buf, data) = r.unwrap();
331
332        assert_eq!(Value::Blob(b"foobar"), data);
333        assert_eq!(b"$6\r\nfoobar\r\n", buf);
334    }
335
336    #[test]
337    fn test_complete_array_parser() {
338        let d = b"*2\r\n$6\r\nfoobar\r\n$3\r\nfoo\r\n";
339
340        let r = parse(d);
341        assert!(r.is_ok());
342
343        let x = match r.unwrap().1 {
344            Value::Array(x) => x,
345            _ => panic!("Unxpected type"),
346        };
347
348        assert_eq!(2, x.len());
349    }
350
351    #[test]
352    fn test_complete_nested_array_parser() {
353        let d = b"*2\r\n$6\r\nfoobar\r\n*1\r\n$3\r\nfoo\r\n";
354
355        let r = parse(d);
356        assert!(r.is_ok());
357
358        let x = match r.unwrap().1 {
359            Value::Array(x) => x,
360            _ => panic!("Unxpected type"),
361        };
362
363        assert_eq!(2, x.len());
364    }
365
366    #[test]
367    fn test_parse_float() {
368        let d = b",0.25887\r\n";
369
370        let r = parse(d);
371        assert!(r.is_ok());
372
373        let x = match r.unwrap().1 {
374            Value::Float(x) => x,
375            _ => panic!("Unxpected type"),
376        };
377
378        assert_eq!(0.25887, x);
379    }
380
381    #[test]
382    fn test_parse_integer() {
383        let d = b":25887\r\n";
384
385        let r = parse(d);
386        assert!(r.is_ok());
387
388        let x = match r.unwrap().1 {
389            Value::Integer(x) => x,
390            _ => panic!("Unxpected type"),
391        };
392
393        assert_eq!(25887, x);
394    }
395
396    #[test]
397    fn test_parse_big_integer() {
398        let d = b"(25887\r\n";
399
400        let r = parse(d);
401        assert!(r.is_ok());
402
403        let x = match r.unwrap().1 {
404            Value::BigInteger(x) => x,
405            _ => panic!("Unxpected type"),
406        };
407
408        assert_eq!(25887, x);
409    }
410
411    #[test]
412    fn test_parse_false() {
413        let d = b"#f\r\n";
414
415        let r = parse(d);
416        assert!(r.is_ok());
417
418        let x = match r.unwrap().1 {
419            Value::Boolean(x) => x,
420            _ => panic!("Unxpected type"),
421        };
422
423        assert!(!x);
424    }
425
426    #[test]
427    fn test_parse_true() {
428        let d = b"#t\r\n";
429
430        let r = parse(d);
431        assert!(r.is_ok());
432
433        let x = match r.unwrap().1 {
434            Value::Boolean(x) => x,
435            _ => panic!("Unxpected type"),
436        };
437
438        assert!(x);
439    }
440
441    #[test]
442    fn test_parse_boolean_unexpected() {
443        let d = b"#1\r\n";
444
445        assert_eq!(Err(Error::InvalidBoolean), parse(d));
446    }
447
448    #[test]
449    fn test_parse_str() {
450        let d = b"+hello world\r\n";
451
452        let r = parse(d);
453        assert!(r.is_ok());
454
455        let x = match r.unwrap().1 {
456            Value::String(x) => x,
457            _ => panic!("Unxpected type"),
458        };
459
460        assert_eq!("hello world", x);
461    }
462
463    #[test]
464    fn test_parse_error() {
465        let d = b"-ERR this is the error description\r\n";
466
467        let r = parse(d);
468        assert!(r.is_ok());
469
470        let x = match r.unwrap().1 {
471            Value::Error(a, b) => (a, b),
472            _ => panic!("Unxpected type"),
473        };
474
475        assert_eq!("ERR", x.0);
476        assert_eq!("this is the error description", x.1);
477    }
478
479    #[test]
480    fn test_empty_string() {
481        let data = b"*2\r\n$0\r\n\r\n$0\r\n\r\n";
482        let (bytes_to_consume_next, data) = parse_server(data).unwrap();
483
484        assert_eq!(
485            vec![b"", b""],
486            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
487        );
488        assert_eq!(b"", bytes_to_consume_next);
489    }
490
491    #[test]
492    fn test_parse_inline_protocol() {
493        let data = b"PING\r\n";
494        let (bytes_to_consume_next, data) = parse_server(data).unwrap();
495        assert_eq!(
496            vec![b"PING"],
497            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
498        );
499        assert_eq!(b"", bytes_to_consume_next);
500    }
501
502    #[test]
503    fn test_parse_inline_protocol_2() {
504        let data = b"PING\t\tfoox   barx\r\n";
505        let (bytes_to_consume_next, data) = parse_server(data).unwrap();
506        assert_eq!(
507            vec![b"PING", b"foox", b"barx"],
508            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
509        );
510        assert_eq!(b"", bytes_to_consume_next);
511    }
512
513    #[test]
514    fn test_parse_inline_protocol_3() {
515        let data = b"PINGPONGXX 'test  test' \"test\\\" test\"PINGPONGXX\r\n";
516        let (bytes_to_consume_next, data) = parse_server(data).unwrap();
517        assert_eq!(
518            vec![b"PINGPONGXX", b"test  test", b"test\" test", b"PINGPONGXX"],
519            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
520        );
521        assert_eq!(b"", bytes_to_consume_next);
522    }
523
524    #[test]
525    fn test_parse_inline_protocol_4() {
526        let data = b"PING\r\n\r\n\r\nPING\r\n";
527        let (bytes_to_consume_next, data) = parse_server(data).unwrap();
528        assert_eq!(
529            vec![b"PING"],
530            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
531        );
532        let (bytes_to_consume_next, data) = parse_server(bytes_to_consume_next).unwrap();
533        assert_eq!(0, data.len(),);
534        let (bytes_to_consume_next, data) = parse_server(bytes_to_consume_next).unwrap();
535        assert_eq!(0, data.len(),);
536        let (bytes_to_consume_next, data) = parse_server(bytes_to_consume_next).unwrap();
537        assert_eq!(
538            vec![b"PING"],
539            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
540        );
541        assert_eq!(b"", bytes_to_consume_next);
542    }
543
544    #[test]
545    fn test_parse_inline_protocol_5() {
546        let data = b"   PING\r\n";
547        let (bytes_to_consume_next, data) = parse_server(data).unwrap();
548        assert_eq!(
549            vec![b"PING"],
550            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
551        );
552        assert_eq!(b"", bytes_to_consume_next);
553    }
554
555    #[test]
556    fn test_parse_inline_protocol_6() {
557        let data = b"PING\r\n\n\nPING\r\n";
558        let (bytes_to_consume_next, data) = parse_server(data).unwrap();
559        assert_eq!(
560            vec![b"PING"],
561            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
562        );
563        let (bytes_to_consume_next, data) = parse_server(bytes_to_consume_next).unwrap();
564        assert_eq!(0, data.len(),);
565        let (bytes_to_consume_next, data) = parse_server(bytes_to_consume_next).unwrap();
566        assert_eq!(0, data.len(),);
567        let (bytes_to_consume_next, data) = parse_server(bytes_to_consume_next).unwrap();
568        assert_eq!(
569            vec![b"PING"],
570            data.iter().map(|r| r.as_ref()).collect::<Vec<&[u8]>>()
571        );
572        assert_eq!(b"", bytes_to_consume_next);
573    }
574
575    #[test]
576    fn test_parse_zero() {
577        let data = b"*5\r\n$4\r\nhset\r\n$6\r\nfoobar\r\n$1\r\n1\r\n$0\r\n\r\n$0\r\n\r\n";
578        let (bytes_to_consume_next, _data) = parse_server(data).unwrap();
579        assert_eq!(b"", bytes_to_consume_next);
580    }
581}