1use bytes::BytesMut;
2use tokio_util::codec::{Decoder, Encoder};
3
4use crate::encode::encode;
5use crate::error::RespError;
6use crate::parse;
7use crate::value::{Value, Version};
8
9pub const DEFAULT_MAX_FRAME_BYTES: usize = 512 * 1024 * 1024;
16
17#[derive(Debug, Clone)]
24pub struct RespCodec {
25 version: Version,
26 max_frame_bytes: usize,
27}
28
29impl Default for RespCodec {
30 fn default() -> Self {
31 Self::new(Version::Resp2)
32 }
33}
34
35impl RespCodec {
36 pub fn new(version: Version) -> Self {
37 Self {
38 version,
39 max_frame_bytes: DEFAULT_MAX_FRAME_BYTES,
40 }
41 }
42
43 pub fn resp2() -> Self {
44 Self::new(Version::Resp2)
45 }
46
47 pub fn resp3() -> Self {
48 Self::new(Version::Resp3)
49 }
50
51 pub fn with_max_frame_bytes(mut self, limit: usize) -> Self {
52 self.max_frame_bytes = limit;
53 self
54 }
55
56 pub fn set_version(&mut self, version: Version) {
58 self.version = version;
59 }
60
61 pub fn version(&self) -> Version {
62 self.version
63 }
64}
65
66impl Decoder for RespCodec {
67 type Item = Value;
68 type Error = RespError;
69
70 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Value>, RespError> {
71 if src.is_empty() {
72 return Ok(None);
73 }
74 if src.len() > self.max_frame_bytes {
75 return Err(RespError::too_large(self.max_frame_bytes));
76 }
77 match parse::frame_len(src) {
78 Ok(len) => {
79 if len > self.max_frame_bytes {
80 return Err(RespError::too_large(self.max_frame_bytes));
81 }
82 let frozen = src.split_to(len).freeze();
83 let mut pos = 0;
84 Ok(Some(parse::build_value(&frozen, &mut pos, 0)?))
85 }
86 Err(RespError::Incomplete) => {
87 src.reserve(64);
88 Ok(None)
89 }
90 Err(e) => Err(e),
91 }
92 }
93}
94
95impl Encoder<Value> for RespCodec {
96 type Error = RespError;
97
98 fn encode(&mut self, item: Value, dst: &mut BytesMut) -> Result<(), RespError> {
99 encode(&item, dst, self.version);
100 Ok(())
101 }
102}
103
104impl Encoder<&Value> for RespCodec {
105 type Error = RespError;
106
107 fn encode(&mut self, item: &Value, dst: &mut BytesMut) -> Result<(), RespError> {
108 encode(item, dst, self.version);
109 Ok(())
110 }
111}
112
113#[cfg(feature = "monoio")]
114impl monoio_codec::Decoder for RespCodec {
115 type Item = Value;
116 type Error = RespError;
117
118 fn decode(&mut self, src: &mut BytesMut) -> Result<monoio_codec::Decoded<Value>, RespError> {
119 if src.is_empty() {
120 return Ok(monoio_codec::Decoded::Insufficient);
121 }
122 if src.len() > self.max_frame_bytes {
123 return Err(RespError::too_large(self.max_frame_bytes));
124 }
125 match parse::frame_len(src) {
126 Ok(len) => {
127 if len > self.max_frame_bytes {
128 return Err(RespError::too_large(self.max_frame_bytes));
129 }
130 let frozen = src.split_to(len).freeze();
131 let mut pos = 0;
132 Ok(monoio_codec::Decoded::Some(parse::build_value(
133 &frozen, &mut pos, 0,
134 )?))
135 }
136 Err(RespError::Incomplete) => {
137 src.reserve(64);
138 Ok(monoio_codec::Decoded::Insufficient)
139 }
140 Err(e) => Err(e),
141 }
142 }
143}