#![cfg(feature = "macros")]
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{
AppInfo, HandlerResult, Router, RustStream, RustStreamError, TypedPublisher,
};
use ruststream::{OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: u32,
}
fn order_bytes(id: u32) -> Vec<u8> {
serde_json::to_vec(&Order { id }).unwrap()
}
static DROP_DONE: AtomicUsize = AtomicUsize::new(0);
static SKIP_DONE: AtomicUsize = AtomicUsize::new(0);
static RPC_DONE: AtomicUsize = AtomicUsize::new(0);
static BATCH_DONE: AtomicUsize = AtomicUsize::new(0);
static BATCH_REPLY_DONE: AtomicUsize = AtomicUsize::new(0);
#[subscriber("boom")]
async fn boom(order: &Order) -> HandlerResult {
assert_eq!(order.id, u32::MAX, "handler exploded");
HandlerResult::Ack
}
#[subscriber("dropping", on_failure(panic = drop))]
async fn dropping(order: &Order) -> HandlerResult {
assert!(order.id != 0, "poison order must panic");
DROP_DONE.fetch_add(1, Ordering::SeqCst);
HandlerResult::Ack
}
#[subscriber("decodeff", on_failure(decode = fail_fast))]
async fn decode_ff(_order: &Order) -> HandlerResult {
HandlerResult::Ack
}
#[subscriber("skipping", on_failure(decode = skip))]
async fn skipping(_order: &Order) -> HandlerResult {
SKIP_DONE.fetch_add(1, Ordering::SeqCst);
HandlerResult::Ack
}
#[subscriber(batch("batchboom"), on_failure(panic = fail_fast))]
async fn batch_boom(orders: &[Order]) -> HandlerResult {
assert!(orders.is_empty(), "batch handler exploded");
HandlerResult::Ack
}
#[subscriber("rpcd", publish("rpcd.out"))]
async fn rpcd(order: &Order) -> u32 {
RPC_DONE.fetch_add(1, Ordering::SeqCst);
order.id
}
#[subscriber(batch("bd"))]
async fn bd(orders: &[Order]) -> HandlerResult {
BATCH_DONE.fetch_add(orders.len(), Ordering::SeqCst);
HandlerResult::Ack
}
#[subscriber(batch("bpd"), publish("bpd.out"))]
async fn bpd(orders: &[Order]) -> Vec<u32> {
BATCH_REPLY_DONE.fetch_add(orders.len(), Ordering::SeqCst);
orders.iter().map(|o| o.id).collect()
}
async fn run_until_torn_down(
app: RustStream,
publisher: impl Publisher,
topic: &str,
payload: Vec<u8>,
) -> Result<(), RustStreamError> {
let run = tokio::spawn(app.run_until(std::future::pending::<()>()));
let outcome = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let _ = publisher
.publish(OutgoingMessage::new(topic, &payload))
.await;
if run.is_finished() {
break;
}
tokio::task::yield_now().await;
}
run.await
})
.await;
outcome
.expect("service did not tear down within the deadline")
.expect("run task panicked")
}
async fn drive_until_seen(
publisher: &impl Publisher,
topic: &str,
payload: &[u8],
counter: &AtomicUsize,
) {
tokio::time::timeout(Duration::from_secs(5), async {
let start = counter.load(Ordering::SeqCst);
while counter.load(Ordering::SeqCst) == start {
let _ = publisher
.publish(OutgoingMessage::new(topic, payload))
.await;
tokio::task::yield_now().await;
}
})
.await
.expect("subscription never went live");
}
async fn wait_for(mut cond: impl FnMut() -> bool, timeout: Duration) {
let result = tokio::time::timeout(timeout, async {
while !cond() {
tokio::task::yield_now().await;
}
})
.await;
assert!(result.is_ok(), "condition not met within {timeout:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn handler_panic_fails_fast_and_run_returns_err() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("boom", "0.1.0")).with_broker(broker, |b| {
b.include(boom);
});
let result = run_until_torn_down(app, publisher, "boom", order_bytes(1)).await;
assert!(
matches!(result, Err(RustStreamError::Dispatch(_))),
"a fail-fast panic must make run() return a dispatch error, got {result:?}",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn panic_drop_keeps_the_subscriber_consuming() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("dropping", "0.1.0")).with_broker(broker, |b| {
b.include(dropping);
});
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
drive_until_seen(&publisher, "dropping", &order_bytes(7), &DROP_DONE).await;
let before = DROP_DONE.load(Ordering::SeqCst);
publisher
.publish(OutgoingMessage::new("dropping", &order_bytes(0)))
.await
.unwrap();
publisher
.publish(OutgoingMessage::new("dropping", &order_bytes(9)))
.await
.unwrap();
wait_for(
|| DROP_DONE.load(Ordering::SeqCst) > before,
Duration::from_secs(5),
)
.await;
shutdown.notify_one();
let result = tokio::time::timeout(Duration::from_secs(5), run)
.await
.expect("run did not stop")
.expect("run task panicked");
assert!(
result.is_ok(),
"a dropped panic must not error the run: {result:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn decode_fail_fast_returns_err() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("decodeff", "0.1.0")).with_broker(broker, |b| {
b.include(decode_ff);
});
let result = run_until_torn_down(app, publisher, "decodeff", b"not json".to_vec()).await;
assert!(
matches!(result, Err(RustStreamError::Dispatch(_))),
"a fail-fast decode failure must make run() return a dispatch error, got {result:?}",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn decode_skip_acks_past_bad_input_and_continues() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("skipping", "0.1.0")).with_broker(broker, |b| {
b.include(skipping);
});
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
drive_until_seen(&publisher, "skipping", &order_bytes(1), &SKIP_DONE).await;
let before = SKIP_DONE.load(Ordering::SeqCst);
publisher
.publish(OutgoingMessage::new("skipping", b"not json"))
.await
.unwrap();
publisher
.publish(OutgoingMessage::new("skipping", &order_bytes(2)))
.await
.unwrap();
wait_for(
|| SKIP_DONE.load(Ordering::SeqCst) > before,
Duration::from_secs(5),
)
.await;
shutdown.notify_one();
let result = tokio::time::timeout(Duration::from_secs(5), run)
.await
.expect("run did not stop")
.expect("run task panicked");
assert!(
result.is_ok(),
"a skipped decode failure must not error the run: {result:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publishing_decode_failure_is_dropped_and_continues() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let router = Router::<MemoryBroker>::new()
.include_publishing(rpcd, TypedPublisher::new(broker.publisher()));
let app = RustStream::new(AppInfo::new("rpcd", "0.1.0"))
.with_broker(broker, |b| b.include_router(router));
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
drive_until_seen(&publisher, "rpcd", &order_bytes(1), &RPC_DONE).await;
let before = RPC_DONE.load(Ordering::SeqCst);
publisher
.publish(OutgoingMessage::new("rpcd", b"not json"))
.await
.unwrap();
publisher
.publish(OutgoingMessage::new("rpcd", &order_bytes(2)))
.await
.unwrap();
wait_for(
|| RPC_DONE.load(Ordering::SeqCst) > before,
Duration::from_secs(5),
)
.await;
shutdown.notify_one();
let result = tokio::time::timeout(Duration::from_secs(5), run)
.await
.expect("run did not stop")
.expect("run task panicked");
assert!(
result.is_ok(),
"a dropped decode failure must not error the run: {result:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_decode_failure_drops_the_bad_element() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("bd", "0.1.0")).with_broker(broker, |b| {
b.include_batch(bd);
});
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
drive_until_seen(&publisher, "bd", &order_bytes(1), &BATCH_DONE).await;
let before = BATCH_DONE.load(Ordering::SeqCst);
publisher
.publish(OutgoingMessage::new("bd", b"not json"))
.await
.unwrap();
publisher
.publish(OutgoingMessage::new("bd", &order_bytes(2)))
.await
.unwrap();
wait_for(
|| BATCH_DONE.load(Ordering::SeqCst) > before,
Duration::from_secs(5),
)
.await;
shutdown.notify_one();
let result = tokio::time::timeout(Duration::from_secs(5), run)
.await
.expect("run did not stop")
.expect("run task panicked");
assert!(
result.is_ok(),
"a dropped batch element must not error the run: {result:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_publishing_decode_failure_is_dropped() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let router = Router::<MemoryBroker>::new()
.include_batch_publishing(bpd, TypedPublisher::new(broker.publisher()));
let app = RustStream::new(AppInfo::new("bpd", "0.1.0"))
.with_broker(broker, |b| b.include_router(router));
let shutdown = Arc::new(Notify::new());
let signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { signal.notified().await }));
drive_until_seen(&publisher, "bpd", &order_bytes(1), &BATCH_REPLY_DONE).await;
let before = BATCH_REPLY_DONE.load(Ordering::SeqCst);
publisher
.publish(OutgoingMessage::new("bpd", b"not json"))
.await
.unwrap();
publisher
.publish(OutgoingMessage::new("bpd", &order_bytes(2)))
.await
.unwrap();
wait_for(
|| BATCH_REPLY_DONE.load(Ordering::SeqCst) > before,
Duration::from_secs(5),
)
.await;
shutdown.notify_one();
let result = tokio::time::timeout(Duration::from_secs(5), run)
.await
.expect("run did not stop")
.expect("run task panicked");
assert!(
result.is_ok(),
"a dropped batch reply element must not error the run: {result:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_handler_panic_fails_fast() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("batchboom", "0.1.0")).with_broker(broker, |b| {
b.include_batch(batch_boom);
});
let result = run_until_torn_down(app, publisher, "batchboom", order_bytes(1)).await;
assert!(
matches!(result, Err(RustStreamError::Dispatch(_))),
"a fail-fast batch panic must make run() return a dispatch error, got {result:?}",
);
}