redis_rawl/
serialize.rs

1use crate::types::Value;
2use std::future::Future;
3use std::pin::Pin;
4use tokio::io::{
5    AsyncBufReadExt,
6    AsyncReadExt,
7    BufReader,
8};
9use tokio::net::TcpStream;
10
11/// up to 512 MB in length
12const RESP_MAX_SIZE: i64 = 512 * 1024 * 1024;
13const OK_RESPONSE: &[u8] = &[79, 75];
14
15pub fn decode(
16    reader: &mut BufReader<TcpStream>
17) -> Pin<Box<dyn '_ + Future<Output = std::result::Result<Value, String>>>> {
18    Box::pin(async move {
19        let mut res: Vec<u8> = Vec::new();
20        reader
21            .read_until(b'\n', &mut res)
22            .await
23            .map_err(|e| e.to_string())?;
24
25        let len = res.len();
26        if len < 3 {
27            return Err(format!("too short: {}", len));
28        }
29        if !is_crlf(res[len - 2], res[len - 1]) {
30            return Err(format!(
31                "invalid CRLF: {:?}",
32                res
33            ));
34        }
35
36        let bytes = res[1..len - 2].as_ref();
37        match res[0] {
38            // Value::String
39            b'+' => match bytes {
40                OK_RESPONSE => Ok(Value::Okay),
41                bytes => String::from_utf8(bytes.to_vec())
42                    .map_err(|e| e.to_string())
43                    .map(Value::Status),
44            },
45            // Value::Error
46            b'-' => match String::from_utf8(bytes.to_vec()) {
47                Ok(value) => Err(value),
48                Err(e) => Err(e.to_string()),
49            },
50            // Value::Integer
51            b':' => parse_integer(bytes).map(Value::Int),
52            // Value::Bulk
53            b'$' => {
54                let int: i64 = parse_integer(bytes)?;
55                if int == -1 {
56                    // Nil bulk
57                    return Ok(Value::Nil);
58                }
59                if !(-1..RESP_MAX_SIZE).contains(&int) {
60                    return Err(format!(
61                        "invalid bulk length: {}",
62                        int
63                    ));
64                }
65
66                let int = int as usize;
67                let mut buf: Vec<u8> = vec![0; int + 2];
68                reader
69                    .read_exact(buf.as_mut_slice())
70                    .await
71                    .map_err(|e| e.to_string())?;
72                if !is_crlf(buf[int], buf[int + 1]) {
73                    return Err(format!(
74                        "invalid CRLF: {:?}",
75                        buf
76                    ));
77                }
78                buf.truncate(int);
79                Ok(Value::Bulk(buf))
80            }
81            // Value::Array
82            b'*' => {
83                let int = parse_integer(bytes)?;
84                if int == -1 {
85                    // Null array
86                    return Ok(Value::Nil);
87                }
88                if !(-1..RESP_MAX_SIZE).contains(&int) {
89                    return Err(format!(
90                        "invalid array length: {}",
91                        int
92                    ));
93                }
94
95                let mut array: Vec<Value> = Vec::with_capacity(int as usize);
96                for _ in 0..int {
97                    let val = decode(reader).await?;
98                    array.push(val);
99                }
100                Ok(Value::Array(array))
101            }
102            prefix => Err(format!(
103                "invalid RESP type: {:?}",
104                prefix
105            )),
106        }
107    })
108}
109
110#[inline]
111fn is_crlf(
112    a: u8,
113    b: u8,
114) -> bool {
115    a == b'\r' && b == b'\n'
116}
117
118#[inline]
119fn parse_integer(bytes: &[u8]) -> std::result::Result<i64, String> {
120    std::str::from_utf8(bytes)
121        .map_or("", |f| f)
122        .parse::<i64>()
123        .map_err(|e| e.to_string())
124}