use std::time::Duration;
use futures::StreamExt;
use super::harness::{expect_next, expect_no_more};
use crate::{
AckError, BatchSubscriber, Broker, Headers, IncomingMessage, OutgoingMessage, Publisher,
RequestReply, Subscriber, SubscriptionSource, TransactionalPublisher,
};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
const MISS_TIMEOUT: Duration = Duration::from_millis(100);
pub async fn request_reply<B, MkBroker, Src, MkSrc, Req, MkReq, Pub, MkPub>(
make_broker: MkBroker,
make_source: MkSrc,
make_requester: MkReq,
make_publisher: MkPub,
) where
B: Broker,
MkBroker: Fn() -> B,
Src: SubscriptionSource<B> + Send,
Src::Subscriber: Send,
MkSrc: Fn(&str) -> Src,
Req: RequestReply,
MkReq: Fn(&B) -> Req,
Pub: Publisher,
MkPub: Fn(&B) -> Pub,
{
const SUBJECT: &str = "conformance.request_reply";
let broker = make_broker();
Broker::connect(&broker).await.expect("broker must connect");
let mut responder = make_source(SUBJECT)
.subscribe(&broker)
.await
.expect("responder subscription must open after connect");
let publisher = make_publisher(&broker);
let requester = make_requester(&broker);
let respond = async {
let mut stream = std::pin::pin!(responder.stream());
let msg = expect_next(&mut stream, "request_reply responder").await;
assert_eq!(
msg.payload(),
b"ping",
"responder must receive the request payload"
);
let reply_to = msg
.headers()
.reply_to()
.expect("a request must carry a usable reply-to header")
.to_owned();
let mut headers = Headers::new();
if let Some(correlation_id) = msg.headers().correlation_id() {
headers.insert("correlation-id", correlation_id.to_owned());
}
publisher
.publish(OutgoingMessage::new(&reply_to, b"pong".as_slice()).with_headers(headers))
.await
.expect("reply publish failed");
match msg.ack().await {
Ok(()) | Err(AckError::Unsupported) => {}
Err(other) => panic!("ack must succeed or be unsupported, got: {other:?}"),
}
};
let request = requester.request(
OutgoingMessage::new(SUBJECT, b"ping".as_slice()),
DEFAULT_TIMEOUT,
);
let (reply, ()) = futures::join!(request, respond);
let reply = reply.expect("request must resolve once the responder replies");
assert_eq!(
reply.payload(),
b"pong",
"the correlated reply must carry the responder payload"
);
let unanswered = requester
.request(
OutgoingMessage::new("conformance.request_reply.void", b"ping".as_slice()),
MISS_TIMEOUT,
)
.await;
assert!(
unanswered.is_err(),
"a request nobody answers must fail once its timeout elapses",
);
Broker::shutdown(&broker)
.await
.expect("broker must shut down cleanly");
}
pub async fn batches<B, MkBroker, Src, MkSrc, Pub, MkPub>(
make_broker: MkBroker,
make_source: MkSrc,
make_publisher: MkPub,
) where
B: Broker,
MkBroker: Fn() -> B,
Src: SubscriptionSource<B> + Send,
Src::Subscriber: BatchSubscriber + Send,
MkSrc: Fn(&str) -> Src,
Pub: Publisher,
MkPub: Fn(&B) -> Pub,
{
const SUBJECT: &str = "conformance.batches";
const COUNT: u32 = 10;
let broker = make_broker();
Broker::connect(&broker).await.expect("broker must connect");
let mut subscriber = make_source(SUBJECT)
.subscribe(&broker)
.await
.expect("subscription must open after connect");
let publisher = make_publisher(&broker);
for i in 0..COUNT {
publisher
.publish(OutgoingMessage::new(SUBJECT, i.to_be_bytes().as_slice()))
.await
.expect("publish failed");
}
let mut received = Vec::new();
let mut stream = std::pin::pin!(subscriber.batches());
while received.len() < COUNT as usize {
let batch = tokio::time::timeout(DEFAULT_TIMEOUT, stream.next())
.await
.expect("batches: stream timed out")
.expect("batches: stream ended unexpectedly")
.unwrap_or_else(|err| panic!("batches: stream yielded error: {err:?}"));
let batch: Vec<_> = batch.into_iter().collect();
assert!(!batch.is_empty(), "a yielded batch must not be empty");
for msg in batch {
received.push(msg.payload().to_vec());
match msg.ack().await {
Ok(()) | Err(AckError::Unsupported) => {}
Err(other) => panic!("ack must succeed or be unsupported, got: {other:?}"),
}
}
}
let expected: Vec<Vec<u8>> = (0..COUNT).map(|i| i.to_be_bytes().to_vec()).collect();
assert_eq!(
received, expected,
"batched deliveries must preserve publish order across batches",
);
Broker::shutdown(&broker)
.await
.expect("broker must shut down cleanly");
}
pub async fn transactions<B, MkBroker, Src, MkSrc, Pub, MkPub>(
make_broker: MkBroker,
make_source: MkSrc,
make_publisher: MkPub,
) where
B: Broker,
MkBroker: Fn() -> B,
Src: SubscriptionSource<B> + Send,
Src::Subscriber: Send,
MkSrc: Fn(&str) -> Src,
Pub: TransactionalPublisher,
MkPub: Fn(&B) -> Pub,
{
const SUBJECT: &str = "conformance.transactions";
let broker = make_broker();
Broker::connect(&broker).await.expect("broker must connect");
let mut subscriber = make_source(SUBJECT)
.subscribe(&broker)
.await
.expect("subscription must open after connect");
let publisher = make_publisher(&broker);
let mut stream = std::pin::pin!(subscriber.stream());
publisher
.begin_transaction()
.await
.expect("begin_transaction failed");
publisher
.publish(OutgoingMessage::new(SUBJECT, b"first".as_slice()))
.await
.expect("publish inside transaction failed");
publisher
.publish(OutgoingMessage::new(SUBJECT, b"second".as_slice()))
.await
.expect("publish inside transaction failed");
expect_no_more(&mut stream, "transactions: before commit").await;
publisher.commit().await.expect("commit failed");
let first = expect_next(&mut stream, "transactions: first after commit").await;
assert_eq!(
first.payload(),
b"first",
"commit must make buffered messages visible in publish order",
);
match first.ack().await {
Ok(()) | Err(AckError::Unsupported) => {}
Err(other) => panic!("ack must succeed or be unsupported, got: {other:?}"),
}
let second = expect_next(&mut stream, "transactions: second after commit").await;
assert_eq!(second.payload(), b"second");
match second.ack().await {
Ok(()) | Err(AckError::Unsupported) => {}
Err(other) => panic!("ack must succeed or be unsupported, got: {other:?}"),
}
publisher
.begin_transaction()
.await
.expect("begin_transaction failed");
publisher
.publish(OutgoingMessage::new(SUBJECT, b"discarded".as_slice()))
.await
.expect("publish inside transaction failed");
publisher.abort().await.expect("abort failed");
expect_no_more(&mut stream, "transactions: after abort").await;
Broker::shutdown(&broker)
.await
.expect("broker must shut down cleanly");
}