1pub mod cluster;
27pub mod sentinel;
28pub mod server;
29pub mod utils;
30
31use std::collections::VecDeque;
32use std::sync::{Arc, Mutex};
33
34use redis::{Cmd, ConnectionLike, ErrorKind, Pipeline, RedisError, RedisResult, Value};
35
36#[cfg(feature = "aio")]
37use futures::{FutureExt, future};
38
39#[cfg(feature = "aio")]
40use redis::{RedisFuture, aio::ConnectionLike as AioConnectionLike};
41
42pub trait IntoRedisValue {
46 fn into_redis_value(self) -> Value;
48}
49
50impl IntoRedisValue for String {
51 fn into_redis_value(self) -> Value {
52 Value::BulkString(self.as_bytes().to_vec())
53 }
54}
55
56impl IntoRedisValue for &str {
57 fn into_redis_value(self) -> Value {
58 Value::BulkString(self.as_bytes().to_vec())
59 }
60}
61
62#[cfg(feature = "bytes")]
63impl IntoRedisValue for bytes::Bytes {
64 fn into_redis_value(self) -> Value {
65 Value::BulkString(self.to_vec())
66 }
67}
68
69impl IntoRedisValue for Vec<u8> {
70 fn into_redis_value(self) -> Value {
71 Value::BulkString(self)
72 }
73}
74
75impl IntoRedisValue for Value {
76 fn into_redis_value(self) -> Value {
77 self
78 }
79}
80
81impl IntoRedisValue for i64 {
82 fn into_redis_value(self) -> Value {
83 Value::Int(self)
84 }
85}
86
87pub trait IntoRedisCmdBytes {
90 fn into_redis_cmd_bytes(self) -> Vec<u8>;
92}
93
94impl IntoRedisCmdBytes for Cmd {
95 fn into_redis_cmd_bytes(self) -> Vec<u8> {
96 self.get_packed_command()
97 }
98}
99
100impl IntoRedisCmdBytes for &Cmd {
101 fn into_redis_cmd_bytes(self) -> Vec<u8> {
102 self.get_packed_command()
103 }
104}
105
106impl IntoRedisCmdBytes for &mut Cmd {
107 fn into_redis_cmd_bytes(self) -> Vec<u8> {
108 self.get_packed_command()
109 }
110}
111
112impl IntoRedisCmdBytes for Pipeline {
113 fn into_redis_cmd_bytes(self) -> Vec<u8> {
114 self.get_packed_pipeline()
115 }
116}
117
118impl IntoRedisCmdBytes for &Pipeline {
119 fn into_redis_cmd_bytes(self) -> Vec<u8> {
120 self.get_packed_pipeline()
121 }
122}
123
124impl IntoRedisCmdBytes for &mut Pipeline {
125 fn into_redis_cmd_bytes(self) -> Vec<u8> {
126 self.get_packed_pipeline()
127 }
128}
129
130pub struct MockCmd {
132 cmd_bytes: Vec<u8>,
133 responses: Result<Vec<Value>, RedisError>,
134}
135
136impl MockCmd {
137 pub fn new<C, V>(cmd: C, response: Result<V, RedisError>) -> Self
140 where
141 C: IntoRedisCmdBytes,
142 V: IntoRedisValue,
143 {
144 MockCmd {
145 cmd_bytes: cmd.into_redis_cmd_bytes(),
146 responses: response.map(|r| vec![r.into_redis_value()]),
147 }
148 }
149
150 pub fn with_values<C, V>(cmd: C, responses: Result<Vec<V>, RedisError>) -> Self
153 where
154 C: IntoRedisCmdBytes,
155 V: IntoRedisValue,
156 {
157 MockCmd {
158 cmd_bytes: cmd.into_redis_cmd_bytes(),
159 responses: responses.map(|xs| xs.into_iter().map(|x| x.into_redis_value()).collect()),
160 }
161 }
162}
163
164#[derive(Clone)]
167pub struct MockRedisConnection {
168 commands: Arc<Mutex<VecDeque<MockCmd>>>,
169 assert_is_empty_on_drop: bool,
170}
171
172impl MockRedisConnection {
173 pub fn new<I>(commands: I) -> Self
175 where
176 I: IntoIterator<Item = MockCmd>,
177 {
178 MockRedisConnection {
179 commands: Arc::new(Mutex::new(VecDeque::from_iter(commands))),
180 assert_is_empty_on_drop: false,
181 }
182 }
183
184 pub fn assert_all_commands_consumed(mut self) -> Self {
186 self.assert_is_empty_on_drop = true;
187 self
188 }
189}
190
191impl Drop for MockRedisConnection {
192 fn drop(&mut self) {
193 if self.assert_is_empty_on_drop {
194 let commands = self.commands.lock().unwrap();
195 if Arc::strong_count(&self.commands) == 1 {
196 assert!(commands.back().is_none());
197 }
198 }
199 }
200}
201
202impl MockRedisConnection {
203 pub fn is_empty(&self) -> bool {
204 self.commands.lock().unwrap().is_empty()
205 }
206}
207
208impl ConnectionLike for MockRedisConnection {
209 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
210 let mut commands = self.commands.lock().unwrap();
211 let next_cmd = commands.pop_front().ok_or_else(|| {
212 self.assert_is_empty_on_drop = false;
213 RedisError::from((ErrorKind::Client, "TEST", "unexpected command".to_owned()))
214 })?;
215
216 if cmd != next_cmd.cmd_bytes {
217 self.assert_is_empty_on_drop = false;
218 return Err(RedisError::from((
219 ErrorKind::Client,
220 "TEST",
221 format!(
222 "unexpected command: expected={}, actual={}",
223 String::from_utf8(next_cmd.cmd_bytes)
224 .unwrap_or_else(|_| "decode error".to_owned()),
225 String::from_utf8(Vec::from(cmd)).unwrap_or_else(|_| "decode error".to_owned()),
226 ),
227 )));
228 }
229
230 next_cmd
231 .responses
232 .and_then(|values| match values.as_slice() {
233 [value] => Ok(value.clone()),
234 [] => {
235 self.assert_is_empty_on_drop = false;
236 Err(RedisError::from((
237 ErrorKind::Client,
238 "no value configured as response",
239 )))},
240 _ => {
241 self.assert_is_empty_on_drop = false;
242 Err(RedisError::from((
243 ErrorKind::Client,
244 "multiple values configured as response for command expecting a single value",
245 )))},
246 })
247 }
248
249 fn req_packed_commands(
250 &mut self,
251 cmd: &[u8],
252 _offset: usize,
253 _count: usize,
254 ) -> RedisResult<Vec<Value>> {
255 let mut commands = self.commands.lock().unwrap();
256 let next_cmd = commands.pop_front().ok_or_else(|| {
257 RedisError::from((ErrorKind::Client, "TEST", "unexpected command".to_owned()))
258 })?;
259
260 if cmd != next_cmd.cmd_bytes {
261 return Err(RedisError::from((
262 ErrorKind::Client,
263 "TEST",
264 format!(
265 "unexpected command: expected={}, actual={}",
266 String::from_utf8(next_cmd.cmd_bytes)
267 .unwrap_or_else(|_| "decode error".to_owned()),
268 String::from_utf8(Vec::from(cmd)).unwrap_or_else(|_| "decode error".to_owned()),
269 ),
270 )));
271 }
272
273 next_cmd.responses
274 }
275
276 fn get_db(&self) -> i64 {
277 0
278 }
279
280 fn check_connection(&mut self) -> bool {
281 true
282 }
283
284 fn is_open(&self) -> bool {
285 true
286 }
287}
288
289#[cfg(feature = "aio")]
290impl AioConnectionLike for MockRedisConnection {
291 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
292 let packed_cmd = cmd.get_packed_command();
293 let response = <MockRedisConnection as ConnectionLike>::req_packed_command(
294 self,
295 packed_cmd.as_slice(),
296 );
297 future::ready(response).boxed()
298 }
299
300 fn req_packed_commands<'a>(
301 &'a mut self,
302 cmd: &'a Pipeline,
303 offset: usize,
304 count: usize,
305 ) -> RedisFuture<'a, Vec<Value>> {
306 let packed_cmd = cmd.get_packed_pipeline();
307 let response = <MockRedisConnection as ConnectionLike>::req_packed_commands(
308 self,
309 packed_cmd.as_slice(),
310 offset,
311 count,
312 );
313 future::ready(response).boxed()
314 }
315
316 fn get_db(&self) -> i64 {
317 0
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::{MockCmd, MockRedisConnection};
324 use redis::{ErrorKind, Value, cmd, pipe};
325
326 #[test]
327 fn sync_basic_test() {
328 let mut conn = MockRedisConnection::new(vec![
329 MockCmd::new(cmd("SET").arg("foo").arg(42), Ok("")),
330 MockCmd::new(cmd("GET").arg("foo"), Ok(42)),
331 MockCmd::new(cmd("SET").arg("bar").arg("foo"), Ok("")),
332 MockCmd::new(cmd("GET").arg("bar"), Ok("foo")),
333 ])
334 .assert_all_commands_consumed();
335
336 cmd("SET").arg("foo").arg(42).exec(&mut conn).unwrap();
337 assert_eq!(cmd("GET").arg("foo").query(&mut conn), Ok(42));
338
339 cmd("SET").arg("bar").arg("foo").exec(&mut conn).unwrap();
340 assert_eq!(
341 cmd("GET").arg("bar").query(&mut conn),
342 Ok(Value::BulkString(b"foo".as_ref().into()))
343 );
344 }
345
346 #[cfg(feature = "aio")]
347 #[tokio::test]
348 async fn async_basic_test() {
349 let mut conn = MockRedisConnection::new(vec![
350 MockCmd::new(cmd("SET").arg("foo").arg(42), Ok("")),
351 MockCmd::new(cmd("GET").arg("foo"), Ok(42)),
352 MockCmd::new(cmd("SET").arg("bar").arg("foo"), Ok("")),
353 MockCmd::new(cmd("GET").arg("bar"), Ok("foo")),
354 ])
355 .assert_all_commands_consumed();
356
357 cmd("SET")
358 .arg("foo")
359 .arg("42")
360 .exec_async(&mut conn)
361 .await
362 .unwrap();
363 let result: Result<usize, _> = cmd("GET").arg("foo").query_async(&mut conn).await;
364 assert_eq!(result, Ok(42));
365
366 cmd("SET")
367 .arg("bar")
368 .arg("foo")
369 .exec_async(&mut conn)
370 .await
371 .unwrap();
372 let result: Result<Vec<u8>, _> = cmd("GET").arg("bar").query_async(&mut conn).await;
373 assert_eq!(result.as_deref(), Ok(&b"foo"[..]));
374 }
375
376 #[test]
377 fn errors_for_unexpected_commands() {
378 let mut conn = MockRedisConnection::new(vec![
379 MockCmd::new(cmd("SET").arg("foo").arg(42), Ok("")),
380 MockCmd::new(cmd("GET").arg("foo"), Ok(42)),
381 ])
382 .assert_all_commands_consumed();
383
384 cmd("SET").arg("foo").arg(42).exec(&mut conn).unwrap();
385 assert_eq!(cmd("GET").arg("foo").query(&mut conn), Ok(42));
386
387 let err = cmd("SET")
388 .arg("bar")
389 .arg("foo")
390 .exec(&mut conn)
391 .unwrap_err();
392 assert_eq!(err.kind(), ErrorKind::Client);
393 assert_eq!(err.detail(), Some("unexpected command"));
394 }
395
396 #[test]
397 fn errors_for_mismatched_commands() {
398 let mut conn = MockRedisConnection::new(vec![
399 MockCmd::new(cmd("SET").arg("foo").arg(42), Ok("")),
400 MockCmd::new(cmd("GET").arg("foo"), Ok(42)),
401 MockCmd::new(cmd("SET").arg("bar").arg("foo"), Ok("")),
402 ])
403 .assert_all_commands_consumed();
404
405 cmd("SET").arg("foo").arg(42).exec(&mut conn).unwrap();
406 let err = cmd("SET")
407 .arg("bar")
408 .arg("foo")
409 .exec(&mut conn)
410 .unwrap_err();
411 assert_eq!(err.kind(), ErrorKind::Client);
412 assert!(err.detail().unwrap().contains("unexpected command"));
413 }
414
415 #[test]
416 fn pipeline_basic_test() {
417 let mut conn = MockRedisConnection::new(vec![MockCmd::with_values(
418 pipe().cmd("GET").arg("foo").cmd("GET").arg("bar"),
419 Ok(vec!["hello", "world"]),
420 )])
421 .assert_all_commands_consumed();
422
423 let results: Vec<String> = pipe()
424 .cmd("GET")
425 .arg("foo")
426 .cmd("GET")
427 .arg("bar")
428 .query(&mut conn)
429 .expect("success");
430 assert_eq!(results, vec!["hello", "world"]);
431 }
432
433 #[test]
434 fn pipeline_atomic_test() {
435 let mut conn = MockRedisConnection::new(vec![MockCmd::with_values(
436 pipe().atomic().cmd("GET").arg("foo").cmd("GET").arg("bar"),
437 Ok(vec![Value::Array(
438 vec!["hello", "world"]
439 .into_iter()
440 .map(|x| Value::BulkString(x.as_bytes().into()))
441 .collect(),
442 )]),
443 )])
444 .assert_all_commands_consumed();
445
446 let results: Vec<String> = pipe()
447 .atomic()
448 .cmd("GET")
449 .arg("foo")
450 .cmd("GET")
451 .arg("bar")
452 .query(&mut conn)
453 .expect("success");
454 assert_eq!(results, vec!["hello", "world"]);
455 }
456}