redis_protocol/
codec.rs

1#[cfg(feature = "resp3")]
2mod resp3 {
3  use crate::{
4    error::{RedisProtocolError, RedisProtocolErrorKind},
5    resp3::{
6      decode::streaming::decode_bytes_mut as resp3_decode,
7      encode::complete::{extend_encode as resp3_encode, extend_encode_borrowed as resp3_encode_borrowed},
8      types::{BorrowedFrame, BytesFrame as Resp3Frame, StreamedFrame},
9    },
10  };
11  use bytes::BytesMut;
12  use tokio_util::codec::{Decoder, Encoder};
13  /// Encode a redis command string (`SET foo bar NX`, etc) into a RESP3 blob string array.
14  pub fn resp3_encode_command(cmd: &str) -> Resp3Frame {
15    Resp3Frame::Array {
16      data:       cmd
17        .split(' ')
18        .map(|s| Resp3Frame::BlobString {
19          data:       s.as_bytes().to_vec().into(),
20          attributes: None,
21        })
22        .collect(),
23      attributes: None,
24    }
25  }
26
27  /// A framed codec for complete and streaming/chunked RESP3 frames.
28  ///
29  /// ```rust
30  /// use futures::{SinkExt, StreamExt};
31  /// use redis_protocol::{
32  ///   codec::{resp3_encode_command, Resp3},
33  ///   resp3::types::{BytesFrame, Resp3Frame, RespVersion},
34  /// };
35  /// use tokio::net::TcpStream;
36  /// use tokio_util::codec::Framed;
37  ///
38  /// // send `HELLO 3 AUTH foo bar` then `GET foo`
39  /// async fn example() {
40  ///   let socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();
41  ///   let mut framed = Framed::new(socket, Resp3::default());
42  ///
43  ///   let hello = BytesFrame::Hello {
44  ///     version: RespVersion::RESP3,
45  ///     auth:    Some(("foo".into(), "bar".into())),
46  ///     setname: None,
47  ///   };
48  ///   // or use the shorthand, but this likely only works for simple use cases
49  ///   let get_foo = resp3_encode_command("GET foo");
50  ///
51  ///   // `Framed` implements both `Sink` and `Stream`
52  ///   let _ = framed.send(hello).await.unwrap();
53  ///   let response = framed.next().await.unwrap();
54  ///   println!("HELLO response: {:?}", response);
55  ///
56  ///   let _ = framed.send(get_foo).await.unwrap();
57  ///   let response = framed.next().await.unwrap();
58  ///   println!("GET foo: {:?}", response);
59  /// }
60  /// ```
61  #[derive(Debug, Default)]
62  pub struct Resp3 {
63    streaming:         Option<StreamedFrame<Resp3Frame>>,
64    int_as_blobstring: bool,
65  }
66
67  impl Resp3 {
68    /// Create a new codec with the provided flag describing whether the encoder logic should send integers as blob
69    /// strings.
70    pub fn new(int_as_blobstring: bool) -> Resp3 {
71      Resp3 {
72        int_as_blobstring,
73        streaming: None,
74      }
75    }
76  }
77
78  impl Encoder<Resp3Frame> for Resp3 {
79    type Error = RedisProtocolError;
80
81    fn encode(&mut self, item: Resp3Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
82      resp3_encode(dst, &item, self.int_as_blobstring)
83        .map(|_| ())
84        .map_err(RedisProtocolError::from)
85    }
86  }
87
88  impl Encoder<BorrowedFrame<'_>> for Resp3 {
89    type Error = RedisProtocolError;
90
91    fn encode(&mut self, item: BorrowedFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
92      resp3_encode_borrowed(dst, &item, self.int_as_blobstring)
93        .map(|_| ())
94        .map_err(RedisProtocolError::from)
95    }
96  }
97
98  impl Decoder for Resp3 {
99    type Error = RedisProtocolError;
100    type Item = Resp3Frame;
101
102    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
103      if src.is_empty() {
104        return Ok(None);
105      }
106      let parsed = match resp3_decode(src)? {
107        Some((f, _, _)) => f,
108        None => return Ok(None),
109      };
110
111      if self.streaming.is_some() && parsed.is_streaming() {
112        return Err(RedisProtocolError::new(
113          RedisProtocolErrorKind::DecodeError,
114          "Cannot start a stream while already inside a stream.",
115        ));
116      }
117
118      let result = if let Some(ref mut state) = self.streaming {
119        // we started receiving streamed data earlier
120        state.add_frame(parsed.into_complete_frame()?);
121
122        if state.is_finished() {
123          Some(state.take()?)
124        } else {
125          None
126        }
127      } else {
128        // we're processing a complete frame or starting a new streamed frame
129        if parsed.is_streaming() {
130          self.streaming = Some(parsed.into_streaming_frame()?);
131          None
132        } else {
133          // we're not in the middle of a stream and we found a complete frame
134          Some(parsed.into_complete_frame()?)
135        }
136      };
137
138      if result.is_some() {
139        let _ = self.streaming.take();
140      }
141      Ok(result)
142    }
143  }
144}
145
146#[cfg(feature = "resp2")]
147mod resp2 {
148  use crate::{
149    error::RedisProtocolError,
150    resp2::{
151      decode::decode_bytes_mut as resp2_decode,
152      encode::{extend_encode as resp2_encode, extend_encode_borrowed as resp2_encode_borrowed},
153      types::{BorrowedFrame, BytesFrame as Resp2Frame},
154    },
155  };
156  use bytes::BytesMut;
157  use tokio_util::codec::{Decoder, Encoder};
158  /// Encode a redis command string (`SET foo bar NX`, etc) into a RESP2 bulk string array.
159  pub fn resp2_encode_command(cmd: &str) -> Resp2Frame {
160    Resp2Frame::Array(
161      cmd
162        .split(' ')
163        .map(|s| Resp2Frame::BulkString(s.as_bytes().to_vec().into()))
164        .collect(),
165    )
166  }
167
168  /// A framed RESP2 codec.
169  ///
170  /// ```rust
171  /// use futures::{SinkExt, StreamExt};
172  /// use redis_protocol::{
173  ///   codec::{resp2_encode_command, Resp2},
174  ///   resp2::types::{BytesFrame, Resp2Frame},
175  /// };
176  /// use tokio::net::TcpStream;
177  /// use tokio_util::codec::Framed;
178  ///
179  /// async fn example() {
180  ///   let socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();
181  ///   let mut framed = Framed::new(socket, Resp2::default());
182  ///
183  ///   let auth = resp2_encode_command("AUTH foo bar");
184  ///   let get_foo = resp2_encode_command("GET foo");
185  ///
186  ///   let _ = framed.send(auth).await.unwrap();
187  ///   let response = framed.next().await.unwrap().unwrap();
188  ///   assert_eq!(response.as_str().unwrap(), "OK");
189  ///
190  ///   let _ = framed.send(get_foo).await.unwrap();
191  ///   let response = framed.next().await.unwrap().unwrap();
192  ///   assert_eq!(response, BytesFrame::Null);
193  /// }
194  /// ```
195  #[derive(Clone, Debug, Default)]
196  pub struct Resp2 {
197    int_as_bulkstring: bool,
198  }
199
200  impl Resp2 {
201    /// Create a new codec with the provided flag describing whether the encoder logic should send integers as blob
202    /// strings.
203    pub fn new(int_as_bulkstring: bool) -> Resp2 {
204      Resp2 { int_as_bulkstring }
205    }
206  }
207
208  impl Encoder<Resp2Frame> for Resp2 {
209    type Error = RedisProtocolError;
210
211    fn encode(&mut self, item: Resp2Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
212      resp2_encode(dst, &item, self.int_as_bulkstring)
213        .map(|_| ())
214        .map_err(RedisProtocolError::from)
215    }
216  }
217
218  impl Encoder<BorrowedFrame<'_>> for Resp2 {
219    type Error = RedisProtocolError;
220
221    fn encode(&mut self, item: BorrowedFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
222      resp2_encode_borrowed(dst, &item, self.int_as_bulkstring)
223        .map(|_| ())
224        .map_err(RedisProtocolError::from)
225    }
226  }
227
228  impl Decoder for Resp2 {
229    type Error = RedisProtocolError;
230    type Item = Resp2Frame;
231
232    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
233      if src.is_empty() {
234        return Ok(None);
235      }
236      let parsed = match resp2_decode(src)? {
237        Some((frame, _, _)) => frame,
238        None => return Ok(None),
239      };
240
241      Ok(Some(parsed))
242    }
243  }
244}
245
246#[cfg(feature = "resp3")]
247#[cfg_attr(docsrs, doc(cfg(feature = "resp3")))]
248pub use resp3::*;
249
250#[cfg(feature = "resp2")]
251#[cfg_attr(docsrs, doc(cfg(feature = "resp2")))]
252pub use resp2::*;