use std::time::Duration;
use serde::{Deserialize, Serialize};
use taskmill::{
BackoffStrategy, Domain, DomainTaskContext, RetryPolicy, Scheduler, SchedulerEvent, TaskError,
TaskStore, TaskTypeConfig, TypedExecutor, TypedTask,
};
use tokio_util::sync::CancellationToken;
use super::common::{define_task, TestDomain};
#[derive(Serialize, Deserialize)]
struct TypeATask;
impl TypedTask for TypeATask {
type Domain = TestDomain;
const TASK_TYPE: &'static str = "type-a";
fn config() -> TaskTypeConfig {
TaskTypeConfig::new().retry(RetryPolicy {
strategy: BackoffStrategy::Constant {
delay: Duration::ZERO,
},
max_retries: 5,
})
}
}
#[derive(Serialize, Deserialize)]
struct TypeBTask;
impl TypedTask for TypeBTask {
type Domain = TestDomain;
const TASK_TYPE: &'static str = "type-b";
}
#[derive(Serialize, Deserialize)]
struct BackoffTestTask;
impl TypedTask for BackoffTestTask {
type Domain = TestDomain;
const TASK_TYPE: &'static str = "backoff-test";
fn config() -> TaskTypeConfig {
TaskTypeConfig::new().retry(RetryPolicy {
strategy: BackoffStrategy::Exponential {
initial: Duration::from_millis(200),
max: Duration::from_secs(10),
multiplier: 2.0,
},
max_retries: 3,
})
}
}
#[derive(Serialize, Deserialize)]
struct RetryEventTask;
impl TypedTask for RetryEventTask {
type Domain = TestDomain;
const TASK_TYPE: &'static str = "retry-event";
fn config() -> TaskTypeConfig {
TaskTypeConfig::new().retry(RetryPolicy {
strategy: BackoffStrategy::Constant {
delay: Duration::from_secs(5),
},
max_retries: 2,
})
}
}
define_task!(LegacyTask, TestDomain, "legacy");
define_task!(RetryOverrideTask, TestDomain, "retry-override");
struct AlwaysRetryableTypedExec;
impl<T: TypedTask> TypedExecutor<T> for AlwaysRetryableTypedExec {
async fn execute<'a>(
&'a self,
_payload: T,
_ctx: DomainTaskContext<'a, T::Domain>,
) -> Result<(), TaskError> {
Err(TaskError::retryable("transient"))
}
}
struct AlwaysRetryableExecutor;
impl TypedExecutor<LegacyTask> for AlwaysRetryableExecutor {
async fn execute<'a>(
&'a self,
_payload: LegacyTask,
_ctx: DomainTaskContext<'a, TestDomain>,
) -> Result<(), TaskError> {
Err(TaskError::retryable("transient"))
}
}
struct RetryAfterExecutor(Duration);
impl TypedExecutor<RetryOverrideTask> for RetryAfterExecutor {
async fn execute<'a>(
&'a self,
_payload: RetryOverrideTask,
_ctx: DomainTaskContext<'a, TestDomain>,
) -> Result<(), TaskError> {
Err(TaskError::retryable("rate limited").retry_after(self.0))
}
}
#[tokio::test]
async fn per_type_retry_policy_overrides_global_default() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(
Domain::<TestDomain>::new()
.task::<TypeATask>(AlwaysRetryableTypedExec)
.task::<TypeBTask>(AlwaysRetryableTypedExec),
)
.max_retries(3)
.max_concurrency(2)
.poll_interval(Duration::from_millis(50))
.build()
.await
.unwrap();
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let handle = tokio::spawn({
let s = sched.clone();
let t = token.clone();
async move { s.run(t).await }
});
let test_handle = sched.domain::<TestDomain>();
test_handle.submit(TypeATask).await.unwrap();
test_handle.submit(TypeBTask).await.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
let mut dead_a = false;
let mut dead_b = false;
let mut a_retry_count = 0i32;
let mut b_retry_count = 0i32;
while tokio::time::Instant::now() < deadline && !(dead_a && dead_b) {
match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Ok(SchedulerEvent::DeadLettered {
header,
retry_count,
..
})) => {
if header.task_type == "test::type-a" {
dead_a = true;
a_retry_count = retry_count;
} else if header.task_type == "test::type-b" {
dead_b = true;
b_retry_count = retry_count;
}
}
_ => continue,
}
}
token.cancel();
let _ = handle.await;
assert!(dead_a, "type-a should be dead-lettered");
assert!(dead_b, "type-b should be dead-lettered");
assert_eq!(
a_retry_count, 6,
"type-a: 5 retries + final attempt = retry_count 6"
);
assert_eq!(
b_retry_count, 4,
"type-b: 3 retries + final attempt = retry_count 4"
);
}
#[tokio::test]
async fn exponential_backoff_delays_redispatch() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<BackoffTestTask>(AlwaysRetryableTypedExec))
.max_concurrency(1)
.poll_interval(Duration::from_millis(50))
.build()
.await
.unwrap();
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let handle = tokio::spawn({
let s = sched.clone();
let t = token.clone();
async move { s.run(t).await }
});
let test_handle = sched.domain::<TestDomain>();
test_handle.submit(BackoffTestTask).await.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
let mut dispatch_times: Vec<tokio::time::Instant> = Vec::new();
let mut done = false;
while tokio::time::Instant::now() < deadline && !done {
match tokio::time::timeout(Duration::from_millis(50), rx.recv()).await {
Ok(Ok(SchedulerEvent::Dispatched(_))) => {
dispatch_times.push(tokio::time::Instant::now());
}
Ok(Ok(SchedulerEvent::DeadLettered { .. })) => {
done = true;
}
_ => continue,
}
}
token.cancel();
let _ = handle.await;
assert!(done, "task should eventually dead-letter");
assert!(
dispatch_times.len() >= 3,
"expected at least 3 dispatches, got {}",
dispatch_times.len()
);
if dispatch_times.len() >= 2 {
let gap = dispatch_times[1] - dispatch_times[0];
assert!(
gap >= Duration::from_millis(150),
"first retry gap should be >=150ms (backoff 200ms), got {:?}",
gap
);
}
if dispatch_times.len() >= 3 {
let gap = dispatch_times[2] - dispatch_times[1];
assert!(
gap >= Duration::from_millis(300),
"second retry gap should be >=300ms (backoff 400ms), got {:?}",
gap
);
}
}
#[tokio::test]
async fn failed_event_includes_retry_after_duration() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<RetryEventTask>(AlwaysRetryableTypedExec))
.max_concurrency(1)
.poll_interval(Duration::from_millis(50))
.build()
.await
.unwrap();
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let handle = tokio::spawn({
let s = sched.clone();
let t = token.clone();
async move { s.run(t).await }
});
let test_handle = sched.domain::<TestDomain>();
test_handle.submit(RetryEventTask).await.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut found_retry_after = None;
while tokio::time::Instant::now() < deadline && found_retry_after.is_none() {
match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Ok(SchedulerEvent::Failed {
will_retry: true,
retry_after,
..
})) => {
found_retry_after = Some(retry_after);
}
_ => continue,
}
}
token.cancel();
let _ = handle.await;
let retry_after =
found_retry_after.expect("should receive a Failed event with will_retry=true");
let delay = retry_after.expect("retry_after should be Some for constant 5s backoff");
assert_eq!(delay, Duration::from_secs(5));
}
#[tokio::test]
async fn failed_event_includes_executor_retry_after_override() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(
Domain::<TestDomain>::new()
.task::<RetryOverrideTask>(RetryAfterExecutor(Duration::from_secs(42))),
)
.max_retries(3)
.max_concurrency(1)
.poll_interval(Duration::from_millis(50))
.build()
.await
.unwrap();
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let handle = tokio::spawn({
let s = sched.clone();
let t = token.clone();
async move { s.run(t).await }
});
let test_handle = sched.domain::<TestDomain>();
test_handle
.submit_with(RetryOverrideTask)
.key("ro1")
.await
.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut found_retry_after = None;
while tokio::time::Instant::now() < deadline && found_retry_after.is_none() {
match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Ok(SchedulerEvent::Failed {
will_retry: true,
retry_after,
..
})) => {
found_retry_after = Some(retry_after);
}
_ => continue,
}
}
token.cancel();
let _ = handle.await;
let retry_after =
found_retry_after.expect("should receive a Failed event with will_retry=true");
let delay = retry_after.expect("retry_after should be Some with executor override");
assert_eq!(delay, Duration::from_secs(42));
}
#[tokio::test]
async fn null_max_retries_uses_global_default() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<LegacyTask>(AlwaysRetryableExecutor))
.max_retries(2)
.max_concurrency(1)
.poll_interval(Duration::from_millis(50))
.build()
.await
.unwrap();
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let handle = tokio::spawn({
let s = sched.clone();
let t = token.clone();
async move { s.run(t).await }
});
let test_handle = sched.domain::<TestDomain>();
test_handle
.submit_with(LegacyTask)
.key("leg1")
.await
.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut dead_letter_retry_count = None;
while tokio::time::Instant::now() < deadline && dead_letter_retry_count.is_none() {
match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Ok(SchedulerEvent::DeadLettered { retry_count, .. })) => {
dead_letter_retry_count = Some(retry_count);
}
_ => continue,
}
}
token.cancel();
let _ = handle.await;
let count = dead_letter_retry_count.expect("task should be dead-lettered");
assert_eq!(
count, 3,
"dead-letter should report retry_count=3 (2 retries + final attempt)"
);
}