#![cfg(feature = "streams")]
use redis::streams::*;
use redis::{Commands, Connection, RedisResult, ToRedisArgs};
mod support;
use crate::support::*;
use std::collections::BTreeMap;
use std::str;
use std::thread::sleep;
use std::time::Duration;
macro_rules! assert_args {
($value:expr, $($args:expr),+) => {
let args = $value.to_redis_args();
let strings: Vec<_> = args.iter()
.map(|a| str::from_utf8(a.as_ref()).unwrap())
.collect();
assert_eq!(strings, vec![$($args),+]);
}
}
fn xadd(con: &mut Connection) {
let _: RedisResult<String> =
con.xadd("k1", "1000-0", &[("hello", "world"), ("redis", "streams")]);
let _: RedisResult<String> = con.xadd("k1", "1000-1", &[("hello", "world2")]);
let _: RedisResult<String> = con.xadd("k2", "2000-0", &[("hello", "world")]);
let _: RedisResult<String> = con.xadd("k2", "2000-1", &[("hello", "world2")]);
}
fn xadd_keyrange(con: &mut Connection, key: &str, start: i32, end: i32) {
for _i in start..end {
let _: RedisResult<String> = con.xadd(key, "*", &[("h", "w")]);
}
}
#[test]
fn test_cmd_options() {
let empty = StreamClaimOptions::default();
assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0);
let empty = StreamReadOptions::default();
assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0);
let opts = StreamClaimOptions::default()
.idle(50)
.time(500)
.retry(3)
.with_force()
.with_justid();
assert_args!(
&opts,
"IDLE",
"50",
"TIME",
"500",
"RETRYCOUNT",
"3",
"FORCE",
"JUSTID"
);
assert_args!(StreamMaxlen::Approx(10), "MAXLEN", "~", "10");
assert_args!(StreamMaxlen::Equals(10), "MAXLEN", "=", "10");
let opts = StreamReadOptions::default()
.noack()
.block(100)
.count(200)
.group("group-name", "consumer-name");
assert_args!(
&opts,
"BLOCK",
"100",
"COUNT",
"200",
"NOACK",
"GROUP",
"group-name",
"consumer-name"
);
let opts = StreamReadOptions::default().noack().block(100).count(200);
assert_args!(&opts, "BLOCK", "100", "COUNT", "200");
}
#[test]
fn test_assorted_1() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let result: RedisResult<String> = con.xadd("k0", "1000-0", &[("x", "y")]);
assert_eq!(result.unwrap(), "1000-0");
let reply: StreamReadReply = con.xread(&["k1", "k2", "k3"], &["0", "0", "0"]).unwrap();
assert_eq!(&reply.keys.len(), &2usize);
assert_eq!(&reply.keys[0].key, "k1");
assert_eq!(&reply.keys[0].ids.len(), &2usize);
assert_eq!(&reply.keys[0].ids[0].id, "1000-0");
let hello: Option<String> = reply.keys[0].ids[0].get("hello");
assert_eq!(hello, Some("world".to_string()));
assert_eq!(&reply.keys[1].key, "k2");
assert_eq!(&reply.keys[1].ids.len(), &2usize);
assert_eq!(&reply.keys[1].ids[0].id, "2000-0");
let mut map: BTreeMap<&str, &str> = BTreeMap::new();
map.insert("ab", "cd");
map.insert("ef", "gh");
map.insert("ij", "kl");
let _: RedisResult<String> = con.xadd_map("k3", "3000-0", map);
let reply: StreamRangeReply = con.xrange_all("k3").unwrap();
assert_eq!(reply.ids[0].contains_key(&"ab"), true);
assert_eq!(reply.ids[0].contains_key(&"ef"), true);
assert_eq!(reply.ids[0].contains_key(&"ij"), true);
xadd_keyrange(&mut con, "k4", 0, 100);
let result: RedisResult<usize> = con.xlen("k4");
assert_eq!(result, Ok(100));
let _: RedisResult<String> =
con.xadd_maxlen("k4", StreamMaxlen::Equals(10), "*", &[("h", "w")]);
let result: RedisResult<usize> = con.xlen("k4");
assert_eq!(result, Ok(10));
}
#[test]
fn test_xgroup_create() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let reply: RedisResult<StreamInfoStreamReply> = con.xinfo_stream("k10");
assert_eq!(reply.is_err(), true);
con = ctx.connection();
let reply: StreamInfoStreamReply = con.xinfo_stream("k1").unwrap();
assert_eq!(&reply.first_entry.id, "1000-0");
assert_eq!(&reply.last_entry.id, "1000-1");
assert_eq!(&reply.last_generated_id, "1000-1");
let result: RedisResult<String> = con.xgroup_create("k1", "g1", "$");
assert_eq!(result.is_ok(), true);
let result: RedisResult<StreamInfoGroupsReply> = con.xinfo_groups("k1");
assert_eq!(result.is_ok(), true);
let reply = result.unwrap();
assert_eq!(&reply.groups.len(), &1);
assert_eq!(&reply.groups[0].name, &"g1");
}
#[test]
fn test_assorted_2() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let result: RedisResult<String> = con.xgroup_create_mkstream("k99", "g99", "0");
assert_eq!(result.is_ok(), true);
let result: RedisResult<StreamInfoGroupsReply> = con.xinfo_groups("k99");
assert_eq!(result.is_ok(), true);
let reply = result.unwrap();
assert_eq!(&reply.groups.len(), &1);
assert_eq!(&reply.groups[0].name, &"g99");
assert_eq!(&reply.groups[0].last_delivered_id, &"0-0");
let _: RedisResult<String> = con.xadd("k99", "1000-0", &[("a", "b"), ("c", "d")]);
let _: RedisResult<String> = con.xadd("k99", "1000-1", &[("e", "f"), ("g", "h")]);
let empty_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap();
assert_eq!(empty_reply.count(), 0);
if let StreamPendingReply::Empty = empty_reply {
} else {
panic!("Expected StreamPendingReply::Empty but got Data");
}
let reply: StreamReadReply = con
.xread_options(
&["k99"],
&[">"],
StreamReadOptions::default().group("g99", "c99"),
)
.unwrap();
assert_eq!(reply.keys[0].ids.len(), 2);
let reply: StreamInfoConsumersReply = con.xinfo_consumers("k99", "g99").unwrap();
assert_eq!(reply.consumers[0].pending, 2);
let result: RedisResult<i32> = con.xack("k99", "g99", &["1000-0"]);
assert_eq!(result, Ok(1));
let reply: StreamReadReply = con
.xread_options(
&["k99"],
&["0"],
StreamReadOptions::default().group("g99", "c99"),
)
.unwrap();
assert_eq!(reply.keys.len(), 1);
let reply: StreamInfoConsumersReply = con.xinfo_consumers("k99", "g99").unwrap();
assert_eq!(reply.consumers[0].pending, 1);
let _: RedisResult<String> = con.xadd("k99", "1001-0", &[("i", "j"), ("k", "l")]);
let _: RedisResult<String> = con.xadd("k99", "1001-1", &[("m", "n"), ("o", "p")]);
let _: StreamReadReply = con
.xread_options(
&["k99"],
&[">"],
StreamReadOptions::default().group("g99", "c99"),
)
.unwrap();
let data_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap();
assert_eq!(data_reply.count(), 3);
if let StreamPendingReply::Data(data) = data_reply {
assert_stream_pending_data(data)
} else {
panic!("Expected StreamPendingReply::Data but got Empty");
}
let reply: StreamPendingCountReply = con.xpending_count("k99", "g99", "-", "+", 10).unwrap();
assert_eq!(reply.ids.len(), 3);
let reply: StreamPendingCountReply = con
.xpending_consumer_count("k99", "g99", "-", "+", 10, "c99")
.unwrap();
assert_eq!(reply.ids.len(), 3);
for StreamPendingId {
id,
consumer,
times_delivered,
last_delivered_ms: _,
} in reply.ids
{
assert!(!id.is_empty());
assert!(!consumer.is_empty());
assert!(times_delivered > 0);
}
}
fn assert_stream_pending_data(data: StreamPendingData) {
assert_eq!(data.start_id, "1000-1");
assert_eq!(data.end_id, "1001-1");
assert_eq!(data.consumers.len(), 1);
assert_eq!(data.consumers[0].name, "c99");
}
#[test]
fn test_xadd_maxlen_map() {
let ctx = TestContext::new();
let mut con = ctx.connection();
for i in 0..10 {
let mut map: BTreeMap<&str, &str> = BTreeMap::new();
let idx = i.to_string();
map.insert("idx", &idx);
let _: RedisResult<String> =
con.xadd_maxlen_map("maxlen_map", StreamMaxlen::Equals(3), "*", map);
}
let result: RedisResult<usize> = con.xlen("maxlen_map");
assert_eq!(result, Ok(3));
let reply: StreamRangeReply = con.xrange_all("maxlen_map").unwrap();
assert_eq!(reply.ids[0].get("idx"), Some("7".to_string()));
assert_eq!(reply.ids[1].get("idx"), Some("8".to_string()));
assert_eq!(reply.ids[2].get("idx"), Some("9".to_string()));
}
#[test]
fn test_xclaim() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let result: RedisResult<String> = con.xgroup_create_mkstream("k1", "g1", "$");
assert_eq!(result.is_ok(), true);
xadd_keyrange(&mut con, "k1", 0, 10);
let reply: StreamReadReply = con
.xread_options(
&["k1"],
&[">"],
StreamReadOptions::default().group("g1", "c1"),
)
.unwrap();
assert_eq!(reply.keys[0].ids.len(), 10);
let claim = &reply.keys[0].ids[0];
let _claim_1 = &reply.keys[0].ids[1];
let claim_justids = &reply.keys[0]
.ids
.iter()
.map(|msg| &msg.id)
.collect::<Vec<&String>>();
sleep(Duration::from_millis(5));
let reply: StreamClaimReply = con
.xclaim("k1", "g1", "c2", 4, &[claim.id.clone()])
.unwrap();
assert_eq!(reply.ids.len(), 1);
assert_eq!(reply.ids[0].id, claim.id);
let reply: StreamPendingReply = con.xpending("k1", "g1").unwrap();
if let StreamPendingReply::Data(data) = reply {
assert_eq!(data.consumers[0].name, "c1");
assert_eq!(data.consumers[0].pending, 9);
assert_eq!(data.consumers[1].name, "c2");
assert_eq!(data.consumers[1].pending, 1);
}
sleep(Duration::from_millis(5));
let _: StreamClaimReply = con
.xclaim_options(
"k1",
"g1",
"c3",
4,
&[claim.id.clone()],
StreamClaimOptions::default().with_force(),
)
.unwrap();
let reply: StreamPendingReply = con.xpending("k1", "g1").unwrap();
if let StreamPendingReply::Data(data) = reply {
assert_eq!(data.consumers[1].name, "c3");
assert_eq!(data.consumers[1].pending, 1);
}
sleep(Duration::from_millis(5));
let claimed: Vec<String> = con
.xclaim_options(
"k1",
"g1",
"c5",
4,
&claim_justids,
StreamClaimOptions::default().with_force().with_justid(),
)
.unwrap();
assert_eq!(claimed.len(), 10);
}
#[test]
fn test_xdel() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let result: RedisResult<i32> = con.xdel("k1", &["1000-0"]);
assert_eq!(result, Ok(1));
let result: RedisResult<i32> = con.xdel("k2", &["2000-0", "2000-1", "2000-2"]);
assert_eq!(result, Ok(2));
}
#[test]
fn test_xtrim() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd_keyrange(&mut con, "k1", 0, 100);
let result: RedisResult<i32> = con.xtrim("k1", StreamMaxlen::Equals(50));
assert_eq!(result, Ok(50));
let result: RedisResult<i32> = con.xtrim("k1", StreamMaxlen::Equals(10));
assert_eq!(result, Ok(40));
}
#[test]
fn test_xgroup() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let result: RedisResult<String> = con.xgroup_create_mkstream("k1", "g1", "0");
assert_eq!(result.is_ok(), true);
let result: RedisResult<i32> = con.xgroup_destroy("k1", "g1");
assert_eq!(result, Ok(1));
xadd(&mut con);
let result: RedisResult<String> = con.xgroup_create("k1", "g1", "0");
assert_eq!(result.is_ok(), true);
let reply: StreamReadReply = con
.xread_options(
&["k1"],
&[">"],
StreamReadOptions::default().group("g1", "c1"),
)
.unwrap();
assert_eq!(reply.keys[0].ids.len(), 2);
let result: RedisResult<i32> = con.xgroup_delconsumer("k1", "g1", "c1");
assert_eq!(result, Ok(2));
let result: RedisResult<i32> = con.xgroup_destroy("k1", "g1");
assert_eq!(result, Ok(1));
}
#[test]
fn test_xrange() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let reply: StreamRangeReply = con.xrange_all("k1").unwrap();
assert_eq!(reply.ids.len(), 2);
let reply: StreamRangeReply = con.xrange("k1", "1000-1", "+").unwrap();
assert_eq!(reply.ids.len(), 1);
let reply: StreamRangeReply = con.xrange("k1", "-", "1000-0").unwrap();
assert_eq!(reply.ids.len(), 1);
let reply: StreamRangeReply = con.xrange_count("k1", "-", "+", 1).unwrap();
assert_eq!(reply.ids.len(), 1);
}
#[test]
fn test_xrevrange() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let reply: StreamRangeReply = con.xrevrange_all("k1").unwrap();
assert_eq!(reply.ids.len(), 2);
let reply: StreamRangeReply = con.xrevrange("k1", "1000-1", "-").unwrap();
assert_eq!(reply.ids.len(), 2);
let reply: StreamRangeReply = con.xrevrange("k1", "+", "1000-1").unwrap();
assert_eq!(reply.ids.len(), 1);
let reply: StreamRangeReply = con.xrevrange_count("k1", "+", "-", 1).unwrap();
assert_eq!(reply.ids.len(), 1);
}