1use crate::{
2 resp::{Value},
3 Error, RedisError, Result,
4};
5use bytes::{Buf, BytesMut};
6use log::trace;
7use std::str::FromStr;
8use tokio_util::codec::Decoder;
9
10pub(crate) struct ValueDecoder;
11
12impl Decoder for ValueDecoder {
13 type Item = Value;
14 type Error = Error;
15
16 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Value>> {
17 Ok(decode(src, 0)?.map(|(item, pos)| {
18 trace!(
19 "decode: {}",
20 String::from_utf8_lossy(&src.as_ref()[..pos]).replace("\r\n", "\\r\\n")
21 );
22 src.advance(pos);
23 item
24 }))
25 }
26}
27
28fn decode(buf: &mut BytesMut, idx: usize) -> Result<Option<(Value, usize)>> {
29 if buf.len() <= idx {
30 return Ok(None);
31 }
32
33 let first_byte = buf[idx];
34 let idx = idx + 1;
35
36 match first_byte {
38 b'$' => Ok(decode_bulk_string(buf, idx)?.map(|(bs, pos)| (Value::BulkString(bs), pos))),
39 b'*' => Ok(decode_array(buf, idx)?.map(|(v, pos)| (Value::Array(v), pos))),
40 b'%' => Ok(decode_map(buf, idx)?.map(|(v, pos)| (Value::Array(v), pos))),
41 b'~' => Ok(decode_array(buf, idx)?.map(|(v, pos)| (Value::Array(v), pos))),
42 b':' => Ok(decode_integer(buf, idx)?.map(|(i, pos)| (Value::Integer(i), pos))),
43 b',' => Ok(decode_double(buf, idx)?.map(|(d, pos)| (Value::Double(d), pos))),
44 b'+' => {
45 Ok(decode_string(buf, idx)?.map(|(s, pos)| (Value::SimpleString(s.to_owned()), pos)))
46 }
47 b'-' => decode_string(buf, idx)?
48 .map(|(s, pos)| RedisError::from_str(s).map(|e| (Value::Error(e), pos)))
49 .transpose(),
50 b'_' => Ok(decode_null(buf, idx)?.map(|pos| (Value::BulkString(None), pos))),
51 b'#' => Ok(decode_boolean(buf, idx)?.map(|(i, pos)| (Value::Integer(i), pos))),
52 b'=' => Ok(decode_bulk_string(buf, idx)?.map(|(bs, pos)| (Value::BulkString(bs), pos))),
53 b'>' => Ok(decode_array(buf, idx)?.map(|(v, pos)| (Value::Push(v), pos))),
54 _ => Err(Error::Client(format!(
55 "Unknown data type '{}' (0x{:02x})",
56 first_byte as char, first_byte
57 ))),
58 }
59}
60
61fn decode_bulk_string(buf: &mut BytesMut, idx: usize) -> Result<Option<(Option<Vec<u8>>, usize)>> {
62 match decode_integer(buf, idx)? {
63 None => Ok(None),
64 Some((-1, pos)) => Ok(Some((None, pos))),
65 Some((len, pos)) => {
66 let len = usize::try_from(len)
67 .map_err(|_| Error::Client("Malformed bulk string len".to_owned()))?;
68 if buf.len() - pos < len + 2 {
69 Ok(None) } else if buf[pos + len] != b'\r' || buf[pos + len + 1] != b'\n' {
71 Err(Error::Client(format!(
72 "Expected \\r\\n after bulk string. Got '{}''{}'",
73 buf[pos + len] as char,
74 buf[pos + len + 1] as char
75 )))
76 } else {
77 Ok(Some((
78 Some(buf[pos..(pos + len)].to_vec()),
79 pos + len + 2,
80 )))
81 }
82 }
83 }
84}
85
86fn decode_array(buf: &mut BytesMut, idx: usize) -> Result<Option<(Option<Vec<Value>>, usize)>> {
87 match decode_integer(buf, idx)? {
88 None => Ok(None),
89 Some((-1, pos)) => Ok(Some((None, pos))),
90 Some((len, pos)) => {
91 let mut values = Vec::with_capacity(
92 usize::try_from(len)
93 .map_err(|_| Error::Client("Malformed array len".to_owned()))?,
94 );
95 let mut pos = pos;
96 for _ in 0..len {
97 match decode(buf, pos)? {
98 None => return Ok(None),
99 Some((value, new_pos)) => {
100 values.push(value);
101 pos = new_pos;
102 }
103 }
104 }
105 Ok(Some((Some(values), pos)))
106 }
107 }
108}
109
110fn decode_map(buf: &mut BytesMut, idx: usize) -> Result<Option<(Option<Vec<Value>>, usize)>> {
111 match decode_integer(buf, idx)? {
112 None => Ok(None),
113 Some((-1, pos)) => Ok(Some((None, pos))),
114 Some((len, pos)) => {
115 let len = len * 2;
116 let mut values = Vec::with_capacity(
117 usize::try_from(len).map_err(|_| Error::Client("Malformed map len".to_owned()))?,
118 );
119 let mut pos = pos;
120 for _ in 0..len {
121 match decode(buf, pos)? {
122 None => return Ok(None),
123 Some((value, new_pos)) => {
124 values.push(value);
125 pos = new_pos;
126 }
127 }
128 }
129 Ok(Some((Some(values), pos)))
130 }
131 }
132}
133
134fn decode_string(buf: &mut BytesMut, idx: usize) -> Result<Option<(&str, usize)>> {
135 let len = buf.len();
136 let mut pos = idx;
137 let mut cr = false;
138
139 while pos < len {
140 let byte = buf[pos];
141
142 match (cr, byte) {
143 (false, b'\r') => cr = true,
144 (true, b'\n') => return Ok(Some((std::str::from_utf8(&buf[idx..pos - 1])?, pos + 1))),
145 (false, _) => (),
146 _ => return Err(Error::Client(format!("Unexpected byte {}", byte))),
147 }
148
149 pos += 1;
150 }
151
152 Ok(None)
153}
154
155fn decode_integer(buf: &mut BytesMut, idx: usize) -> Result<Option<(i64, usize)>> {
156 let len = buf.len();
157 let mut is_negative = false;
158 let mut i = 0i64;
159 let mut pos = idx;
160 let mut cr = false;
161
162 while pos < len {
163 let byte = buf[pos];
164
165 match (cr, is_negative, byte) {
166 (false, false, b'-') => is_negative = true,
167 (false, false, b'0'..=b'9') => i = i * 10 + i64::from(byte - b'0'),
168 (false, true, b'0'..=b'9') => i = i * 10 - i64::from(byte - b'0'),
169 (false, _, b'\r') => cr = true,
170 (true, _, b'\n') => return Ok(Some((i, pos + 1))),
171 _ => return Err(Error::Client(format!("Unexpected byte {}", byte))),
172 }
173
174 pos += 1;
175 }
176
177 Ok(None)
178}
179
180fn decode_double(buf: &mut BytesMut, idx: usize) -> Result<Option<(f64, usize)>> {
181 match buf[idx..].iter().position(|b| *b == b'\r') {
182 Some(pos) if buf[idx + pos + 1] == b'\n' => {
183 let slice = &buf[idx..idx + pos];
184 let str = std::str::from_utf8(slice)?;
185 let d = str.parse::<f64>()?;
186 Ok(Some((d, idx + pos + 2)))
187 }
188 _ => Err(Error::Client("malformed double".to_owned())),
189 }
190}
191
192fn decode_null(buf: &mut BytesMut, idx: usize) -> Result<Option<usize>> {
193 if buf[idx] != b'\r' || buf[idx + 1] != b'\n' {
194 Err(Error::Client(format!(
195 "Expected \\r\\n after null. Got '{}''{}'",
196 buf[idx] as char,
197 buf[idx + 1] as char
198 )))
199 } else {
200 Ok(Some(idx + 2))
201 }
202}
203
204fn decode_boolean(buf: &mut BytesMut, idx: usize) -> Result<Option<(i64, usize)>> {
205 if buf[idx + 1] != b'\r' || buf[idx + 2] != b'\n' {
206 Err(Error::Client(format!(
207 "Expected \\r\\n after bulk string. Got '{}''{}'",
208 buf[idx + 1] as char,
209 buf[idx + 2] as char
210 )))
211 } else {
212 match buf[idx] {
213 b't' => Ok(Some((1, idx + 2))),
214 b'f' => Ok(Some((0, idx + 2))),
215 _ => Err(Error::Client(format!(
216 "Unexpected boolean character '{}'",
217 buf[idx] as char,
218 ))),
219 }
220 }
221}