Skip to main content

beyond_resp/
codec.rs

1use bytes::BytesMut;
2use tokio_util::codec::{Decoder, Encoder};
3
4use crate::encode::encode;
5use crate::error::RespError;
6use crate::parse;
7use crate::value::{Value, Version};
8
9/// Maximum frame size the decoder will accept (matches Redis default: 512 MiB).
10///
11/// Note: the parsed `Value` tree is larger than the wire frame. Compact types
12/// like integers use ~4 wire bytes but ~56 bytes as a `Value` variant, so
13/// peak heap usage can be 10–15× the frame size in the worst case. Size this
14/// limit with that amplification in mind.
15pub const DEFAULT_MAX_FRAME_BYTES: usize = 512 * 1024 * 1024;
16
17/// Tokio-util codec for RESP2/RESP3 framing.
18///
19/// A single instance manages one connection's protocol state. Call
20/// [`set_version`] after a successful `HELLO 3` handshake to switch to RESP3.
21///
22/// [`set_version`]: RespCodec::set_version
23#[derive(Debug, Clone)]
24pub struct RespCodec {
25    version: Version,
26    max_frame_bytes: usize,
27}
28
29impl Default for RespCodec {
30    fn default() -> Self {
31        Self::new(Version::Resp2)
32    }
33}
34
35impl RespCodec {
36    pub fn new(version: Version) -> Self {
37        Self {
38            version,
39            max_frame_bytes: DEFAULT_MAX_FRAME_BYTES,
40        }
41    }
42
43    pub fn resp2() -> Self {
44        Self::new(Version::Resp2)
45    }
46
47    pub fn resp3() -> Self {
48        Self::new(Version::Resp3)
49    }
50
51    pub fn with_max_frame_bytes(mut self, limit: usize) -> Self {
52        self.max_frame_bytes = limit;
53        self
54    }
55
56    /// Switch protocol version mid-stream (e.g. after HELLO 3 succeeds).
57    pub fn set_version(&mut self, version: Version) {
58        self.version = version;
59    }
60
61    pub fn version(&self) -> Version {
62        self.version
63    }
64}
65
66impl Decoder for RespCodec {
67    type Item = Value;
68    type Error = RespError;
69
70    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Value>, RespError> {
71        if src.is_empty() {
72            return Ok(None);
73        }
74        match parse::frame_len(src) {
75            Ok(len) => {
76                if len > self.max_frame_bytes {
77                    return Err(RespError::too_large(self.max_frame_bytes));
78                }
79                let frozen = src.split_to(len).freeze();
80                let mut pos = 0;
81                Ok(Some(parse::build_value(&frozen, &mut pos, 0)?))
82            }
83            Err(RespError::Incomplete) => {
84                src.reserve(64);
85                Ok(None)
86            }
87            Err(e) => Err(e),
88        }
89    }
90}
91
92impl Encoder<Value> for RespCodec {
93    type Error = RespError;
94
95    fn encode(&mut self, item: Value, dst: &mut BytesMut) -> Result<(), RespError> {
96        encode(&item, dst, self.version);
97        Ok(())
98    }
99}
100
101impl Encoder<&Value> for RespCodec {
102    type Error = RespError;
103
104    fn encode(&mut self, item: &Value, dst: &mut BytesMut) -> Result<(), RespError> {
105        encode(item, dst, self.version);
106        Ok(())
107    }
108}