use bytes::Bytes;
use chrono::Duration;
use queue_runtime::{
client::QueueProvider,
message::{Message, QueueName, SessionId},
providers::RabbitMqProvider,
RabbitMqConfig,
};
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::rabbitmq::RabbitMq;
use tokio::sync::OnceCell;
static RABBITMQ_PORT: OnceCell<u16> = OnceCell::const_new();
async fn rabbitmq_port() -> u16 {
*RABBITMQ_PORT
.get_or_init(|| async {
let container = RabbitMq::default()
.start()
.await
.expect("start RabbitMQ container");
let port = container.get_host_port_ipv4(5672).await.unwrap();
Box::leak(Box::new(container));
port
})
.await
}
fn queue(name: &str) -> QueueName {
QueueName::new(name.to_string()).expect("valid queue name")
}
fn msg(body: &str) -> Message {
Message::new(Bytes::from(body.to_string()))
}
fn msg_with_attrs(body: &str, key: &str, value: &str) -> Message {
let mut m = msg(body);
m.attributes.insert(key.to_string(), value.to_string());
m
}
fn msg_with_session(body: &str, session: &str) -> Message {
let sid = SessionId::new(session.to_string()).expect("valid session id");
Message::new(Bytes::from(body.to_string())).with_session_id(sid)
}
async fn rabbitmq_provider(port: u16) -> RabbitMqProvider {
let config = RabbitMqConfig {
url: format!("amqp://guest:guest@localhost:{}", port),
virtual_host: "/".to_string(),
prefetch_count: 10,
session_lock_duration: Duration::minutes(5),
message_ttl: None,
enable_dead_letter: false,
dead_letter_exchange: None,
};
RabbitMqProvider::new(config)
.await
.expect("connect to RabbitMQ test container")
}
#[tokio::test]
async fn rabbitmq_send_and_receive_single_message() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-send-receive");
let body = "rabbitmq integration payload";
p.send_message(&q, &msg(body))
.await
.expect("send must succeed");
let received = p
.receive_message(&q, Duration::seconds(10))
.await
.expect("receive must not error")
.expect("must receive sent message");
assert_eq!(received.body, Bytes::from(body));
}
#[tokio::test]
async fn rabbitmq_attributes_survive_round_trip() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-attr-rt");
p.send_message(&q, &msg_with_attrs("body", "event-type", "pull-request"))
.await
.expect("send");
let received = p
.receive_message(&q, Duration::seconds(10))
.await
.unwrap()
.unwrap();
assert_eq!(
received.attributes.get("event-type"),
Some(&"pull-request".to_string())
);
}
#[tokio::test]
async fn rabbitmq_correlation_id_survives_round_trip() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-corr-id");
let mut message = msg("body");
message.correlation_id = Some("trace-abc-456".to_string());
p.send_message(&q, &message).await.expect("send");
let received = p
.receive_message(&q, Duration::seconds(10))
.await
.unwrap()
.unwrap();
assert_eq!(
received.correlation_id,
Some("trace-abc-456".to_string()),
"correlation ID must survive round trip"
);
}
#[tokio::test]
async fn rabbitmq_receive_from_empty_queue_returns_none() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-empty");
let result = p
.receive_message(&q, Duration::seconds(2))
.await
.expect("must not error on empty queue");
assert!(result.is_none(), "empty queue must return None");
}
#[tokio::test]
async fn rabbitmq_complete_removes_message() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-complete");
p.send_message(&q, &msg("ack-me")).await.unwrap();
let received = p
.receive_message(&q, Duration::seconds(10))
.await
.unwrap()
.expect("must receive message");
p.complete_message(&received.receipt_handle)
.await
.expect("complete must succeed");
let recheck = p.receive_message(&q, Duration::seconds(2)).await.unwrap();
assert!(recheck.is_none(), "acked message must not reappear");
}
#[tokio::test]
async fn rabbitmq_abandon_requeues_message() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-abandon");
p.send_message(&q, &msg("nack-me")).await.unwrap();
let first = p
.receive_message(&q, Duration::seconds(10))
.await
.unwrap()
.expect("first delivery");
p.abandon_message(&first.receipt_handle)
.await
.expect("abandon must succeed");
let second = p
.receive_message(&q, Duration::seconds(10))
.await
.unwrap()
.expect("message must be redelivered after abandon");
assert_eq!(second.body, first.body, "redelivered body must match");
assert_eq!(
second.delivery_count, 2,
"delivery_count must increment to 2 on redeliver"
);
}
#[tokio::test]
async fn rabbitmq_session_delivers_in_fifo_order() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-session-fifo");
let sid = "order-session";
p.send_message(&q, &msg_with_session("A", sid))
.await
.unwrap();
p.send_message(&q, &msg_with_session("B", sid))
.await
.unwrap();
p.send_message(&q, &msg_with_session("C", sid))
.await
.unwrap();
let session = p
.create_session_client(&q, Some(SessionId::new(sid.to_string()).unwrap()))
.await
.expect("create session client");
let r1 = session
.receive_message(Duration::seconds(10))
.await
.unwrap()
.expect("A");
let r2 = session
.receive_message(Duration::seconds(10))
.await
.unwrap()
.expect("B");
let r3 = session
.receive_message(Duration::seconds(10))
.await
.unwrap()
.expect("C");
assert_eq!(r1.body, Bytes::from("A"));
assert_eq!(r2.body, Bytes::from("B"));
assert_eq!(r3.body, Bytes::from("C"));
}
#[tokio::test]
async fn rabbitmq_batch_send_returns_one_id_per_message() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-batch");
let messages = vec![msg("x"), msg("y"), msg("z")];
let ids = p
.send_messages(&q, &messages)
.await
.expect("batch send must succeed");
assert_eq!(ids.len(), 3);
let mut strs: Vec<_> = ids.iter().map(|id| id.as_str()).collect();
strs.sort();
strs.dedup();
assert_eq!(strs.len(), 3, "all batch IDs must be distinct");
}
#[tokio::test]
async fn rabbitmq_receive_messages_respects_max_count() {
let p = rabbitmq_provider(rabbitmq_port().await).await;
let q = queue("rmq-batch-receive");
for i in 0..10 {
p.send_message(&q, &msg(&format!("msg-{}", i)))
.await
.unwrap();
}
let received = p
.receive_messages(&q, 4, Duration::seconds(5))
.await
.expect("batch receive must succeed");
assert_eq!(
received.len(),
4,
"must return exactly the requested number of messages when sufficient messages are queued"
);
}