1use crate::types::Value;
2use std::future::Future;
3use std::pin::Pin;
4use tokio::io::{
5 AsyncBufReadExt,
6 AsyncReadExt,
7 BufReader,
8};
9use tokio::net::TcpStream;
10
11const RESP_MAX_SIZE: i64 = 512 * 1024 * 1024;
13const OK_RESPONSE: &[u8] = &[79, 75];
14
15pub fn decode(
16 reader: &mut BufReader<TcpStream>
17) -> Pin<Box<dyn '_ + Future<Output = std::result::Result<Value, String>>>> {
18 Box::pin(async move {
19 let mut res: Vec<u8> = Vec::new();
20 reader
21 .read_until(b'\n', &mut res)
22 .await
23 .map_err(|e| e.to_string())?;
24
25 let len = res.len();
26 if len < 3 {
27 return Err(format!("too short: {}", len));
28 }
29 if !is_crlf(res[len - 2], res[len - 1]) {
30 return Err(format!(
31 "invalid CRLF: {:?}",
32 res
33 ));
34 }
35
36 let bytes = res[1..len - 2].as_ref();
37 match res[0] {
38 b'+' => match bytes {
40 OK_RESPONSE => Ok(Value::Okay),
41 bytes => String::from_utf8(bytes.to_vec())
42 .map_err(|e| e.to_string())
43 .map(Value::Status),
44 },
45 b'-' => match String::from_utf8(bytes.to_vec()) {
47 Ok(value) => Err(value),
48 Err(e) => Err(e.to_string()),
49 },
50 b':' => parse_integer(bytes).map(Value::Int),
52 b'$' => {
54 let int: i64 = parse_integer(bytes)?;
55 if int == -1 {
56 return Ok(Value::Nil);
58 }
59 if !(-1..RESP_MAX_SIZE).contains(&int) {
60 return Err(format!(
61 "invalid bulk length: {}",
62 int
63 ));
64 }
65
66 let int = int as usize;
67 let mut buf: Vec<u8> = vec![0; int + 2];
68 reader
69 .read_exact(buf.as_mut_slice())
70 .await
71 .map_err(|e| e.to_string())?;
72 if !is_crlf(buf[int], buf[int + 1]) {
73 return Err(format!(
74 "invalid CRLF: {:?}",
75 buf
76 ));
77 }
78 buf.truncate(int);
79 Ok(Value::Bulk(buf))
80 }
81 b'*' => {
83 let int = parse_integer(bytes)?;
84 if int == -1 {
85 return Ok(Value::Nil);
87 }
88 if !(-1..RESP_MAX_SIZE).contains(&int) {
89 return Err(format!(
90 "invalid array length: {}",
91 int
92 ));
93 }
94
95 let mut array: Vec<Value> = Vec::with_capacity(int as usize);
96 for _ in 0..int {
97 let val = decode(reader).await?;
98 array.push(val);
99 }
100 Ok(Value::Array(array))
101 }
102 prefix => Err(format!(
103 "invalid RESP type: {:?}",
104 prefix
105 )),
106 }
107 })
108}
109
110#[inline]
111fn is_crlf(
112 a: u8,
113 b: u8,
114) -> bool {
115 a == b'\r' && b == b'\n'
116}
117
118#[inline]
119fn parse_integer(bytes: &[u8]) -> std::result::Result<i64, String> {
120 std::str::from_utf8(bytes)
121 .map_or("", |f| f)
122 .parse::<i64>()
123 .map_err(|e| e.to_string())
124}