#![cfg(feature = "macros")]
mod common;
use std::{
sync::{
Arc, LazyLock, Mutex,
atomic::{AtomicU32, AtomicUsize, Ordering},
},
time::Duration,
};
use common::handler_signal;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream};
use ruststream::{OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
use tokio::sync::{Notify, watch};
use tokio::time::Instant;
#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: u32,
}
fn order_bytes(id: u32) -> Vec<u8> {
serde_json::to_vec(&Order { id }).unwrap()
}
const RETRY_DELAY: Duration = Duration::from_secs(5);
static DELAY_ATTEMPTS: Mutex<Vec<Instant>> = Mutex::new(Vec::new());
static DELAY_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("delayed")]
async fn deferred(_o: &Order) -> HandlerResult {
let mut attempts = DELAY_ATTEMPTS.lock().unwrap();
attempts.push(Instant::now());
let first = attempts.len() == 1;
drop(attempts);
DELAY_NOTIFY.notify_one();
if first {
HandlerResult::retry_after(RETRY_DELAY)
} else {
HandlerResult::Ack
}
}
#[tokio::test(start_paused = true)]
async fn retry_after_delay_is_honored_by_the_dispatcher() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("delayed", "0.1.0"))
.with_broker(broker, |b| b.include(deferred));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let payload = order_bytes(1);
let result = tokio::time::timeout(Duration::from_secs(60), async {
loop {
if DELAY_ATTEMPTS.lock().unwrap().is_empty() {
let _ = publisher
.publish(OutgoingMessage::new("delayed", &payload))
.await;
}
handler_signal(&DELAY_NOTIFY).await;
if DELAY_ATTEMPTS.lock().unwrap().len() >= 2 {
break;
}
}
})
.await;
assert!(result.is_ok(), "the deferred message was never redelivered");
let between = {
let attempts = DELAY_ATTEMPTS.lock().unwrap();
attempts[1].duration_since(attempts[0])
};
assert!(
between >= RETRY_DELAY,
"redelivery arrived after {between:?}, before the requested {RETRY_DELAY:?}",
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}
static POOL_ACKED: AtomicU32 = AtomicU32::new(0);
static POOL_RETRIED: AtomicU32 = AtomicU32::new(0);
static POOL_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("pool-retry", workers(3))]
async fn pool_retry(order: &Order) -> HandlerResult {
static FIRST_SEEN: Mutex<Vec<u32>> = Mutex::new(Vec::new());
let first = {
let mut seen = FIRST_SEEN.lock().unwrap();
if seen.contains(&order.id) {
false
} else {
seen.push(order.id);
true
}
};
if first {
POOL_RETRIED.fetch_add(1, Ordering::SeqCst);
POOL_NOTIFY.notify_one();
return HandlerResult::retry();
}
POOL_ACKED.fetch_add(1, Ordering::SeqCst);
POOL_NOTIFY.notify_one();
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retry_completes_inside_a_worker_pool() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("pool-retry", "0.1.0"))
.with_broker(broker, |b| b.include(pool_retry));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let warmup = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let _ = publisher
.publish(OutgoingMessage::new("pool-retry", &order_bytes(0)))
.await;
handler_signal(&POOL_NOTIFY).await;
if POOL_RETRIED.load(Ordering::SeqCst) >= 1 {
break;
}
}
})
.await;
assert!(warmup.is_ok(), "subscription did not come up");
for id in 1..=4u32 {
publisher
.publish(OutgoingMessage::new("pool-retry", &order_bytes(id)))
.await
.unwrap();
}
let result = tokio::time::timeout(Duration::from_secs(5), async {
while POOL_ACKED.load(Ordering::SeqCst) < 5 {
POOL_NOTIFY.notified().await;
}
})
.await;
assert!(
result.is_ok(),
"retried deliveries did not all complete in the pool: acked {}, retried {}",
POOL_ACKED.load(Ordering::SeqCst),
POOL_RETRIED.load(Ordering::SeqCst),
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}
static LANE_ACKED: AtomicU32 = AtomicU32::new(0);
static LANE_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
#[subscriber("lane-retry", workers(2, by_key))]
async fn lane_retry(order: &Order) -> HandlerResult {
static FIRST_SEEN: Mutex<Vec<u32>> = Mutex::new(Vec::new());
let first = {
let mut seen = FIRST_SEEN.lock().unwrap();
if seen.contains(&order.id) {
false
} else {
seen.push(order.id);
true
}
};
LANE_NOTIFY.notify_one();
if first {
return HandlerResult::retry();
}
LANE_ACKED.fetch_add(1, Ordering::SeqCst);
LANE_NOTIFY.notify_one();
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn retry_completes_inside_keyed_lanes() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("lane-retry", "0.1.0"))
.with_broker(broker, |b| b.include(lane_retry));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let keyed_publish = |key: &'static str, id: u32| {
let publisher = publisher.clone();
async move {
let mut headers = ruststream::Headers::new();
headers.insert("partition-key", key);
publisher
.publish(OutgoingMessage::new("lane-retry", &order_bytes(id)).with_headers(headers))
.await
}
};
let warmup = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let _ = keyed_publish("warmup", 0).await;
handler_signal(&LANE_NOTIFY).await;
if LANE_ACKED.load(Ordering::SeqCst) >= 1 {
break;
}
}
})
.await;
assert!(warmup.is_ok(), "subscription did not come up");
keyed_publish("alpha", 1).await.unwrap();
keyed_publish("beta", 2).await.unwrap();
let result = tokio::time::timeout(Duration::from_secs(5), async {
while LANE_ACKED.load(Ordering::SeqCst) < 3 {
LANE_NOTIFY.notified().await;
}
})
.await;
assert!(
result.is_ok(),
"retried deliveries did not all complete in keyed lanes: acked {}",
LANE_ACKED.load(Ordering::SeqCst),
);
shutdown.notify_one();
run.await.unwrap().unwrap();
}
static BATCHES_IN_FLIGHT: AtomicUsize = AtomicUsize::new(0);
static OVERLAP_WARMED: AtomicUsize = AtomicUsize::new(0);
static OVERLAP_NOTIFY: LazyLock<Notify> = LazyLock::new(Notify::new);
static RELEASE: LazyLock<watch::Sender<bool>> = LazyLock::new(|| watch::Sender::new(false));
#[subscriber(batch("overlap"), workers(2))]
async fn overlap(orders: &[Order]) -> HandlerResult {
if orders.iter().any(|o| o.id == 0) {
OVERLAP_WARMED.fetch_add(1, Ordering::SeqCst);
OVERLAP_NOTIFY.notify_one();
return HandlerResult::Ack;
}
BATCHES_IN_FLIGHT.fetch_add(1, Ordering::SeqCst);
OVERLAP_NOTIFY.notify_one();
let mut release = RELEASE.subscribe();
let _ = release.wait_for(|released| *released).await;
BATCHES_IN_FLIGHT.fetch_sub(1, Ordering::SeqCst);
HandlerResult::Ack
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_pool_overlaps_batches() {
let broker = MemoryBroker::new();
let publisher = broker.publisher();
let app = RustStream::new(AppInfo::new("overlap", "0.1.0"))
.with_broker(broker, |b| b.include_batch(overlap));
let shutdown = Arc::new(Notify::new());
let shutdown_signal = Arc::clone(&shutdown);
let run = tokio::spawn(app.run_until(async move { shutdown_signal.notified().await }));
let warmup = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let _ = publisher
.publish(OutgoingMessage::new("overlap", &order_bytes(0)))
.await;
handler_signal(&OVERLAP_NOTIFY).await;
if OVERLAP_WARMED.load(Ordering::SeqCst) >= 1 {
break;
}
}
})
.await;
assert!(warmup.is_ok(), "subscription did not come up");
let result = tokio::time::timeout(Duration::from_secs(5), async {
let mut id = 1u32;
loop {
let _ = publisher
.publish(OutgoingMessage::new("overlap", &order_bytes(id)))
.await;
id += 1;
handler_signal(&OVERLAP_NOTIFY).await;
if BATCHES_IN_FLIGHT.load(Ordering::SeqCst) >= 2 {
break;
}
}
})
.await;
assert!(
result.is_ok(),
"two batches were never in flight at once through the pool",
);
RELEASE.send_replace(true);
shutdown.notify_one();
run.await.unwrap().unwrap();
}