redis_driver/resp/
value_decoder.rs

1use crate::{
2    resp::{Value},
3    Error, RedisError, Result,
4};
5use bytes::{Buf, BytesMut};
6use log::trace;
7use std::str::FromStr;
8use tokio_util::codec::Decoder;
9
10pub(crate) struct ValueDecoder;
11
12impl Decoder for ValueDecoder {
13    type Item = Value;
14    type Error = Error;
15
16    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Value>> {
17        Ok(decode(src, 0)?.map(|(item, pos)| {
18            trace!(
19                "decode: {}",
20                String::from_utf8_lossy(&src.as_ref()[..pos]).replace("\r\n", "\\r\\n")
21            );
22            src.advance(pos);
23            item
24        }))
25    }
26}
27
28fn decode(buf: &mut BytesMut, idx: usize) -> Result<Option<(Value, usize)>> {
29    if buf.len() <= idx {
30        return Ok(None);
31    }
32
33    let first_byte = buf[idx];
34    let idx = idx + 1;
35
36    // cf. https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md
37    match first_byte {
38        b'$' => Ok(decode_bulk_string(buf, idx)?.map(|(bs, pos)| (Value::BulkString(bs), pos))),
39        b'*' => Ok(decode_array(buf, idx)?.map(|(v, pos)| (Value::Array(v), pos))),
40        b'%' => Ok(decode_map(buf, idx)?.map(|(v, pos)| (Value::Array(v), pos))),
41        b'~' => Ok(decode_array(buf, idx)?.map(|(v, pos)| (Value::Array(v), pos))),
42        b':' => Ok(decode_integer(buf, idx)?.map(|(i, pos)| (Value::Integer(i), pos))),
43        b',' => Ok(decode_double(buf, idx)?.map(|(d, pos)| (Value::Double(d), pos))),
44        b'+' => {
45            Ok(decode_string(buf, idx)?.map(|(s, pos)| (Value::SimpleString(s.to_owned()), pos)))
46        }
47        b'-' => decode_string(buf, idx)?
48            .map(|(s, pos)| RedisError::from_str(s).map(|e| (Value::Error(e), pos)))
49            .transpose(),
50        b'_' => Ok(decode_null(buf, idx)?.map(|pos| (Value::BulkString(None), pos))),
51        b'#' => Ok(decode_boolean(buf, idx)?.map(|(i, pos)| (Value::Integer(i), pos))),
52        b'=' => Ok(decode_bulk_string(buf, idx)?.map(|(bs, pos)| (Value::BulkString(bs), pos))),
53        b'>' => Ok(decode_array(buf, idx)?.map(|(v, pos)| (Value::Push(v), pos))),
54        _ => Err(Error::Client(format!(
55            "Unknown data type '{}' (0x{:02x})",
56            first_byte as char, first_byte
57        ))),
58    }
59}
60
61fn decode_bulk_string(buf: &mut BytesMut, idx: usize) -> Result<Option<(Option<Vec<u8>>, usize)>> {
62    match decode_integer(buf, idx)? {
63        None => Ok(None),
64        Some((-1, pos)) => Ok(Some((None, pos))),
65        Some((len, pos)) => {
66            let len = usize::try_from(len)
67                .map_err(|_| Error::Client("Malformed bulk string len".to_owned()))?;
68            if buf.len() - pos < len + 2 {
69                Ok(None) // EOF
70            } else if buf[pos + len] != b'\r' || buf[pos + len + 1] != b'\n' {
71                Err(Error::Client(format!(
72                    "Expected \\r\\n after bulk string. Got '{}''{}'",
73                    buf[pos + len] as char,
74                    buf[pos + len + 1] as char
75                )))
76            } else {
77                Ok(Some((
78                    Some(buf[pos..(pos + len)].to_vec()),
79                    pos + len + 2,
80                )))
81            }
82        }
83    }
84}
85
86fn decode_array(buf: &mut BytesMut, idx: usize) -> Result<Option<(Option<Vec<Value>>, usize)>> {
87    match decode_integer(buf, idx)? {
88        None => Ok(None),
89        Some((-1, pos)) => Ok(Some((None, pos))),
90        Some((len, pos)) => {
91            let mut values = Vec::with_capacity(
92                usize::try_from(len)
93                    .map_err(|_| Error::Client("Malformed array len".to_owned()))?,
94            );
95            let mut pos = pos;
96            for _ in 0..len {
97                match decode(buf, pos)? {
98                    None => return Ok(None),
99                    Some((value, new_pos)) => {
100                        values.push(value);
101                        pos = new_pos;
102                    }
103                }
104            }
105            Ok(Some((Some(values), pos)))
106        }
107    }
108}
109
110fn decode_map(buf: &mut BytesMut, idx: usize) -> Result<Option<(Option<Vec<Value>>, usize)>> {
111    match decode_integer(buf, idx)? {
112        None => Ok(None),
113        Some((-1, pos)) => Ok(Some((None, pos))),
114        Some((len, pos)) => {
115            let len = len * 2;
116            let mut values = Vec::with_capacity(
117                usize::try_from(len).map_err(|_| Error::Client("Malformed map len".to_owned()))?,
118            );
119            let mut pos = pos;
120            for _ in 0..len {
121                match decode(buf, pos)? {
122                    None => return Ok(None),
123                    Some((value, new_pos)) => {
124                        values.push(value);
125                        pos = new_pos;
126                    }
127                }
128            }
129            Ok(Some((Some(values), pos)))
130        }
131    }
132}
133
134fn decode_string(buf: &mut BytesMut, idx: usize) -> Result<Option<(&str, usize)>> {
135    let len = buf.len();
136    let mut pos = idx;
137    let mut cr = false;
138
139    while pos < len {
140        let byte = buf[pos];
141
142        match (cr, byte) {
143            (false, b'\r') => cr = true,
144            (true, b'\n') => return Ok(Some((std::str::from_utf8(&buf[idx..pos - 1])?, pos + 1))),
145            (false, _) => (),
146            _ => return Err(Error::Client(format!("Unexpected byte {}", byte))),
147        } 
148
149        pos += 1;
150    }
151
152    Ok(None)
153}
154
155fn decode_integer(buf: &mut BytesMut, idx: usize) -> Result<Option<(i64, usize)>> {
156    let len = buf.len();
157    let mut is_negative = false;
158    let mut i = 0i64;
159    let mut pos = idx;
160    let mut cr = false;
161
162    while pos < len {
163        let byte = buf[pos];
164
165        match (cr, is_negative, byte) {
166            (false, false, b'-') => is_negative = true,
167            (false, false, b'0'..=b'9') => i = i * 10 + i64::from(byte - b'0'),
168            (false, true, b'0'..=b'9') => i = i * 10 - i64::from(byte - b'0'),
169            (false, _, b'\r') => cr = true,
170            (true, _, b'\n') => return Ok(Some((i, pos + 1))),
171            _ => return Err(Error::Client(format!("Unexpected byte {}", byte))),
172        }
173
174        pos += 1;
175    }
176
177    Ok(None)
178}
179
180fn decode_double(buf: &mut BytesMut, idx: usize) -> Result<Option<(f64, usize)>> {
181    match buf[idx..].iter().position(|b| *b == b'\r') {
182        Some(pos) if buf[idx + pos + 1] == b'\n' => {
183            let slice = &buf[idx..idx + pos];
184            let str = std::str::from_utf8(slice)?;
185            let d = str.parse::<f64>()?;
186            Ok(Some((d, idx + pos + 2)))
187        }
188        _ => Err(Error::Client("malformed double".to_owned())),
189    }
190}
191
192fn decode_null(buf: &mut BytesMut, idx: usize) -> Result<Option<usize>> {
193    if buf[idx] != b'\r' || buf[idx + 1] != b'\n' {
194        Err(Error::Client(format!(
195            "Expected \\r\\n after null. Got '{}''{}'",
196            buf[idx] as char,
197            buf[idx + 1] as char
198        )))
199    } else {
200        Ok(Some(idx + 2))
201    }
202}
203
204fn decode_boolean(buf: &mut BytesMut, idx: usize) -> Result<Option<(i64, usize)>> {
205    if buf[idx + 1] != b'\r' || buf[idx + 2] != b'\n' {
206        Err(Error::Client(format!(
207            "Expected \\r\\n after bulk string. Got '{}''{}'",
208            buf[idx + 1] as char,
209            buf[idx + 2] as char
210        )))
211    } else {
212        match buf[idx] {
213            b't' => Ok(Some((1, idx + 2))),
214            b'f' => Ok(Some((0, idx + 2))),
215            _ => Err(Error::Client(format!(
216                "Unexpected boolean character '{}'",
217                buf[idx] as char,
218            ))),
219        }
220    }
221}