1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#[cfg(feature = "resp3")]
mod resp3 {
use crate::{
error::{RedisProtocolError, RedisProtocolErrorKind},
resp3::{
decode::streaming::decode_bytes_mut as resp3_decode,
encode::complete::{extend_encode as resp3_encode, extend_encode_borrowed as resp3_encode_borrowed},
types::{BorrowedFrame, BytesFrame as Resp3Frame, StreamedFrame},
},
};
use bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};
/// Encode a redis command string (`SET foo bar NX`, etc) into a RESP3 blob string array.
pub fn resp3_encode_command(cmd: &str) -> Resp3Frame {
Resp3Frame::Array {
data: cmd
.split(' ')
.map(|s| Resp3Frame::BlobString {
data: s.as_bytes().to_vec().into(),
attributes: None,
})
.collect(),
attributes: None,
}
}
/// A framed codec for complete and streaming/chunked RESP3 frames.
///
/// ```rust
/// use futures::{SinkExt, StreamExt};
/// use redis_protocol::{
/// codec::{resp3_encode_command, Resp3},
/// resp3::types::{BytesFrame, Resp3Frame, RespVersion},
/// };
/// use tokio::net::TcpStream;
/// use tokio_util::codec::Framed;
///
/// // send `HELLO 3 AUTH foo bar` then `GET foo`
/// async fn example() {
/// let socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();
/// let mut framed = Framed::new(socket, Resp3::default());
///
/// let hello = BytesFrame::Hello {
/// version: RespVersion::RESP3,
/// auth: Some(("foo".into(), "bar".into())),
/// setname: None,
/// };
/// // or use the shorthand, but this likely only works for simple use cases
/// let get_foo = resp3_encode_command("GET foo");
///
/// // `Framed` implements both `Sink` and `Stream`
/// let _ = framed.send(hello).await.unwrap();
/// let response = framed.next().await.unwrap();
/// println!("HELLO response: {:?}", response);
///
/// let _ = framed.send(get_foo).await.unwrap();
/// let response = framed.next().await.unwrap();
/// println!("GET foo: {:?}", response);
/// }
/// ```
#[derive(Debug, Default)]
pub struct Resp3 {
streaming: Option<StreamedFrame<Resp3Frame>>,
int_as_blobstring: bool,
}
impl Resp3 {
/// Create a new codec with the provided flag describing whether the encoder logic should send integers as blob
/// strings.
pub fn new(int_as_blobstring: bool) -> Resp3 {
Resp3 {
int_as_blobstring,
streaming: None,
}
}
}
impl Encoder<Resp3Frame> for Resp3 {
type Error = RedisProtocolError;
fn encode(&mut self, item: Resp3Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
resp3_encode(dst, &item, self.int_as_blobstring)
.map(|_| ())
.map_err(RedisProtocolError::from)
}
}
impl Encoder<BorrowedFrame<'_>> for Resp3 {
type Error = RedisProtocolError;
fn encode(&mut self, item: BorrowedFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
resp3_encode_borrowed(dst, &item, self.int_as_blobstring)
.map(|_| ())
.map_err(RedisProtocolError::from)
}
}
impl Decoder for Resp3 {
type Error = RedisProtocolError;
type Item = Resp3Frame;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None);
}
let parsed = match resp3_decode(src)? {
Some((f, _, _)) => f,
None => return Ok(None),
};
if self.streaming.is_some() && parsed.is_streaming() {
return Err(RedisProtocolError::new(
RedisProtocolErrorKind::DecodeError,
"Cannot start a stream while already inside a stream.",
));
}
let result = if let Some(ref mut state) = self.streaming {
// we started receiving streamed data earlier
state.add_frame(parsed.into_complete_frame()?);
if state.is_finished() {
Some(state.take()?)
} else {
None
}
} else {
// we're processing a complete frame or starting a new streamed frame
if parsed.is_streaming() {
self.streaming = Some(parsed.into_streaming_frame()?);
None
} else {
// we're not in the middle of a stream and we found a complete frame
Some(parsed.into_complete_frame()?)
}
};
if result.is_some() {
let _ = self.streaming.take();
}
Ok(result)
}
}
}
#[cfg(feature = "resp2")]
mod resp2 {
use crate::{
error::RedisProtocolError,
resp2::{
decode::decode_bytes_mut as resp2_decode,
encode::{extend_encode as resp2_encode, extend_encode_borrowed as resp2_encode_borrowed},
types::{BorrowedFrame, BytesFrame as Resp2Frame},
},
};
use bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};
/// Encode a redis command string (`SET foo bar NX`, etc) into a RESP2 bulk string array.
pub fn resp2_encode_command(cmd: &str) -> Resp2Frame {
Resp2Frame::Array(
cmd
.split(' ')
.map(|s| Resp2Frame::BulkString(s.as_bytes().to_vec().into()))
.collect(),
)
}
/// A framed RESP2 codec.
///
/// ```rust
/// use futures::{SinkExt, StreamExt};
/// use redis_protocol::{
/// codec::{resp2_encode_command, Resp2},
/// resp2::types::{BytesFrame, Resp2Frame},
/// };
/// use tokio::net::TcpStream;
/// use tokio_util::codec::Framed;
///
/// async fn example() {
/// let socket = TcpStream::connect("127.0.0.1:6379").await.unwrap();
/// let mut framed = Framed::new(socket, Resp2::default());
///
/// let auth = resp2_encode_command("AUTH foo bar");
/// let get_foo = resp2_encode_command("GET foo");
///
/// let _ = framed.send(auth).await.unwrap();
/// let response = framed.next().await.unwrap().unwrap();
/// assert_eq!(response.as_str().unwrap(), "OK");
///
/// let _ = framed.send(get_foo).await.unwrap();
/// let response = framed.next().await.unwrap().unwrap();
/// assert_eq!(response, BytesFrame::Null);
/// }
/// ```
#[derive(Clone, Debug, Default)]
pub struct Resp2 {
int_as_bulkstring: bool,
}
impl Resp2 {
/// Create a new codec with the provided flag describing whether the encoder logic should send integers as blob
/// strings.
pub fn new(int_as_bulkstring: bool) -> Resp2 {
Resp2 { int_as_bulkstring }
}
}
impl Encoder<Resp2Frame> for Resp2 {
type Error = RedisProtocolError;
fn encode(&mut self, item: Resp2Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
resp2_encode(dst, &item, self.int_as_bulkstring)
.map(|_| ())
.map_err(RedisProtocolError::from)
}
}
impl Encoder<BorrowedFrame<'_>> for Resp2 {
type Error = RedisProtocolError;
fn encode(&mut self, item: BorrowedFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
resp2_encode_borrowed(dst, &item, self.int_as_bulkstring)
.map(|_| ())
.map_err(RedisProtocolError::from)
}
}
impl Decoder for Resp2 {
type Error = RedisProtocolError;
type Item = Resp2Frame;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None);
}
let parsed = match resp2_decode(src)? {
Some((frame, _, _)) => frame,
None => return Ok(None),
};
Ok(Some(parsed))
}
}
}
#[cfg(feature = "resp3")]
#[cfg_attr(docsrs, doc(cfg(feature = "resp3")))]
pub use resp3::*;
#[cfg(feature = "resp2")]
#[cfg_attr(docsrs, doc(cfg(feature = "resp2")))]
pub use resp2::*;