#![cfg(feature = "streams")]
use redis::streams::*;
use redis::{Connection, ToRedisArgs, TypedCommands};
#[macro_use]
mod support;
use crate::support::*;
use assert_matches::assert_matches;
use std::collections::BTreeMap;
use std::slice;
use std::str;
use std::thread::sleep;
use std::time::Duration;
fn xadd(con: &mut Connection) {
con.xadd("k1", "1000-0", &[("hello", "world"), ("redis", "streams")])
.unwrap();
con.xadd("k1", "1000-1", &[("hello", "world2")]).unwrap();
con.xadd("k2", "2000-0", &[("hello", "world")]).unwrap();
con.xadd("k2", "2000-1", &[("hello", "world2")]).unwrap();
}
fn xadd_keyrange(con: &mut Connection, key: &str, start: i32, end: i32) {
for _i in start..end {
con.xadd(key, "*", &[("h", "w")]).unwrap();
}
}
#[test]
fn test_cmd_options() {
let empty = StreamClaimOptions::default();
assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0);
let empty = StreamReadOptions::default();
assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0);
let opts = StreamClaimOptions::default()
.idle(50)
.time(500)
.retry(3)
.with_force()
.with_justid();
assert_args!(
&opts,
"IDLE",
"50",
"TIME",
"500",
"RETRYCOUNT",
"3",
"FORCE",
"JUSTID"
);
assert_args!(StreamMaxlen::Approx(10), "MAXLEN", "~", "10");
assert_args!(StreamMaxlen::Equals(10), "MAXLEN", "=", "10");
let opts = StreamReadOptions::default()
.noack()
.block(100)
.count(200)
.group("group-name", "consumer-name");
assert_args!(
&opts,
"GROUP",
"group-name",
"consumer-name",
"BLOCK",
"100",
"COUNT",
"200",
"NOACK"
);
let opts = StreamReadOptions::default().noack().block(100).count(200);
assert_args!(&opts, "BLOCK", "100", "COUNT", "200");
}
#[test]
fn test_assorted_1() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let result = con.xadd("k0", "1000-0", &[("x", "y")]).unwrap();
assert_eq!(result.unwrap(), "1000-0");
let reply = con
.xread(&["k1", "k2", "k3"], &["0", "0", "0"])
.unwrap()
.unwrap();
assert_eq!(&reply.keys.len(), &2usize);
assert_eq!(&reply.keys[0].key, "k1");
assert_eq!(&reply.keys[0].ids.len(), &2usize);
assert_eq!(&reply.keys[0].ids[0].id, "1000-0");
let hello = reply.keys[0].ids[0].get("hello");
assert_eq!(hello, Some("world".to_string()));
assert_eq!(&reply.keys[1].key, "k2");
assert_eq!(&reply.keys[1].ids.len(), &2usize);
assert_eq!(&reply.keys[1].ids[0].id, "2000-0");
let mut map = BTreeMap::new();
map.insert("ab", "cd");
map.insert("ef", "gh");
map.insert("ij", "kl");
con.xadd_map("k3", "3000-0", map).unwrap();
let reply = con.xrange_all("k3").unwrap();
assert!(reply.ids[0].contains_key("ab"));
assert!(reply.ids[0].contains_key("ef"));
assert!(reply.ids[0].contains_key("ij"));
xadd_keyrange(&mut con, "k4", 0, 100);
let result = con.xlen("k4");
assert_eq!(result, Ok(100));
con.xadd_maxlen("k4", StreamMaxlen::Equals(10), "*", &[("h", "w")])
.unwrap();
let result = con.xlen("k4");
assert_eq!(result, Ok(10));
}
#[test]
fn test_xgroup_create() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let reply = con.xinfo_stream("k10");
assert!(
matches!(&reply, Err(e) if e.kind() == redis::ServerErrorKind::ResponseError.into()
&& e.code() == Some("ERR")
&& e.detail() == Some("no such key"))
);
con = ctx.connection();
let reply = con.xinfo_stream("k1").unwrap();
assert_eq!(&reply.first_entry.id, "1000-0");
assert_eq!(&reply.last_entry.id, "1000-1");
assert_eq!(&reply.last_generated_id, "1000-1");
let result = con.xgroup_create("k1", "g1", "$");
assert_matches!(result, Ok(_));
let result = con.xinfo_groups("k1");
assert_matches!(result, Ok(_));
let reply = result.unwrap();
assert_eq!(&reply.groups.len(), &1);
assert_eq!(&reply.groups[0].name, &"g1");
}
#[test]
fn test_xgroup_createconsumer() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let reply = con.xinfo_stream("k1").unwrap();
assert_eq!(&reply.first_entry.id, "1000-0");
assert_eq!(&reply.last_entry.id, "1000-1");
assert_eq!(&reply.last_generated_id, "1000-1");
let result = con.xgroup_create("k1", "g1", "$");
assert_matches!(result, Ok(_));
let result = con.xinfo_groups("k1");
assert_matches!(result, Ok(_));
let reply = result.unwrap();
assert_eq!(&reply.groups.len(), &1);
assert_eq!(&reply.groups[0].name, &"g1");
let result = con.xinfo_consumers("k1", "g1");
assert_matches!(result, Ok(_));
let reply = result.unwrap();
assert_eq!(&reply.consumers.len(), &0);
let result = con.xgroup_createconsumer("k1", "g1", "c1");
assert_matches!(result, Ok(true));
let result = con.xinfo_consumers("k1", "g1");
assert_matches!(result, Ok(_));
let reply = result.unwrap();
assert_eq!(&reply.consumers.len(), &1);
assert_eq!(&reply.consumers[0].name, &"c1");
let result = con.xgroup_createconsumer("k1", "g1", "c1");
assert_matches!(result, Ok(false));
let result = con.xinfo_consumers("k1", "g1");
assert_matches!(result, Ok(_));
let reply = result.unwrap();
assert_eq!(&reply.consumers.len(), &1);
assert_eq!(&reply.consumers[0].name, &"c1");
}
#[test]
fn test_assorted_2() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let result = con.xgroup_create_mkstream("k99", "g99", "0");
assert_matches!(result, Ok(_));
let result = con.xinfo_groups("k99");
assert_matches!(result, Ok(_));
let reply = result.unwrap();
assert_eq!(&reply.groups.len(), &1);
assert_eq!(&reply.groups[0].name, &"g99");
assert_eq!(&reply.groups[0].last_delivered_id, &"0-0");
if let Some(lag) = reply.groups[0].lag {
assert_eq!(lag, 0);
}
let _ = con.xadd("k99", "1000-0", &[("a", "b"), ("c", "d")]);
let _ = con.xadd("k99", "1000-1", &[("e", "f"), ("g", "h")]);
let result = con.xinfo_groups("k99");
assert_matches!(result, Ok(_));
let reply = result.unwrap();
assert_eq!(&reply.groups.len(), &1);
assert_eq!(&reply.groups[0].name, &"g99");
if let Some(lag) = reply.groups[0].lag {
assert_eq!(lag, 2);
}
let empty_reply = con.xpending("k99", "g99").unwrap();
assert_eq!(empty_reply.count(), 0);
if let StreamPendingReply::Empty = empty_reply {
} else {
panic!("Expected StreamPendingReply::Empty but got Data");
}
let reply = con
.xread_options(
&["k99"],
&[">"],
&StreamReadOptions::default().group("g99", "c99"),
)
.unwrap()
.unwrap();
assert_eq!(reply.keys[0].ids.len(), 2);
let reply = con.xinfo_consumers("k99", "g99").unwrap();
assert_eq!(reply.consumers[0].pending, 2);
let result = con.xack("k99", "g99", &["1000-0"]);
assert_eq!(result, Ok(1));
let reply = con
.xread_options(
&["k99"],
&["0"],
&StreamReadOptions::default().group("g99", "c99"),
)
.unwrap()
.unwrap();
assert_eq!(reply.keys.len(), 1);
let reply = con.xinfo_consumers("k99", "g99").unwrap();
assert_eq!(reply.consumers[0].pending, 1);
let _ = con.xadd("k99", "1001-0", &[("i", "j"), ("k", "l")]);
let _ = con.xadd("k99", "1001-1", &[("m", "n"), ("o", "p")]);
let _ = con
.xread_options(
&["k99"],
&[">"],
&StreamReadOptions::default().group("g99", "c99"),
)
.unwrap();
let data_reply = con.xpending("k99", "g99").unwrap();
assert_eq!(data_reply.count(), 3);
if let StreamPendingReply::Data(data) = data_reply {
assert_stream_pending_data(data)
} else {
panic!("Expected StreamPendingReply::Data but got Empty");
}
let reply = con.xpending_count("k99", "g99", "-", "+", 10).unwrap();
assert_eq!(reply.ids.len(), 3);
let reply = con
.xpending_consumer_count("k99", "g99", "-", "+", 10, "c99")
.unwrap();
assert_eq!(reply.ids.len(), 3);
for StreamPendingId {
id,
consumer,
times_delivered,
last_delivered_ms: _,
} in reply.ids
{
assert!(!id.is_empty());
assert!(!consumer.is_empty());
assert!(times_delivered > 0);
}
}
fn assert_stream_pending_data(data: StreamPendingData) {
assert_eq!(data.start_id, "1000-1");
assert_eq!(data.end_id, "1001-1");
assert_eq!(data.consumers.len(), 1);
assert_eq!(data.consumers[0].name, "c99");
}
#[test]
fn test_xadd_maxlen_map() {
let ctx = TestContext::new();
let mut con = ctx.connection();
for i in 0..10 {
let mut map = BTreeMap::new();
let idx = i.to_string();
map.insert("idx", &idx);
let _ = con.xadd_maxlen_map("maxlen_map", StreamMaxlen::Equals(3), "*", map);
}
let result = con.xlen("maxlen_map");
assert_eq!(result, Ok(3));
let reply = con.xrange_all("maxlen_map").unwrap();
assert_eq!(reply.ids[0].get("idx"), Some("7".to_string()));
assert_eq!(reply.ids[1].get("idx"), Some("8".to_string()));
assert_eq!(reply.ids[2].get("idx"), Some("9".to_string()));
}
#[test]
fn test_xadd_options() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let result = con.xadd_options(
"k1",
"*",
&[("h", "w")],
&StreamAddOptions::default().nomkstream(),
);
assert_eq!(result, Ok(None));
let result = con.xinfo_stream("k1");
assert!(
matches!(&result, Err(e) if e.kind() == redis::ServerErrorKind::ResponseError.into()
&& e.code() == Some("ERR")
&& e.detail() == Some("no such key"))
);
fn setup_stream(con: &mut Connection) {
let _ = con.del("k1");
for i in 0..10 {
let _ = con.xadd_options(
"k1",
format!("1-{i}"),
&[("h", "w")],
&StreamAddOptions::default(),
);
}
}
setup_stream(&mut con);
let _ = con.xadd_options(
"k1",
"2-1",
&[("h", "w")],
&StreamAddOptions::default().trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 4)),
);
let info = con.xinfo_stream("k1").unwrap();
assert_eq!(info.length, 4);
assert_eq!(info.first_entry.id, "1-7");
setup_stream(&mut con);
let _ = con.xadd_options(
"k1",
"2-1",
&[("h", "w")],
&StreamAddOptions::default()
.trim(StreamTrimStrategy::minid(StreamTrimmingMode::Exact, "1-5")),
);
let info = con.xinfo_stream("k1").unwrap();
assert_eq!(info.length, 6);
assert_eq!(info.first_entry.id, "1-5");
let mut map = BTreeMap::new();
map.insert("ab", "cd");
map.insert("ef", "gh");
map.insert("ij", "kl");
let _ = con.xadd_options("k1", "3-1", map, &StreamAddOptions::default());
let info = con.xinfo_stream("k1").unwrap();
assert_eq!(info.length, 7);
assert_eq!(info.first_entry.id, "1-5");
assert_eq!(info.last_entry.id, "3-1");
}
#[test]
fn test_xread_options_deleted_pel_entry() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let result = con.xgroup_create_mkstream("k1", "g1", "$");
assert_matches!(result, Ok(_));
let _ = con.xadd_maxlen("k1", StreamMaxlen::Equals(1), "*", &[("h1", "w1")]);
let result = con
.xread_options(
&["k1"],
&[">"],
&StreamReadOptions::default().group("g1", "c1"),
)
.unwrap()
.unwrap();
let _ = con.xadd_maxlen("k1", StreamMaxlen::Equals(1), "*", &[("h2", "w2")]);
let result_deleted_entry = con
.xread_options(
&["k1"],
&["0"],
&StreamReadOptions::default().group("g1", "c1"),
)
.unwrap()
.unwrap();
assert_eq!(
result.keys[0].ids.len(),
result_deleted_entry.keys[0].ids.len()
);
assert_eq!(
result.keys[0].ids[0].id,
result_deleted_entry.keys[0].ids[0].id
);
}
fn create_group_add_and_read(con: &mut Connection) -> StreamReadReply {
con.flushall().unwrap();
let result = con.xgroup_create_mkstream("k1", "g1", "$");
assert_matches!(result, Ok(_));
xadd_keyrange(con, "k1", 0, 10);
let reply = con
.xread_options(
&["k1"],
&[">"],
&StreamReadOptions::default().group("g1", "c1"),
)
.unwrap()
.unwrap();
assert_eq!(reply.keys[0].ids.len(), 10);
reply
}
#[test]
fn test_xautoclaim() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let reply = create_group_add_and_read(&mut con);
let claim = &reply.keys[0].ids[0];
let claim_1 = &reply.keys[0].ids[1];
sleep(Duration::from_millis(10));
let reply = con
.xautoclaim_options(
"k1",
"g1",
"c2",
4,
claim.id.clone(),
StreamAutoClaimOptions::default().count(2),
)
.unwrap();
assert_eq!(reply.claimed.len(), 2);
assert_eq!(reply.claimed[0].id, claim.id);
assert!(!reply.claimed[0].map.is_empty());
assert_eq!(reply.claimed[1].id, claim_1.id);
assert!(!reply.claimed[1].map.is_empty());
sleep(Duration::from_millis(5));
let reply = con
.xautoclaim_options(
"k1",
"g1",
"c3",
4,
claim.id.clone(),
StreamAutoClaimOptions::default().count(5).with_justid(),
)
.unwrap();
assert_eq!(reply.claimed.len(), 5);
assert_eq!(reply.claimed[0].id, claim.id);
assert!(reply.claimed[0].map.is_empty());
assert_eq!(reply.claimed[1].id, claim_1.id);
assert!(reply.claimed[1].map.is_empty());
}
#[test]
fn test_xclaim() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let reply = create_group_add_and_read(&mut con);
let claim = &reply.keys[0].ids[0];
let claim_justids = &reply.keys[0]
.ids
.iter()
.map(|msg| &msg.id)
.collect::<Vec<&String>>();
sleep(Duration::from_millis(5));
let reply = con
.xclaim("k1", "g1", "c2", 4, slice::from_ref(&claim.id))
.unwrap();
assert_eq!(reply.ids.len(), 1);
assert_eq!(reply.ids[0].id, claim.id);
let reply = con.xpending("k1", "g1").unwrap();
if let StreamPendingReply::Data(data) = reply {
assert_eq!(data.consumers[0].name, "c1");
assert_eq!(data.consumers[0].pending, 9);
assert_eq!(data.consumers[1].name, "c2");
assert_eq!(data.consumers[1].pending, 1);
}
sleep(Duration::from_millis(5));
let _: StreamClaimReply = con
.xclaim_options(
"k1",
"g1",
"c3",
4,
slice::from_ref(&claim.id),
StreamClaimOptions::default().with_force(),
)
.unwrap();
let reply = con.xpending("k1", "g1").unwrap();
if let StreamPendingReply::Data(data) = reply {
assert_eq!(data.consumers[1].name, "c3");
assert_eq!(data.consumers[1].pending, 1);
}
sleep(Duration::from_millis(5));
let claimed: Vec<String> = con
.xclaim_options(
"k1",
"g1",
"c5",
4,
claim_justids,
StreamClaimOptions::default().with_force().with_justid(),
)
.unwrap();
assert_eq!(claimed.len(), 10);
}
#[test]
fn test_xclaim_last_id() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let result = con.xgroup_create_mkstream("k1", "g1", "$");
assert_matches!(result, Ok(_));
xadd_keyrange(&mut con, "k1", 0, 10);
let reply = con
.xread_options(&["k1"], &["0"], &StreamReadOptions::default())
.unwrap()
.unwrap();
assert_eq!(reply.keys[0].ids.len(), 10);
let claim_early_id = &reply.keys[0].ids[3];
let claim_middle_id = &reply.keys[0].ids[5];
let claim_late_id = &reply.keys[0].ids[8];
let _ = con
.xread_options(
&["k1"],
&[">"],
&StreamReadOptions::default().count(6).group("g1", "c1"),
)
.unwrap();
let info = con.xinfo_groups("k1").unwrap();
assert_eq!(info.groups[0].last_delivered_id, claim_middle_id.id.clone());
sleep(Duration::from_millis(5));
let _: Vec<String> = con
.xclaim_options(
"k1",
"g1",
"c2",
4,
slice::from_ref(&claim_middle_id.id),
StreamClaimOptions::default()
.with_justid()
.with_lastid(claim_early_id.id.as_str()),
)
.unwrap();
let info = con.xinfo_groups("k1").unwrap();
assert_eq!(info.groups[0].last_delivered_id, claim_middle_id.id.clone());
sleep(Duration::from_millis(5));
let _: Vec<String> = con
.xclaim_options(
"k1",
"g1",
"c1",
4,
slice::from_ref(&claim_middle_id.id),
StreamClaimOptions::default()
.with_justid()
.with_lastid(claim_late_id.id.as_str()),
)
.unwrap();
let info = con.xinfo_groups("k1").unwrap();
assert_eq!(info.groups[0].last_delivered_id, claim_late_id.id.clone());
}
#[test]
fn test_xreadgroup_with_claim_option() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_4);
let mut con = ctx.connection();
let stream_name = "test_stream";
let group_name = "test_group";
let consumer1 = "consumer1";
let consumer2 = "consumer2";
assert!(
con.xgroup_create_mkstream(stream_name, group_name, "$")
.is_ok()
);
xadd_keyrange(&mut con, stream_name, 0, 10);
let reply = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer1)
.count(10),
)
.unwrap()
.unwrap();
assert_eq!(reply.keys[0].ids.len(), 10);
let message_identifiers: Vec<String> =
reply.keys[0].ids.iter().map(|msg| msg.id.clone()).collect();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 10);
if let StreamPendingReply::Data(data) = pending {
assert_eq!(data.consumers.len(), 1);
assert_eq!(data.consumers[0].name, consumer1);
assert_eq!(data.consumers[0].pending, 10);
}
sleep(Duration::from_millis(10));
let claim_reply: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer2)
.claim(5) .count(5),
)
.unwrap();
let claim_reply = claim_reply.unwrap();
assert_eq!(claim_reply.keys.len(), 1);
assert_eq!(claim_reply.keys[0].key, stream_name);
assert!(!claim_reply.keys[0].ids.is_empty());
assert!(claim_reply.keys[0].ids.len() <= 5);
for stream_id in &claim_reply.keys[0].ids {
assert!(stream_id.milliseconds_elapsed_from_delivery.unwrap() > 0);
assert!(stream_id.delivered_count.unwrap() > 0);
}
let claimed_ids: Vec<String> = claim_reply.keys[0]
.ids
.iter()
.map(|id| id.id.clone())
.collect();
for claimed_id in &claimed_ids {
assert!(message_identifiers.contains(claimed_id));
}
let pending_info = con
.xpending_count(stream_name, group_name, "-", "+", 10)
.unwrap();
let mut consumer1_count = 0;
let mut consumer2_count = 0;
for pending_id in &pending_info.ids {
if pending_id.consumer == consumer1 {
consumer1_count += 1;
} else {
consumer2_count += 1;
}
}
assert!(consumer1_count < 10);
assert!(consumer2_count > 0);
assert_eq!(consumer1_count + consumer2_count, 10);
}
#[test]
fn test_xreadgroup_claim_with_idle_and_incoming_messages() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_4);
let mut con = ctx.connection();
let stream_name = "test_stream_claim_with_idle_and_incoming_messages";
let group_name = "test_group";
let consumer1 = "consumer1";
let consumer2 = "consumer2";
assert!(
con.xgroup_create_mkstream(stream_name, group_name, "$")
.is_ok()
);
xadd_keyrange(&mut con, stream_name, 0, 2);
let initial_reply = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer1)
.count(2),
)
.unwrap()
.unwrap();
assert_eq!(initial_reply.keys[0].ids.len(), 2);
let idle_pending_ids: Vec<String> = initial_reply.keys[0]
.ids
.iter()
.map(|id| id.id.clone())
.collect();
sleep(Duration::from_millis(20));
xadd_keyrange(&mut con, stream_name, 2, 22);
let claim_reply: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer2)
.claim(5) .count(10),
)
.unwrap();
let claim_reply = claim_reply.unwrap();
assert_eq!(claim_reply.keys.len(), 1);
assert_eq!(claim_reply.keys[0].key, stream_name);
assert_eq!(claim_reply.keys[0].ids.len(), 10);
assert_eq!(idle_pending_ids[0], claim_reply.keys[0].ids[0].id);
assert_eq!(idle_pending_ids[1], claim_reply.keys[0].ids[1].id);
for i in 0..2 {
let stream_id = &claim_reply.keys[0].ids[i];
assert!(stream_id.milliseconds_elapsed_from_delivery.unwrap() > 0);
assert!(stream_id.delivered_count.unwrap() > 0);
}
for i in 2..10 {
let stream_id = &claim_reply.keys[0].ids[i];
assert_eq!(stream_id.milliseconds_elapsed_from_delivery.unwrap(), 0);
assert_eq!(stream_id.delivered_count.unwrap(), 0);
assert!(!idle_pending_ids.contains(&stream_id.id));
}
let pending_info = con
.xpending_count(stream_name, group_name, "-", "+", 30)
.unwrap();
let mut consumer1_count = 0;
let mut consumer2_count = 0;
for pending_id in &pending_info.ids {
if pending_id.consumer == consumer1 {
consumer1_count += 1;
} else {
consumer2_count += 1;
}
}
assert_eq!(consumer1_count, 0);
assert_eq!(consumer2_count, 10);
sleep(Duration::from_millis(10));
let claim_all_reply: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer1)
.claim(5), )
.unwrap();
let claim_all_reply = claim_all_reply.unwrap();
assert_eq!(claim_all_reply.keys.len(), 1);
assert_eq!(claim_all_reply.keys[0].ids.len(), 22);
for i in 0..10 {
let stream_id = &claim_all_reply.keys[0].ids[i];
assert!(stream_id.milliseconds_elapsed_from_delivery.unwrap() > 0);
assert!(stream_id.delivered_count.unwrap() > 0);
}
for i in 10..22 {
let stream_id = &claim_all_reply.keys[0].ids[i];
assert_eq!(stream_id.milliseconds_elapsed_from_delivery.unwrap(), 0);
assert_eq!(stream_id.delivered_count.unwrap(), 0);
}
}
#[test]
fn test_xreadgroup_claim_multiple_streams() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_4);
let mut con = ctx.connection();
let stream1 = "test_stream_claim_multi_1";
let stream2 = "test_stream_claim_multi_2";
let stream3 = "test_stream_claim_multi_3";
let group_name = "test_group";
let consumer1 = "consumer1";
let consumer2 = "consumer2";
for stream_name in [stream1, stream2, stream3] {
assert!(
con.xgroup_create_mkstream(stream_name, group_name, "$")
.is_ok()
);
xadd_keyrange(&mut con, stream_name, 0, 5);
}
let initial_reply: Option<StreamReadReply> = con
.xread_options(
&[stream1, stream2, stream3],
&[">", ">", ">"],
&StreamReadOptions::default()
.group(group_name, consumer1)
.count(15),
)
.unwrap();
let initial_reply = initial_reply.unwrap();
assert_eq!(initial_reply.keys.len(), 3);
sleep(Duration::from_millis(20));
let claim_reply: Option<StreamReadReply> = con
.xread_options(
&[stream1, stream2, stream3],
&[">", ">", ">"],
&StreamReadOptions::default()
.group(group_name, consumer2)
.claim(5), )
.unwrap();
let claim_reply = claim_reply.unwrap();
assert_eq!(claim_reply.keys.len(), 3);
for stream_key in &claim_reply.keys {
assert_eq!(stream_key.ids.len(), 5);
for stream_id in &stream_key.ids {
assert!(stream_id.milliseconds_elapsed_from_delivery.unwrap() > 0);
assert!(stream_id.delivered_count.unwrap() > 0);
}
}
let consumer1_pending: StreamPendingCountReply = con
.xpending_consumer_count(stream1, group_name, "-", "+", 10, consumer1)
.unwrap();
assert_eq!(consumer1_pending.ids.len(), 0);
let consumer2_pending: StreamPendingCountReply = con
.xpending_consumer_count(stream1, group_name, "-", "+", 10, consumer2)
.unwrap();
assert_eq!(consumer2_pending.ids.len(), 5);
}
#[test]
fn test_xdel() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let result = con.xdel("k1", &["1000-0"]);
assert_eq!(result, Ok(1));
let result = con.xdel("k2", &["2000-0", "2000-1", "2000-2"]);
assert_eq!(result, Ok(2));
}
#[test]
fn test_xadd_options_deletion_policy_keepref() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_2);
let mut con = ctx.connection();
let _: () = con.flushdb().unwrap();
let stream_name = "test_stream_xadd_keepref";
let group_name = "test_group";
let consumer_name = "consumer";
let initial_stream_entries = [
("field1", "value1"),
("field2", "value2"),
("field3", "value3"),
];
let stream_entries: [(&str, &str); 3] = initial_stream_entries[..3].try_into().unwrap();
let [id1, id2, id3]: [String; 3] =
stream_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let _: () = con
.xgroup_create_mkstream(stream_name, group_name, "0")
.unwrap();
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer_name)
.count(initial_stream_entries.len() + 1),
)
.unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len());
let id4 = con
.xadd_options(
stream_name,
"*",
&[("field4", "value4")],
&StreamAddOptions::default()
.trim(StreamTrimStrategy::maxlen(
StreamTrimmingMode::Exact,
initial_stream_entries.len(),
))
.set_deletion_policy(StreamDeletionPolicy::KeepRef),
)
.unwrap()
.unwrap();
assert_eq!(con.xlen(stream_name).unwrap(), initial_stream_entries.len());
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.first_entry.id, id2);
assert_eq!(info.last_generated_id, id4);
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len());
let reply = con
.xpending_consumer_count(
stream_name,
group_name,
"-",
"+",
initial_stream_entries.len(),
consumer_name,
)
.unwrap();
assert_eq!(
vec![id1, id2, id3],
reply
.ids
.iter()
.map(|id| id.id.clone())
.collect::<Vec<String>>()
);
}
#[test]
fn test_xadd_options_deletion_policy_delref() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_2);
let mut con = ctx.connection();
let _: () = con.flushdb().unwrap();
let stream_name = "test_stream_xadd_delref";
let group_name = "test_group";
let consumer_name = "consumer";
let initial_stream_entries = [
("field1", "value1"),
("field2", "value2"),
("field3", "value3"),
];
let stream_entries: [(&str, &str); 3] = initial_stream_entries[..3].try_into().unwrap();
let [_, id2, id3]: [String; 3] =
stream_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let _: () = con
.xgroup_create_mkstream(stream_name, group_name, "0")
.unwrap();
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer_name)
.count(initial_stream_entries.len() + 1),
)
.unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len());
let id4 = con
.xadd_options(
stream_name,
"*",
&[("field4", "value4")],
&StreamAddOptions::default()
.trim(StreamTrimStrategy::maxlen(
StreamTrimmingMode::Exact,
initial_stream_entries.len(),
))
.set_deletion_policy(StreamDeletionPolicy::DelRef),
)
.unwrap()
.unwrap();
assert_eq!(con.xlen(stream_name).unwrap(), initial_stream_entries.len());
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.first_entry.id, id2);
assert_eq!(info.last_generated_id, id4);
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len() - 1);
let reply = con
.xpending_consumer_count(
stream_name,
group_name,
"-",
"+",
initial_stream_entries.len(),
consumer_name,
)
.unwrap();
assert_eq!(
vec![id2, id3],
reply
.ids
.iter()
.map(|id| id.id.clone())
.collect::<Vec<String>>()
);
}
#[test]
fn test_xadd_options_deletion_policy_acked() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_2);
let mut con = ctx.connection();
let _: () = con.flushdb().unwrap();
let stream_name = "test_stream_xadd_acked";
let group_name = "test_group";
let consumer_name = "consumer";
let initial_stream_entries = [
("field1", "value1"),
("field2", "value2"),
("field3", "value3"),
];
let stream_entries: [(&str, &str); 3] = initial_stream_entries[..3].try_into().unwrap();
let [id1, id2, id3]: [String; 3] =
stream_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let _: () = con
.xgroup_create_mkstream(stream_name, group_name, "0")
.unwrap();
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer_name)
.count(initial_stream_entries.len() + 1),
)
.unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len());
let id4 = con
.xadd_options(
stream_name,
"*",
&[("field4", "value4")],
&StreamAddOptions::default()
.trim(StreamTrimStrategy::maxlen(
StreamTrimmingMode::Exact,
initial_stream_entries.len(),
))
.set_deletion_policy(StreamDeletionPolicy::Acked),
)
.unwrap()
.unwrap();
assert_eq!(
con.xlen(stream_name).unwrap(),
initial_stream_entries.len() + 1
);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.first_entry.id, id1);
assert_eq!(info.last_generated_id, id4);
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len());
let reply = con
.xpending_consumer_count(
stream_name,
group_name,
"-",
"+",
initial_stream_entries.len(),
consumer_name,
)
.unwrap();
assert_eq!(
vec![id1, id2, id3],
reply
.ids
.iter()
.map(|id| id.id.clone())
.collect::<Vec<String>>()
);
}
#[test]
fn test_xtrim_options_deletion_policy_keepref() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_2);
let mut con = ctx.connection();
let _: () = con.flushdb().unwrap();
let stream_name = "test_stream_xtrim_keepref";
let group_name = "test_group";
let consumer_name = "consumer";
let initial_stream_entries = [
("field1", "value1"),
("field2", "value2"),
("field3", "value3"),
];
let stream_entries: [(&str, &str); 3] = initial_stream_entries[..3].try_into().unwrap();
let [id1, id2, id3]: [String; 3] =
stream_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let _: () = con
.xgroup_create_mkstream(stream_name, group_name, "0")
.unwrap();
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer_name)
.count(initial_stream_entries.len() + 1),
)
.unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len());
let _: usize = con
.xtrim_options(
stream_name,
&StreamTrimOptions::minid(StreamTrimmingMode::Exact, id2.clone())
.set_deletion_policy(StreamDeletionPolicy::KeepRef),
)
.unwrap();
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, initial_stream_entries.len() - 1);
assert_eq!(info.first_entry.id, id2);
let reply = con
.xpending_consumer_count(
stream_name,
group_name,
"-",
"+",
initial_stream_entries.len(),
consumer_name,
)
.unwrap();
assert_eq!(
vec![id1, id2, id3],
reply
.ids
.iter()
.map(|id| id.id.clone())
.collect::<Vec<String>>()
);
}
#[test]
fn test_xtrim_options_deletion_policy_delref() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_2);
let mut con = ctx.connection();
let _: () = con.flushdb().unwrap();
let stream_name = "test_stream_xtrim_delref";
let group_name = "test_group";
let consumer_name = "consumer";
let initial_stream_entries = [
("field1", "value1"),
("field2", "value2"),
("field3", "value3"),
];
let stream_entries: [(&str, &str); 3] = initial_stream_entries[..3].try_into().unwrap();
let [_, id2, id3]: [String; 3] =
stream_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let _: () = con
.xgroup_create_mkstream(stream_name, group_name, "0")
.unwrap();
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer_name)
.count(initial_stream_entries.len() + 1),
)
.unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len());
let _: usize = con
.xtrim_options(
stream_name,
&StreamTrimOptions::minid(StreamTrimmingMode::Exact, id2.clone())
.set_deletion_policy(StreamDeletionPolicy::DelRef),
)
.unwrap();
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, initial_stream_entries.len() - 1);
assert_eq!(info.first_entry.id, id2);
let reply = con
.xpending_consumer_count(
stream_name,
group_name,
"-",
"+",
initial_stream_entries.len(),
consumer_name,
)
.unwrap();
assert_eq!(
vec![id2, id3],
reply
.ids
.iter()
.map(|id| id.id.clone())
.collect::<Vec<String>>()
);
}
#[test]
fn test_xtrim_options_deletion_policy_acked() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_2);
let mut con = ctx.connection();
let _: () = con.flushdb().unwrap();
let stream_name = "test_stream_xtrim_acked";
let group_name = "test_group";
let consumer_name = "consumer";
let initial_stream_entries = [
("field1", "value1"),
("field2", "value2"),
("field3", "value3"),
];
let stream_entries: [(&str, &str); 3] = initial_stream_entries[..3].try_into().unwrap();
let [id1, id2, id3]: [String; 3] =
stream_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let _: () = con
.xgroup_create_mkstream(stream_name, group_name, "0")
.unwrap();
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, consumer_name)
.count(initial_stream_entries.len() + 1),
)
.unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), initial_stream_entries.len());
let _: usize = con
.xtrim_options(
stream_name,
&StreamTrimOptions::minid(StreamTrimmingMode::Exact, id2.clone())
.set_deletion_policy(StreamDeletionPolicy::Acked),
)
.unwrap();
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, initial_stream_entries.len());
assert_eq!(info.first_entry.id, id1);
let reply = con
.xpending_consumer_count(
stream_name,
group_name,
"-",
"+",
initial_stream_entries.len(),
consumer_name,
)
.unwrap();
assert_eq!(
vec![id1, id2, id3],
reply
.ids
.iter()
.map(|id| id.id.clone())
.collect::<Vec<String>>()
);
}
#[test]
fn test_xdel_ex() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_2);
let mut con = ctx.connection();
let _: () = con.flushdb().unwrap();
let stream_name = "test_stream_xdel_ex";
let group_name = "test_group";
let stream_entries = [
("field1", "value1"),
("field2", "value2"),
("field3", "value3"),
("field4", "value4"),
("field5", "value5"),
("field6", "value6"),
];
let non_existent_id = "9999999999-0";
let first_three_entries: [(&str, &str); 3] = stream_entries[..3].try_into().unwrap();
let [id1, id2, id3]: [String; 3] =
first_three_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let _: () = con
.xgroup_create_mkstream(stream_name, group_name, "0")
.unwrap();
for i in 1..3 {
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, format!("consumer{i}"))
.count(1),
)
.unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), i);
if let StreamPendingReply::Data(data) = pending {
assert_eq!(data.consumers.len(), i);
for j in 0..i {
assert_eq!(data.consumers[j].name, format!("consumer{}", j + 1));
assert_eq!(data.consumers[j].pending, 1);
}
} else {
panic!("Expected StreamPendingReply::Data");
}
}
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, first_three_entries.len());
let result = con.xdel_ex(stream_name, &[&id1], StreamDeletionPolicy::Acked);
assert_eq!(
result,
Ok(vec![
XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced
])
);
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 2);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 3);
let _: usize = con.xack(stream_name, group_name, &[&id1]).unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 3);
let result = con.xdel_ex(stream_name, &[&id1], StreamDeletionPolicy::Acked);
assert_eq!(result, Ok(vec![XDelExStatusCode::Deleted]));
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 2);
let result = con.xdel_ex(stream_name, &[&id2], StreamDeletionPolicy::KeepRef);
assert_eq!(result, Ok(vec![XDelExStatusCode::Deleted]));
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 1);
let result = con.xdel_ex(stream_name, &[&id2], StreamDeletionPolicy::Acked);
assert_eq!(
result,
Ok(vec![
XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced
])
);
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 1);
assert_eq!(info.first_entry.id, id3);
let fourth_entry: [(&str, &str); 1] = stream_entries[3..4].try_into().unwrap();
let id4: String = con.xadd(stream_name, "*", &fourth_entry).unwrap().unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 2);
let result = con.xdel_ex(stream_name, &[&id4], StreamDeletionPolicy::Acked);
assert_eq!(
result,
Ok(vec![
XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced
])
);
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 2);
let result = con.xdel_ex(stream_name, &[&id4], StreamDeletionPolicy::DelRef);
assert_eq!(result, Ok(vec![XDelExStatusCode::Deleted]));
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 1);
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, "consumer3")
.count(1),
)
.unwrap();
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 2);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 1);
let result = con.xdel_ex(stream_name, &[&id2], StreamDeletionPolicy::DelRef);
assert_eq!(result, Ok(vec![XDelExStatusCode::IdNotFound]));
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 1);
let result = con.xdel_ex(stream_name, &[&id3], StreamDeletionPolicy::DelRef);
assert_eq!(result, Ok(vec![XDelExStatusCode::Deleted]));
let pending = con.xpending(stream_name, group_name).unwrap();
assert_eq!(pending.count(), 0);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 0);
let last_two_entries: [(&str, &str); 2] = stream_entries[4..6].try_into().unwrap();
let [id5, id6]: [String; 2] =
last_two_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let result = con.xdel_ex(
stream_name,
&[id5.as_str(), id6.as_str(), non_existent_id],
StreamDeletionPolicy::DelRef,
);
assert_eq!(
result,
Ok(vec![
XDelExStatusCode::Deleted,
XDelExStatusCode::Deleted,
XDelExStatusCode::IdNotFound,
])
);
let result = con.xdel_ex(
stream_name,
&[non_existent_id],
StreamDeletionPolicy::DelRef,
);
assert_eq!(result, Ok(vec![XDelExStatusCode::IdNotFound]));
let result = con.xdel_ex(stream_name, &["invalid-0"], StreamDeletionPolicy::DelRef);
assert_matches!(result, Err(e) if e.to_string().contains("Invalid stream ID"));
}
#[test]
fn test_xack_del() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_8_2);
let mut con = ctx.connection();
let _: () = con.flushdb().unwrap();
let stream_name = "test_stream_xack_del";
let first_group_name = "test_group1";
let second_group_name = "test_group2";
let stream_entries = [
("field1", "value1"),
("field2", "value2"),
("field3", "value3"),
("field4", "value4"),
("field5", "value5"),
("field6", "value6"),
];
let non_existent_id = "9999999999-0";
let first_three_entries: [(&str, &str); 3] = stream_entries[..3].try_into().unwrap();
let [id1, id2, id3]: [String; 3] =
first_three_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
let _: () = con
.xgroup_create_mkstream(stream_name, first_group_name, "0")
.unwrap();
for i in 1..4 {
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(first_group_name, format!("consumer{i}"))
.count(1),
)
.unwrap();
let pending = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(pending.count(), i);
if let StreamPendingReply::Data(data) = pending {
assert_eq!(data.consumers.len(), i);
for j in 0..i {
assert_eq!(data.consumers[j].name, format!("consumer{}", j + 1));
assert_eq!(data.consumers[j].pending, 1);
}
} else {
panic!("Expected StreamPendingReply::Data");
}
}
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, first_three_entries.len());
let pending = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(pending.count(), first_three_entries.len());
let mut remaining_entries = first_three_entries.len();
let ids = [&id1, &id2, &id3];
let policies = [
StreamDeletionPolicy::KeepRef,
StreamDeletionPolicy::DelRef,
StreamDeletionPolicy::Acked,
];
for (&id, policy) in ids.iter().zip(policies.iter()) {
let result = con.xack_del(stream_name, first_group_name, &[id], policy.clone());
assert_eq!(result, Ok(vec![XAckDelStatusCode::AcknowledgedAndDeleted]));
remaining_entries -= 1;
let pending = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(pending.count(), remaining_entries);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, remaining_entries);
}
let fourth_entry: [(&str, &str); 1] = stream_entries[3..4].try_into().unwrap();
let id4: String = con.xadd(stream_name, "*", &fourth_entry).unwrap().unwrap();
let _: () = con
.xgroup_create_mkstream(stream_name, second_group_name, "0")
.unwrap();
let result = con.xack_del(
stream_name,
first_group_name,
&[&id4],
StreamDeletionPolicy::Acked,
);
assert_eq!(result, Ok(vec![XAckDelStatusCode::IdNotFound]));
for &group_name in &[first_group_name, second_group_name] {
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, "consumer1")
.count(1),
)
.unwrap();
}
let first_group_pending_messages = con.xpending(stream_name, first_group_name).unwrap();
let second_group_pending_messages = con.xpending(stream_name, second_group_name).unwrap();
assert_eq!(first_group_pending_messages.count(), 1);
assert_eq!(
first_group_pending_messages.count(),
second_group_pending_messages.count()
);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 1);
let result = con.xack_del(
stream_name,
first_group_name,
&[&id4],
StreamDeletionPolicy::KeepRef,
);
assert_eq!(result, Ok(vec![XAckDelStatusCode::AcknowledgedAndDeleted]));
let first_group_pending_messages = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(first_group_pending_messages.count(), 0);
let second_group_pending_messages = con.xpending(stream_name, second_group_name).unwrap();
assert_eq!(second_group_pending_messages.count(), 1);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 0);
let result = con.xack_del(
stream_name,
second_group_name,
&[&id4],
StreamDeletionPolicy::DelRef,
);
assert_eq!(result, Ok(vec![XAckDelStatusCode::AcknowledgedAndDeleted]));
let second_group_pending_messages = con.xpending(stream_name, second_group_name).unwrap();
assert_eq!(second_group_pending_messages.count(), 0);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 0);
let last_two_entries: [(&str, &str); 2] = stream_entries[4..6].try_into().unwrap();
let [id5, id6]: [String; 2] =
last_two_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
for &group_name in &[first_group_name, second_group_name] {
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, "consumer1")
.count(2),
)
.unwrap();
}
let first_group_pending_messages = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(first_group_pending_messages.count(), 2);
let second_group_pending_messages = con.xpending(stream_name, second_group_name).unwrap();
assert_eq!(second_group_pending_messages.count(), 2);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 2);
let result = con.xack_del(
stream_name,
first_group_name,
&[&id5, &id6],
StreamDeletionPolicy::DelRef,
);
assert_eq!(
result,
Ok(vec![
XAckDelStatusCode::AcknowledgedAndDeleted,
XAckDelStatusCode::AcknowledgedAndDeleted,
])
);
let first_group_pending_messages = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(first_group_pending_messages.count(), 0);
let second_group_pending_messages = con.xpending(stream_name, second_group_name).unwrap();
assert_eq!(second_group_pending_messages.count(), 0);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 0);
let result = con.xack_del(
stream_name,
second_group_name,
&[&id5, &id6],
StreamDeletionPolicy::DelRef,
);
assert_eq!(
result,
Ok(vec![
XAckDelStatusCode::IdNotFound,
XAckDelStatusCode::IdNotFound,
])
);
let last_two_entries: [(&str, &str); 2] = stream_entries[4..6].try_into().unwrap();
let [id5, id6]: [String; 2] =
last_two_entries.map(|entry| con.xadd(stream_name, "*", &[entry]).unwrap().unwrap());
for &group_name in &[first_group_name, second_group_name] {
let _: Option<StreamReadReply> = con
.xread_options(
&[stream_name],
&[">"],
&StreamReadOptions::default()
.group(group_name, "consumer1")
.count(2),
)
.unwrap();
}
let first_group_pending_messages = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(first_group_pending_messages.count(), 2);
let second_group_pending_messages = con.xpending(stream_name, second_group_name).unwrap();
assert_eq!(second_group_pending_messages.count(), 2);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 2);
let result = con.xack_del(
stream_name,
first_group_name,
&[&id5, &id6],
StreamDeletionPolicy::Acked,
);
assert_eq!(
result,
Ok(vec![
XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced,
XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced,
])
);
let first_group_pending_messages = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(first_group_pending_messages.count(), 0);
let second_group_pending_messages = con.xpending(stream_name, second_group_name).unwrap();
assert_eq!(second_group_pending_messages.count(), 2);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 2);
let result = con.xack_del(
stream_name,
second_group_name,
&[&id5, &id6],
StreamDeletionPolicy::Acked,
);
assert_eq!(
result,
Ok(vec![
XAckDelStatusCode::AcknowledgedAndDeleted,
XAckDelStatusCode::AcknowledgedAndDeleted,
])
);
let first_group_pending_messages = con.xpending(stream_name, first_group_name).unwrap();
assert_eq!(first_group_pending_messages.count(), 0);
let second_group_pending_messages = con.xpending(stream_name, second_group_name).unwrap();
assert_eq!(second_group_pending_messages.count(), 0);
let info = con.xinfo_stream(stream_name).unwrap();
assert_eq!(info.length, 0);
let result = con.xack_del(
stream_name,
first_group_name,
&[non_existent_id],
StreamDeletionPolicy::DelRef,
);
assert_eq!(result, Ok(vec![XAckDelStatusCode::IdNotFound]));
let result = con.xack_del(
stream_name,
first_group_name,
&["invalid-0"],
StreamDeletionPolicy::DelRef,
);
assert_matches!(result, Err(e) if e.to_string().contains("Invalid stream ID"));
}
#[test]
fn test_xtrim() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd_keyrange(&mut con, "k1", 0, 100);
let result = con.xtrim("k1", StreamMaxlen::Equals(50));
assert_eq!(result, Ok(50));
let result = con.xtrim("k1", StreamMaxlen::Equals(10));
assert_eq!(result, Ok(40));
}
#[test]
fn test_xtrim_options() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd_keyrange(&mut con, "k1", 0, 100);
let result = con.xtrim_options(
"k1",
&StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 50),
);
assert_eq!(result, Ok(50));
let result = con.xtrim_options(
"k1",
&StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10),
);
assert_eq!(result, Ok(40));
let _ = con.del("k1");
for i in 1..100 {
let _ = con.xadd("k1", format!("1-{i}"), &[("h", "w")]);
}
let result = con.xtrim_options(
"k1",
&StreamTrimOptions::minid(StreamTrimmingMode::Exact, "1-26"),
);
assert_eq!(result, Ok(25));
let result = con.xtrim_options(
"k1",
&StreamTrimOptions::minid(StreamTrimmingMode::Exact, "1-76"),
);
assert_eq!(result, Ok(50));
}
#[test]
fn test_xgroup() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let result = con.xgroup_create_mkstream("k1", "g1", "0");
assert_matches!(result, Ok(_));
let result = con.xgroup_destroy("k1", "g1");
assert_eq!(result, Ok(true));
xadd(&mut con);
let result = con.xgroup_create("k1", "g1", "0");
assert_matches!(result, Ok(_));
let reply = con
.xread_options(
&["k1"],
&[">"],
&StreamReadOptions::default().group("g1", "c1"),
)
.unwrap()
.unwrap();
assert_eq!(reply.keys[0].ids.len(), 2);
let result = con.xgroup_delconsumer("k1", "g1", "c1");
assert_eq!(result, Ok(2));
let result = con.xgroup_destroy("k1", "g1");
assert_eq!(result, Ok(true));
}
#[test]
fn test_xrange() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let reply = con.xrange_all("k1").unwrap();
assert_eq!(reply.ids.len(), 2);
let reply = con.xrange("k1", "1000-1", "+").unwrap();
assert_eq!(reply.ids.len(), 1);
let reply = con.xrange("k1", "-", "1000-0").unwrap();
assert_eq!(reply.ids.len(), 1);
let reply = con.xrange_count("k1", "-", "+", 1).unwrap();
assert_eq!(reply.ids.len(), 1);
}
#[test]
fn test_xrevrange() {
let ctx = TestContext::new();
let mut con = ctx.connection();
xadd(&mut con);
let reply = con.xrevrange_all("k1").unwrap();
assert_eq!(reply.ids.len(), 2);
let reply = con.xrevrange("k1", "1000-1", "-").unwrap();
assert_eq!(reply.ids.len(), 2);
let reply = con.xrevrange("k1", "+", "1000-1").unwrap();
assert_eq!(reply.ids.len(), 1);
let reply = con.xrevrange_count("k1", "+", "-", 1).unwrap();
assert_eq!(reply.ids.len(), 1);
}
#[test]
fn test_xautoclaim_invalid_pel_entries_claiming_full_entries() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let reply = create_group_add_and_read(&mut con);
let claim = &reply.keys[0].ids[0];
let claim_1 = &reply.keys[0].ids[1];
let claimed_entries = vec![reply.keys[0].ids[2].clone(), reply.keys[0].ids[3].clone()];
let _ = con.xdel("k1", &[claim.id.clone(), claim_1.id.clone()]);
sleep(Duration::from_millis(5));
let reply = con
.xautoclaim_options(
"k1",
"g1",
"c2",
1,
claim.id.clone(),
StreamAutoClaimOptions::default().count(4),
)
.unwrap();
assert_eq!(reply.claimed, claimed_entries);
if ctx.get_version().0 > 6 {
assert_eq!(
reply.deleted_ids,
vec![claim.id.clone(), claim_1.id.clone()]
);
} else {
assert_eq!(reply.deleted_ids.len(), 0);
assert!(reply.invalid_entries);
}
}
#[test]
fn test_xautoclaim_invalid_pel_entries_claiming_just_ids() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let reply = create_group_add_and_read(&mut con);
let claim = &reply.keys[0].ids[0];
let claim_1 = &reply.keys[0].ids[1];
let mut claimed_entries = vec![reply.keys[0].ids[2].clone(), reply.keys[0].ids[3].clone()];
let _ = con.xdel("k1", &[claim.id.clone(), claim_1.id.clone()]);
sleep(Duration::from_millis(5));
let reply = con
.xautoclaim_options(
"k1",
"g1",
"c2",
1,
claim.id.clone(),
StreamAutoClaimOptions::default().count(4).with_justid(),
)
.unwrap();
std::mem::take(&mut claimed_entries[0].map);
std::mem::take(&mut claimed_entries[1].map);
if ctx.get_version().0 > 6 {
assert_eq!(reply.claimed, claimed_entries);
assert_eq!(
reply.deleted_ids,
vec![claim.id.clone(), claim_1.id.clone()]
);
} else {
claimed_entries.insert(
0,
StreamId {
id: claim.id.clone(),
map: Default::default(),
milliseconds_elapsed_from_delivery: None,
delivered_count: None,
},
);
claimed_entries.insert(
1,
StreamId {
id: claim_1.id.clone(),
map: Default::default(),
milliseconds_elapsed_from_delivery: None,
delivered_count: None,
},
);
assert_eq!(reply.claimed, claimed_entries);
assert_eq!(reply.deleted_ids.len(), 0);
}
}