1#[cfg(feature = "resp3")]
2mod resp3 {
3 use crate::{
4 error::{RedisProtocolError, RedisProtocolErrorKind},
5 resp3::{
6 decode::streaming::decode_bytes_mut as resp3_decode,
7 encode::complete::{extend_encode as resp3_encode, extend_encode_borrowed as resp3_encode_borrowed},
8 types::{BorrowedFrame, BytesFrame as Resp3Frame, StreamedFrame},
9 },
10 };
11 use bytes::BytesMut;
12 use tokio_util::codec::{Decoder, Encoder};
13 pub fn resp3_encode_command(cmd: &str) -> Resp3Frame {
15 Resp3Frame::Array {
16 data: cmd
17 .split(' ')
18 .map(|s| Resp3Frame::BlobString {
19 data: s.as_bytes().to_vec().into(),
20 attributes: None,
21 })
22 .collect(),
23 attributes: None,
24 }
25 }
26
27 #[derive(Debug, Default)]
62 pub struct Resp3 {
63 streaming: Option<StreamedFrame<Resp3Frame>>,
64 int_as_blobstring: bool,
65 }
66
67 impl Resp3 {
68 pub fn new(int_as_blobstring: bool) -> Resp3 {
71 Resp3 {
72 int_as_blobstring,
73 streaming: None,
74 }
75 }
76 }
77
78 impl Encoder<Resp3Frame> for Resp3 {
79 type Error = RedisProtocolError;
80
81 fn encode(&mut self, item: Resp3Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
82 resp3_encode(dst, &item, self.int_as_blobstring)
83 .map(|_| ())
84 .map_err(RedisProtocolError::from)
85 }
86 }
87
88 impl Encoder<BorrowedFrame<'_>> for Resp3 {
89 type Error = RedisProtocolError;
90
91 fn encode(&mut self, item: BorrowedFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
92 resp3_encode_borrowed(dst, &item, self.int_as_blobstring)
93 .map(|_| ())
94 .map_err(RedisProtocolError::from)
95 }
96 }
97
98 impl Decoder for Resp3 {
99 type Error = RedisProtocolError;
100 type Item = Resp3Frame;
101
102 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
103 if src.is_empty() {
104 return Ok(None);
105 }
106 let parsed = match resp3_decode(src)? {
107 Some((f, _, _)) => f,
108 None => return Ok(None),
109 };
110
111 if self.streaming.is_some() && parsed.is_streaming() {
112 return Err(RedisProtocolError::new(
113 RedisProtocolErrorKind::DecodeError,
114 "Cannot start a stream while already inside a stream.",
115 ));
116 }
117
118 let result = if let Some(ref mut state) = self.streaming {
119 state.add_frame(parsed.into_complete_frame()?);
121
122 if state.is_finished() {
123 Some(state.take()?)
124 } else {
125 None
126 }
127 } else {
128 if parsed.is_streaming() {
130 self.streaming = Some(parsed.into_streaming_frame()?);
131 None
132 } else {
133 Some(parsed.into_complete_frame()?)
135 }
136 };
137
138 if result.is_some() {
139 let _ = self.streaming.take();
140 }
141 Ok(result)
142 }
143 }
144}
145
146#[cfg(feature = "resp2")]
147mod resp2 {
148 use crate::{
149 error::RedisProtocolError,
150 resp2::{
151 decode::decode_bytes_mut as resp2_decode,
152 encode::{extend_encode as resp2_encode, extend_encode_borrowed as resp2_encode_borrowed},
153 types::{BorrowedFrame, BytesFrame as Resp2Frame},
154 },
155 };
156 use bytes::BytesMut;
157 use tokio_util::codec::{Decoder, Encoder};
158 pub fn resp2_encode_command(cmd: &str) -> Resp2Frame {
160 Resp2Frame::Array(
161 cmd
162 .split(' ')
163 .map(|s| Resp2Frame::BulkString(s.as_bytes().to_vec().into()))
164 .collect(),
165 )
166 }
167
168 #[derive(Clone, Debug, Default)]
196 pub struct Resp2 {
197 int_as_bulkstring: bool,
198 }
199
200 impl Resp2 {
201 pub fn new(int_as_bulkstring: bool) -> Resp2 {
204 Resp2 { int_as_bulkstring }
205 }
206 }
207
208 impl Encoder<Resp2Frame> for Resp2 {
209 type Error = RedisProtocolError;
210
211 fn encode(&mut self, item: Resp2Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
212 resp2_encode(dst, &item, self.int_as_bulkstring)
213 .map(|_| ())
214 .map_err(RedisProtocolError::from)
215 }
216 }
217
218 impl Encoder<BorrowedFrame<'_>> for Resp2 {
219 type Error = RedisProtocolError;
220
221 fn encode(&mut self, item: BorrowedFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
222 resp2_encode_borrowed(dst, &item, self.int_as_bulkstring)
223 .map(|_| ())
224 .map_err(RedisProtocolError::from)
225 }
226 }
227
228 impl Decoder for Resp2 {
229 type Error = RedisProtocolError;
230 type Item = Resp2Frame;
231
232 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
233 if src.is_empty() {
234 return Ok(None);
235 }
236 let parsed = match resp2_decode(src)? {
237 Some((frame, _, _)) => frame,
238 None => return Ok(None),
239 };
240
241 Ok(Some(parsed))
242 }
243 }
244}
245
246#[cfg(feature = "resp3")]
247#[cfg_attr(docsrs, doc(cfg(feature = "resp3")))]
248pub use resp3::*;
249
250#[cfg(feature = "resp2")]
251#[cfg_attr(docsrs, doc(cfg(feature = "resp2")))]
252pub use resp2::*;