Skip to main content

resp_async/
resp.rs

1use std::convert::{From, TryFrom};
2use std::str;
3
4use bytes::{Buf, BufMut, Bytes, BytesMut};
5
6use crate::response::RespError;
7
8/// Limits for decoding RESP frames.
9#[derive(Debug, Clone, Copy, Eq, PartialEq)]
10pub struct DecodeLimits {
11    pub max_bulk_len: usize,
12    pub max_array_len: usize,
13    pub max_depth: usize,
14}
15
16impl Default for DecodeLimits {
17    fn default() -> Self {
18        Self {
19            max_bulk_len: 1 << 20,
20            max_array_len: 1024,
21            max_depth: 16,
22        }
23    }
24}
25
26/// RESP frame value.
27#[derive(Debug, Clone, Eq, PartialEq)]
28pub enum Value {
29    Simple(Bytes),
30    Error(Bytes),
31    Integer(i64),
32    Bulk(Bytes),
33    Null,
34    Array(Vec<Value>),
35}
36
37impl Value {
38    pub fn encode(&self, buf: &mut BytesMut) {
39        ValueEncoder::encode(buf, self);
40    }
41
42    pub fn as_integer(&self) -> Result<i64, RespError> {
43        match self {
44            Value::Integer(i) => Ok(*i),
45            Value::Simple(s) | Value::Bulk(s) => parse_int(s),
46            _ => inconvertible(self, "Integer"),
47        }
48    }
49
50    pub fn to_integer(self) -> Result<i64, RespError> {
51        self.as_integer()
52    }
53
54    pub fn as_float(&self) -> Result<f64, RespError> {
55        match self {
56            Value::Integer(i) => Ok(*i as f64),
57            Value::Simple(s) | Value::Bulk(s) => parse_float(s),
58            _ => inconvertible(self, "Float"),
59        }
60    }
61
62    pub fn to_float(self) -> Result<f64, RespError> {
63        self.as_float()
64    }
65
66    pub fn as_str(&self) -> Result<&str, RespError> {
67        match self {
68            Value::Bulk(v) | Value::Simple(v) => str::from_utf8(v)
69                .map_err(|err| RespError::invalid_data(format!("invalid utf-8: {}", err))),
70            _ => inconvertible(self, "&str"),
71        }
72    }
73
74    pub fn to_string(self) -> Result<String, RespError> {
75        match self {
76            Value::Bulk(b) | Value::Simple(b) => Ok(String::from_utf8(b.to_vec())
77                .map_err(|err| RespError::invalid_data(format!("invalid utf-8: {}", err)))?),
78            Value::Integer(i) => Ok(i.to_string()),
79            Value::Null => Ok(String::new()),
80            _ => inconvertible(&self, "String"),
81        }
82    }
83
84    pub fn as_slice(&self) -> Result<&[u8], RespError> {
85        match self {
86            Value::Bulk(v) | Value::Simple(v) => Ok(v.as_ref()),
87            _ => inconvertible(self, "&[u8]"),
88        }
89    }
90
91    pub fn to_bytes(self) -> Result<Vec<u8>, RespError> {
92        match self {
93            Value::Bulk(b) | Value::Simple(b) => Ok(b.to_vec()),
94            Value::Integer(i) => Ok(i.to_string().into_bytes()),
95            Value::Null => Ok(Vec::new()),
96            _ => inconvertible(&self, "Vec<u8>"),
97        }
98    }
99}
100
101fn parse_int(bytes: &Bytes) -> Result<i64, RespError> {
102    str::from_utf8(bytes)
103        .map_err(|err| RespError::invalid_data(format!("invalid utf-8: {}", err)))?
104        .parse()
105        .map_err(|err| RespError::invalid_data(format!("invalid integer: {}", err)))
106}
107
108fn parse_float(bytes: &Bytes) -> Result<f64, RespError> {
109    str::from_utf8(bytes)
110        .map_err(|err| RespError::invalid_data(format!("invalid utf-8: {}", err)))?
111        .parse()
112        .map_err(|err| RespError::invalid_data(format!("invalid float: {}", err)))
113}
114
115fn inconvertible<A>(from: &Value, target: &str) -> Result<A, RespError> {
116    Err(RespError::invalid_data(format!(
117        "'{:?}' is not convertible to '{}'",
118        from, target
119    )))
120}
121
122impl TryFrom<&Value> for String {
123    type Error = RespError;
124    fn try_from(val: &Value) -> Result<Self, Self::Error> {
125        val.as_str().map(ToOwned::to_owned)
126    }
127}
128
129impl TryFrom<Value> for String {
130    type Error = RespError;
131    fn try_from(val: Value) -> Result<Self, Self::Error> {
132        val.to_string()
133    }
134}
135
136impl TryFrom<&Value> for Vec<u8> {
137    type Error = RespError;
138
139    fn try_from(val: &Value) -> Result<Self, Self::Error> {
140        val.as_slice().map(ToOwned::to_owned)
141    }
142}
143
144impl TryFrom<Value> for Vec<u8> {
145    type Error = RespError;
146
147    fn try_from(val: Value) -> Result<Self, Self::Error> {
148        val.to_bytes()
149    }
150}
151
152impl TryFrom<&Value> for Vec<String> {
153    type Error = RespError;
154
155    fn try_from(val: &Value) -> Result<Self, Self::Error> {
156        if let Value::Array(array) = val {
157            array.iter().map(TryInto::try_into).collect()
158        } else {
159            inconvertible(val, "Vec<String>")
160        }
161    }
162}
163
164impl TryFrom<Value> for Vec<String> {
165    type Error = RespError;
166
167    fn try_from(val: Value) -> Result<Self, Self::Error> {
168        if let Value::Array(array) = val {
169            array.into_iter().map(Value::to_string).collect()
170        } else {
171            inconvertible(&val, "Vec<String>")
172        }
173    }
174}
175
176impl From<Value> for Vec<Value> {
177    fn from(val: Value) -> Self {
178        if let Value::Array(array) = val {
179            array
180        } else {
181            Vec::from(val)
182        }
183    }
184}
185
186impl From<&Value> for Vec<Value> {
187    fn from(val: &Value) -> Self {
188        if let Value::Array(array) = val {
189            array.to_vec()
190        } else {
191            Vec::from(val)
192        }
193    }
194}
195
196impl TryFrom<Value> for i64 {
197    type Error = RespError;
198
199    fn try_from(val: Value) -> Result<Self, Self::Error> {
200        val.as_integer()
201    }
202}
203
204impl TryFrom<&Value> for i64 {
205    type Error = RespError;
206
207    fn try_from(val: &Value) -> Result<Self, Self::Error> {
208        val.as_integer()
209    }
210}
211
212impl TryFrom<Value> for f64 {
213    type Error = RespError;
214
215    fn try_from(val: Value) -> Result<Self, Self::Error> {
216        val.as_float()
217    }
218}
219
220impl TryFrom<&Value> for f64 {
221    type Error = RespError;
222
223    fn try_from(val: &Value) -> Result<Self, Self::Error> {
224        val.as_float()
225    }
226}
227
228impl TryFrom<Value> for Option<Vec<u8>> {
229    type Error = RespError;
230
231    fn try_from(val: Value) -> Result<Self, Self::Error> {
232        if let Value::Null = val {
233            Ok(None)
234        } else {
235            val.to_bytes().map(Some)
236        }
237    }
238}
239
240impl TryFrom<&Value> for Option<Vec<u8>> {
241    type Error = RespError;
242
243    fn try_from(val: &Value) -> Result<Self, Self::Error> {
244        if let Value::Null = val {
245            Ok(None)
246        } else {
247            val.as_slice().map(ToOwned::to_owned).map(Some)
248        }
249    }
250}
251
252impl From<i64> for Value {
253    fn from(i: i64) -> Self {
254        Value::Integer(i)
255    }
256}
257
258impl From<i32> for Value {
259    fn from(i: i32) -> Self {
260        Value::Integer(i64::from(i))
261    }
262}
263
264impl From<usize> for Value {
265    fn from(i: usize) -> Self {
266        Value::Integer(i as i64)
267    }
268}
269
270impl From<u32> for Value {
271    fn from(i: u32) -> Self {
272        Value::Integer(i64::from(i))
273    }
274}
275
276impl From<u64> for Value {
277    fn from(i: u64) -> Self {
278        Value::Integer(i as i64)
279    }
280}
281
282impl From<Bytes> for Value {
283    fn from(b: Bytes) -> Self {
284        Value::Bulk(b)
285    }
286}
287
288impl From<&'static [u8]> for Value {
289    fn from(b: &'static [u8]) -> Self {
290        Value::Bulk(Bytes::from_static(b))
291    }
292}
293
294impl From<&'static str> for Value {
295    fn from(s: &'static str) -> Self {
296        Value::Bulk(Bytes::from_static(s.as_bytes()))
297    }
298}
299
300impl From<Vec<u8>> for Value {
301    fn from(b: Vec<u8>) -> Self {
302        Value::Bulk(Bytes::from(b))
303    }
304}
305
306impl From<Option<Vec<u8>>> for Value {
307    fn from(b: Option<Vec<u8>>) -> Self {
308        match b {
309            Some(b) => Value::Bulk(Bytes::from(b)),
310            None => Value::Null,
311        }
312    }
313}
314
315impl<T> From<Vec<T>> for Value
316where
317    T: Into<Value>,
318{
319    fn from(a: Vec<T>) -> Self {
320        Value::Array(a.into_iter().map(Into::into).collect())
321    }
322}
323
324/// Encoder for RESP values.
325pub struct ValueEncoder;
326
327impl ValueEncoder {
328    #[inline]
329    fn ensure_capacity(buf: &mut BytesMut, size: usize) {
330        if buf.remaining_mut() < size {
331            buf.reserve(size)
332        }
333    }
334
335    #[inline]
336    fn write_crlf(buf: &mut BytesMut) {
337        buf.put_slice(b"\r\n");
338    }
339
340    fn write_header(buf: &mut BytesMut, ty: u8, number: i64) {
341        let mut hdr = [0u8; 32];
342        let mut idx = hdr.len();
343        hdr[idx - 1] = b'\n';
344        hdr[idx - 2] = b'\r';
345        idx -= 2;
346
347        let negative = number < 0;
348        let mut n = if negative {
349            number.wrapping_neg() as u64
350        } else {
351            number as u64
352        };
353        loop {
354            idx -= 1;
355            hdr[idx] = b'0' + (n % 10) as u8;
356            n /= 10;
357            if n == 0 {
358                break;
359            }
360        }
361        if negative {
362            idx -= 1;
363            hdr[idx] = b'-';
364        }
365        idx -= 1;
366        hdr[idx] = ty;
367        let hdr = &hdr[idx..];
368        Self::ensure_capacity(buf, hdr.len());
369        buf.put_slice(hdr);
370    }
371
372    fn write_bulk(buf: &mut BytesMut, ty: u8, bytes: &Bytes) {
373        Self::ensure_capacity(buf, bytes.len() + 32);
374        Self::write_header(buf, ty, bytes.len() as i64);
375        buf.put(bytes.as_ref());
376        Self::write_crlf(buf);
377    }
378
379    fn write_simple(buf: &mut BytesMut, ty: u8, bytes: &Bytes) {
380        Self::ensure_capacity(buf, bytes.len() + 3);
381        buf.put_u8(ty);
382        buf.put(bytes.as_ref());
383        Self::write_crlf(buf);
384    }
385
386    pub fn encode(buf: &mut BytesMut, value: &Value) {
387        match value {
388            Value::Null => Self::write_header(buf, b'$', -1),
389            Value::Array(a) => {
390                Self::write_header(buf, b'*', a.len() as i64);
391                for e in a {
392                    Self::encode(buf, e);
393                }
394            }
395            Value::Integer(i) => Self::write_header(buf, b':', *i),
396            Value::Bulk(b) => Self::write_bulk(buf, b'$', b),
397            Value::Simple(s) => Self::write_simple(buf, b'+', s),
398            Value::Error(e) => Self::write_simple(buf, b'-', e),
399        }
400    }
401}
402
403const RESP_TYPE_BULK_STRING: u8 = b'$';
404const RESP_TYPE_ARRAY: u8 = b'*';
405const RESP_TYPE_INTEGER: u8 = b':';
406const RESP_TYPE_SIMPLE_STRING: u8 = b'+';
407const RESP_TYPE_ERROR: u8 = b'-';
408
409fn split_input(input: &mut BytesMut, at: usize) -> Bytes {
410    let mut buf = input.split_to(at);
411    let len = buf.len();
412    buf.truncate(len - 2);
413    buf.freeze()
414}
415
416#[derive(Debug, Default)]
417struct StringDecoder {
418    expect_lf: bool,
419    inspect: usize,
420}
421
422impl StringDecoder {
423    fn decode(&mut self, input: &mut BytesMut) -> Result<Bytes, RespError> {
424        Ok(split_input(input, self.inspect))
425    }
426
427    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Bytes>, RespError> {
428        let length = input.len();
429        loop {
430            if length <= self.inspect {
431                return Ok(None);
432            }
433            let inspect = self.inspect;
434            self.inspect += 1;
435            match (self.expect_lf, input[inspect]) {
436                (false, b'\r') => self.expect_lf = true,
437                (false, _) => (),
438                (true, b'\n') => return self.decode(input).map(Some),
439                (true, b) => {
440                    return Err(RespError::invalid_data(format!(
441                        "Invalid last tailing line feed: '{}'",
442                        b
443                    )));
444                }
445            }
446        }
447    }
448}
449
450#[derive(Debug)]
451struct BulkDecoder {
452    length_decoder: Option<IntegerDecoder>,
453    length: i64,
454    max_bulk_len: usize,
455}
456
457impl BulkDecoder {
458    fn new(limits: DecodeLimits) -> Self {
459        BulkDecoder {
460            length_decoder: Some(IntegerDecoder::default()),
461            length: 0,
462            max_bulk_len: limits.max_bulk_len,
463        }
464    }
465
466    fn try_decode_length(&mut self, input: &mut BytesMut) -> Result<bool, RespError> {
467        if let Some(length) = self.length_decoder.as_mut().unwrap().try_decode(input)? {
468            self.length_decoder = None;
469            self.length = length;
470            Ok(true)
471        } else {
472            Ok(false)
473        }
474    }
475
476    fn try_decode_bulk(&mut self, input: &mut BytesMut) -> Result<Option<Value>, RespError> {
477        if self.length < 0 {
478            return if self.length == -1 {
479                Ok(Some(Value::Null))
480            } else {
481                Err(RespError::invalid_data(format!(
482                    "Invalid bulk length: '{}'",
483                    self.length
484                )))
485            };
486        }
487        let length = self.length as usize;
488        if length > self.max_bulk_len {
489            return Err(RespError::invalid_data(format!(
490                "Invalid bulk length: '{}' (limit {})",
491                length, self.max_bulk_len
492            )));
493        }
494        let len = input.len();
495        if len < length + 2 {
496            return Ok(None);
497        }
498        if input[length] != b'\r' || input[length + 1] != b'\n' {
499            return Err(RespError::invalid_data(format!(
500                "Invalid bulk tailing bytes: '[{}, {}]'",
501                input[length],
502                input[length + 1]
503            )));
504        }
505        let mut bulk = input.split_to(length + 2);
506        bulk.truncate(length);
507        Ok(Some(Value::Bulk(bulk.freeze())))
508    }
509
510    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>, RespError> {
511        if self.length_decoder.is_some() && !self.try_decode_length(input)? {
512            return Ok(None);
513        }
514        self.try_decode_bulk(input)
515    }
516}
517
518#[derive(Debug)]
519struct ArrayDecoder {
520    size_decoder: Option<IntegerDecoder>,
521    value_decoder: Box<ValueDecoder>,
522    array: Option<Vec<Value>>,
523    size: usize,
524    max_array_len: usize,
525}
526
527impl ArrayDecoder {
528    fn new(limits: DecodeLimits, depth: usize) -> Self {
529        ArrayDecoder {
530            size_decoder: Some(IntegerDecoder::default()),
531            array: None,
532            value_decoder: Box::new(ValueDecoder::with_depth(limits, depth)),
533            size: 0,
534            max_array_len: limits.max_array_len,
535        }
536    }
537
538    fn try_decode_size(&mut self, input: &mut BytesMut) -> Result<bool, RespError> {
539        if let Some(size) = self.size_decoder.as_mut().unwrap().try_decode(input)? {
540            self.size_decoder = None;
541            if size < 0 {
542                return Err(RespError::invalid_data(format!(
543                    "Invalid array size '{}'",
544                    size
545                )));
546            }
547            self.size = size as usize;
548            if self.size > self.max_array_len {
549                return Err(RespError::invalid_data(format!(
550                    "Invalid array size '{}' (limit {})",
551                    self.size, self.max_array_len
552                )));
553            }
554            self.array = Some(Vec::with_capacity(self.size));
555            Ok(true)
556        } else {
557            Ok(false)
558        }
559    }
560
561    fn try_decode_element(&mut self, input: &mut BytesMut) -> Result<Option<Value>, RespError> {
562        if self.size == 0 {
563            return Ok(Some(Value::Array(Vec::new())));
564        }
565        while !input.is_empty() {
566            if let Some(value) = self.value_decoder.try_decode(input)? {
567                self.array.as_mut().unwrap().push(value);
568                if self.array.as_ref().unwrap().len() == self.size {
569                    return Ok(self.array.take().map(Value::Array));
570                }
571            } else {
572                break;
573            }
574        }
575        Ok(None)
576    }
577
578    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>, RespError> {
579        if self.size_decoder.is_some() && !self.try_decode_size(input)? {
580            return Ok(None);
581        }
582        self.try_decode_element(input)
583    }
584}
585
586#[derive(Debug, Default)]
587struct IntegerDecoder {
588    expect_lf: bool,
589    inspect: usize,
590}
591
592impl IntegerDecoder {
593    fn decode(&mut self, input: &mut BytesMut) -> Result<i64, RespError> {
594        let bytes = split_input(input, self.inspect);
595        str::from_utf8(&bytes)
596            .map_err(|err| RespError::invalid_data(format!("invalid utf-8: {}", err)))?
597            .parse()
598            .map_err(|err| RespError::invalid_data(format!("invalid integer: {}", err)))
599    }
600
601    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<i64>, RespError> {
602        let length = input.len();
603        loop {
604            if length <= self.inspect {
605                return Ok(None);
606            }
607            let inspect = self.inspect;
608            self.inspect += 1;
609            match (self.expect_lf, input[inspect]) {
610                (false, b'0'..=b'9') => (),
611                (false, b'-') => (),
612                (false, b'\r') => self.expect_lf = true,
613                (true, b'\n') => return self.decode(input).map(Some),
614                (_, b) => {
615                    return Err(RespError::invalid_data(format!(
616                        "Invalid byte '{}' when decoding integer",
617                        b
618                    )));
619                }
620            }
621        }
622    }
623}
624
625#[derive(Debug)]
626enum TypedDecoder {
627    String(StringDecoder),
628    Error(StringDecoder),
629    Integer(IntegerDecoder),
630    Bulk(BulkDecoder),
631    Array(ArrayDecoder),
632}
633
634impl TypedDecoder {
635    fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>, RespError> {
636        match self {
637            TypedDecoder::String(decoder) => {
638                decoder.try_decode(input).map(|x| x.map(Value::Simple))
639            }
640            TypedDecoder::Error(decoder) => decoder.try_decode(input).map(|x| x.map(Value::Error)),
641            TypedDecoder::Integer(decoder) => {
642                decoder.try_decode(input).map(|x| x.map(Value::Integer))
643            }
644            TypedDecoder::Bulk(decoder) => decoder.try_decode(input),
645            TypedDecoder::Array(decoder) => decoder.try_decode(input),
646        }
647    }
648}
649
650/// Streaming RESP decoder.
651#[derive(Debug)]
652pub struct ValueDecoder {
653    decoder: Option<TypedDecoder>,
654    limits: DecodeLimits,
655    depth: usize,
656}
657
658impl ValueDecoder {
659    pub fn new(limits: DecodeLimits) -> Self {
660        Self {
661            decoder: None,
662            limits,
663            depth: 0,
664        }
665    }
666
667    fn with_depth(limits: DecodeLimits, depth: usize) -> Self {
668        Self {
669            decoder: None,
670            limits,
671            depth,
672        }
673    }
674
675    pub fn try_decode(&mut self, input: &mut BytesMut) -> Result<Option<Value>, RespError> {
676        if input.is_empty() {
677            return Ok(None);
678        }
679        if self.decoder.is_none() {
680            let decoder = match input[0] {
681                RESP_TYPE_BULK_STRING => TypedDecoder::Bulk(BulkDecoder::new(self.limits)),
682                RESP_TYPE_ARRAY => {
683                    if self.depth + 1 > self.limits.max_depth {
684                        return Err(RespError::invalid_data(format!(
685                            "ERR max depth exceeded (limit {})",
686                            self.limits.max_depth
687                        )));
688                    }
689                    TypedDecoder::Array(ArrayDecoder::new(self.limits, self.depth + 1))
690                }
691                RESP_TYPE_INTEGER => TypedDecoder::Integer(IntegerDecoder::default()),
692                RESP_TYPE_SIMPLE_STRING => TypedDecoder::String(StringDecoder::default()),
693                RESP_TYPE_ERROR => TypedDecoder::Error(StringDecoder::default()),
694                t => {
695                    return Err(RespError::invalid_data(format!(
696                        "Invalid value type '{}'",
697                        t
698                    )));
699                }
700            };
701            input.advance(1);
702            self.decoder = Some(decoder);
703        }
704        let result = self.decoder.as_mut().unwrap().try_decode(input)?;
705        if result.is_some() {
706            self.decoder = None;
707        }
708        Ok(result)
709    }
710}
711
712impl Default for ValueDecoder {
713    fn default() -> Self {
714        ValueDecoder::new(DecodeLimits::default())
715    }
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721
722    fn test_decode_partially(input: &BytesMut) {
723        let len = input.len();
724        for i in 1..len {
725            let mut s = input[0..i].into();
726            let v = ValueDecoder::default().try_decode(&mut s);
727            assert!(v.is_ok());
728            let v = v.unwrap();
729            assert!(v.is_none());
730        }
731    }
732
733    fn test_decode(mut input: BytesMut, expect: Value) {
734        test_decode_partially(&input);
735        let mut decoder = ValueDecoder::default();
736        if let Ok(Some(v)) = decoder.try_decode(&mut input) {
737            assert_eq!(v, expect);
738        } else {
739            assert!(false);
740        }
741    }
742
743    fn test_encode(expect: &BytesMut, value: &Value) {
744        let mut buf = BytesMut::with_capacity(128);
745        ValueEncoder::encode(&mut buf, value);
746        assert_eq!(&buf, expect);
747    }
748
749    fn test_codec(serialized: BytesMut, value: Value) {
750        test_encode(&serialized, &value);
751        test_decode(serialized, value);
752    }
753
754    #[test]
755    fn test_simple_string() {
756        test_codec(
757            b"+OK\r\n"[..].into(),
758            Value::Simple(Bytes::from_static(b"OK")),
759        );
760    }
761
762    #[test]
763    fn test_error() {
764        test_codec(
765            b"-Error message\r\n"[..].into(),
766            Value::Error(Bytes::from_static(b"Error message")),
767        );
768    }
769
770    #[test]
771    fn test_integer() {
772        test_codec(b":1000\r\n"[..].into(), Value::Integer(1000));
773    }
774
775    #[test]
776    fn test_bulk_string() {
777        test_codec(
778            b"$6\r\nfoobar\r\n"[..].into(),
779            Value::Bulk(Bytes::from_static(b"foobar")),
780        );
781        test_codec(
782            b"$0\r\n\r\n"[..].into(),
783            Value::Bulk(Bytes::from_static(b"")),
784        );
785        test_codec(b"$-1\r\n"[..].into(), Value::Null);
786    }
787
788    #[test]
789    fn test_array() {
790        test_codec(b"*0\r\n"[..].into(), Value::Array(vec![]));
791        test_codec(
792            b"*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..].into(),
793            Value::Array(vec![
794                Value::Bulk(Bytes::from_static(b"foo")),
795                Value::Bulk(Bytes::from_static(b"bar")),
796            ]),
797        );
798        test_codec(
799            b"*3\r\n:1\r\n:2\r\n:3\r\n"[..].into(),
800            Value::Array(vec![
801                Value::Integer(1),
802                Value::Integer(2),
803                Value::Integer(3),
804            ]),
805        );
806        test_codec(
807            b"*5\r\n:1\r\n:2\r\n:3\r\n:4\r\n$6\r\nfoobar\r\n"[..].into(),
808            Value::Array(vec![
809                Value::Integer(1),
810                Value::Integer(2),
811                Value::Integer(3),
812                Value::Integer(4),
813                Value::Bulk(Bytes::from_static(b"foobar")),
814            ]),
815        );
816    }
817
818    #[test]
819    fn test_bulk_limit() {
820        let limits = DecodeLimits {
821            max_bulk_len: 3,
822            max_array_len: 1024,
823            max_depth: 16,
824        };
825        let mut decoder = ValueDecoder::new(limits);
826        let mut input = BytesMut::from(&b"$4\r\ntest\r\n"[..]);
827        let err = decoder.try_decode(&mut input).unwrap_err();
828        assert!(matches!(err, RespError::InvalidData(_)));
829    }
830
831    #[test]
832    fn test_array_limit() {
833        let limits = DecodeLimits {
834            max_bulk_len: 1024,
835            max_array_len: 1,
836            max_depth: 16,
837        };
838        let mut decoder = ValueDecoder::new(limits);
839        let mut input = BytesMut::from(&b"*2\r\n:1\r\n:2\r\n"[..]);
840        let err = decoder.try_decode(&mut input).unwrap_err();
841        assert!(matches!(err, RespError::InvalidData(_)));
842    }
843
844    #[test]
845    fn test_depth_limit() {
846        let limits = DecodeLimits {
847            max_bulk_len: 1024,
848            max_array_len: 1024,
849            max_depth: 1,
850        };
851        let mut decoder = ValueDecoder::new(limits);
852        let mut input = BytesMut::from(&b"*1\r\n*1\r\n:1\r\n"[..]);
853        let err = decoder.try_decode(&mut input).unwrap_err();
854        assert!(matches!(err, RespError::InvalidData(_)));
855    }
856}