#![cfg(feature = "macros")]
mod common;
use std::{
sync::{
Arc, LazyLock,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use common::handler_signal;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream, TypedPublisher};
use ruststream::testing::TestClient;
use ruststream::{Buffered, Name, OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: u32,
}
#[derive(Debug, Serialize, Deserialize)]
struct Receipt {
id: u32,
}
fn order_bytes(id: u32) -> Vec<u8> {
serde_json::to_vec(&Order { id }).unwrap()
}
static TX_HANDLED: AtomicUsize = AtomicUsize::new(0);
static TX_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber(batch("tx-in"), publish("tx-out"), workers(2))]
async fn tx_confirm(orders: &[Order]) -> Vec<Receipt> {
TX_HANDLED.fetch_add(orders.len(), Ordering::SeqCst);
TX_NOTIFY.notify_one();
orders.iter().map(|o| Receipt { id: o.id }).collect()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn transactional_replies_compose_with_a_batch_pool() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let observer = broker.clone();
let replies = TypedPublisher::new(broker.publisher()).transactional();
let app = RustStream::new(AppInfo::new("tx", "0.1.0"))
.with_broker(broker, |b| b.include_batch_publishing(tx_confirm, replies));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let result = tokio::time::timeout(Duration::from_secs(5), async {
let mut id = 1u32;
loop {
let _ = publisher
.publish(OutgoingMessage::new("tx-in", &order_bytes(id)))
.await;
id += 1;
handler_signal(&TX_NOTIFY).await;
if TX_HANDLED.load(Ordering::SeqCst) >= 4 {
break;
}
}
})
.await;
assert!(result.is_ok(), "batches did not flow through the pool");
let handled = TX_HANDLED.load(Ordering::SeqCst);
let receipts = observer
.expect_published("tx-out", handled, Duration::from_secs(5))
.await
.unwrap();
assert!(
receipts.len() >= handled,
"{} orders handled but only {} receipts committed",
handled,
receipts.len(),
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}
static BUF_SEEN: AtomicUsize = AtomicUsize::new(0);
static BUF_BATCHES: AtomicUsize = AtomicUsize::new(0);
static BUF_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber(batch(Buffered::<Name>::new(Name::new("buf-in"))
.max_size(2)
.max_wait(Duration::from_millis(10))), workers(2))]
async fn buffered_drain(orders: &[Order]) -> HandlerResult {
BUF_SEEN.fetch_add(orders.len(), Ordering::SeqCst);
BUF_BATCHES.fetch_add(1, Ordering::SeqCst);
BUF_NOTIFY.notify_one();
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn buffered_sources_compose_with_a_batch_pool() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("buf", "0.1.0"))
.with_broker(broker, |b| b.include_batch(buffered_drain));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let result = tokio::time::timeout(Duration::from_secs(5), async {
let mut id = 1u32;
loop {
let _ = publisher
.publish(OutgoingMessage::new("buf-in", &order_bytes(id)))
.await;
id += 1;
handler_signal(&BUF_NOTIFY).await;
if BUF_SEEN.load(Ordering::SeqCst) >= 6 {
break;
}
}
})
.await;
assert!(
result.is_ok(),
"buffered batches did not drain through the pool"
);
assert!(
BUF_BATCHES.load(Ordering::SeqCst) >= 2,
"everything arrived as a single batch; size/deadline closing did not engage",
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}
static PUB_REPLIED: AtomicUsize = AtomicUsize::new(0);
static PUB_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("pub-in", publish("pub-out"), workers(3))]
async fn pooled_relay(o: &Order) -> Receipt {
Receipt { id: o.id }
}
#[subscriber("pub-out")]
async fn pooled_check(_r: &Receipt) -> HandlerResult {
PUB_REPLIED.fetch_add(1, Ordering::SeqCst);
PUB_NOTIFY.notify_one();
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publishing_replies_compose_with_a_worker_pool() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("pub", "0.1.0")).with_broker(broker, |b| {
b.include_publishing(pooled_relay, TypedPublisher::new(b.broker().publisher()));
b.include(pooled_check);
});
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let result = tokio::time::timeout(Duration::from_secs(5), async {
let mut id = 1u32;
loop {
let _ = publisher
.publish(OutgoingMessage::new("pub-in", &order_bytes(id)))
.await;
id += 1;
handler_signal(&PUB_NOTIFY).await;
if PUB_REPLIED.load(Ordering::SeqCst) >= 4 {
break;
}
}
})
.await;
assert!(
result.is_ok(),
"pooled publishing handler did not reply to every delivery",
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}