resp_async/
resp.rs

1use std::convert::{From, TryFrom};
2use std::str;
3
4use bytes::{Buf, BufMut, BytesMut};
5
6use crate::error::{invalid_data, to_error, Error, Result};
7
8#[derive(Debug, Clone, Eq, PartialEq)]
9pub enum Value {
10    String(String),
11    Error(String),
12    Integer(i64),
13    Bulk(Vec<u8>),
14    Null,
15    Array(Vec<Value>),
16    StaticError(&'static str),
17    StaticString(&'static str),
18}
19
20impl Value {
21    pub(crate) fn encode(&self, buf: &mut BytesMut) {
22        ValueEncoder::encode(buf, self);
23    }
24
25    pub fn as_integer(&self) -> Result<i64> {
26        match self {
27            Value::Integer(i) => Ok(*i),
28            Value::String(s) => s.parse().map_err(to_error),
29            Value::Bulk(v) => str::from_utf8(v)
30                .map_err(to_error)
31                .and_then(|s| s.parse().map_err(to_error)),
32            _ => inconvertible(self, "Integer"),
33        }
34    }
35
36    pub fn to_integer(self) -> Result<i64> {
37        self.as_integer()
38    }
39
40    pub fn as_float(&self) -> Result<f64> {
41        match self {
42            Value::Integer(i) => Ok((*i) as f64),
43            Value::String(s) => s.parse().map_err(to_error),
44            Value::Bulk(v) => str::from_utf8(v)
45                .map_err(to_error)
46                .and_then(|s| s.parse().map_err(to_error)),
47            _ => inconvertible(self, "Float"),
48        }
49    }
50
51    pub fn to_float(self) -> Result<f64> {
52        self.as_float()
53    }
54
55    pub fn as_str(&self) -> Result<&str> {
56        match self {
57            Value::Bulk(v) => str::from_utf8(v.as_slice()).map_err(to_error),
58            Value::String(s) => Ok(s.as_str()),
59            _ => inconvertible(self, "&str"),
60        }
61    }
62
63    pub fn to_string(self) -> Result<String> {
64        match self {
65            Value::Bulk(b) => Ok(String::from_utf8(b).map_err(to_error)?),
66            Value::Integer(i) => Ok(i.to_string()),
67            Value::String(s) => Ok(s),
68            Value::Null => Ok(String::new()),
69            _ => inconvertible(&self, "String"),
70        }
71    }
72
73    pub fn as_slice(&self) -> Result<&[u8]> {
74        match self {
75            Value::Bulk(v) => Ok(v.as_slice()),
76            Value::String(s) => Ok(s.as_bytes()),
77            _ => inconvertible(self, "&[u8]"),
78        }
79    }
80
81    pub fn to_bytes(self) -> Result<Vec<u8>> {
82        match self {
83            Value::Bulk(b) => Ok(b),
84            Value::String(s) => Ok(s.into_bytes()),
85            Value::Integer(i) => Ok(i.to_string().into_bytes()),
86            Value::Null => Ok(Vec::new()),
87            _ => inconvertible(&self, "Vec<u8>"),
88        }
89    }
90}
91
92fn inconvertible<A>(from: &Value, target: &str) -> Result<A> {
93    invalid_data(format!("'{:?}' is not convertible to '{}'", from, target))
94}
95
96impl TryFrom<&Value> for String {
97    type Error = Error;
98    fn try_from(val: &Value) -> Result<Self> {
99        val.as_str().map(ToOwned::to_owned)
100    }
101}
102
103impl TryFrom<Value> for String {
104    type Error = Error;
105    fn try_from(val: Value) -> Result<Self> {
106        val.to_string()
107    }
108}
109
110impl TryFrom<&Value> for Vec<u8> {
111    type Error = Error;
112
113    fn try_from(val: &Value) -> Result<Self> {
114        val.as_slice().map(ToOwned::to_owned)
115    }
116}
117
118impl TryFrom<Value> for Vec<u8> {
119    type Error = Error;
120
121    fn try_from(val: Value) -> Result<Self> {
122        val.to_bytes()
123    }
124}
125
126impl TryFrom<&Value> for Vec<String> {
127    type Error = Error;
128
129    fn try_from(val: &Value) -> Result<Self> {
130        if let Value::Array(array) = val {
131            array.iter().map(TryInto::try_into).collect()
132        } else {
133            inconvertible(val, "Vec<String>")
134        }
135    }
136}
137
138impl TryFrom<Value> for Vec<String> {
139    type Error = Error;
140
141    fn try_from(val: Value) -> Result<Self> {
142        if let Value::Array(array) = val {
143            array.into_iter().map(Value::to_string).collect()
144        } else {
145            inconvertible(&val, "Vec<String>")
146        }
147    }
148}
149
150impl From<Value> for Vec<Value> {
151    fn from(val: Value) -> Self {
152        if let Value::Array(array) = val {
153            array
154        } else {
155            Vec::from(val)
156        }
157    }
158}
159
160impl From<&Value> for Vec<Value> {
161    fn from(val: &Value) -> Self {
162        if let Value::Array(array) = val {
163            array.iter().map(Clone::clone).collect()
164        } else {
165            Vec::from(val)
166        }
167    }
168}
169
170impl TryFrom<Value> for i64 {
171    type Error = Error;
172
173    fn try_from(val: Value) -> Result<Self> {
174        val.as_integer()
175    }
176}
177
178impl TryFrom<&Value> for i64 {
179    type Error = Error;
180
181    fn try_from(val: &Value) -> Result<Self> {
182        val.as_integer()
183    }
184}
185
186impl TryFrom<Value> for f64 {
187    type Error = Error;
188
189    fn try_from(val: Value) -> Result<Self> {
190        val.as_float()
191    }
192}
193
194impl TryFrom<&Value> for f64 {
195    type Error = Error;
196
197    fn try_from(val: &Value) -> Result<Self> {
198        val.as_float()
199    }
200}
201
202impl TryFrom<Value> for Option<Vec<u8>> {
203    type Error = Error;
204
205    fn try_from(val: Value) -> Result<Self> {
206        if let Value::Null = val {
207            Ok(None)
208        } else {
209            val.to_bytes().map(Some)
210        }
211    }
212}
213
214impl TryFrom<&Value> for Option<Vec<u8>> {
215    type Error = Error;
216
217    fn try_from(val: &Value) -> Result<Self> {
218        if let Value::Null = val {
219            Ok(None)
220        } else {
221            val.as_slice().map(ToOwned::to_owned).map(Some)
222        }
223    }
224}
225
226impl From<i64> for Value {
227    fn from(i: i64) -> Self {
228        Value::Integer(i)
229    }
230}
231
232impl From<i32> for Value {
233    fn from(i: i32) -> Self {
234        Value::Integer(i64::from(i))
235    }
236}
237
238impl From<usize> for Value {
239    fn from(i: usize) -> Self {
240        Value::Integer(i as i64)
241    }
242}
243
244impl From<u32> for Value {
245    fn from(i: u32) -> Self {
246        Value::Integer(i64::from(i))
247    }
248}
249
250impl From<u64> for Value {
251    fn from(i: u64) -> Self {
252        Value::Integer(i as i64)
253    }
254}
255
256impl From<Vec<u8>> for Value {
257    fn from(b: Vec<u8>) -> Self {
258        Value::Bulk(b)
259    }
260}
261
262impl From<Option<Vec<u8>>> for Value {
263    fn from(b: Option<Vec<u8>>) -> Self {
264        match b {
265            Some(b) => Value::Bulk(b),
266            None => Value::Null,
267        }
268    }
269}
270
271impl<T> From<Vec<T>> for Value
272where
273    T: Into<Value>,
274{
275    fn from(a: Vec<T>) -> Self {
276        Value::Array(a.into_iter().map(Into::into).collect())
277    }
278}
279
280pub struct ValueEncoder;
281
282impl ValueEncoder {
283    #[inline]
284    fn ensure_capacity(buf: &mut BytesMut, size: usize) {
285        if buf.remaining_mut() < size {
286            buf.reserve(size)
287        }
288    }
289
290    #[inline]
291    fn write_crlf(buf: &mut BytesMut) {
292        buf.put_slice(b"\r\n");
293    }
294
295    fn write_header_format(buf: &mut [u8], ty: u8, number: i64) -> &[u8] {
296        let s = format!("{}{}\r\n", char::from(ty), number);
297        let s = s.as_bytes();
298        let buf = &mut buf[0..s.len()];
299        buf.copy_from_slice(s);
300        buf
301    }
302
303    fn write_header_fast(buf: &mut [u8], ty: u8, number: i64) -> &[u8] {
304        let len = buf.len();
305        buf[len - 1] = b'\n';
306        buf[len - 2] = b'\r';
307        let mut i = len - 3;
308        let mut number = number;
309        loop {
310            buf[i] = b'0' + (number % 10) as u8;
311            i -= 1;
312            number /= 10;
313            if number == 0 {
314                break;
315            }
316        }
317        buf[i] = ty;
318        &buf[i..]
319    }
320
321    fn write_header(buf: &mut BytesMut, ty: u8, number: i64) {
322        let mut hdr = [0u8; 32];
323        let hdr = if number < 0 {
324            Self::write_header_format(&mut hdr, ty, number)
325        } else {
326            Self::write_header_fast(&mut hdr, ty, number)
327        };
328        Self::ensure_capacity(buf, hdr.len());
329        buf.put(hdr)
330    }
331
332    fn write_bulk(buf: &mut BytesMut, ty: u8, bytes: &[u8]) {
333        Self::ensure_capacity(buf, bytes.len() + 32);
334        Self::write_header(buf, ty, bytes.len() as i64);
335        buf.put(bytes);
336        Self::write_crlf(buf);
337    }
338
339    fn write_string(buf: &mut BytesMut, ty: u8, str: &str) {
340        Self::ensure_capacity(buf, str.len() + 3);
341        buf.put_u8(ty);
342        buf.put(str.as_bytes());
343        Self::write_crlf(buf);
344    }
345
346    pub fn encode(buf: &mut BytesMut, value: &Value) {
347        match value {
348            Value::Null => Self::write_header(buf, b'$', -1),
349            Value::Array(a) => {
350                Self::write_header(buf, b'*', a.len() as i64);
351                for e in a {
352                    Self::encode(buf, e);
353                }
354            }
355            Value::Integer(i) => Self::write_header(buf, b':', *i),
356            Value::Bulk(b) => Self::write_bulk(buf, b'$', b),
357            Value::String(s) => Self::write_string(buf, b'+', s),
358            Value::Error(e) => Self::write_string(buf, b'-', e),
359            Value::StaticError(e) => Self::write_string(buf, b'-', e),
360            Value::StaticString(e) => Self::write_string(buf, b'+', e),
361        }
362    }
363}
364
365const RESP_TYPE_BULK_STRING: u8 = b'$';
366const RESP_TYPE_ARRAY: u8 = b'*';
367const RESP_TYPE_INTEGER: u8 = b':';
368const RESP_TYPE_SIMPLE_STRING: u8 = b'+';
369const RESP_TYPE_ERROR: u8 = b'-';
370
371fn split_input(input: &mut BytesMut, at: usize) -> BytesMut {
372    let mut buf = input.split_to(at);
373    let len = buf.len();
374    buf.truncate(len - 2);
375    buf
376}
377
378#[derive(Debug, Default)]
379struct StringDecoder {
380    expect_lf: bool,
381    inspect: usize,
382}
383
384impl StringDecoder {
385    fn decode(&mut self, input: &mut BytesMut) -> Result<String> {
386        Ok(str::from_utf8(&split_input(input, self.inspect))
387            .map_err(to_error)?
388            .into())
389    }
390
391    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<String>> {
392        let length = input.len();
393        loop {
394            if length <= self.inspect {
395                return Ok(None);
396            }
397            let inspect = self.inspect;
398            self.inspect += 1;
399            match (self.expect_lf, input[inspect]) {
400                (false, b'\r') => self.expect_lf = true,
401                (false, _) => (),
402                (true, b'\n') => return self.decode(input).map(Some),
403                (true, b) => {
404                    return invalid_data(format!("Invalid last tailing line feed: '{}'", b));
405                }
406            }
407        }
408    }
409}
410
411#[derive(Debug)]
412struct BulkDecoder {
413    length_decoder: Option<IntegerDecoder>,
414    length: i64,
415}
416
417impl BulkDecoder {
418    fn try_decode_length(&mut self, input: &mut BytesMut) -> Result<bool> {
419        if let Some(length) = self.length_decoder.as_mut().unwrap().try_decode(input)? {
420            self.length_decoder = None;
421            self.length = length;
422            Ok(true)
423        } else {
424            Ok(false)
425        }
426    }
427
428    fn try_decode_bulk(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
429        if self.length < 0 {
430            return if self.length == -1 {
431                Ok(Some(Value::Null))
432            } else {
433                invalid_data(format!("Invalid bulk length: '{}'", self.length))
434            };
435        }
436        let length = self.length as usize;
437        let len = input.len();
438        if len < length + 2 {
439            return Ok(None);
440        }
441        if input[length] != b'\r' || input[length + 1] != b'\n' {
442            return invalid_data(format!(
443                "Invalid bulk tailing bytes: '[{}, {}]'",
444                input[length],
445                input[length + 1]
446            ));
447        }
448        let mut bulk = input.split_to(length + 2).to_vec();
449        bulk.truncate(length);
450        Ok(Some(Value::Bulk(bulk)))
451    }
452
453    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
454        if self.length_decoder.is_some() && !self.try_decode_length(input)? {
455            return Ok(None);
456        }
457        self.try_decode_bulk(input)
458    }
459}
460
461impl Default for BulkDecoder {
462    fn default() -> Self {
463        BulkDecoder {
464            length_decoder: Some(IntegerDecoder::default()),
465            length: 0,
466        }
467    }
468}
469
470#[derive(Debug)]
471struct ArrayDecoder {
472    size_decoder: Option<IntegerDecoder>,
473    value_decoder: Box<ValueDecoder>,
474    array: Option<Vec<Value>>,
475    size: usize,
476}
477
478impl ArrayDecoder {
479    fn try_decode_size(&mut self, input: &mut BytesMut) -> Result<bool> {
480        if let Some(size) = self.size_decoder.as_mut().unwrap().try_decode(input)? {
481            self.size_decoder = None;
482            if size < 0 {
483                return invalid_data(format!("Invalid array size '{}'", size));
484            }
485            self.size = size as usize;
486            self.array = Some(Vec::with_capacity(self.size));
487            Ok(true)
488        } else {
489            Ok(false)
490        }
491    }
492
493    fn try_decode_element(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
494        if self.size == 0 {
495            return Ok(Some(Value::Array(Vec::new())));
496        }
497        while !input.is_empty() {
498            if let Some(value) = self.value_decoder.try_decode(input)? {
499                self.array.as_mut().unwrap().push(value);
500                if self.array.as_ref().unwrap().len() == self.size {
501                    return Ok(self.array.take().map(Value::Array));
502                }
503            } else {
504                break;
505            }
506        }
507        Ok(None)
508    }
509
510    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
511        if self.size_decoder.is_some() && !self.try_decode_size(input)? {
512            return Ok(None);
513        }
514        self.try_decode_element(input)
515    }
516}
517
518impl Default for ArrayDecoder {
519    fn default() -> Self {
520        ArrayDecoder {
521            size_decoder: Some(IntegerDecoder::default()),
522            array: None,
523            value_decoder: Default::default(),
524            size: 0,
525        }
526    }
527}
528
529#[derive(Debug, Default)]
530struct IntegerDecoder {
531    expect_lf: bool,
532    inspect: usize,
533}
534
535impl IntegerDecoder {
536    fn decode(&mut self, input: &mut BytesMut) -> Result<i64> {
537        str::from_utf8(&split_input(input, self.inspect))
538            .map_err(to_error)?
539            .parse()
540            .map_err(to_error)
541    }
542
543    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<i64>> {
544        let length = input.len();
545        loop {
546            if length <= self.inspect {
547                return Ok(None);
548            }
549            let inspect = self.inspect;
550            self.inspect += 1;
551            match (self.expect_lf, input[inspect]) {
552                (false, b'0'..=b'9') => (),
553                (false, b'-') => (),
554                (false, b'\r') => self.expect_lf = true,
555                (true, b'\n') => return self.decode(input).map(Some),
556                (_, b) => {
557                    return invalid_data(format!(
558                        "Invalid byte '{}' when decoding integer {:?}",
559                        b, input
560                    ));
561                }
562            }
563        }
564    }
565}
566
567#[derive(Debug)]
568enum TypedDecoder {
569    String(StringDecoder),
570    Error(StringDecoder),
571    Integer(IntegerDecoder),
572    Bulk(BulkDecoder),
573    Array(ArrayDecoder),
574}
575
576impl TypedDecoder {
577    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
578        match self {
579            TypedDecoder::String(decoder) => {
580                decoder.try_decode(input).map(|x| x.map(Value::String))
581            }
582            TypedDecoder::Error(decoder) => decoder.try_decode(input).map(|x| x.map(Value::Error)),
583            TypedDecoder::Integer(decoder) => {
584                decoder.try_decode(input).map(|x| x.map(Value::Integer))
585            }
586            TypedDecoder::Bulk(decoder) => decoder.try_decode(input),
587            TypedDecoder::Array(decoder) => decoder.try_decode(input),
588        }
589    }
590}
591
592#[derive(Debug, Default)]
593pub struct ValueDecoder {
594    decoder: Option<TypedDecoder>,
595}
596
597impl ValueDecoder {
598    pub fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>> {
599        if input.is_empty() {
600            return Ok(None);
601        }
602        if self.decoder.is_none() {
603            let decoder = match input[0] {
604                RESP_TYPE_BULK_STRING => TypedDecoder::Bulk(BulkDecoder::default()),
605                RESP_TYPE_ARRAY => TypedDecoder::Array(ArrayDecoder::default()),
606                RESP_TYPE_INTEGER => TypedDecoder::Integer(IntegerDecoder::default()),
607                RESP_TYPE_SIMPLE_STRING => TypedDecoder::String(StringDecoder::default()),
608                RESP_TYPE_ERROR => TypedDecoder::Error(StringDecoder::default()),
609                t => return invalid_data(format!("Invalid value type '{}'", t)),
610            };
611            input.advance(1);
612            self.decoder = Some(decoder);
613        }
614        let result = self.decoder.as_mut().unwrap().try_decode(input)?;
615        if result.is_some() {
616            self.decoder = None;
617        }
618        Ok(result)
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625
626    fn test_decode_partially(input: &BytesMut) {
627        let len = input.len();
628        for i in 1..len {
629            let mut s = input[0..i].into();
630            let v = ValueDecoder::default().try_decode(&mut s);
631            assert!(v.is_ok());
632            let v = v.unwrap();
633            assert!(v.is_none());
634        }
635    }
636
637    fn test_decode(mut input: BytesMut, expect: Value) {
638        test_decode_partially(&input);
639        let mut decoder = ValueDecoder::default();
640        if let Ok(Some(v)) = decoder.try_decode(&mut input) {
641            assert_eq!(v, expect);
642        } else {
643            assert!(false);
644        }
645    }
646
647    fn test_encode(expect: &BytesMut, value: &Value) {
648        let mut buf = BytesMut::with_capacity(128);
649        ValueEncoder::encode(&mut buf, value);
650        assert_eq!(&buf, expect);
651    }
652
653    fn test_codec(serialized: BytesMut, value: Value) {
654        test_encode(&serialized, &value);
655        test_decode(serialized, value);
656    }
657
658    #[test]
659    fn test_simple_string() {
660        test_codec(b"+OK\r\n"[..].into(), Value::String("OK".into()));
661    }
662
663    #[test]
664    fn test_error() {
665        test_codec(
666            b"-Error message\r\n"[..].into(),
667            Value::Error("Error message".into()),
668        );
669    }
670
671    #[test]
672    fn test_integer() {
673        test_codec(b":1000\r\n"[..].into(), Value::Integer(1000));
674    }
675
676    #[test]
677    fn test_bulk_string() {
678        test_codec(
679            b"$6\r\nfoobar\r\n"[..].into(),
680            Value::Bulk(b"foobar"[..].into()),
681        );
682        test_codec(b"$0\r\n\r\n"[..].into(), Value::Bulk(b""[..].into()));
683        test_codec(b"$-1\r\n"[..].into(), Value::Null);
684    }
685
686    #[test]
687    fn test_array() {
688        test_codec(b"*0\r\n"[..].into(), Value::Array(vec![]));
689        test_codec(
690            b"*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..].into(),
691            Value::Array(vec![
692                Value::Bulk(b"foo"[..].into()),
693                Value::Bulk(b"bar"[..].into()),
694            ]),
695        );
696        test_codec(
697            b"*3\r\n:1\r\n:2\r\n:3\r\n"[..].into(),
698            Value::Array(vec![
699                Value::Integer(1),
700                Value::Integer(2),
701                Value::Integer(3),
702            ]),
703        );
704        test_codec(
705            b"*5\r\n:1\r\n:2\r\n:3\r\n:4\r\n$6\r\nfoobar\r\n"[..].into(),
706            Value::Array(vec![
707                Value::Integer(1),
708                Value::Integer(2),
709                Value::Integer(3),
710                Value::Integer(4),
711                Value::Bulk(b"foobar"[..].into()),
712            ]),
713        );
714    }
715}