mod support;
use rsmq_async::{RedisBytes, Rsmq, RsmqConnection as _, RsmqError};
use std::{convert::TryFrom, time::Duration};
use support::*;
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Runtime::new().unwrap()
}
async fn new_rsmq(ctx: &TestContext) -> Rsmq {
Rsmq::new_with_connection(ctx.async_connection().await.unwrap(), false, Some(&ctx.ns))
.await
.unwrap()
}
#[test]
fn send_receiving_deleting_message() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue1", None, None, None).await.unwrap();
rsmq.send_message("queue1", "testmessage", None)
.await
.unwrap();
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_some());
let message = message.unwrap();
rsmq.delete_message("queue1", &message.id).await.unwrap();
assert_eq!(message.message, "testmessage".to_string());
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());
rsmq.delete_queue("queue1").await.unwrap();
})
}
#[test]
fn send_receiving_delayed_message() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue1", None, None, None).await.unwrap();
rsmq.send_message("queue1", "testmessage", Some(Duration::from_secs(2)))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_some());
let message = message.unwrap();
rsmq.delete_message("queue1", &message.id).await.unwrap();
assert_eq!(message.message, "testmessage".to_string());
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());
rsmq.delete_queue("queue1").await.unwrap();
})
}
#[test]
fn send_receiving_deleting_message_vec_u8() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue1", None, None, None).await.unwrap();
rsmq.send_message("queue1", "testmessage", None)
.await
.unwrap();
let message = rsmq
.receive_message::<Vec<u8>>("queue1", None)
.await
.unwrap();
assert!(message.is_some());
let message = message.unwrap();
rsmq.delete_message("queue1", &message.id).await.unwrap();
assert_eq!(message.message, b"testmessage");
let message = rsmq
.receive_message::<Vec<u8>>("queue1", None)
.await
.unwrap();
assert!(message.is_none());
rsmq.delete_queue("queue1").await.unwrap();
})
}
#[test]
fn send_receiving_deleting_message_custom_type() {
let rt = tokio::runtime::Runtime::new().unwrap();
#[derive(Debug, PartialEq)]
struct MyValue(Vec<u8>);
impl TryFrom<RedisBytes> for MyValue {
type Error = Vec<u8>;
fn try_from(t: RedisBytes) -> Result<Self, Self::Error> {
Ok(MyValue(t.into_bytes()))
}
}
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue1", None, None, None).await.unwrap();
rsmq.send_message("queue1", b"testmessage".to_owned().to_vec(), None)
.await
.unwrap();
let message = rsmq
.receive_message::<MyValue>("queue1", None)
.await
.unwrap();
assert!(message.is_some());
let message = message.unwrap();
rsmq.delete_message("queue1", &message.id).await.unwrap();
assert_eq!(message.message, MyValue(b"testmessage".to_owned().to_vec()));
let message = rsmq
.receive_message::<MyValue>("queue1", None)
.await
.unwrap();
assert!(message.is_none());
rsmq.delete_queue("queue1").await.unwrap();
})
}
#[test]
fn pop_message() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue2", None, None, None).await.unwrap();
rsmq.send_message("queue2", "testmessage", None)
.await
.unwrap();
let message = rsmq.pop_message::<String>("queue2").await.unwrap();
assert!(message.is_some());
let message = message.unwrap();
assert_eq!(message.message, "testmessage");
let message = rsmq.pop_message::<String>("queue2").await.unwrap();
assert!(message.is_none());
rsmq.delete_queue("queue2").await.unwrap();
})
}
#[test]
fn pop_message_vec_u8() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue2", None, None, None).await.unwrap();
rsmq.send_message("queue2", "testmessage", None)
.await
.unwrap();
let message = rsmq.pop_message::<Vec<u8>>("queue2").await.unwrap();
assert!(message.is_some());
let message = message.unwrap();
assert_eq!(message.message, "testmessage".as_bytes());
let message = rsmq.pop_message::<String>("queue2").await.unwrap();
assert!(message.is_none());
rsmq.delete_queue("queue2").await.unwrap();
})
}
#[test]
fn creating_queue() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue3", None, None, None).await.unwrap();
let queues = rsmq.list_queues().await.unwrap();
assert_eq!(queues, vec!("queue3"));
let result = rsmq.create_queue("queue3", None, None, None).await;
assert!(result.is_err());
if let Err(RsmqError::QueueExists) = result {
rsmq.delete_queue("queue3").await.unwrap();
} else {
panic!()
}
})
}
#[test]
fn updating_queue() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue4", None, None, None).await.unwrap();
let attributes = rsmq.get_queue_attributes("queue4").await.unwrap();
assert_eq!(attributes.vt, Duration::from_secs(30));
assert_eq!(attributes.delay, Duration::ZERO);
assert_eq!(attributes.maxsize, 65536);
assert_eq!(attributes.totalrecv, 0);
assert_eq!(attributes.totalsent, 0);
assert_eq!(attributes.msgs, 0);
assert_eq!(attributes.hiddenmsgs, 0);
assert!(attributes.created > 0);
assert!(attributes.modified > 0);
rsmq.set_queue_attributes(
"queue4",
Some(Duration::from_secs(45)),
Some(Duration::from_secs(5)),
Some(2048),
)
.await
.unwrap();
let attributes = rsmq.get_queue_attributes("queue4").await.unwrap();
assert_eq!(attributes.vt, Duration::from_secs(45));
assert_eq!(attributes.delay, Duration::from_secs(5));
assert_eq!(attributes.maxsize, 2048);
assert_eq!(attributes.totalrecv, 0);
assert_eq!(attributes.totalsent, 0);
assert_eq!(attributes.msgs, 0);
assert_eq!(attributes.hiddenmsgs, 0);
assert!(attributes.created > 0);
assert!(attributes.modified > 0);
rsmq.delete_queue("queue4").await.unwrap();
})
}
#[test]
fn deleting_queue() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue5", None, None, None).await.unwrap();
let queues = rsmq.list_queues().await.unwrap();
assert_eq!(queues, vec!("queue5"));
rsmq.delete_queue("queue5").await.unwrap();
let queues = rsmq.list_queues().await.unwrap();
assert_eq!(queues, Vec::<String>::new());
let result = rsmq.delete_queue("queue5").await;
assert!(result.is_err());
if let Err(RsmqError::QueueNotFound) = result {
} else {
panic!("{:?}", result)
}
let result = rsmq.get_queue_attributes("queue5").await;
assert!(result.is_err());
if let Err(RsmqError::QueueNotFound) = result {
} else {
panic!("{:?}", result)
}
let result = rsmq
.set_queue_attributes(
"queue5",
Some(Duration::from_secs(45)),
Some(Duration::from_secs(5)),
Some(2048),
)
.await;
assert!(result.is_err());
if let Err(RsmqError::QueueNotFound) = result {
} else {
panic!("{:?}", result)
}
})
}
#[test]
fn change_message_visibility() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue6", None, None, None).await.unwrap();
rsmq.send_message("queue6", "testmessage", None)
.await
.unwrap();
let message = rsmq
.receive_message::<String>("queue6", None)
.await
.unwrap();
assert!(message.is_some());
let message_id = message.unwrap().id;
let message = rsmq
.receive_message::<String>("queue6", None)
.await
.unwrap();
assert!(message.is_none());
rsmq.change_message_visibility("queue6", &message_id, Duration::ZERO)
.await
.unwrap();
let ten_millis = std::time::Duration::from_millis(10);
std::thread::sleep(ten_millis);
let message = rsmq
.receive_message::<String>("queue6", None)
.await
.unwrap();
assert!(message.is_some());
assert_eq!(message_id, message.unwrap().id);
rsmq.delete_queue("queue6").await.unwrap();
})
}
#[test]
fn change_queue_size() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue6", None, None, None).await.unwrap();
rsmq.set_queue_attributes("queue6", None, None, Some(-1))
.await
.unwrap();
let attributes = rsmq.get_queue_attributes("queue6").await.unwrap();
assert_eq!(attributes.maxsize, -1);
})
}
#[cfg(feature = "break-js-comp")]
#[test]
fn sent_messages_must_keep_order() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let ctx = TestContext::new();
let connection = ctx.async_connection().await.unwrap();
let mut rsmq = Rsmq::new_with_connection(connection, false, Some(&ctx.ns))
.await
.unwrap();
rsmq.create_queue("queue1", None, None, None).await.unwrap();
for i in 0..10000 {
rsmq.send_message("queue1", format!("testmessage{}", i), None)
.await
.unwrap();
}
for i in 0..10000 {
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap()
.unwrap();
assert_eq!(message.message, format!("testmessage{}", i));
rsmq.delete_message("queue1", &message.id).await.unwrap();
}
let message = rsmq
.receive_message::<String>("queue1", None)
.await
.unwrap();
assert!(message.is_none());
rsmq.delete_queue("queue1").await.unwrap();
})
}
#[test]
fn queue_name_empty_is_rejected() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
assert!(matches!(
rsmq.create_queue("", None, None, None).await,
Err(RsmqError::InvalidFormat(_))
));
});
}
#[test]
fn queue_name_too_long_is_rejected() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
let long = "a".repeat(161);
assert!(matches!(
rsmq.create_queue(&long, None, None, None).await,
Err(RsmqError::InvalidFormat(_))
));
});
}
#[test]
fn queue_name_invalid_chars_are_rejected() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
for bad in &["has space", "has.dot", "has@at", "has/slash", "has:colon"] {
assert!(
matches!(
rsmq.create_queue(bad, None, None, None).await,
Err(RsmqError::InvalidFormat(_))
),
"expected InvalidFormat for {:?}",
bad
);
}
});
}
#[test]
fn queue_name_valid_boundaries_accepted() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("valid-name_123", None, None, None)
.await
.unwrap();
let max_name = "a".repeat(160);
rsmq.create_queue(&max_name, None, None, None)
.await
.unwrap();
});
}
#[test]
fn create_queue_maxsize_boundaries() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q-min", None, None, Some(1024))
.await
.unwrap();
rsmq.create_queue("q-max", None, None, Some(65536))
.await
.unwrap();
rsmq.create_queue("q-unlimited", None, None, Some(-1))
.await
.unwrap();
assert!(matches!(
rsmq.create_queue("q-bad-low", None, None, Some(1023)).await,
Err(RsmqError::InvalidValue(_, _, _))
));
assert!(matches!(
rsmq.create_queue("q-bad-high", None, None, Some(65537))
.await,
Err(RsmqError::InvalidValue(_, _, _))
));
});
}
#[test]
fn message_exactly_at_maxsize_is_accepted() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q1", None, None, None).await.unwrap();
rsmq.send_message("q1", "x".repeat(65536), None)
.await
.unwrap();
});
}
#[test]
fn message_exceeding_maxsize_is_rejected() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q1", None, None, None).await.unwrap();
assert!(matches!(
rsmq.send_message("q1", "x".repeat(65537), None).await,
Err(RsmqError::MessageTooLong)
));
});
}
#[test]
fn unlimited_maxsize_accepts_large_message() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q1", None, None, Some(-1)).await.unwrap();
rsmq.send_message("q1", "x".repeat(100_000), None)
.await
.unwrap();
});
}
#[test]
fn queue_attributes_track_sent_and_received_counts() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q1", None, None, None).await.unwrap();
let attrs = rsmq.get_queue_attributes("q1").await.unwrap();
assert_eq!(attrs.msgs, 0);
assert_eq!(attrs.totalsent, 0);
assert_eq!(attrs.totalrecv, 0);
assert!(attrs.created > 0);
assert!(attrs.modified > 0);
for i in 0..3u32 {
rsmq.send_message("q1", format!("msg{i}"), None)
.await
.unwrap();
}
let attrs = rsmq.get_queue_attributes("q1").await.unwrap();
assert_eq!(attrs.msgs, 3);
assert_eq!(attrs.totalsent, 3);
assert_eq!(attrs.totalrecv, 0);
rsmq.receive_message::<String>("q1", None).await.unwrap();
rsmq.receive_message::<String>("q1", None).await.unwrap();
let attrs = rsmq.get_queue_attributes("q1").await.unwrap();
assert_eq!(attrs.msgs, 3);
assert_eq!(attrs.totalsent, 3);
assert_eq!(attrs.totalrecv, 2);
});
}
#[test]
fn get_queue_attributes_on_nonexistent_queue() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
assert!(matches!(
rsmq.get_queue_attributes("noqueue").await,
Err(RsmqError::QueueNotFound)
));
});
}
#[test]
fn set_queue_attributes_only_updates_specified_fields() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q1", None, None, None).await.unwrap();
rsmq.set_queue_attributes("q1", Some(Duration::from_secs(60)), None, None)
.await
.unwrap();
let a = rsmq.get_queue_attributes("q1").await.unwrap();
assert_eq!(a.vt, Duration::from_secs(60));
assert_eq!(a.delay, Duration::ZERO);
assert_eq!(a.maxsize, 65536);
rsmq.set_queue_attributes("q1", None, Some(Duration::from_secs(10)), None)
.await
.unwrap();
let a = rsmq.get_queue_attributes("q1").await.unwrap();
assert_eq!(a.vt, Duration::from_secs(60));
assert_eq!(a.delay, Duration::from_secs(10));
assert_eq!(a.maxsize, 65536);
rsmq.set_queue_attributes("q1", None, None, Some(2048))
.await
.unwrap();
let a = rsmq.get_queue_attributes("q1").await.unwrap();
assert_eq!(a.vt, Duration::from_secs(60));
assert_eq!(a.delay, Duration::from_secs(10));
assert_eq!(a.maxsize, 2048);
});
}
#[test]
fn receive_count_increments_and_first_received_is_preserved() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q1", None, None, None).await.unwrap();
rsmq.send_message("q1", "hello", None).await.unwrap();
let msg = rsmq
.receive_message::<String>("q1", None)
.await
.unwrap()
.unwrap();
assert_eq!(msg.rc, 1);
assert!(msg.fr > 0);
let first_received = msg.fr;
let id = msg.id.clone();
rsmq.change_message_visibility("q1", &id, Duration::ZERO)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
let msg2 = rsmq
.receive_message::<String>("q1", None)
.await
.unwrap()
.unwrap();
assert_eq!(msg2.id, id);
assert_eq!(msg2.rc, 2);
assert_eq!(msg2.fr, first_received); });
}
#[test]
fn delete_message_returns_true_then_false() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q1", None, None, None).await.unwrap();
rsmq.send_message("q1", "hello", None).await.unwrap();
let msg = rsmq
.receive_message::<String>("q1", None)
.await
.unwrap()
.unwrap();
assert!(rsmq.delete_message("q1", &msg.id).await.unwrap());
assert!(!rsmq.delete_message("q1", &msg.id).await.unwrap());
});
}
#[test]
fn message_sent_field_is_nonzero() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
rsmq.create_queue("q1", None, None, None).await.unwrap();
rsmq.send_message("q1", "hello", None).await.unwrap();
let msg = rsmq
.receive_message::<String>("q1", None)
.await
.unwrap()
.unwrap();
assert!(msg.sent > 0, "sent should be a non-zero timestamp");
});
}
#[test]
fn operations_on_nonexistent_queue_return_queue_not_found() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
assert!(matches!(
rsmq.send_message("ghost", "msg", None).await,
Err(RsmqError::QueueNotFound)
));
assert!(matches!(
rsmq.receive_message::<String>("ghost", None).await,
Err(RsmqError::QueueNotFound)
));
assert!(matches!(
rsmq.pop_message::<String>("ghost").await,
Err(RsmqError::QueueNotFound)
));
assert!(matches!(
rsmq.get_queue_attributes("ghost").await,
Err(RsmqError::QueueNotFound)
));
assert!(matches!(
rsmq.set_queue_attributes("ghost", None, None, None).await,
Err(RsmqError::QueueNotFound)
));
assert!(matches!(
rsmq.change_message_visibility("ghost", "fakeid", Duration::from_secs(5))
.await,
Err(RsmqError::QueueNotFound)
));
});
}
#[test]
fn receive_skips_phantom_message_without_body() {
rt().block_on(async {
let ctx = TestContext::new();
let mut rsmq = new_rsmq(&ctx).await;
let mut raw = ctx.async_connection().await.unwrap();
rsmq.create_queue("q1", None, None, None).await.unwrap();
redis::cmd("ZADD")
.arg(format!("{}:q1", ctx.ns))
.arg(0u64)
.arg("phantom-id")
.query_async::<()>(&mut raw)
.await
.unwrap();
let msg = rsmq.receive_message::<String>("q1", None).await.unwrap();
assert!(msg.is_none(), "phantom message should not be returned");
let attrs = rsmq.get_queue_attributes("q1").await.unwrap();
assert_eq!(
attrs.totalrecv, 0,
"totalrecv must not increment for nil body"
);
assert_eq!(attrs.msgs, 1, "phantom ID is still in the sorted set");
});
}