use redis_queue::utils::queue_trim_consumed;
use redis_queue::{Queue, QueueConfig};
use redis_queue::types::{EntryValue, FetchParams, FetchType, PendingParams, Range, RangeIdx, TimestampId};
use core::time;
#[tokio::test]
async fn verify_redis_behavior() {
const STREAM: &str = "verify_redis_behavior";
const GROUP: &str = "test1";
const READER: &str = "reader1";
let config = QueueConfig { stream: STREAM.into() };
let client = redis::Client::open("redis://127.0.0.1/").expect("to create redis client");
let conn = client.get_connection_manager().await.expect("to get connection");
let queue = Queue::new(config, conn);
let result = queue.time().await.expect("get time");
println!("redis current time={}", result.as_millis());
queue.purge().await.expect("to delete redis key");
let len = queue.len().await.expect("get len");
assert_eq!(len, 0);
{
let mut conn = queue.connection();
let _: () = redis::Cmd::new()
.arg("ZADD")
.arg(STREAM)
.arg("1")
.arg("data")
.query_async(&mut conn)
.await
.expect("Success");
}
let error = queue.create_group(GROUP).await.unwrap_err();
assert_eq!(error.kind(), redis::ErrorKind::ClientError);
queue.purge().await.expect("to delete redis key");
queue.create_group(&GROUP).await.expect("create group");
queue.create_group(&GROUP).await.expect("create group");
let groups = queue.groups_info().await.expect("to get group info");
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].pending, 0);
let value1 = EntryValue {
id: uuid::Uuid::new_v4(),
payload: "test",
};
let id1 = queue.append(&value1).await.expect("append entry");
let groups = queue.groups_info().await.expect("to get group info");
assert!(groups[0].last_delivered_id < id1);
assert_eq!(groups[0].pending, 0);
let value2 = EntryValue {
id: uuid::Uuid::new_v4(),
payload: "test",
};
let id2 = queue.append(&value2).await.expect("append entry");
let groups = queue.groups_info().await.expect("to get group info");
assert!(groups[0].last_delivered_id < id1);
assert_eq!(groups[0].pending, 0);
queue_trim_consumed(queue.clone(), 10).await;
let len = queue.len().await.expect("get len");
assert_eq!(len, 2);
let mut params = FetchParams {
group: GROUP,
consumer: READER,
count: 0,
typ: FetchType::New,
timeout: None,
};
let stream = queue.fetch::<String>(¶ms).await.expect("Fetch new messages");
assert_eq!(stream.stream, STREAM);
queue_trim_consumed(queue.clone(), 10).await;
let len = queue.len().await.expect("get len");
assert_eq!(len, 2);
let items = &stream.entries;
assert_eq!(items.len(), 2);
assert_eq!(items[0].id, id1);
assert_eq!(items[0].value.id, value1.id);
assert_eq!(items[0].value.payload, value1.payload);
assert_eq!(items[1].id, id2);
assert_eq!(items[1].value.id, value2.id);
assert_eq!(items[1].value.payload, value2.payload);
let groups = queue.groups_info().await.expect("to get group info");
assert_eq!(groups[0].last_delivered_id, id2);
assert_eq!(groups[0].pending, 2);
let stream = queue.fetch::<String>(¶ms).await.expect("Fetch new messages");
assert_eq!(stream.stream, STREAM);
let items = &stream.entries;
assert_eq!(items.len(), 0);
params.typ = FetchType::Pending;
let stream = queue.fetch::<String>(¶ms).await.expect("Fetch pending messages");
assert_eq!(stream.stream, STREAM);
let items = &stream.entries;
assert_eq!(items.len(), 2);
assert_eq!(items[0].id, id1);
assert_eq!(items[0].value.id, value1.id);
assert_eq!(items[0].value.payload, value1.payload);
assert_eq!(items[1].id, id2);
assert_eq!(items[1].value.id, value2.id);
assert_eq!(items[1].value.payload, value2.payload);
params.typ = FetchType::After(id1);
let stream = queue.fetch::<String>(¶ms).await.expect("Fetch pending messages");
assert_eq!(stream.stream, STREAM);
let items = &stream.entries;
assert_eq!(items.len(), 1);
assert_eq!(items[0].id, id2);
assert_eq!(items[0].value.id, value2.id);
assert_eq!(items[0].value.payload, value2.payload);
queue.consume(&GROUP, &[id2]).await.expect("To consume second message");
let stream = queue.fetch::<String>(¶ms).await.expect("Fetch pending messages");
assert_eq!(stream.stream, STREAM);
let items = &stream.entries;
assert_eq!(items.len(), 0);
queue_trim_consumed(queue.clone(), 10).await;
let len = queue.len().await.expect("get len");
assert_eq!(len, 2);
let groups = queue.groups_info().await.expect("to get group info");
assert_eq!(groups[0].last_delivered_id, id2);
assert_eq!(groups[0].pending, 1);
queue.consume(&GROUP, &[id1]).await.expect("To consume second message");
params.typ = FetchType::Pending;
let groups = queue.groups_info().await.expect("to get group info");
assert_eq!(groups[0].last_delivered_id, id2);
assert_eq!(groups[0].pending, 0);
let stream = queue.fetch::<String>(¶ms).await.expect("Fetch pending messages");
assert_eq!(stream.stream, STREAM);
let items = &stream.entries;
assert_eq!(items.len(), 0);
let size = queue.delete(&[id1, id2]).await.expect("to delete entries");
assert_eq!(size, 2);
let len = queue.len().await.expect("get length");
assert_eq!(len, 0);
let groups = queue.groups_info().await.expect("to get group info");
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].pending, 0);
let id3_delay = time::Duration::from_secs(10);
let id1 = queue.append(&value1).await.expect("append entry");
let id2 = queue.append(&value2).await.expect("append entry");
let id3 = queue.append_delayed(&value2, id3_delay).await.expect("append entry");
let time = queue.time().await.expect("get current time");
let mut pending_params = PendingParams {
group: GROUP,
consumer: Some(READER),
idle: None,
range: Range {
start: RangeIdx::Any,
end: RangeIdx::Any,
},
count: 10,
};
let groups = queue.groups_info().await.expect("to get group info");
assert_eq!(groups.len(), 1);
assert!(groups[0].last_delivered_id < id1);
assert_eq!(groups[0].pending, 0);
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 0);
let pending_stats = queue
.pending_stats(&pending_params.group)
.await
.expect("get pending stats");
assert_eq!(pending_stats.len, 0);
params.typ = FetchType::New;
let stream = queue.fetch::<String>(¶ms).await.expect("Fetch new messages");
assert_eq!(stream.stream, STREAM);
assert_eq!(stream.entries.len(), 3);
queue_trim_consumed(queue.clone(), 10).await;
let len = queue.len().await.expect("get len");
assert_eq!(len, 3);
let groups = queue.groups_info().await.expect("to get group info");
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].last_delivered_id, id3);
assert_eq!(groups[0].pending, 3);
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 3);
assert_eq!(pending[0].id, id1);
assert_eq!(pending[1].id, id2);
assert_eq!(pending[2].id, id3);
assert_eq!(pending[0].consumer, READER);
assert_eq!(pending[1].consumer, READER);
assert_eq!(pending[2].consumer, READER);
assert_eq!(pending[0].count, 1);
assert_eq!(pending[1].count, 1);
assert_eq!(pending[2].count, 1);
let pending_stats = queue
.pending_stats(&pending_params.group)
.await
.expect("get pending stats");
assert_eq!(pending_stats.len, 3);
assert_eq!(pending_stats.lowest_id, id1);
assert_eq!(pending_stats.highest_id, id3);
assert_eq!(pending_stats.consumers.len(), 1);
assert_eq!(pending_stats.consumers[0].no_ack_num, 3);
pending_params.range.end = RangeIdx::Timestamp(TimestampId::new(time + id3_delay));
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 3);
assert_eq!(pending[0].id, id1);
assert_eq!(pending[1].id, id2);
assert_eq!(pending[2].id, id3);
pending_params.range.end =
RangeIdx::Timestamp(TimestampId::new(time + id3_delay - time::Duration::from_secs(1)));
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 2);
assert_eq!(pending[0].id, id1);
assert_eq!(pending[1].id, id2);
pending_params.range.start = RangeIdx::Id(id1);
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 2);
assert_eq!(pending[0].id, id1);
assert_eq!(pending[1].id, id2);
pending_params.range.start = RangeIdx::ExcludeId(id1);
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].id, id2);
pending_params.range.end = RangeIdx::Id(id2);
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].id, id2);
pending_params.range.end = RangeIdx::ExcludeId(id2);
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 0);
pending_params.range.start = RangeIdx::Id(id1);
let pending = queue.pending(&pending_params).await.expect("get pending");
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].id, id1);
queue_trim_consumed(queue.clone(), 10).await;
let len = queue.len().await.expect("get len");
assert_eq!(len, 3);
queue.consume(&GROUP, &[id3]).await.expect("consume id3");
queue_trim_consumed(queue.clone(), 10).await;
let len = queue.len().await.expect("get len");
assert_eq!(len, 3);
queue.consume(&GROUP, &[id1]).await.expect("To consume second message");
queue_trim_consumed(queue.clone(), 10).await;
let len = queue.len().await.expect("get len");
assert_eq!(len, 2);
queue.consume(&GROUP, &[id2]).await.expect("To consume second message");
queue_trim_consumed(queue.clone(), 10).await;
let len = queue.len().await.expect("get len");
assert_eq!(len, 0);
}