extern crate redis;
extern crate futures;
extern crate tokio;
use futures::{future, Future};
use support::*;
use redis::async::SharedConnection;
use redis::RedisError;
use tokio::executor::current_thread::block_on_all;
use tokio::runtime::current_thread::Runtime;
mod support;
#[test]
fn test_args() {
let ctx = TestContext::new();
let connect = ctx.async_connection();
block_on_all(connect.and_then(|con| {
redis::cmd("SET")
.arg("key1")
.arg(b"foo")
.query_async(con)
.and_then(|(con, ())| redis::cmd("SET").arg(&["key2", "bar"]).query_async(con))
.and_then(|(con, ())| {
redis::cmd("MGET")
.arg(&["key1", "key2"])
.query_async(con)
.map(|t| t.1)
})
.then(|result| {
assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
result
})
})).unwrap();
}
#[test]
fn dont_panic_on_closed_shared_connection() {
let ctx = TestContext::new();
let connect = ctx.shared_async_connection();
drop(ctx);
Runtime::new()
.unwrap()
.block_on(future::lazy(|| {
connect
.and_then(|con| {
let cmd = move || {
redis::cmd("SET")
.arg("key1")
.arg(b"foo")
.query_async(con.clone())
.map(|(_, ())| ())
};
cmd().then(move |result| {
assert_eq!(
result.as_ref().unwrap_err().kind(),
redis::ErrorKind::IoError,
"{}",
result.as_ref().unwrap_err()
);
cmd()
})
})
.then(|result| -> Result<(), ()> {
assert_eq!(
result.as_ref().unwrap_err().kind(),
redis::ErrorKind::IoError,
"{}",
result.as_ref().unwrap_err()
);
Ok(())
})
}))
.unwrap();
}
#[test]
fn test_pipeline_transaction() {
let ctx = TestContext::new();
block_on_all(ctx.async_connection().and_then(|con| {
let mut pipe = redis::pipe();
pipe.atomic()
.cmd("SET")
.arg("key_1")
.arg(42)
.ignore()
.cmd("SET")
.arg("key_2")
.arg(43)
.ignore()
.cmd("MGET")
.arg(&["key_1", "key_2"]);
pipe.query_async(con)
.and_then(|(_con, ((k1, k2),)): (_, ((i32, i32),))| {
assert_eq!(k1, 42);
assert_eq!(k2, 43);
Ok(())
})
})).unwrap();
}
fn test_cmd(con: &SharedConnection, i: i32) -> Box<Future<Item = (), Error = RedisError> + Send> {
let key = format!("key{}", i);
let key_2 = key.clone();
let key2 = format!("key{}_2", i);
let key2_2 = key2.clone();
let foo = format!("foo{}", i);
let con1 = con.clone();
let con2 = con.clone();
Box::new(
redis::cmd("SET")
.arg(&key[..])
.arg(foo.as_bytes())
.query_async(con.clone())
.and_then(move |(_, ())| redis::cmd("SET").arg(&[&key2, "bar"]).query_async(con1))
.and_then(move |(_, ())| {
redis::cmd("MGET")
.arg(&[&key_2, &key2_2])
.query_async(con2)
.map(|t| t.1)
.then(|result| {
assert_eq!(Ok((foo, b"bar".to_vec())), result);
Ok(())
})
}),
)
}
fn test_error(con: &SharedConnection) -> Box<Future<Item = (), Error = RedisError> + Send> {
Box::new(
redis::cmd("SET")
.query_async(con.clone())
.then(|result| match result {
Ok((_, ())) => panic!("Expected redis to return an error"),
Err(_) => Ok(()),
}),
)
}
#[test]
fn test_args_shared_connection() {
let ctx = TestContext::new();
Runtime::new()
.unwrap()
.block_on(future::lazy(|| {
ctx.shared_async_connection()
.and_then(|con| {
let cmds = (0..100).map(move |i| test_cmd(&con, i));
future::join_all(cmds).map(|results| {
assert_eq!(results.len(), 100);
})
})
.map_err(|err| panic!("{}", err))
}))
.unwrap();
}
#[test]
fn test_args_with_errors_shared_connection() {
let ctx = TestContext::new();
Runtime::new()
.unwrap()
.block_on(future::lazy(|| {
ctx.shared_async_connection()
.and_then(|con| {
let cmds = (0..100).map(move |i| {
if i % 2 == 0 {
test_cmd(&con, i)
} else {
test_error(&con)
}
});
future::join_all(cmds).map(|results| {
assert_eq!(results.len(), 100);
})
})
.map_err(|err| panic!("{}", err))
}))
.unwrap();
}
#[test]
fn test_transaction_shared_connection() {
let ctx = TestContext::new();
Runtime::new()
.unwrap()
.block_on(future::lazy(|| {
ctx.shared_async_connection()
.and_then(|con| {
let cmds = (0..100).map(move |i| {
let foo = i;
let bar = format!("bar{}", i);
let mut pipe = redis::pipe();
pipe.atomic()
.cmd("SET")
.arg("key")
.arg(foo)
.ignore()
.cmd("SET")
.arg(&["key2", &bar[..]])
.ignore()
.cmd("MGET")
.arg(&["key", "key2"]);
pipe.query_async(con.clone())
.map(|t| t.1)
.then(move |result| {
assert_eq!(Ok(((foo, bar.clone().into_bytes()),)), result);
result
})
});
future::join_all(cmds)
})
.and_then(|results| {
assert_eq!(results.len(), 100);
Ok(())
})
.map_err(|err| panic!("{}", err))
}))
.unwrap();
}