1use bytes::BytesMut;
2use tokio_util::codec::{Decoder, Encoder};
3
4use crate::encode::encode;
5use crate::error::RespError;
6use crate::parse;
7use crate::value::{Value, Version};
8
9pub const DEFAULT_MAX_FRAME_BYTES: usize = 512 * 1024 * 1024;
16
17#[derive(Debug, Clone)]
24pub struct RespCodec {
25 version: Version,
26 max_frame_bytes: usize,
27}
28
29impl Default for RespCodec {
30 fn default() -> Self {
31 Self::new(Version::Resp2)
32 }
33}
34
35impl RespCodec {
36 pub fn new(version: Version) -> Self {
37 Self {
38 version,
39 max_frame_bytes: DEFAULT_MAX_FRAME_BYTES,
40 }
41 }
42
43 pub fn resp2() -> Self {
44 Self::new(Version::Resp2)
45 }
46
47 pub fn resp3() -> Self {
48 Self::new(Version::Resp3)
49 }
50
51 pub fn with_max_frame_bytes(mut self, limit: usize) -> Self {
52 self.max_frame_bytes = limit;
53 self
54 }
55
56 pub fn set_version(&mut self, version: Version) {
58 self.version = version;
59 }
60
61 pub fn version(&self) -> Version {
62 self.version
63 }
64}
65
66impl Decoder for RespCodec {
67 type Item = Value;
68 type Error = RespError;
69
70 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Value>, RespError> {
71 if src.is_empty() {
72 return Ok(None);
73 }
74 match parse::frame_len(src) {
75 Ok(len) => {
76 if len > self.max_frame_bytes {
77 return Err(RespError::too_large(self.max_frame_bytes));
78 }
79 let frozen = src.split_to(len).freeze();
80 let mut pos = 0;
81 Ok(Some(parse::build_value(&frozen, &mut pos, 0)?))
82 }
83 Err(RespError::Incomplete) => {
84 src.reserve(64);
85 Ok(None)
86 }
87 Err(e) => Err(e),
88 }
89 }
90}
91
92impl Encoder<Value> for RespCodec {
93 type Error = RespError;
94
95 fn encode(&mut self, item: Value, dst: &mut BytesMut) -> Result<(), RespError> {
96 encode(&item, dst, self.version);
97 Ok(())
98 }
99}
100
101impl Encoder<&Value> for RespCodec {
102 type Error = RespError;
103
104 fn encode(&mut self, item: &Value, dst: &mut BytesMut) -> Result<(), RespError> {
105 encode(item, dst, self.version);
106 Ok(())
107 }
108}