1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#[cfg(feature = "resp3")]
mod resp3 {
  use crate::{
    error::{RedisProtocolError, RedisProtocolErrorKind},
    resp3::{
      decode::streaming::decode_bytes_mut as resp3_decode,
      encode::complete::extend_encode as resp3_encode,
      types::{BytesFrame as Resp3Frame, StreamedFrame},
    },
  };
  use bytes::BytesMut;
  use tokio_util::codec::{Decoder, Encoder};
  /// Encode a redis command string (`SET foo bar NX`, etc) into a RESP3 blob string array.
  pub fn resp3_encode_command(cmd: &str) -> Resp3Frame {
    Resp3Frame::Array {
      data:       cmd
        .split(' ')
        .map(|s| Resp3Frame::BlobString {
          data:       s.as_bytes().to_vec().into(),
          attributes: None,
        })
        .collect(),
      attributes: None,
    }
  }

  /// A framed codec for complete and streaming/chunked RESP3 frames.
  ///
  /// ```rust
  /// use futures::{SinkExt, StreamExt};
  /// use redis_protocol::{
  ///   codec::{resp3_encode_command, Resp3},
  ///   resp3::types::{BytesFrame, Resp3Frame, RespVersion},
  /// };
  /// use tokio::net::TcpStream;
  /// use tokio_util::codec::Framed;
  ///
  /// // send `HELLO 3 AUTH foo bar` then `GET foo`
  /// async fn example() {
  ///   let socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();
  ///   let mut framed = Framed::new(socket, Resp3::default());
  ///
  ///   let hello = Resp3Frame::Hello {
  ///     version:  RespVersion::RESP3,
  ///     username: Some("foo".into()),
  ///     password: Some("bar".into()),
  ///   };
  ///   // or use the shorthand, but this likely only works for simple use cases
  ///   let get_foo = resp3_encode_command("GET foo");
  ///
  ///   // `Framed` implements both `Sink` and `Stream`
  ///   let _ = framed.send(hello).await.unwrap();
  ///   let response = framed.next().await.unwrap();
  ///   println!("HELLO response: {:?}", response);
  ///
  ///   let _ = framed.send(get_foo).await.unwrap();
  ///   let response = framed.next().await.unwrap();
  ///   println!("GET foo: {:?}", response);
  /// }
  /// ```
  #[derive(Debug, Default)]
  pub struct Resp3 {
    streaming: Option<StreamedFrame<Resp3Frame>>,
  }

  impl Encoder<Resp3Frame> for Resp3 {
    type Error = RedisProtocolError;

    fn encode(&mut self, item: Resp3Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
      resp3_encode(dst, &item).map(|_| ()).map_err(RedisProtocolError::from)
    }
  }

  impl Decoder for Resp3 {
    type Error = RedisProtocolError;
    type Item = Resp3Frame;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
      if src.is_empty() {
        return Ok(None);
      }
      let parsed = match resp3_decode(src)? {
        Some((f, _, _)) => f,
        None => return Ok(None),
      };

      if self.streaming.is_some() && parsed.is_streaming() {
        return Err(RedisProtocolError::new(
          RedisProtocolErrorKind::DecodeError,
          "Cannot start a stream while already inside a stream.",
        ));
      }

      let result = if let Some(ref mut state) = self.streaming {
        // we started receiving streamed data earlier
        state.add_frame(parsed.into_complete_frame()?);

        if state.is_finished() {
          Some(state.take()?)
        } else {
          None
        }
      } else {
        // we're processing a complete frame or starting a new streamed frame
        if parsed.is_streaming() {
          self.streaming = Some(parsed.into_streaming_frame()?);
          None
        } else {
          // we're not in the middle of a stream and we found a complete frame
          Some(parsed.into_complete_frame()?)
        }
      };

      if result.is_some() {
        let _ = self.streaming.take();
      }
      Ok(result)
    }
  }
}

#[cfg(feature = "resp2")]
mod resp2 {
  use crate::{
    error::RedisProtocolError,
    resp2::{
      decode::decode_bytes_mut as resp2_decode,
      encode::extend_encode as resp2_encode,
      types::BytesFrame as Resp2Frame,
    },
  };
  use bytes::BytesMut;
  use tokio_util::codec::{Decoder, Encoder};
  /// Encode a redis command string (`SET foo bar NX`, etc) into a RESP2 bulk string array.
  pub fn resp2_encode_command(cmd: &str) -> Resp2Frame {
    Resp2Frame::Array(
      cmd
        .split(' ')
        .map(|s| Resp2Frame::BulkString(s.as_bytes().to_vec().into()))
        .collect(),
    )
  }

  /// A framed RESP2 codec.
  ///
  /// ```rust
  /// use futures::{SinkExt, StreamExt};
  /// use redis_protocol::{
  ///   codec::{resp2_encode_command, Resp2},
  ///   resp2::types::{BytesFrame, Resp2Frame},
  /// };
  /// use tokio::net::TcpStream;
  /// use tokio_util::codec::Framed;
  ///
  /// async fn example() {
  ///   let socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();
  ///   let mut framed = Framed::new(socket, Resp2::default());
  ///
  ///   let auth = resp2_encode_command("AUTH foo bar");
  ///   let get_foo = resp2_encode_command("GET foo");
  ///
  ///   let _ = framed.send(auth).await.unwrap();
  ///   let response = framed.next().await.unwrap().unwrap();
  ///   assert_eq!(response.as_str().unwrap(), "OK");
  ///
  ///   let _ = framed.send(get_foo).await.unwrap();
  ///   let response = framed.next().await.unwrap().unwrap();
  ///   assert_eq!(response, BytesFrame::Null);
  /// }
  /// ```
  #[derive(Clone, Debug, Default)]
  pub struct Resp2;

  impl Encoder<Resp2Frame> for Resp2 {
    type Error = RedisProtocolError;

    fn encode(&mut self, item: Resp2Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
      resp2_encode(dst, &item).map(|_| ()).map_err(RedisProtocolError::from)
    }
  }

  impl Decoder for Resp2 {
    type Error = RedisProtocolError;
    type Item = Resp2Frame;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
      if src.is_empty() {
        return Ok(None);
      }
      let parsed = match resp2_decode(src)? {
        Some((frame, _, _)) => frame,
        None => return Ok(None),
      };

      Ok(Some(parsed))
    }
  }
}

#[cfg(feature = "resp3")]
#[cfg_attr(docsrs, doc(cfg(feature = "resp3")))]
pub use resp3::*;

#[cfg(feature = "resp2")]
#[cfg_attr(docsrs, doc(cfg(feature = "resp2")))]
pub use resp2::*;