Skip to main content

beyond_resp/
codec.rs

1use bytes::BytesMut;
2use tokio_util::codec::{Decoder, Encoder};
3
4use crate::encode::encode;
5use crate::error::RespError;
6use crate::parse;
7use crate::value::{Value, Version};
8
9/// Maximum frame size the decoder will accept (matches Redis default: 512 MiB).
10///
11/// Note: the parsed `Value` tree is larger than the wire frame. Compact types
12/// like integers use ~4 wire bytes but ~56 bytes as a `Value` variant, so
13/// peak heap usage can be 10–15× the frame size in the worst case. Size this
14/// limit with that amplification in mind.
15pub const DEFAULT_MAX_FRAME_BYTES: usize = 512 * 1024 * 1024;
16
17/// Tokio-util codec for RESP2/RESP3 framing.
18///
19/// A single instance manages one connection's protocol state. Call
20/// [`set_version`] after a successful `HELLO 3` handshake to switch to RESP3.
21///
22/// [`set_version`]: RespCodec::set_version
23#[derive(Debug, Clone)]
24pub struct RespCodec {
25    version: Version,
26    max_frame_bytes: usize,
27}
28
29impl Default for RespCodec {
30    fn default() -> Self {
31        Self::new(Version::Resp2)
32    }
33}
34
35impl RespCodec {
36    pub fn new(version: Version) -> Self {
37        Self {
38            version,
39            max_frame_bytes: DEFAULT_MAX_FRAME_BYTES,
40        }
41    }
42
43    pub fn resp2() -> Self {
44        Self::new(Version::Resp2)
45    }
46
47    pub fn resp3() -> Self {
48        Self::new(Version::Resp3)
49    }
50
51    pub fn with_max_frame_bytes(mut self, limit: usize) -> Self {
52        self.max_frame_bytes = limit;
53        self
54    }
55
56    /// Switch protocol version mid-stream (e.g. after HELLO 3 succeeds).
57    pub fn set_version(&mut self, version: Version) {
58        self.version = version;
59    }
60
61    pub fn version(&self) -> Version {
62        self.version
63    }
64}
65
66impl Decoder for RespCodec {
67    type Item = Value;
68    type Error = RespError;
69
70    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Value>, RespError> {
71        if src.is_empty() {
72            return Ok(None);
73        }
74        if src.len() > self.max_frame_bytes {
75            return Err(RespError::too_large(self.max_frame_bytes));
76        }
77        match parse::frame_len(src) {
78            Ok(len) => {
79                if len > self.max_frame_bytes {
80                    return Err(RespError::too_large(self.max_frame_bytes));
81                }
82                let frozen = src.split_to(len).freeze();
83                let mut pos = 0;
84                Ok(Some(parse::build_value(&frozen, &mut pos, 0)?))
85            }
86            Err(RespError::Incomplete) => {
87                src.reserve(64);
88                Ok(None)
89            }
90            Err(e) => Err(e),
91        }
92    }
93}
94
95impl Encoder<Value> for RespCodec {
96    type Error = RespError;
97
98    fn encode(&mut self, item: Value, dst: &mut BytesMut) -> Result<(), RespError> {
99        encode(&item, dst, self.version);
100        Ok(())
101    }
102}
103
104impl Encoder<&Value> for RespCodec {
105    type Error = RespError;
106
107    fn encode(&mut self, item: &Value, dst: &mut BytesMut) -> Result<(), RespError> {
108        encode(item, dst, self.version);
109        Ok(())
110    }
111}
112
113#[cfg(feature = "monoio")]
114impl monoio_codec::Decoder for RespCodec {
115    type Item = Value;
116    type Error = RespError;
117
118    fn decode(&mut self, src: &mut BytesMut) -> Result<monoio_codec::Decoded<Value>, RespError> {
119        if src.is_empty() {
120            return Ok(monoio_codec::Decoded::Insufficient);
121        }
122        if src.len() > self.max_frame_bytes {
123            return Err(RespError::too_large(self.max_frame_bytes));
124        }
125        match parse::frame_len(src) {
126            Ok(len) => {
127                if len > self.max_frame_bytes {
128                    return Err(RespError::too_large(self.max_frame_bytes));
129                }
130                let frozen = src.split_to(len).freeze();
131                let mut pos = 0;
132                Ok(monoio_codec::Decoded::Some(parse::build_value(
133                    &frozen, &mut pos, 0,
134                )?))
135            }
136            Err(RespError::Incomplete) => {
137                src.reserve(64);
138                Ok(monoio_codec::Decoded::Insufficient)
139            }
140            Err(e) => Err(e),
141        }
142    }
143}