use flume::Receiver;
use speare::{
mini::{root, OnErr, SpeareErr},
Backoff, Limit,
};
use std::{
fmt::Debug,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::time::{sleep, timeout, Duration};
const RECV_TIMEOUT: Duration = Duration::from_millis(500);
const QUIET_TIMEOUT: Duration = Duration::from_millis(150);
async fn recv_within<T>(rx: &Receiver<T>) -> T
where
T: Send + 'static,
{
timeout(RECV_TIMEOUT, rx.recv_async())
.await
.expect("timed out waiting for message")
.expect("channel closed before delivering message")
}
async fn assert_no_message<T>(rx: &Receiver<T>)
where
T: Debug + Send + 'static,
{
match timeout(QUIET_TIMEOUT, rx.recv_async()).await {
Err(_) => {}
Ok(Err(_)) => {}
Ok(Ok(msg)) => panic!("unexpected message: {msg:?}"),
}
}
#[tokio::test]
async fn publish_delivers_to_subscriber() {
let root = root();
let rx = root.subscribe::<u32>("numbers").unwrap();
root.publish("numbers", 42_u32).unwrap();
assert_eq!(recv_within(&rx).await, 42);
}
#[tokio::test]
async fn publish_delivers_to_multiple_subscribers() {
let root = root();
let rx_a = root.subscribe::<u32>("numbers").unwrap();
let rx_b = root.subscribe::<u32>("numbers").unwrap();
root.publish("numbers", 42_u32).unwrap();
assert_eq!(recv_within(&rx_a).await, 42);
assert_eq!(recv_within(&rx_b).await, 42);
}
#[tokio::test]
async fn publish_to_empty_topic_is_ok() {
let root = root();
let result = root.publish("nobody-listens", 42_u32);
assert_eq!(result, Ok(()));
}
#[tokio::test]
async fn subscribe_type_mismatch_returns_error() {
let root = root();
let _rx = root.subscribe::<u32>("numbers").unwrap();
let err = root.subscribe::<String>("numbers").unwrap_err();
assert_eq!(
err,
SpeareErr::TypeMismatch {
topic: "numbers".to_string()
}
);
}
#[tokio::test]
async fn publish_type_mismatch_returns_error() {
let root = root();
let _rx = root.subscribe::<u32>("numbers").unwrap();
let err = root
.publish("numbers", "wrong type".to_string())
.unwrap_err();
assert_eq!(
err,
SpeareErr::TypeMismatch {
topic: "numbers".to_string()
}
);
}
#[tokio::test]
async fn task_runs_once_with_async_closure() {
let root = root();
let rx = root.subscribe::<u32>("done").unwrap();
root.task(async |ctx| {
ctx.publish("done", 1_u32)?;
Ok::<(), SpeareErr>(())
})
.unwrap();
assert_eq!(recv_within(&rx).await, 1);
assert_no_message(&rx).await;
}
#[tokio::test]
async fn oneshot_runs_once_with_async_closure() {
let root = root();
let rx = root.subscribe::<String>("done").unwrap();
let payload = String::from("owned value");
root.oneshot(async move |ctx| {
ctx.publish("done", payload)?;
Ok::<(), SpeareErr>(())
})
.unwrap();
assert_eq!(recv_within(&rx).await, "owned value");
assert_no_message(&rx).await;
}
#[tokio::test]
async fn oneshot_stops_after_first_error() {
let root = root();
let attempts = Arc::new(AtomicUsize::new(0));
let (started_tx, started_rx) = flume::unbounded::<()>();
let attempts_cl = attempts.clone();
let started_tx_cl = started_tx.clone();
root.oneshot(async move |_ctx| {
attempts_cl.fetch_add(1, Ordering::SeqCst);
started_tx_cl.send(()).unwrap();
Err::<(), &'static str>("boom")
})
.unwrap();
recv_within(&started_rx).await;
sleep(QUIET_TIMEOUT).await;
assert_eq!(attempts.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn task_with_passes_args_into_child_ctx() {
let root = root();
let rx = root.subscribe::<u32>("args").unwrap();
root.task_with()
.args(7_u32)
.spawn(async |ctx| {
ctx.publish("args", *ctx)?;
Ok::<(), SpeareErr>(())
})
.unwrap();
assert_eq!(recv_within(&rx).await, 7);
}
#[tokio::test]
async fn spawnwatch_reports_error_once_for_stop_policy() {
let root = root();
let watch_rx = root
.task_with()
.on_err(OnErr::Stop)
.spawnwatch(async |_ctx| Err::<(), &'static str>("boom"))
.unwrap();
let (_task_id, err) = recv_within(&watch_rx).await;
assert_eq!(err, "boom");
assert_no_message(&watch_rx).await;
}
#[tokio::test]
async fn restart_retries_until_success() {
let root = root();
let attempts = Arc::new(AtomicUsize::new(0));
let (done_tx, done_rx) = flume::unbounded();
let watch_rx = root
.task_with()
.args((attempts.clone(), done_tx.clone()))
.on_err(OnErr::Restart {
max: Limit::None,
backoff: Backoff::None,
})
.spawnwatch(async |ctx| {
let attempt = ctx.0.fetch_add(1, Ordering::SeqCst) + 1;
if attempt < 3 {
Err::<(), &'static str>("retry")
} else {
ctx.1.send(attempt).unwrap();
Ok(())
}
})
.unwrap();
let (task_id_a, err_a) = recv_within(&watch_rx).await;
let (task_id_b, err_b) = recv_within(&watch_rx).await;
assert_eq!(err_a, "retry");
assert_eq!(err_b, "retry");
assert_eq!(task_id_a, task_id_b);
assert_eq!(recv_within(&done_rx).await, 3);
assert_eq!(attempts.load(Ordering::SeqCst), 3);
assert_no_message(&watch_rx).await;
}
#[tokio::test]
async fn restart_stops_after_max_restarts() {
let root = root();
let attempts = Arc::new(AtomicUsize::new(0));
let watch_rx = root
.task_with()
.args(attempts.clone())
.on_err(OnErr::Restart {
max: 2.into(),
backoff: Backoff::None,
})
.spawnwatch(async |ctx| {
ctx.fetch_add(1, Ordering::SeqCst);
Err("retry")
})
.unwrap();
let (_, first) = recv_within(&watch_rx).await;
let (_, second) = recv_within(&watch_rx).await;
let (_, third) = recv_within(&watch_rx).await;
assert_eq!(first, "retry");
assert_eq!(second, "retry");
assert_eq!(third, "retry");
assert_eq!(attempts.load(Ordering::SeqCst), 3);
assert_no_message(&watch_rx).await;
}
#[tokio::test]
async fn abort_children_stops_all_running_tasks() {
let root = root();
let (done_tx, done_rx) = flume::unbounded::<u8>();
for id in [1_u8, 2] {
root.task_with()
.args((done_tx.clone(), id))
.spawn(async |ctx| {
sleep(Duration::from_millis(80)).await;
ctx.0.send(ctx.1).unwrap();
Ok::<(), &'static str>(())
})
.unwrap();
}
root.abort_children().unwrap();
assert_no_message(&done_rx).await;
}
#[tokio::test]
async fn dropping_root_aborts_running_children() {
let (done_tx, done_rx) = flume::unbounded::<u8>();
{
let root = root();
root.task_with()
.args(done_tx.clone())
.spawn(async |ctx| {
sleep(Duration::from_millis(80)).await;
ctx.send(1).unwrap();
Ok::<(), &'static str>(())
})
.unwrap();
}
assert_no_message(&done_rx).await;
}