#![cfg(feature = "testing")]
use std::time::Duration;
use futures::{Stream, StreamExt};
use ruststream::{
BatchSubscriber, Broker, DescribeServer, Headers, IncomingMessage, OutgoingMessage,
Partitioned, Publisher, Subscriber, TransactionalPublisher, testing::TestClient,
};
use ruststream_fred::{
PARTITION_KEY_HEADER, RedisError,
testing::{RedisTestBroker, RedisTestClient, RedisTestMessage},
};
const WAIT: Duration = Duration::from_secs(1);
async fn next_payload<S>(stream: &mut S) -> Vec<u8>
where
S: Stream<Item = Result<RedisTestMessage, RedisError>> + Unpin,
{
let msg = tokio::time::timeout(WAIT, stream.next())
.await
.expect("delivery within timeout")
.expect("stream has next")
.expect("delivery ok");
let payload = msg.payload().to_vec();
msg.ack().await.expect("ack");
payload
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pub_sub_round_trip_through_broker_traits() {
let broker = RedisTestBroker::new();
broker.connect().await.expect("connect");
let mut subscriber = broker.subscribe("orders").await.expect("subscribe");
let publisher = broker.publisher();
publisher
.publish(OutgoingMessage::new("orders", b"o1"))
.await
.expect("publish");
let mut stream = Box::pin(subscriber.stream());
let got = next_payload(&mut stream).await;
assert_eq!(got, b"o1");
drop(stream);
broker.shutdown().await.expect("shutdown");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publisher_rejects_empty_key() {
let broker = RedisTestBroker::new();
let publisher = broker.publisher();
let err = publisher
.publish(OutgoingMessage::new("", b"x"))
.await
.expect_err("empty key must be rejected");
assert!(format!("{err}").contains("publish"), "got {err}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn distinct_keys_are_isolated() {
let broker = RedisTestBroker::new();
let mut orders = broker.subscribe("orders").await.expect("subscribe orders");
let mut events = broker.subscribe("events").await.expect("subscribe events");
let publisher = broker.publisher();
publisher
.publish(OutgoingMessage::new("orders", b"o"))
.await
.expect("publish o");
publisher
.publish(OutgoingMessage::new("events", b"e"))
.await
.expect("publish e");
let mut orders_stream = Box::pin(orders.stream());
assert_eq!(next_payload(&mut orders_stream).await, b"o");
let mut events_stream = Box::pin(events.stream());
assert_eq!(next_payload(&mut events_stream).await, b"e");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn nack_requeue_redelivers_to_same_subscriber() {
let broker = RedisTestBroker::new();
let mut subscriber = broker.subscribe("orders").await.expect("subscribe");
let publisher = broker.publisher();
publisher
.publish(OutgoingMessage::new("orders", b"once"))
.await
.expect("publish");
let mut stream = Box::pin(subscriber.stream());
let first = tokio::time::timeout(WAIT, stream.next())
.await
.expect("first delivery")
.expect("stream has next")
.expect("ok");
first.nack(true).await.expect("nack requeue");
let second = tokio::time::timeout(WAIT, stream.next())
.await
.expect("redelivery")
.expect("stream has next")
.expect("ok");
assert_eq!(second.payload(), b"once");
second.ack().await.expect("ack");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn headers_are_propagated_to_subscribers() {
let broker = RedisTestBroker::new();
let mut subscriber = broker.subscribe("orders").await.expect("subscribe");
let publisher = broker.publisher();
let mut headers = Headers::new();
headers.insert("content-type", "application/json");
headers.insert("correlation-id", "abc-1");
let outgoing = OutgoingMessage::new("orders", b"{}").with_headers(headers);
publisher.publish(outgoing).await.expect("publish");
let mut stream = Box::pin(subscriber.stream());
let msg = tokio::time::timeout(WAIT, stream.next())
.await
.expect("delivery")
.expect("stream has next")
.expect("ok");
assert_eq!(msg.headers().content_type(), Some("application/json"));
assert_eq!(msg.headers().correlation_id(), Some("abc-1"));
msg.ack().await.expect("ack");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_client_drives_expect_published() {
let client = RedisTestClient::start().await.expect("start");
TestClient::publish(&client, "events", b"first")
.await
.expect("publish first");
TestClient::publish(&client, "events", b"second")
.await
.expect("publish second");
let observed = client
.expect_published("events", 2, Duration::from_secs(1))
.await
.expect("expect_published");
assert_eq!(observed.len(), 2);
assert_eq!(observed[0].payload(), b"first");
assert_eq!(observed[1].payload(), b"second");
client.shutdown().await.expect("shutdown");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stream_can_be_reentered() {
let broker = RedisTestBroker::new();
let mut subscriber = broker.subscribe("orders").await.expect("subscribe");
let publisher = broker.publisher();
publisher
.publish(OutgoingMessage::new("orders", b"one"))
.await
.expect("publish one");
{
let mut stream = Box::pin(subscriber.stream());
assert_eq!(next_payload(&mut stream).await, b"one");
}
publisher
.publish(OutgoingMessage::new("orders", b"two"))
.await
.expect("publish two");
let mut stream = Box::pin(subscriber.stream());
assert_eq!(next_payload(&mut stream).await, b"two");
}
#[tokio::test]
async fn describe_server_returns_redis_protocol() {
let broker = RedisTestBroker::new();
let spec = broker.describe_server();
assert_eq!(spec.protocol, "redis");
}
#[tokio::test]
async fn partition_key_header_is_surfaced() {
let client = RedisTestClient::start().await.expect("start");
let mut sub = client.subscribe("events").await.expect("subscribe");
let mut headers = Headers::new();
headers.insert(PARTITION_KEY_HEADER, "tenant-a");
client
.publisher()
.await
.expect("publisher")
.publish(OutgoingMessage::new("events", b"payload").with_headers(headers))
.await
.expect("publish");
let mut stream = Box::pin(sub.stream());
let msg = tokio::time::timeout(WAIT, stream.next())
.await
.expect("delivery")
.expect("item")
.expect("ok");
assert_eq!(
Partitioned::partition_key(&msg),
Some(b"tenant-a".as_slice())
);
msg.ack().await.ok();
client.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn partition_key_absent_yields_none() {
let client = RedisTestClient::start().await.expect("start");
let mut sub = client.subscribe("events.bare").await.expect("subscribe");
client
.publisher()
.await
.expect("publisher")
.publish(OutgoingMessage::new("events.bare", b"payload"))
.await
.expect("publish");
let mut stream = Box::pin(sub.stream());
let msg = tokio::time::timeout(WAIT, stream.next())
.await
.expect("delivery")
.expect("item")
.expect("ok");
assert_eq!(Partitioned::partition_key(&msg), None);
msg.ack().await.ok();
client.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn batch_drains_in_publish_order() {
let client = RedisTestClient::start().await.expect("start");
let publisher = client.publisher().await.expect("publisher");
let mut sub = client.subscribe("batch.order").await.expect("subscribe");
let count = 5u8;
for i in 0..count {
publisher
.publish(OutgoingMessage::new("batch.order", &[i]))
.await
.expect("publish");
}
let mut batches = Box::pin(sub.batches());
let batch = tokio::time::timeout(WAIT, batches.next())
.await
.expect("batch within timeout")
.expect("stream has next")
.expect("ok batch");
assert!(!batch.is_empty(), "batch must contain at least one message");
assert!(batch.len() <= usize::from(count));
for (i, msg) in batch.into_iter().enumerate() {
assert_eq!(msg.payload(), &[u8::try_from(i).expect("count fits u8")]);
msg.ack().await.ok();
}
client.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn batches_can_be_reentered() {
let client = RedisTestClient::start().await.expect("start");
let publisher = client.publisher().await.expect("publisher");
let mut sub = client.subscribe("batch.reenter").await.expect("subscribe");
publisher
.publish(OutgoingMessage::new("batch.reenter", b"one"))
.await
.expect("publish");
{
let mut batches = Box::pin(sub.batches());
let batch = tokio::time::timeout(WAIT, batches.next())
.await
.expect("batch within timeout")
.expect("stream has next")
.expect("ok batch");
assert_eq!(
batch.first().map(|m| m.payload().to_vec()),
Some(b"one".to_vec())
);
for msg in batch {
msg.ack().await.ok();
}
}
publisher
.publish(OutgoingMessage::new("batch.reenter", b"two"))
.await
.expect("publish");
let mut batches = Box::pin(sub.batches());
let batch = tokio::time::timeout(WAIT, batches.next())
.await
.expect("batch within timeout")
.expect("stream has next")
.expect("ok batch");
assert_eq!(
batch.first().map(|m| m.payload().to_vec()),
Some(b"two".to_vec())
);
for msg in batch {
msg.ack().await.ok();
}
client.shutdown().await.expect("shutdown");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn transaction_buffers_until_commit() {
let broker = RedisTestBroker::new();
let mut sub = broker.subscribe("tx").await.expect("subscribe");
let publisher = broker.publisher();
publisher.begin_transaction().await.expect("begin");
publisher
.publish(OutgoingMessage::new("tx", b"first"))
.await
.expect("publish first");
publisher
.publish(OutgoingMessage::new("tx", b"second"))
.await
.expect("publish second");
let observed = broker
.expect_published("tx", 1, Duration::from_millis(50))
.await;
assert!(observed.is_empty(), "buffered messages must not be visible");
publisher.commit().await.expect("commit");
let mut stream = Box::pin(sub.stream());
assert_eq!(next_payload(&mut stream).await, b"first");
assert_eq!(next_payload(&mut stream).await, b"second");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn transaction_abort_discards_buffer() {
let broker = RedisTestBroker::new();
let publisher = broker.publisher();
publisher.begin_transaction().await.expect("begin");
publisher
.publish(OutgoingMessage::new("tx", b"discarded"))
.await
.expect("publish");
publisher.abort().await.expect("abort");
let observed = broker
.expect_published("tx", 1, Duration::from_millis(50))
.await;
assert!(observed.is_empty(), "aborted messages must be discarded");
}