use std::sync::Arc;
use futures::future::try_join_all;
use test_log::test;
use redust::{
resp::{array, Data},
Connection, Error, Result,
};
use tokio::{spawn, sync::Mutex};
use crate::common::redis_url;
mod common;
#[test(tokio::test)]
async fn ping() -> Result<()> {
let mut conn = Connection::new(redis_url()).await?;
let res = conn.cmd(["PING"]).await?;
assert_eq!(res, "PONG");
Ok(())
}
#[test(tokio::test)]
async fn multi_ping() -> Result<()> {
let mut conn = Connection::new(redis_url()).await?;
let res = conn.cmd(["PING"]).await?;
assert_eq!(res, "PONG");
let res = conn.cmd(["PING", "foobar"]).await?;
assert_eq!(res, b"foobar");
Ok(())
}
#[test(tokio::test)]
async fn stream() -> Result<()> {
let mut conn = Connection::new(redis_url()).await?;
let res_id = conn.cmd(["XADD", "foo1", "*", "foo", "bar"]).await?;
let res = conn.cmd(["XREAD", "STREAMS", "foo1", "0-0"]).await?;
conn.cmd(["DEL", "foo1"]).await?;
let expected = array![array![
b"foo1",
array![array![res_id, array![b"foo", b"bar"]]]
]];
assert_eq!(res, expected);
Ok(())
}
#[test(tokio::test)]
async fn ping_stream() -> Result<()> {
let mut conn = Connection::new(redis_url()).await?;
let cmds = [["ping", "foo"], ["ping", "bar"]];
let res = conn.pipeline(cmds.iter()).await?;
assert_eq!(
res,
vec![Data::bulk_string(b"foo"), Data::bulk_string(b"bar")]
);
Ok(())
}
#[test(tokio::test)]
async fn many_sequential() -> Result<()> {
let mut conn = Connection::new(redis_url()).await?;
for i in 0..1000 {
let i_str = i.to_string();
let res = conn.cmd(["PING", &i_str]).await?;
assert!(matches!(res, Data::BulkString(i_bytes) if i_bytes == i_str.as_bytes()));
}
Ok(())
}
#[test(tokio::test)]
async fn many_parallel() -> Result<()> {
let concurrency = 5;
let conn = Arc::new(Mutex::new(Connection::new(redis_url()).await?));
let mut futs = Vec::with_capacity(concurrency);
for i in 0..concurrency {
let conn2 = Arc::clone(&conn);
let handle = spawn(async move {
for j in (i * 1000)..(i * 1000 + 1000) {
let j_str = j.to_string();
let mut conn = conn2.lock().await;
let res = conn.cmd(["PING", &j_str]).await?;
assert!(matches!(res, Data::BulkString(j_bytes) if j_bytes == j_str.as_bytes()));
}
Ok::<_, Error>(())
});
futs.push(handle);
}
try_join_all(futs)
.await
.unwrap()
.into_iter()
.for_each(|r| r.unwrap());
Ok(())
}
#[cfg(feature = "command")]
#[test(tokio::test)]
async fn hello_no_auth() -> Result<()> {
let mut conn = Connection::new(redis_url()).await?;
conn.run(redust::command::connection::Hello {
username: None::<&str>,
password: None::<&str>,
})
.await?;
Ok(())
}
#[test(tokio::test)]
async fn blocking() -> Result<()> {
let mut conn = Connection::new(redis_url()).await?;
let data = conn.cmd(["BLPOP", "empty", "5"]).await?;
assert_eq!(data, ());
Ok(())
}