use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, OnceLock, RwLock};
use futures::FutureExt as _;
#[cfg(feature = "redis")]
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use crate::{AppState, AutumnError, AutumnResult};
pub type JobHandler =
fn(AppState, Value) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>>;
const DEFAULT_JOB_ADMIN_HISTORY_LIMIT: usize = 1_000;
const DEFAULT_JOB_ADMIN_PER_PAGE: u64 = 25;
#[derive(Clone)]
pub struct JobInfo {
pub name: String,
pub max_attempts: u32,
pub initial_backoff_ms: u64,
pub handler: JobHandler,
}
#[derive(Clone)]
pub struct JobClient {
local_sender: Option<tokio::sync::mpsc::Sender<QueuedJob>>,
#[cfg(feature = "redis")]
redis: Option<RedisClient>,
registry: crate::actuator::JobRegistry,
job_admin: JobAdminMemoryBackend,
default_max_attempts: u32,
default_initial_backoff_ms: u64,
per_job_defaults: HashMap<String, (u32, u64)>,
}
#[derive(Debug)]
struct QueuedJob {
id: String,
name: String,
payload: Value,
attempt: u32,
max_attempts: u32,
initial_backoff_ms: u64,
}
#[derive(Debug, PartialEq, Eq)]
enum JobExecutionOutcome {
Succeeded,
Failed(String),
Panicked(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum JobAdminStartDecision {
Started,
Canceled,
Missing,
AlreadyTransitioned,
}
pub type JobAdminFuture<'a, T> = Pin<Box<dyn Future<Output = AutumnResult<T>> + Send + 'a>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum JobAdminStatus {
Enqueued,
Running,
Retrying,
Completed,
Failed,
Discarded,
Canceled,
Retried,
}
impl JobAdminStatus {
#[must_use]
pub const fn label(self) -> &'static str {
match self {
Self::Enqueued => "enqueued",
Self::Running => "running",
Self::Retrying => "retrying",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Discarded => "discarded",
Self::Canceled => "canceled",
Self::Retried => "retried",
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct JobAdminRecord {
pub id: String,
pub name: String,
pub status: JobAdminStatus,
pub enqueued_at: Option<String>,
pub started_at: Option<String>,
pub finished_at: Option<String>,
pub attempt: u32,
pub max_attempts: u32,
pub last_error: Option<String>,
pub principal_id: Option<String>,
pub correlation_id: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct JobAdminPage {
pub records: Vec<JobAdminRecord>,
pub total: u64,
pub page: u64,
pub per_page: u64,
}
impl JobAdminPage {
#[must_use]
pub const fn new(records: Vec<JobAdminRecord>, total: u64, page: u64, per_page: u64) -> Self {
Self {
records,
total,
page,
per_page,
}
}
#[must_use]
pub const fn total_pages(&self) -> u64 {
if self.per_page == 0 {
return 0;
}
self.total.div_ceil(self.per_page)
}
}
#[derive(Debug, Clone, Serialize)]
pub struct JobScheduleSummary {
pub name: String,
pub schedule: String,
pub next_run_at: Option<String>,
pub last_run_status: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct JobAdminSnapshot {
pub enqueued: JobAdminPage,
pub running: JobAdminPage,
pub completed: JobAdminPage,
pub failed: JobAdminPage,
pub schedules: Vec<JobScheduleSummary>,
pub bounded_history_limit: usize,
}
impl JobAdminSnapshot {
#[must_use]
pub const fn empty() -> Self {
Self {
enqueued: JobAdminPage::new(Vec::new(), 0, 1, DEFAULT_JOB_ADMIN_PER_PAGE),
running: JobAdminPage::new(Vec::new(), 0, 1, DEFAULT_JOB_ADMIN_PER_PAGE),
completed: JobAdminPage::new(Vec::new(), 0, 1, DEFAULT_JOB_ADMIN_PER_PAGE),
failed: JobAdminPage::new(Vec::new(), 0, 1, DEFAULT_JOB_ADMIN_PER_PAGE),
schedules: Vec::new(),
bounded_history_limit: DEFAULT_JOB_ADMIN_HISTORY_LIMIT,
}
}
}
#[derive(Debug, Clone)]
pub struct JobAdminQuery {
pub enqueued_page: u64,
pub running_page: u64,
pub completed_page: u64,
pub failed_page: u64,
pub per_page: u64,
}
impl Default for JobAdminQuery {
fn default() -> Self {
Self {
enqueued_page: 1,
running_page: 1,
completed_page: 1,
failed_page: 1,
per_page: DEFAULT_JOB_ADMIN_PER_PAGE,
}
}
}
pub trait JobAdminBackend: Send + Sync + 'static {
fn snapshot(&self, query: JobAdminQuery) -> JobAdminFuture<'_, JobAdminSnapshot>;
fn retry(&self, id: &str) -> JobAdminFuture<'_, ()>;
fn discard(&self, id: &str) -> JobAdminFuture<'_, ()>;
fn cancel(&self, id: &str) -> JobAdminFuture<'_, ()>;
}
#[derive(Clone)]
pub struct JobAdminBackendEntry(pub Arc<dyn JobAdminBackend>);
#[must_use]
pub fn job_admin_backend(state: &AppState) -> Option<Arc<dyn JobAdminBackend>> {
state
.extension::<JobAdminBackendEntry>()
.map(|entry| Arc::clone(&entry.0))
}
#[derive(Debug, Clone)]
struct JobAdminStoredRecord {
id: String,
name: String,
payload: Value,
status: JobAdminStatus,
enqueued_at: Option<chrono::DateTime<chrono::Utc>>,
started_at: Option<chrono::DateTime<chrono::Utc>>,
finished_at: Option<chrono::DateTime<chrono::Utc>>,
attempt: u32,
max_attempts: u32,
last_error: Option<String>,
principal_id: Option<String>,
correlation_id: Option<String>,
}
impl JobAdminStoredRecord {
fn sort_time(&self) -> chrono::DateTime<chrono::Utc> {
self.finished_at
.or(self.started_at)
.or(self.enqueued_at)
.unwrap_or_else(chrono::Utc::now)
}
fn to_public(&self) -> JobAdminRecord {
JobAdminRecord {
id: self.id.clone(),
name: self.name.clone(),
status: self.status,
enqueued_at: self.enqueued_at.map(format_job_admin_time),
started_at: self.started_at.map(format_job_admin_time),
finished_at: self.finished_at.map(format_job_admin_time),
attempt: self.attempt,
max_attempts: self.max_attempts,
last_error: self.last_error.clone(),
principal_id: self.principal_id.clone(),
correlation_id: self.correlation_id.clone(),
}
}
}
#[derive(Debug)]
struct JobAdminMemoryInner {
records: HashMap<String, JobAdminStoredRecord>,
order: VecDeque<String>,
history_limit: usize,
}
#[derive(Clone)]
pub struct JobAdminMemoryBackend {
inner: Arc<RwLock<JobAdminMemoryInner>>,
}
impl JobAdminMemoryBackend {
#[must_use]
pub fn new() -> Self {
Self::with_history_limit(DEFAULT_JOB_ADMIN_HISTORY_LIMIT)
}
#[must_use]
pub fn with_history_limit(history_limit: usize) -> Self {
Self {
inner: Arc::new(RwLock::new(JobAdminMemoryInner {
records: HashMap::new(),
order: VecDeque::new(),
history_limit: history_limit.max(1),
})),
}
}
fn record_enqueue(
&self,
id: String,
name: &str,
payload: Value,
attempt: u32,
max_attempts: u32,
) {
let now = chrono::Utc::now();
let (principal_id, correlation_id) = job_payload_identity(&payload);
if let Ok(mut inner) = self.inner.write() {
inner.order.push_back(id.clone());
inner.records.insert(
id.clone(),
JobAdminStoredRecord {
id,
name: name.to_owned(),
payload,
status: JobAdminStatus::Enqueued,
enqueued_at: Some(now),
started_at: None,
finished_at: None,
attempt,
max_attempts,
last_error: None,
principal_id,
correlation_id,
},
);
prune_job_admin_history(&mut inner);
}
}
fn record_requeued(&self, id: &str, attempt: u32) {
if let Ok(mut inner) = self.inner.write()
&& let Some(record) = inner.records.get_mut(id)
{
record.status = JobAdminStatus::Enqueued;
record.enqueued_at = Some(chrono::Utc::now());
record.started_at = None;
record.finished_at = None;
record.attempt = attempt;
}
}
fn try_record_start(&self, id: &str, attempt: u32) -> JobAdminStartDecision {
let Ok(mut inner) = self.inner.write() else {
return JobAdminStartDecision::Missing;
};
let Some(record) = inner.records.get_mut(id) else {
return JobAdminStartDecision::Missing;
};
match record.status {
JobAdminStatus::Enqueued => {
record.status = JobAdminStatus::Running;
record.started_at = Some(chrono::Utc::now());
record.finished_at = None;
record.attempt = attempt;
JobAdminStartDecision::Started
}
JobAdminStatus::Canceled => JobAdminStartDecision::Canceled,
_ => JobAdminStartDecision::AlreadyTransitioned,
}
}
fn record_success(&self, id: &str) {
if let Ok(mut inner) = self.inner.write()
&& let Some(record) = inner.records.get_mut(id)
{
record.status = JobAdminStatus::Completed;
record.finished_at = Some(chrono::Utc::now());
record.last_error = None;
prune_job_admin_history(&mut inner);
}
}
fn record_retrying(&self, id: &str, error: &str) {
if let Ok(mut inner) = self.inner.write()
&& let Some(record) = inner.records.get_mut(id)
{
record.status = JobAdminStatus::Retrying;
record.finished_at = Some(chrono::Utc::now());
record.last_error = Some(error.to_owned());
}
}
fn record_failure(&self, id: &str, error: String) {
if let Ok(mut inner) = self.inner.write()
&& let Some(record) = inner.records.get_mut(id)
{
record.status = JobAdminStatus::Failed;
record.finished_at = Some(chrono::Utc::now());
record.last_error = Some(error);
prune_job_admin_history(&mut inner);
}
}
fn record_cancelled(&self, id: &str) {
if let Ok(mut inner) = self.inner.write()
&& let Some(record) = inner.records.get_mut(id)
{
record.status = JobAdminStatus::Canceled;
record.finished_at = Some(chrono::Utc::now());
}
}
fn retry_payload(&self, id: &str) -> AutumnResult<(String, Value)> {
let mut inner = self
.inner
.write()
.map_err(|_| AutumnError::internal_server_error_msg("job admin store lock poisoned"))?;
let record = inner
.records
.get_mut(id)
.ok_or_else(|| AutumnError::not_found_msg(format!("job '{id}' not found")))?;
if record.status != JobAdminStatus::Failed {
return Err(AutumnError::bad_request_msg(
"only failed jobs can be retried",
));
}
let retry = (record.name.clone(), record.payload.clone());
record.status = JobAdminStatus::Retried;
record.finished_at = Some(chrono::Utc::now());
drop(inner);
Ok(retry)
}
fn restore_failed_retry(&self, id: &str) {
if let Ok(mut inner) = self.inner.write()
&& let Some(record) = inner.records.get_mut(id)
&& record.status == JobAdminStatus::Retried
{
record.status = JobAdminStatus::Failed;
record.finished_at = Some(chrono::Utc::now());
}
}
fn ensure_retryable(&self, id: &str) -> AutumnResult<()> {
let inner = self
.inner
.read()
.map_err(|_| AutumnError::internal_server_error_msg("job admin store lock poisoned"))?;
let record = inner
.records
.get(id)
.ok_or_else(|| AutumnError::not_found_msg(format!("job '{id}' not found")))?;
let status = record.status;
drop(inner);
if status != JobAdminStatus::Failed {
return Err(AutumnError::bad_request_msg(
"only failed jobs can be retried",
));
}
Ok(())
}
fn discard_failed(&self, id: &str) -> AutumnResult<()> {
let mut inner = self
.inner
.write()
.map_err(|_| AutumnError::internal_server_error_msg("job admin store lock poisoned"))?;
let record = inner
.records
.get_mut(id)
.ok_or_else(|| AutumnError::not_found_msg(format!("job '{id}' not found")))?;
if record.status != JobAdminStatus::Failed {
return Err(AutumnError::bad_request_msg(
"only failed jobs can be discarded",
));
}
record.status = JobAdminStatus::Discarded;
record.finished_at = Some(chrono::Utc::now());
drop(inner);
Ok(())
}
fn cancel_enqueued(&self, id: &str) -> AutumnResult<()> {
let mut inner = self
.inner
.write()
.map_err(|_| AutumnError::internal_server_error_msg("job admin store lock poisoned"))?;
let record = inner
.records
.get_mut(id)
.ok_or_else(|| AutumnError::not_found_msg(format!("job '{id}' not found")))?;
if record.status != JobAdminStatus::Enqueued {
return Err(AutumnError::bad_request_msg(
"only enqueued jobs can be canceled",
));
}
record.status = JobAdminStatus::Canceled;
record.finished_at = Some(chrono::Utc::now());
drop(inner);
Ok(())
}
fn snapshot_sync(&self, query: &JobAdminQuery) -> JobAdminSnapshot {
let Ok(inner) = self.inner.read() else {
return JobAdminSnapshot::empty();
};
let now = chrono::Utc::now();
let per_page = query.per_page.clamp(1, 100);
JobAdminSnapshot {
enqueued: paginate_job_admin_records(
&inner,
JobAdminStatus::Enqueued,
None,
query.enqueued_page,
per_page,
),
running: paginate_job_admin_records(
&inner,
JobAdminStatus::Running,
None,
query.running_page,
per_page,
),
completed: paginate_job_admin_records(
&inner,
JobAdminStatus::Completed,
Some(now - chrono::TimeDelta::hours(24)),
query.completed_page,
per_page,
),
failed: paginate_job_admin_records(
&inner,
JobAdminStatus::Failed,
Some(now - chrono::TimeDelta::days(7)),
query.failed_page,
per_page,
),
schedules: Vec::new(),
bounded_history_limit: inner.history_limit,
}
}
#[cfg(test)]
fn new_for_test(history_limit: usize) -> Self {
Self::with_history_limit(history_limit)
}
#[cfg(test)]
fn record_enqueue_for_test(
&self,
name: &str,
payload: Value,
attempt: u32,
max_attempts: u32,
) -> String {
let id = uuid::Uuid::new_v4().to_string();
self.record_enqueue(id.clone(), name, payload, attempt, max_attempts);
id
}
#[cfg(test)]
fn record_start_for_test(&self, id: &str, attempt: u32) {
let _ = self.try_record_start(id, attempt);
}
#[cfg(test)]
fn record_success_for_test(&self, id: &str) {
self.record_success(id);
}
#[cfg(test)]
fn record_failure_for_test(&self, id: &str, error: &str) {
self.record_failure(id, error.to_owned());
}
}
impl Default for JobAdminMemoryBackend {
fn default() -> Self {
Self::new()
}
}
impl JobAdminBackend for JobAdminMemoryBackend {
fn snapshot(&self, query: JobAdminQuery) -> JobAdminFuture<'_, JobAdminSnapshot> {
Box::pin(async move { Ok(self.snapshot_sync(&query)) })
}
fn retry(&self, id: &str) -> JobAdminFuture<'_, ()> {
let id = id.to_owned();
Box::pin(async move {
self.ensure_retryable(&id)?;
let client = global_job_client().ok_or_else(|| {
AutumnError::service_unavailable_msg("job runtime is not initialized")
})?;
let (name, payload) = self.retry_payload(&id)?;
match client.enqueue(&name, payload).await {
Ok(()) => Ok(()),
Err(error) => {
self.restore_failed_retry(&id);
Err(error)
}
}
})
}
fn discard(&self, id: &str) -> JobAdminFuture<'_, ()> {
let id = id.to_owned();
Box::pin(async move { self.discard_failed(&id) })
}
fn cancel(&self, id: &str) -> JobAdminFuture<'_, ()> {
let id = id.to_owned();
Box::pin(async move { self.cancel_enqueued(&id) })
}
}
fn format_job_admin_time(time: chrono::DateTime<chrono::Utc>) -> String {
time.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
}
fn prune_job_admin_history(inner: &mut JobAdminMemoryInner) {
let mut scanned = 0;
while inner.order.len() > inner.history_limit && scanned < inner.order.len() {
let Some(id) = inner.order.pop_front() else {
break;
};
let is_active = inner.records.get(&id).is_some_and(|record| {
matches!(
record.status,
JobAdminStatus::Enqueued | JobAdminStatus::Running | JobAdminStatus::Retrying
)
});
if is_active {
inner.order.push_back(id);
scanned += 1;
} else {
inner.records.remove(&id);
}
}
}
fn paginate_job_admin_records(
inner: &JobAdminMemoryInner,
status: JobAdminStatus,
since: Option<chrono::DateTime<chrono::Utc>>,
page: u64,
per_page: u64,
) -> JobAdminPage {
let page = page.max(1);
let mut records: Vec<_> = inner
.records
.values()
.filter(|record| {
record.status == status
&& since.is_none_or(|cutoff| {
record
.finished_at
.or(record.started_at)
.or(record.enqueued_at)
.is_some_and(|time| time >= cutoff)
})
})
.cloned()
.collect();
records.sort_by_key(JobAdminStoredRecord::sort_time);
records.reverse();
let total = records.len() as u64;
let start =
usize::try_from(page.saturating_sub(1).saturating_mul(per_page)).unwrap_or(usize::MAX);
let take = usize::try_from(per_page).unwrap_or(usize::MAX);
let page_records = records
.into_iter()
.skip(start)
.take(take)
.map(|record| record.to_public())
.collect();
JobAdminPage::new(page_records, total, page, per_page)
}
fn job_payload_identity(payload: &Value) -> (Option<String>, Option<String>) {
let principal = first_payload_string(payload, &["principal_id", "principal", "user_id"]);
let correlation = first_payload_string(payload, &["correlation_id", "request_id"]);
(principal, correlation)
}
fn first_payload_string(payload: &Value, keys: &[&str]) -> Option<String> {
let object = payload.as_object()?;
for key in keys {
let Some(value) = object.get(*key) else {
continue;
};
if let Some(raw) = value.as_str() {
if !raw.is_empty() {
return Some(raw.to_owned());
}
} else if value.is_number() || value.is_boolean() {
return Some(value.to_string());
}
}
None
}
fn default_job_admin_backend_for_state(state: &AppState) -> JobAdminMemoryBackend {
let backend = JobAdminMemoryBackend::new();
if job_admin_backend(state).is_none() {
state.insert_extension(JobAdminBackendEntry(Arc::new(backend.clone())));
}
backend
}
#[cfg(feature = "redis")]
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RedisJobRecord {
id: String,
name: String,
payload: Value,
attempt: u32,
max_attempts: u32,
initial_backoff_ms: u64,
#[serde(default)]
enqueued_at_ms: Option<u64>,
#[serde(default)]
started_at_ms: Option<u64>,
#[serde(default)]
finished_at_ms: Option<u64>,
#[serde(default)]
claimed_by: Option<String>,
#[serde(default)]
claimed_at_ms: Option<u64>,
#[serde(default)]
last_error: Option<String>,
}
#[cfg(all(feature = "redis", test))]
#[derive(Debug, Clone)]
struct RedisClaimedRecord {
record: RedisJobRecord,
deadline_ms: u64,
}
#[cfg(feature = "redis")]
#[derive(Debug, Clone)]
struct RedisRetrySchedule {
record: RedisJobRecord,
due_at_ms: u64,
}
#[cfg(feature = "redis")]
#[derive(Debug, Clone)]
enum RedisFailureAction {
Retry(RedisRetrySchedule),
DeadLetter(RedisJobRecord),
}
#[cfg(feature = "redis")]
#[derive(Debug, Clone)]
enum RedisStaleRecovery {
Requeue(RedisJobRecord),
DeadLetter(RedisJobRecord),
}
#[cfg(feature = "redis")]
struct RedisMaintenanceThrottle {
next_run_at: std::time::Instant,
interval: std::time::Duration,
}
#[cfg(feature = "redis")]
impl RedisMaintenanceThrottle {
const fn new(now: std::time::Instant, interval: std::time::Duration) -> Self {
Self {
next_run_at: now,
interval,
}
}
fn take_due(&mut self, now: std::time::Instant) -> bool {
if now < self.next_run_at {
return false;
}
self.next_run_at = now + self.interval;
true
}
}
#[cfg(feature = "redis")]
const REDIS_STALE_MAINTENANCE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
#[cfg(feature = "redis")]
const REDIS_WORKER_IDLE_SLEEP_MAX: std::time::Duration = std::time::Duration::from_millis(200);
#[cfg(feature = "redis")]
fn redis_retry_promotion_interval_ms(default_backoff_ms: u64, jobs: &[JobInfo]) -> u64 {
let mut interval_ms = default_backoff_ms.max(1);
for job in jobs {
if job.initial_backoff_ms > 0 {
interval_ms = interval_ms.min(job.initial_backoff_ms);
}
}
interval_ms
}
#[cfg(feature = "redis")]
fn redis_worker_idle_sleep(retry_promotion_interval: std::time::Duration) -> std::time::Duration {
retry_promotion_interval.min(REDIS_WORKER_IDLE_SLEEP_MAX)
}
#[cfg(feature = "redis")]
#[derive(Clone)]
struct RedisWorkerConfig {
queue_key: String,
processing_key: String,
delayed_key: String,
dead_key: String,
completed_key: String,
record_prefix: String,
dead_record_prefix: String,
worker_id: String,
visibility_timeout_ms: u64,
default_attempts: u32,
default_backoff: u64,
retry_promotion_interval: std::time::Duration,
}
static GLOBAL_JOB_CLIENT: OnceLock<RwLock<Option<Arc<JobClient>>>> = OnceLock::new();
#[cfg(test)]
pub(crate) fn global_job_runtime_test_lock() -> &'static tokio::sync::Mutex<()> {
static LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
}
async fn run_job_handler(
handler: JobHandler,
state: AppState,
payload: Value,
) -> JobExecutionOutcome {
let future = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
(handler)(state, payload)
})) {
Ok(future) => future,
Err(panic) => return JobExecutionOutcome::Panicked(format_job_panic(panic.as_ref())),
};
match std::panic::AssertUnwindSafe(future).catch_unwind().await {
Ok(Ok(())) => JobExecutionOutcome::Succeeded,
Ok(Err(error)) => JobExecutionOutcome::Failed(error.to_string()),
Err(panic) => JobExecutionOutcome::Panicked(format_job_panic(panic.as_ref())),
}
}
fn format_job_panic(panic: &(dyn std::any::Any + Send)) -> String {
let detail = panic
.downcast_ref::<String>()
.map(String::as_str)
.or_else(|| panic.downcast_ref::<&'static str>().copied())
.unwrap_or("non-string panic payload");
format!("job handler panicked: {detail}")
}
#[must_use]
pub fn global_job_client() -> Option<Arc<JobClient>> {
GLOBAL_JOB_CLIENT
.get()
.and_then(|lock| lock.read().ok().and_then(|guard| guard.clone()))
}
pub(crate) fn init_global_job_client(client: JobClient) {
if let Some(lock) = GLOBAL_JOB_CLIENT.get() {
if let Ok(mut guard) = lock.write() {
*guard = Some(Arc::new(client));
}
return;
}
let _ = GLOBAL_JOB_CLIENT.set(RwLock::new(Some(Arc::new(client))));
}
pub(crate) fn clear_global_job_client() {
if let Some(lock) = GLOBAL_JOB_CLIENT.get() {
if let Ok(mut guard) = lock.write() {
*guard = None;
}
} else {
let _ = GLOBAL_JOB_CLIENT.set(RwLock::new(None));
}
}
pub async fn enqueue(name: &str, payload: Value) -> AutumnResult<()> {
let Some(client) = global_job_client() else {
return Err(AutumnError::internal_server_error(std::io::Error::other(
"job runtime is not initialized; register jobs with AppBuilder::jobs()",
)));
};
client.enqueue(name, payload).await
}
impl JobClient {
pub async fn enqueue(&self, name: &str, payload: Value) -> AutumnResult<()> {
let Some((job_max_attempts, job_backoff_ms)) = self.per_job_defaults.get(name).copied()
else {
return Err(AutumnError::internal_server_error(std::io::Error::other(
format!("job '{name}' is not registered; add it to AppBuilder::jobs()"),
)));
};
let job_max_attempts = if job_max_attempts != 0 {
job_max_attempts
} else {
self.default_max_attempts
};
let job_backoff_ms = if job_backoff_ms != 0 {
job_backoff_ms
} else {
self.default_initial_backoff_ms
};
let id = uuid::Uuid::new_v4().to_string();
self.registry.record_enqueue(name);
self.job_admin
.record_enqueue(id.clone(), name, payload.clone(), 1, job_max_attempts);
let result = if let Some(sender) = &self.local_sender {
sender
.send(QueuedJob {
id: id.clone(),
name: name.to_string(),
payload,
attempt: 1,
max_attempts: job_max_attempts,
initial_backoff_ms: job_backoff_ms,
})
.await
.map_err(|e| {
AutumnError::internal_server_error(std::io::Error::other(format!(
"failed to enqueue job: {e}"
)))
})
} else {
#[cfg(feature = "redis")]
{
if let Some(redis) = &self.redis {
redis
.enqueue(id.clone(), name, payload, job_max_attempts, job_backoff_ms)
.await
} else {
Err(AutumnError::internal_server_error(std::io::Error::other(
"job runtime backend is unavailable",
)))
}
}
#[cfg(not(feature = "redis"))]
{
let _ = payload;
Err(AutumnError::internal_server_error(std::io::Error::other(
"job runtime backend is unavailable",
)))
}
};
if result.is_err() {
self.registry.record_cancel(name);
self.job_admin.record_cancelled(&id);
}
result
}
}
pub(crate) fn start_runtime(
jobs: Vec<JobInfo>,
state: &AppState,
shutdown: &tokio_util::sync::CancellationToken,
config: &crate::config::JobConfig,
) -> AutumnResult<()> {
validate_unique_job_names(&jobs).map_err(|error| {
AutumnError::internal_server_error(std::io::Error::other(format!(
"invalid jobs configuration: {error}"
)))
})?;
match config.backend.as_str() {
"local" => {
start_local_runtime(
jobs,
state,
shutdown,
config.workers,
config.max_attempts,
config.initial_backoff_ms,
);
Ok(())
}
"redis" => {
#[cfg(feature = "redis")]
{
start_redis_runtime(jobs, state, shutdown, config)
}
#[cfg(not(feature = "redis"))]
{
let _ = jobs;
let _ = state;
let _ = shutdown;
let _ = config;
Err(AutumnError::internal_server_error(std::io::Error::other(
"jobs.backend=redis requested but redis feature is disabled",
)))
}
}
other => {
tracing::warn!(backend = %other, "unknown jobs backend; falling back to local backend");
start_local_runtime(
jobs,
state,
shutdown,
config.workers,
config.max_attempts,
config.initial_backoff_ms,
);
Ok(())
}
}
}
pub(crate) fn start_local_runtime(
jobs: Vec<JobInfo>,
state: &AppState,
shutdown: &tokio_util::sync::CancellationToken,
workers: usize,
default_max_attempts: u32,
default_initial_backoff_ms: u64,
) {
let job_admin = default_job_admin_backend_for_state(state);
let per_job_defaults = build_per_job_defaults(&jobs);
let jobs_by_name: Arc<RwLock<HashMap<String, JobInfo>>> = Arc::new(RwLock::new(
jobs.into_iter().map(|j| (j.name.clone(), j)).collect(),
));
{
let guard = jobs_by_name.read().expect("job registry lock poisoned");
for name in guard.keys() {
state.job_registry.register(name);
}
}
let worker_count = workers.max(1);
let (tx, rx) = tokio::sync::mpsc::channel::<QueuedJob>(1024);
let shared_rx = Arc::new(tokio::sync::Mutex::new(rx));
let client = JobClient {
local_sender: Some(tx.clone()),
#[cfg(feature = "redis")]
redis: None,
registry: state.job_registry.clone(),
job_admin: job_admin.clone(),
default_max_attempts,
default_initial_backoff_ms,
per_job_defaults,
};
init_global_job_client(client);
for _ in 0..worker_count {
let state = state.clone();
let tx = tx.clone();
let job_admin = job_admin.clone();
let jobs_by_name = Arc::clone(&jobs_by_name);
let shared_rx = Arc::clone(&shared_rx);
let shutdown = shutdown.clone();
tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => break,
maybe = async {
let mut guard = shared_rx.lock().await;
guard.recv().await
} => {
let Some(job) = maybe else { break; };
execute_local_job(job, &jobs_by_name, &tx, &state, &job_admin).await;
}
}
}
});
}
}
async fn execute_local_job(
job: QueuedJob,
jobs_by_name: &Arc<RwLock<HashMap<String, JobInfo>>>,
tx: &tokio::sync::mpsc::Sender<QueuedJob>,
state: &AppState,
job_admin: &JobAdminMemoryBackend,
) {
if job_admin.try_record_start(&job.id, job.attempt) == JobAdminStartDecision::Canceled {
state.job_registry.record_cancel(&job.name);
job_admin.record_cancelled(&job.id);
return;
}
state.job_registry.record_start(&job.name);
let Some((handler, info_max_attempts, info_backoff_ms)) = jobs_by_name
.read()
.expect("job registry lock poisoned")
.get(&job.name)
.map(|info| (info.handler, info.max_attempts, info.initial_backoff_ms))
else {
state
.job_registry
.record_failure(&job.name, format!("unknown job '{}'", job.name), true);
job_admin.record_failure(&job.id, format!("unknown job '{}'", job.name));
return;
};
let max_attempts = if job.max_attempts != 0 {
job.max_attempts
} else if info_max_attempts != 0 {
info_max_attempts
} else {
5
};
let backoff_ms = if job.initial_backoff_ms != 0 {
job.initial_backoff_ms
} else if info_backoff_ms != 0 {
info_backoff_ms
} else {
250
};
match run_job_handler(handler, state.clone(), job.payload.clone()).await {
JobExecutionOutcome::Succeeded => {
state.job_registry.record_success(&job.name);
job_admin.record_success(&job.id);
}
JobExecutionOutcome::Failed(error) => {
if job.attempt < max_attempts {
state
.job_registry
.record_retry(&job.name, &error, job.attempt);
job_admin.record_retrying(&job.id, &error);
let sender = tx.clone();
let registry = state.job_registry.clone();
let job_admin = job_admin.clone();
let id = job.id.clone();
let name = job.name.clone();
let payload = job.payload;
let delay = backoff_ms.saturating_mul(2_u64.saturating_pow(job.attempt - 1));
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
registry.record_enqueue(&name);
job_admin.record_requeued(&id, job.attempt + 1);
let _ = sender
.send(QueuedJob {
id,
name,
payload,
attempt: job.attempt + 1,
max_attempts,
initial_backoff_ms: backoff_ms,
})
.await;
});
} else {
state
.job_registry
.record_failure(&job.name, error.clone(), true);
job_admin.record_failure(&job.id, error);
}
}
JobExecutionOutcome::Panicked(error) => {
tracing::error!(job = %job.name, error = %error, "local job handler panicked");
state
.job_registry
.record_failure(&job.name, error.clone(), true);
job_admin.record_failure(&job.id, error);
}
}
}
#[cfg(feature = "redis")]
#[derive(Clone)]
struct RedisClient {
connection: redis::aio::ConnectionManager,
queue_key: String,
record_prefix: String,
}
#[cfg(feature = "redis")]
fn now_unix_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |duration| {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
})
}
#[cfg(feature = "redis")]
fn redis_record_key(record_prefix: &str, id: &str) -> String {
format!("{record_prefix}{id}")
}
#[cfg(feature = "redis")]
const fn redis_retry_delay_ms(initial_backoff_ms: u64, attempt: u32) -> u64 {
initial_backoff_ms.saturating_mul(2_u64.saturating_pow(attempt.saturating_sub(1)))
}
#[cfg(feature = "redis")]
fn clear_redis_claim(record: &mut RedisJobRecord) {
record.claimed_by = None;
record.claimed_at_ms = None;
}
#[cfg(all(feature = "redis", test))]
fn claim_redis_record(
mut record: RedisJobRecord,
worker_id: &str,
now_ms: u64,
visibility_timeout_ms: u64,
) -> RedisClaimedRecord {
record.claimed_by = Some(worker_id.to_string());
record.claimed_at_ms = Some(now_ms);
RedisClaimedRecord {
record,
deadline_ms: now_ms.saturating_add(visibility_timeout_ms),
}
}
#[cfg(feature = "redis")]
fn prepare_redis_failure_action(
mut record: RedisJobRecord,
error: String,
now_ms: u64,
) -> RedisFailureAction {
clear_redis_claim(&mut record);
record.last_error = Some(error);
record.finished_at_ms = Some(now_ms);
if record.attempt < record.max_attempts {
let due_at_ms = now_ms.saturating_add(redis_retry_delay_ms(
record.initial_backoff_ms,
record.attempt,
));
record.attempt = record.attempt.saturating_add(1);
RedisFailureAction::Retry(RedisRetrySchedule { record, due_at_ms })
} else {
RedisFailureAction::DeadLetter(record)
}
}
#[cfg(feature = "redis")]
fn prepare_redis_panic_dead_letter(
mut record: RedisJobRecord,
error: String,
now_ms: u64,
) -> RedisJobRecord {
clear_redis_claim(&mut record);
record.last_error = Some(error);
record.finished_at_ms = Some(now_ms);
record
}
#[cfg(feature = "redis")]
fn recover_stale_redis_record(
mut record: RedisJobRecord,
now_ms: u64,
visibility_timeout_ms: u64,
) -> Option<RedisStaleRecovery> {
let claimed_at_ms = record.claimed_at_ms?;
if claimed_at_ms.saturating_add(visibility_timeout_ms) > now_ms {
return None;
}
let claimed_by = record
.claimed_by
.clone()
.unwrap_or_else(|| "unknown worker".to_string());
record.last_error = Some(format!(
"visibility timeout expired for claim by {claimed_by} at {claimed_at_ms}"
));
record.finished_at_ms = Some(now_ms);
clear_redis_claim(&mut record);
if record.attempt < record.max_attempts {
record.attempt = record.attempt.saturating_add(1);
Some(RedisStaleRecovery::Requeue(record))
} else {
Some(RedisStaleRecovery::DeadLetter(record))
}
}
#[cfg(feature = "redis")]
fn encode_redis_record(record: &RedisJobRecord) -> AutumnResult<String> {
serde_json::to_string(record).map_err(|e| {
AutumnError::internal_server_error(std::io::Error::other(format!(
"serialize durable job failed: {e}"
)))
})
}
#[cfg(feature = "redis")]
impl RedisClient {
async fn enqueue(
&self,
id: String,
name: &str,
payload: Value,
default_max_attempts: u32,
default_initial_backoff_ms: u64,
) -> AutumnResult<()> {
let mut connection = self.connection.clone();
let msg = RedisJobRecord {
id: id.clone(),
name: name.to_string(),
payload,
attempt: 1,
max_attempts: default_max_attempts,
initial_backoff_ms: default_initial_backoff_ms,
enqueued_at_ms: Some(now_unix_ms()),
started_at_ms: None,
finished_at_ms: None,
claimed_by: None,
claimed_at_ms: None,
last_error: None,
};
let encoded = encode_redis_record(&msg)?;
let record_key = redis_record_key(&self.record_prefix, &id);
redis::pipe()
.atomic()
.cmd("SET")
.arg(record_key)
.arg(encoded)
.ignore()
.cmd("LPUSH")
.arg(&self.queue_key)
.arg(id)
.ignore()
.query_async::<()>(&mut connection)
.await
.map_err(|e| {
AutumnError::internal_server_error(std::io::Error::other(format!(
"enqueue durable job failed: {e}"
)))
})
}
}
#[cfg(feature = "redis")]
#[derive(Clone)]
struct RedisJobAdminBackend {
connection: redis::aio::ConnectionManager,
queue_key: String,
processing_key: String,
dead_key: String,
completed_key: String,
record_prefix: String,
dead_record_prefix: String,
history_limit: usize,
}
#[cfg(feature = "redis")]
impl RedisJobAdminBackend {
#[allow(clippy::too_many_arguments)]
fn new(
connection: redis::aio::ConnectionManager,
queue_key: String,
processing_key: String,
dead_key: String,
completed_key: String,
record_prefix: String,
dead_record_prefix: String,
history_limit: usize,
) -> Self {
Self {
connection,
queue_key,
processing_key,
dead_key,
completed_key,
record_prefix,
dead_record_prefix,
history_limit: history_limit.max(1),
}
}
async fn snapshot_redis(&self, query: &JobAdminQuery) -> AutumnResult<JobAdminSnapshot> {
let mut connection = self.connection.clone();
let per_page = query.per_page.clamp(1, 100);
let now_ms = now_unix_ms();
let completed_since = now_ms.saturating_sub(86_400_000);
let failed_since = now_ms.saturating_sub(604_800_000);
let enqueued = redis_admin_active_list_page(
&mut connection,
&self.queue_key,
&self.record_prefix,
JobAdminStatus::Enqueued,
query.enqueued_page,
per_page,
)
.await?;
let running = redis_admin_running_page(
&mut connection,
&self.processing_key,
&self.record_prefix,
query.running_page,
per_page,
)
.await?;
let completed = redis_admin_encoded_list_page(
&mut connection,
&self.completed_key,
JobAdminStatus::Completed,
Some(completed_since),
query.completed_page,
per_page,
self.history_limit,
)
.await?;
let failed = redis_admin_encoded_list_page(
&mut connection,
&self.dead_key,
JobAdminStatus::Failed,
Some(failed_since),
query.failed_page,
per_page,
self.history_limit,
)
.await?;
Ok(JobAdminSnapshot {
enqueued,
running,
completed,
failed,
schedules: Vec::new(),
bounded_history_limit: self.history_limit,
})
}
async fn retry_failed_redis(&self, id: &str) -> AutumnResult<()> {
let mut connection = self.connection.clone();
let new_id = uuid::Uuid::new_v4().to_string();
let dead_record_key = format!("{}{id}", self.dead_record_prefix);
let result: i64 = redis::cmd("EVAL")
.arg(
r"
local failed = redis.call('GET', KEYS[1])
if not failed then
return 0
end
if redis.call('LREM', KEYS[2], 0, failed) == 0 then
return -1
end
local ok, record = pcall(cjson.decode, failed)
if not ok then
return -2
end
redis.call('DEL', KEYS[1])
record['id'] = ARGV[1]
record['attempt'] = 1
record['enqueued_at_ms'] = tonumber(ARGV[2])
record['started_at_ms'] = nil
record['finished_at_ms'] = nil
record['claimed_by'] = nil
record['claimed_at_ms'] = nil
record['last_error'] = nil
local active = cjson.encode(record)
redis.call('SET', KEYS[3] .. ARGV[1], active)
redis.call('LPUSH', KEYS[4], ARGV[1])
return 1
",
)
.arg(4)
.arg(dead_record_key)
.arg(&self.dead_key)
.arg(&self.record_prefix)
.arg(&self.queue_key)
.arg(new_id)
.arg(now_unix_ms())
.query_async(&mut connection)
.await
.map_err(|error| redis_admin_error("retry failed job", &error))?;
redis_admin_operation_result(result, id, "retry failed job")
}
async fn discard_failed_redis(&self, id: &str) -> AutumnResult<()> {
let mut connection = self.connection.clone();
let dead_record_key = format!("{}{id}", self.dead_record_prefix);
let result: i64 = redis::cmd("EVAL")
.arg(
r"
local failed = redis.call('GET', KEYS[1])
if not failed then
return 0
end
if redis.call('LREM', KEYS[2], 0, failed) == 0 then
return -1
end
redis.call('DEL', KEYS[1])
return 1
",
)
.arg(2)
.arg(dead_record_key)
.arg(&self.dead_key)
.query_async(&mut connection)
.await
.map_err(|error| redis_admin_error("discard failed job", &error))?;
redis_admin_operation_result(result, id, "discard failed job")
}
async fn cancel_enqueued_redis(&self, id: &str) -> AutumnResult<()> {
let mut connection = self.connection.clone();
let active_record_key = redis_record_key(&self.record_prefix, id);
let result: i64 = redis::cmd("EVAL")
.arg(
r"
if not redis.call('GET', KEYS[1]) then
return 0
end
if redis.call('LREM', KEYS[2], 0, ARGV[1]) == 0 then
return -1
end
redis.call('DEL', KEYS[1])
return 1
",
)
.arg(2)
.arg(active_record_key)
.arg(&self.queue_key)
.arg(id)
.query_async(&mut connection)
.await
.map_err(|error| redis_admin_error("cancel enqueued job", &error))?;
redis_admin_operation_result(result, id, "cancel enqueued job")
}
}
#[cfg(feature = "redis")]
impl JobAdminBackend for RedisJobAdminBackend {
fn snapshot(&self, query: JobAdminQuery) -> JobAdminFuture<'_, JobAdminSnapshot> {
Box::pin(async move { self.snapshot_redis(&query).await })
}
fn retry(&self, id: &str) -> JobAdminFuture<'_, ()> {
let id = id.to_owned();
Box::pin(async move { self.retry_failed_redis(&id).await })
}
fn discard(&self, id: &str) -> JobAdminFuture<'_, ()> {
let id = id.to_owned();
Box::pin(async move { self.discard_failed_redis(&id).await })
}
fn cancel(&self, id: &str) -> JobAdminFuture<'_, ()> {
let id = id.to_owned();
Box::pin(async move { self.cancel_enqueued_redis(&id).await })
}
}
#[cfg(feature = "redis")]
fn redis_admin_error(operation: &str, error: &redis::RedisError) -> AutumnError {
AutumnError::internal_server_error(std::io::Error::other(format!(
"redis job admin {operation} failed: {error}"
)))
}
#[cfg(feature = "redis")]
fn redis_admin_operation_result(result: i64, id: &str, operation: &str) -> AutumnResult<()> {
match result {
1 => Ok(()),
0 => Err(AutumnError::not_found_msg(format!("job '{id}' not found"))),
-1 => Err(AutumnError::bad_request_msg(format!(
"job '{id}' is not in the expected state for {operation}"
))),
-2 => Err(AutumnError::internal_server_error_msg(format!(
"job '{id}' has an invalid stored payload"
))),
_ => Err(AutumnError::internal_server_error_msg(format!(
"redis job admin {operation} returned unexpected code {result}"
))),
}
}
#[cfg(feature = "redis")]
fn redis_admin_time(ms: Option<u64>) -> Option<String> {
let ms = i64::try_from(ms?).ok()?;
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(ms).map(format_job_admin_time)
}
#[cfg(feature = "redis")]
fn redis_record_sort_time(record: &RedisJobRecord) -> u64 {
record
.finished_at_ms
.or(record.started_at_ms)
.or(record.enqueued_at_ms)
.unwrap_or_default()
}
#[cfg(feature = "redis")]
fn redis_record_to_admin_record(record: &RedisJobRecord, status: JobAdminStatus) -> JobAdminRecord {
let (principal_id, correlation_id) = job_payload_identity(&record.payload);
JobAdminRecord {
id: record.id.clone(),
name: record.name.clone(),
status,
enqueued_at: redis_admin_time(record.enqueued_at_ms),
started_at: redis_admin_time(record.started_at_ms),
finished_at: redis_admin_time(record.finished_at_ms),
attempt: record.attempt,
max_attempts: record.max_attempts,
last_error: record.last_error.clone(),
principal_id,
correlation_id,
}
}
#[cfg(feature = "redis")]
async fn redis_records_for_ids(
connection: &mut redis::aio::ConnectionManager,
record_prefix: &str,
ids: &[String],
) -> Result<Vec<RedisJobRecord>, redis::RedisError> {
if ids.is_empty() {
return Ok(Vec::new());
}
let keys: Vec<String> = ids
.iter()
.map(|id| redis_record_key(record_prefix, id))
.collect();
let bodies: Vec<Option<String>> = redis::cmd("MGET").arg(keys).query_async(connection).await?;
Ok(bodies
.into_iter()
.flatten()
.filter_map(|body| serde_json::from_str::<RedisJobRecord>(&body).ok())
.collect())
}
#[cfg(feature = "redis")]
async fn redis_admin_active_list_page(
connection: &mut redis::aio::ConnectionManager,
queue_key: &str,
record_prefix: &str,
status: JobAdminStatus,
page: u64,
per_page: u64,
) -> AutumnResult<JobAdminPage> {
let page = page.max(1);
let start = page.saturating_sub(1).saturating_mul(per_page);
let stop = start.saturating_add(per_page).saturating_sub(1);
let (ids, total): (Vec<String>, u64) = redis::pipe()
.cmd("LRANGE")
.arg(queue_key)
.arg(start)
.arg(stop)
.cmd("LLEN")
.arg(queue_key)
.query_async(connection)
.await
.map_err(|error| redis_admin_error("read enqueued page", &error))?;
let records = redis_records_for_ids(connection, record_prefix, &ids)
.await
.map_err(|error| redis_admin_error("read enqueued records", &error))?
.into_iter()
.map(|record| redis_record_to_admin_record(&record, status))
.collect();
Ok(JobAdminPage::new(records, total, page, per_page))
}
#[cfg(feature = "redis")]
async fn redis_admin_running_page(
connection: &mut redis::aio::ConnectionManager,
processing_key: &str,
record_prefix: &str,
page: u64,
per_page: u64,
) -> AutumnResult<JobAdminPage> {
let page = page.max(1);
let start = page.saturating_sub(1).saturating_mul(per_page);
let stop = start.saturating_add(per_page).saturating_sub(1);
let (ids, total): (Vec<String>, u64) = redis::pipe()
.cmd("ZREVRANGE")
.arg(processing_key)
.arg(start)
.arg(stop)
.cmd("ZCARD")
.arg(processing_key)
.query_async(connection)
.await
.map_err(|error| redis_admin_error("read running page", &error))?;
let mut records: Vec<_> = redis_records_for_ids(connection, record_prefix, &ids)
.await
.map_err(|error| redis_admin_error("read running records", &error))?
.into_iter()
.map(|record| redis_record_to_admin_record(&record, JobAdminStatus::Running))
.collect();
records.sort_by(|a, b| b.started_at.cmp(&a.started_at));
Ok(JobAdminPage::new(records, total, page, per_page))
}
#[cfg(feature = "redis")]
async fn redis_admin_encoded_list_page(
connection: &mut redis::aio::ConnectionManager,
list_key: &str,
status: JobAdminStatus,
since_ms: Option<u64>,
page: u64,
per_page: u64,
history_limit: usize,
) -> AutumnResult<JobAdminPage> {
let page = page.max(1);
let stop = isize::try_from(history_limit.saturating_sub(1)).unwrap_or(isize::MAX);
let bodies: Vec<String> = redis::cmd("LRANGE")
.arg(list_key)
.arg(0)
.arg(stop)
.query_async(connection)
.await
.map_err(|error| redis_admin_error("read completed/failed list", &error))?;
let mut records: Vec<_> = bodies
.into_iter()
.filter_map(|body| serde_json::from_str::<RedisJobRecord>(&body).ok())
.filter(|record| since_ms.is_none_or(|since| redis_record_sort_time(record) >= since))
.collect();
records.sort_by_key(redis_record_sort_time);
records.reverse();
let total = records.len() as u64;
let start =
usize::try_from(page.saturating_sub(1).saturating_mul(per_page)).unwrap_or(usize::MAX);
let take = usize::try_from(per_page).unwrap_or(usize::MAX);
let page_records = records
.into_iter()
.skip(start)
.take(take)
.map(|record| redis_record_to_admin_record(&record, status))
.collect();
Ok(JobAdminPage::new(page_records, total, page, per_page))
}
#[cfg(feature = "redis")]
fn new_redis_connection_manager(
client: &redis::Client,
label: &str,
) -> Result<redis::aio::ConnectionManager, AutumnError> {
use redis::aio::ConnectionManagerConfig;
redis::aio::ConnectionManager::new_lazy_with_config(
client.clone(),
ConnectionManagerConfig::new(),
)
.map_err(|e| {
AutumnError::internal_server_error(std::io::Error::other(format!(
"failed to create {label}: {e}"
)))
})
}
#[cfg(feature = "redis")]
async fn push_json_list_item<T: ?Sized + Serialize + Sync>(
connection: &mut redis::aio::ConnectionManager,
key: &str,
value: &T,
) {
use redis::AsyncCommands as _;
if let Ok(encoded) = serde_json::to_string(value) {
let _ = connection.lpush::<_, _, ()>(key, encoded).await;
}
}
#[cfg(feature = "redis")]
async fn claim_next_redis_job(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
) -> Result<Option<RedisJobRecord>, redis::RedisError> {
const CLAIM_SCRIPT: &str = r"
local id = redis.call('RPOP', KEYS[1])
if not id then
return nil
end
local key = KEYS[3] .. id
local body = redis.call('GET', key)
if not body then
return nil
end
local ok, record = pcall(cjson.decode, body)
if not ok then
redis.call('ZADD', KEYS[2], ARGV[3], id)
return { id, body }
end
record['claimed_by'] = ARGV[1]
record['claimed_at_ms'] = tonumber(ARGV[2])
record['started_at_ms'] = tonumber(ARGV[2])
record['finished_at_ms'] = nil
local updated = cjson.encode(record)
redis.call('SET', key, updated)
redis.call('ZADD', KEYS[2], ARGV[3], id)
return { id, updated }
";
let now_ms = now_unix_ms();
let deadline_ms = now_ms.saturating_add(worker_config.visibility_timeout_ms);
let response: Option<(String, String)> = redis::cmd("EVAL")
.arg(CLAIM_SCRIPT)
.arg(3)
.arg(&worker_config.queue_key)
.arg(&worker_config.processing_key)
.arg(&worker_config.record_prefix)
.arg(&worker_config.worker_id)
.arg(now_ms)
.arg(deadline_ms)
.query_async(connection)
.await?;
let Some((id, body)) = response else {
return Ok(None);
};
match serde_json::from_str::<RedisJobRecord>(&body) {
Ok(record) => Ok(Some(record)),
Err(error) => {
tracing::warn!(job_id = %id, error = %error, "invalid durable job record");
let malformed_id = id.clone();
let malformed = serde_json::json!({
"id": id,
"error": error.to_string(),
"raw_payload": body,
});
push_json_list_item(connection, &worker_config.dead_key, &malformed).await;
let _ = redis::cmd("ZREM")
.arg(&worker_config.processing_key)
.arg(malformed_id)
.query_async::<usize>(connection)
.await;
Ok(None)
}
}
}
#[cfg(feature = "redis")]
async fn record_enqueues_for_redis_ids(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
state: &AppState,
job_admin: &JobAdminMemoryBackend,
ids: &[String],
) -> Result<(), redis::RedisError> {
if ids.is_empty() {
return Ok(());
}
let keys: Vec<String> = ids
.iter()
.map(|id| redis_record_key(&worker_config.record_prefix, id))
.collect();
let bodies: Vec<Option<String>> = redis::cmd("MGET")
.arg(&keys)
.query_async(connection)
.await?;
for body in bodies.into_iter().flatten() {
if let Ok(mut record) = serde_json::from_str::<RedisJobRecord>(&body) {
record.enqueued_at_ms = Some(now_unix_ms());
record.started_at_ms = None;
record.finished_at_ms = None;
clear_redis_claim(&mut record);
if let Ok(encoded) = encode_redis_record(&record) {
let key = redis_record_key(&worker_config.record_prefix, &record.id);
let _ = redis::cmd("SET")
.arg(key)
.arg(encoded)
.query_async::<()>(&mut *connection)
.await;
}
state.job_registry.record_enqueue(&record.name);
job_admin.record_requeued(&record.id, record.attempt);
}
}
Ok(())
}
#[cfg(feature = "redis")]
async fn promote_due_redis_retries(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
state: &AppState,
job_admin: &JobAdminMemoryBackend,
) -> Result<(), redis::RedisError> {
const PROMOTE_SCRIPT: &str = r"
local ids = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
local promoted = {}
for _, id in ipairs(ids) do
if redis.call('ZREM', KEYS[1], id) == 1 then
redis.call('LPUSH', KEYS[2], id)
table.insert(promoted, id)
end
end
return promoted
";
let promoted: Vec<String> = redis::cmd("EVAL")
.arg(PROMOTE_SCRIPT)
.arg(2)
.arg(&worker_config.delayed_key)
.arg(&worker_config.queue_key)
.arg(now_unix_ms())
.arg(64_usize)
.query_async(connection)
.await?;
record_enqueues_for_redis_ids(connection, worker_config, state, job_admin, &promoted).await?;
Ok(())
}
#[cfg(feature = "redis")]
fn expected_claim_args(record: &RedisJobRecord) -> Option<(&str, u64)> {
Some((record.claimed_by.as_deref()?, record.claimed_at_ms?))
}
#[cfg(feature = "redis")]
const CLAIMED_REDIS_TRANSITION_SCRIPT: &str = r"
local function trim_dead_history(dead_key, dead_record_prefix, limit)
local trimmed_records = redis.call('LRANGE', dead_key, limit, -1)
for _, encoded in ipairs(trimmed_records) do
local trimmed_ok, trimmed = pcall(cjson.decode, encoded)
if trimmed_ok and trimmed['id'] then
redis.call('DEL', dead_record_prefix .. trimmed['id'])
end
end
redis.call('LTRIM', dead_key, 0, limit - 1)
end
local key = KEYS[2] .. ARGV[1]
local body = redis.call('GET', key)
if not body then
return 0
end
local ok, record = pcall(cjson.decode, body)
if not ok then
return 0
end
if record['claimed_by'] ~= ARGV[2] then
return 0
end
if record['claimed_at_ms'] ~= tonumber(ARGV[3]) then
return 0
end
redis.call('ZREM', KEYS[1], ARGV[1])
if ARGV[4] == 'success' then
redis.call('LPUSH', KEYS[5], ARGV[5])
redis.call('LTRIM', KEYS[5], 0, tonumber(ARGV[7]) - 1)
redis.call('DEL', key)
elseif ARGV[4] == 'retry' then
redis.call('SET', key, ARGV[5])
redis.call('ZADD', KEYS[3], ARGV[6], ARGV[1])
elseif ARGV[4] == 'dead' then
redis.call('LPUSH', KEYS[4], ARGV[5])
redis.call('SET', KEYS[6] .. ARGV[1], ARGV[5])
trim_dead_history(KEYS[4], KEYS[6], tonumber(ARGV[7]))
redis.call('DEL', key)
else
return 0
end
return 1
";
#[cfg(feature = "redis")]
async fn apply_claimed_redis_transition(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
expected: &RedisJobRecord,
mode: &str,
encoded_record: Option<String>,
due_at_ms: Option<u64>,
) -> Result<bool, redis::RedisError> {
let Some((claimed_by, claimed_at_ms)) = expected_claim_args(expected) else {
return Ok(false);
};
let applied: usize = redis::cmd("EVAL")
.arg(CLAIMED_REDIS_TRANSITION_SCRIPT)
.arg(6)
.arg(&worker_config.processing_key)
.arg(&worker_config.record_prefix)
.arg(&worker_config.delayed_key)
.arg(&worker_config.dead_key)
.arg(&worker_config.completed_key)
.arg(&worker_config.dead_record_prefix)
.arg(&expected.id)
.arg(claimed_by)
.arg(claimed_at_ms)
.arg(mode)
.arg(encoded_record.unwrap_or_default())
.arg(due_at_ms.unwrap_or_default())
.arg(DEFAULT_JOB_ADMIN_HISTORY_LIMIT)
.query_async(connection)
.await?;
Ok(applied == 1)
}
#[cfg(feature = "redis")]
async fn ack_redis_success(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
record: &RedisJobRecord,
) -> Result<bool, redis::RedisError> {
let mut completed = record.clone();
clear_redis_claim(&mut completed);
completed.finished_at_ms = Some(now_unix_ms());
completed.last_error = None;
let Ok(encoded) = encode_redis_record(&completed) else {
tracing::warn!(job_id = %record.id, "failed to serialize redis completed record");
return Ok(false);
};
apply_claimed_redis_transition(
connection,
worker_config,
record,
"success",
Some(encoded),
None,
)
.await
}
#[cfg(feature = "redis")]
async fn schedule_redis_retry(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
expected: &RedisJobRecord,
schedule: &RedisRetrySchedule,
) -> Result<bool, redis::RedisError> {
let Ok(encoded) = encode_redis_record(&schedule.record) else {
tracing::warn!(job_id = %schedule.record.id, "failed to serialize redis retry record");
return Ok(false);
};
apply_claimed_redis_transition(
connection,
worker_config,
expected,
"retry",
Some(encoded),
Some(schedule.due_at_ms),
)
.await
}
#[cfg(feature = "redis")]
async fn dead_letter_redis_job(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
expected: &RedisJobRecord,
record: &RedisJobRecord,
) -> Result<bool, redis::RedisError> {
let Ok(encoded) = encode_redis_record(record) else {
tracing::warn!(job_id = %record.id, "failed to serialize redis dead-letter record");
return Ok(false);
};
apply_claimed_redis_transition(
connection,
worker_config,
expected,
"dead",
Some(encoded),
None,
)
.await
}
#[cfg(feature = "redis")]
const STALE_REDIS_RECOVERY_SCRIPT: &str = r"
local function trim_dead_history(dead_key, dead_record_prefix, limit)
local trimmed_records = redis.call('LRANGE', dead_key, limit, -1)
for _, encoded in ipairs(trimmed_records) do
local trimmed_ok, trimmed = pcall(cjson.decode, encoded)
if trimmed_ok and trimmed['id'] then
redis.call('DEL', dead_record_prefix .. trimmed['id'])
end
end
redis.call('LTRIM', dead_key, 0, limit - 1)
end
local key = KEYS[2] .. ARGV[1]
local body = redis.call('GET', key)
if not body then
redis.call('ZREM', KEYS[1], ARGV[1])
return 0
end
local ok, record = pcall(cjson.decode, body)
if not ok then
redis.call('ZREM', KEYS[1], ARGV[1])
return 0
end
if record['claimed_by'] ~= ARGV[2] then
return 0
end
if record['claimed_at_ms'] ~= tonumber(ARGV[3]) then
return 0
end
redis.call('ZREM', KEYS[1], ARGV[1])
if ARGV[4] == 'requeue' then
redis.call('SET', key, ARGV[5])
redis.call('LPUSH', KEYS[3], ARGV[1])
elseif ARGV[4] == 'dead' then
redis.call('LPUSH', KEYS[4], ARGV[5])
redis.call('SET', KEYS[5] .. ARGV[1], ARGV[5])
trim_dead_history(KEYS[4], KEYS[5], tonumber(ARGV[6]))
redis.call('DEL', key)
else
return 0
end
return 1
";
#[cfg(feature = "redis")]
async fn apply_stale_redis_recovery(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
expected: &RedisJobRecord,
action: &RedisStaleRecovery,
) -> Result<bool, redis::RedisError> {
let Some((claimed_by, claimed_at_ms)) = expected_claim_args(expected) else {
return Ok(false);
};
let (mode, record) = match action {
RedisStaleRecovery::Requeue(record) => ("requeue", record),
RedisStaleRecovery::DeadLetter(record) => ("dead", record),
};
let Ok(encoded) = encode_redis_record(record) else {
tracing::warn!(job_id = %record.id, "failed to serialize stale redis record");
return Ok(false);
};
let applied: usize = redis::cmd("EVAL")
.arg(STALE_REDIS_RECOVERY_SCRIPT)
.arg(5)
.arg(&worker_config.processing_key)
.arg(&worker_config.record_prefix)
.arg(&worker_config.queue_key)
.arg(&worker_config.dead_key)
.arg(&worker_config.dead_record_prefix)
.arg(&expected.id)
.arg(claimed_by)
.arg(claimed_at_ms)
.arg(mode)
.arg(encoded)
.arg(DEFAULT_JOB_ADMIN_HISTORY_LIMIT)
.query_async(connection)
.await?;
Ok(applied == 1)
}
#[cfg(feature = "redis")]
async fn recover_stale_redis_jobs(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
state: &AppState,
job_admin: &JobAdminMemoryBackend,
) -> Result<(), redis::RedisError> {
let stale_ids: Vec<String> = redis::cmd("ZRANGEBYSCORE")
.arg(&worker_config.processing_key)
.arg("-inf")
.arg(now_unix_ms())
.arg("LIMIT")
.arg(0)
.arg(64)
.query_async(connection)
.await?;
if stale_ids.is_empty() {
return Ok(());
}
let keys: Vec<String> = stale_ids
.iter()
.map(|id| redis_record_key(&worker_config.record_prefix, id))
.collect();
let bodies: Vec<Option<String>> = redis::cmd("MGET")
.arg(&keys)
.query_async(connection)
.await?;
for (id, body) in stale_ids.into_iter().zip(bodies) {
let Some(body) = body else {
let _ = redis::cmd("ZREM")
.arg(&worker_config.processing_key)
.arg(&id)
.query_async::<usize>(connection)
.await?;
continue;
};
let Ok(record) = serde_json::from_str::<RedisJobRecord>(&body) else {
let _ = redis::cmd("ZREM")
.arg(&worker_config.processing_key)
.arg(&id)
.query_async::<usize>(connection)
.await?;
continue;
};
let Some(action) = recover_stale_redis_record(
record.clone(),
now_unix_ms(),
worker_config.visibility_timeout_ms,
) else {
continue;
};
if apply_stale_redis_recovery(connection, worker_config, &record, &action).await? {
match &action {
RedisStaleRecovery::Requeue(requeued) => {
if let Some(error) = requeued.last_error.as_deref() {
state
.job_registry
.record_retry(&requeued.name, error, record.attempt);
job_admin.record_retrying(&requeued.id, error);
}
state.job_registry.record_enqueue(&requeued.name);
job_admin.record_requeued(&requeued.id, requeued.attempt);
}
RedisStaleRecovery::DeadLetter(dead) => {
let error = dead
.last_error
.clone()
.unwrap_or_else(|| "visibility timeout expired".to_string());
state
.job_registry
.record_failure(&dead.name, error.clone(), true);
job_admin.record_failure(&dead.id, error);
}
}
}
}
Ok(())
}
#[cfg(feature = "redis")]
fn spawn_redis_worker(
client: &redis::Client,
jobs_by_name: Arc<RwLock<HashMap<String, JobInfo>>>,
state: AppState,
job_admin: JobAdminMemoryBackend,
shutdown: tokio_util::sync::CancellationToken,
worker_config: RedisWorkerConfig,
) -> Result<(), AutumnError> {
let mut connection =
new_redis_connection_manager(client, "jobs redis worker connection manager")?;
tokio::spawn(async move {
let mut retry_promotion_throttle = RedisMaintenanceThrottle::new(
std::time::Instant::now(),
worker_config.retry_promotion_interval,
);
let mut stale_recovery_throttle = RedisMaintenanceThrottle::new(
std::time::Instant::now(),
REDIS_STALE_MAINTENANCE_INTERVAL,
);
let idle_sleep = redis_worker_idle_sleep(worker_config.retry_promotion_interval);
loop {
if shutdown.is_cancelled() {
break;
}
if retry_promotion_throttle.take_due(std::time::Instant::now()) {
match promote_due_redis_retries(&mut connection, &worker_config, &state, &job_admin)
.await
{
Ok(()) => {}
Err(error) => {
tracing::warn!(error = %error, "redis job worker retry promotion failed");
}
}
}
if stale_recovery_throttle.take_due(std::time::Instant::now()) {
match recover_stale_redis_jobs(&mut connection, &worker_config, &state, &job_admin)
.await
{
Ok(()) => {}
Err(error) => {
tracing::warn!(error = %error, "redis job worker stale recovery failed");
}
}
}
let Some(record) = (match claim_next_redis_job(&mut connection, &worker_config).await {
Ok(record) => record,
Err(error) => {
tracing::warn!(error = %error, "redis job worker claim failed");
tokio::time::sleep(idle_sleep).await;
continue;
}
}) else {
tokio::time::sleep(idle_sleep).await;
continue;
};
process_redis_job_record(
&mut connection,
record,
&jobs_by_name,
&state,
&job_admin,
&worker_config,
)
.await;
}
});
Ok(())
}
#[cfg(feature = "redis")]
#[allow(clippy::cognitive_complexity)]
async fn settle_failed_redis_job(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
state: &AppState,
record: &RedisJobRecord,
error: String,
outcome: &str,
job_admin: &JobAdminMemoryBackend,
) {
let action = prepare_redis_failure_action(record.clone(), error.clone(), now_unix_ms());
match action {
RedisFailureAction::Retry(schedule) => {
match schedule_redis_retry(connection, worker_config, record, &schedule).await {
Ok(true) => {
state
.job_registry
.record_retry(&schedule.record.name, &error, record.attempt);
job_admin.record_retrying(&schedule.record.id, &error);
}
Ok(false) => tracing::warn!(
job = %record.name,
job_id = %record.id,
outcome = %outcome,
"redis job retry skipped because claim changed"
),
Err(error) => tracing::warn!(
job = %record.name,
job_id = %record.id,
outcome = %outcome,
error = %error,
"redis job retry scheduling failed"
),
}
}
RedisFailureAction::DeadLetter(dead) => {
match dead_letter_redis_job(connection, worker_config, record, &dead).await {
Ok(true) => {
state
.job_registry
.record_failure(&dead.name, error.clone(), true);
job_admin.record_failure(&dead.id, error);
}
Ok(false) => tracing::warn!(
job = %record.name,
job_id = %record.id,
outcome = %outcome,
"redis job dead-letter skipped because claim changed"
),
Err(error) => tracing::warn!(
job = %record.name,
job_id = %record.id,
outcome = %outcome,
error = %error,
"redis job dead-letter failed"
),
}
}
}
}
#[cfg(feature = "redis")]
async fn dead_letter_panicked_redis_job(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
state: &AppState,
record: &RedisJobRecord,
error: String,
job_admin: &JobAdminMemoryBackend,
) {
let dead = prepare_redis_panic_dead_letter(record.clone(), error.clone(), now_unix_ms());
match dead_letter_redis_job(connection, worker_config, record, &dead).await {
Ok(true) => {
state
.job_registry
.record_failure(&dead.name, error.clone(), true);
job_admin.record_failure(&dead.id, error);
}
Ok(false) => tracing::warn!(
job = %record.name,
job_id = %record.id,
"redis job panic dead-letter skipped because claim changed"
),
Err(error) => tracing::warn!(
job = %record.name,
job_id = %record.id,
error = %error,
"redis job panic dead-letter failed"
),
}
}
#[cfg(feature = "redis")]
async fn dead_letter_invalid_redis_job(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
state: &AppState,
record: &RedisJobRecord,
error: &str,
job_admin: &JobAdminMemoryBackend,
) {
state
.job_registry
.record_failure(&record.name, error.to_owned(), true);
job_admin.record_failure(&record.id, error.to_owned());
let mut dead = record.clone();
clear_redis_claim(&mut dead);
dead.last_error = Some(error.to_owned());
let _ = dead_letter_redis_job(connection, worker_config, record, &dead).await;
}
#[cfg(feature = "redis")]
#[allow(clippy::cognitive_complexity)]
async fn process_redis_job_record(
connection: &mut redis::aio::ConnectionManager,
mut record: RedisJobRecord,
jobs_by_name: &Arc<RwLock<HashMap<String, JobInfo>>>,
state: &AppState,
job_admin: &JobAdminMemoryBackend,
worker_config: &RedisWorkerConfig,
) {
if job_admin.try_record_start(&record.id, record.attempt) == JobAdminStartDecision::Canceled {
state.job_registry.record_cancel(&record.name);
job_admin.record_cancelled(&record.id);
let _ = ack_redis_success(connection, worker_config, &record).await;
return;
}
state.job_registry.record_start(&record.name);
let maybe_info = {
let guard = jobs_by_name.read().expect("job registry lock poisoned");
guard
.get(&record.name)
.map(|info| (info.handler, info.max_attempts, info.initial_backoff_ms))
};
let Some((handler, info_max_attempts, info_backoff_ms)) = maybe_info else {
dead_letter_invalid_redis_job(
connection,
worker_config,
state,
&record,
"unknown job type",
job_admin,
)
.await;
return;
};
let max_attempts = if record.max_attempts != 0 {
record.max_attempts
} else if info_max_attempts != 0 {
info_max_attempts
} else {
worker_config.default_attempts
};
let backoff_ms = if record.initial_backoff_ms != 0 {
record.initial_backoff_ms
} else if info_backoff_ms != 0 {
info_backoff_ms
} else {
worker_config.default_backoff
};
record.max_attempts = max_attempts;
record.initial_backoff_ms = backoff_ms;
if record.attempt == 0 {
dead_letter_invalid_redis_job(
connection,
worker_config,
state,
&record,
"invalid job payload: attempt must be >= 1",
job_admin,
)
.await;
return;
}
match run_job_handler(handler, state.clone(), record.payload.clone()).await {
JobExecutionOutcome::Succeeded => {
match ack_redis_success(connection, worker_config, &record).await {
Ok(true) => {
state.job_registry.record_success(&record.name);
job_admin.record_success(&record.id);
}
Ok(false) => tracing::warn!(
job = %record.name,
job_id = %record.id,
"redis job success ack skipped because claim changed"
),
Err(error) => tracing::warn!(
job = %record.name,
job_id = %record.id,
error = %error,
"redis job success ack failed"
),
}
}
JobExecutionOutcome::Failed(error) => {
settle_failed_redis_job(
connection,
worker_config,
state,
&record,
error,
"failed",
job_admin,
)
.await;
}
JobExecutionOutcome::Panicked(error) => {
tracing::error!(job = %record.name, error = %error, "redis job handler panicked");
dead_letter_panicked_redis_job(
connection,
worker_config,
state,
&record,
error,
job_admin,
)
.await;
}
}
}
#[cfg(feature = "redis")]
fn start_redis_runtime(
jobs: Vec<JobInfo>,
state: &AppState,
shutdown: &tokio_util::sync::CancellationToken,
config: &crate::config::JobConfig,
) -> Result<(), AutumnError> {
let job_admin = JobAdminMemoryBackend::new();
let url = config
.redis
.url
.clone()
.filter(|u| !u.trim().is_empty())
.ok_or_else(|| {
AutumnError::internal_server_error(std::io::Error::other(
"jobs.backend=redis requires jobs.redis.url",
))
})?;
let client = redis::Client::open(url).map_err(|e| {
AutumnError::internal_server_error(std::io::Error::other(format!(
"invalid jobs redis url: {e}"
)))
})?;
let producer_connection =
new_redis_connection_manager(&client, "jobs redis connection manager")?;
let admin_connection =
new_redis_connection_manager(&client, "jobs redis admin connection manager")?;
let queue_key = format!("{}:queue", config.redis.key_prefix);
let processing_key = format!("{}:processing", config.redis.key_prefix);
let delayed_key = format!("{}:delayed", config.redis.key_prefix);
let dead_key = format!("{}:dead", config.redis.key_prefix);
let completed_key = format!("{}:completed", config.redis.key_prefix);
let record_prefix = format!("{}:record:", config.redis.key_prefix);
let dead_record_prefix = format!("{}:dead-record:", config.redis.key_prefix);
if job_admin_backend(state).is_none() {
state.insert_extension(JobAdminBackendEntry(Arc::new(RedisJobAdminBackend::new(
admin_connection,
queue_key.clone(),
processing_key.clone(),
dead_key.clone(),
completed_key.clone(),
record_prefix.clone(),
dead_record_prefix.clone(),
DEFAULT_JOB_ADMIN_HISTORY_LIMIT,
))));
}
let per_job_defaults = build_per_job_defaults(&jobs);
let retry_promotion_interval = std::time::Duration::from_millis(
redis_retry_promotion_interval_ms(config.initial_backoff_ms, &jobs),
);
let jobs_by_name: Arc<RwLock<HashMap<String, JobInfo>>> = Arc::new(RwLock::new(
jobs.into_iter().map(|j| (j.name.clone(), j)).collect(),
));
{
let guard = jobs_by_name.read().expect("job registry lock poisoned");
for name in guard.keys() {
state.job_registry.register(name);
}
}
init_global_job_client(JobClient {
local_sender: None,
redis: Some(RedisClient {
connection: producer_connection,
queue_key: queue_key.clone(),
record_prefix: record_prefix.clone(),
}),
registry: state.job_registry.clone(),
job_admin: job_admin.clone(),
default_max_attempts: config.max_attempts,
default_initial_backoff_ms: config.initial_backoff_ms,
per_job_defaults,
});
let worker_count = config.workers.max(1);
for _ in 0..worker_count {
spawn_redis_worker(
&client,
Arc::clone(&jobs_by_name),
state.clone(),
job_admin.clone(),
shutdown.clone(),
RedisWorkerConfig {
queue_key: queue_key.clone(),
processing_key: processing_key.clone(),
delayed_key: delayed_key.clone(),
dead_key: dead_key.clone(),
completed_key: completed_key.clone(),
record_prefix: record_prefix.clone(),
dead_record_prefix: dead_record_prefix.clone(),
worker_id: format!("{}:{}", std::process::id(), uuid::Uuid::new_v4()),
visibility_timeout_ms: config.redis.visibility_timeout_ms,
default_attempts: config.max_attempts,
default_backoff: config.initial_backoff_ms,
retry_promotion_interval,
},
)?;
}
Ok(())
}
fn build_per_job_defaults(jobs: &[JobInfo]) -> HashMap<String, (u32, u64)> {
jobs.iter()
.map(|job| (job.name.clone(), (job.max_attempts, job.initial_backoff_ms)))
.collect()
}
fn validate_unique_job_names(jobs: &[JobInfo]) -> Result<(), String> {
let mut names = std::collections::HashSet::new();
for job in jobs {
if !names.insert(job.name.clone()) {
return Err(format!("duplicate job name '{}'", job.name));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "redis")]
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc;
use tokio::time::{Duration, timeout};
#[cfg(feature = "redis")]
static REDIS_HANDLER_CALLS: AtomicUsize = AtomicUsize::new(0);
fn always_fail_handler(
_state: AppState,
_payload: Value,
) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>> {
Box::pin(async move {
Err(AutumnError::internal_server_error(std::io::Error::other(
"forced failure",
)))
})
}
#[cfg(feature = "redis")]
fn redis_counting_success_handler(
_state: AppState,
_payload: Value,
) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>> {
Box::pin(async move {
REDIS_HANDLER_CALLS.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
#[cfg(feature = "redis")]
fn redis_counting_failure_handler(
_state: AppState,
_payload: Value,
) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>> {
Box::pin(async move {
REDIS_HANDLER_CALLS.fetch_add(1, Ordering::SeqCst);
Err(AutumnError::internal_server_error(std::io::Error::other(
"redis forced failure",
)))
})
}
fn panicking_handler(
_state: AppState,
_payload: Value,
) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>> {
Box::pin(async move {
panic!("forced panic");
})
}
fn instantly_panicking_handler(
_state: AppState,
_payload: Value,
) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>> {
panic!("panic before future")
}
#[tokio::test]
async fn job_admin_backend_lists_and_operates_failed_jobs() {
let backend = JobAdminMemoryBackend::new_for_test(32);
let enqueued_id = backend.record_enqueue_for_test(
"send_email",
serde_json::json!({
"user_id": 42,
"correlation_id": "req-123",
"subject": "Welcome"
}),
1,
5,
);
let running_id = backend.record_enqueue_for_test("reindex", serde_json::json!({}), 1, 3);
backend.record_start_for_test(&running_id, 1);
let completed_id = backend.record_enqueue_for_test("digest", serde_json::json!({}), 1, 3);
backend.record_start_for_test(&completed_id, 1);
backend.record_success_for_test(&completed_id);
let failed_id =
backend.record_enqueue_for_test("send_email", serde_json::json!({"user_id": 7}), 2, 5);
backend.record_start_for_test(&failed_id, 2);
backend.record_failure_for_test(&failed_id, "smtp refused recipient");
let snapshot = backend
.snapshot(JobAdminQuery {
enqueued_page: 1,
running_page: 1,
completed_page: 1,
failed_page: 1,
per_page: 10,
})
.await
.expect("snapshot should render");
assert_eq!(snapshot.enqueued.records[0].id, enqueued_id);
assert_eq!(
snapshot.enqueued.records[0].principal_id.as_deref(),
Some("42")
);
assert_eq!(
snapshot.enqueued.records[0].correlation_id.as_deref(),
Some("req-123")
);
assert_eq!(snapshot.running.records[0].id, running_id);
assert_eq!(snapshot.completed.records[0].id, completed_id);
assert_eq!(snapshot.failed.records[0].id, failed_id);
assert_eq!(
snapshot.failed.records[0].last_error.as_deref(),
Some("smtp refused recipient")
);
backend
.discard(&failed_id)
.await
.expect("failed job should be discardable");
backend
.cancel(&enqueued_id)
.await
.expect("enqueued job should be cancelable");
assert_eq!(
backend.try_record_start(&enqueued_id, 1),
JobAdminStartDecision::Canceled,
"canceled jobs must not race into running"
);
let snapshot = backend
.snapshot(JobAdminQuery::default())
.await
.expect("snapshot after operations");
assert!(snapshot.failed.records.is_empty());
assert!(snapshot.enqueued.records.is_empty());
}
#[tokio::test]
async fn job_admin_retry_reenqueues_failed_payload() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
let backend = JobAdminMemoryBackend::new_for_test(32);
let (tx, mut rx) = mpsc::channel(1);
init_global_job_client(JobClient {
local_sender: Some(tx),
#[cfg(feature = "redis")]
redis: None,
registry: crate::actuator::JobRegistry::new(),
job_admin: backend.clone(),
default_max_attempts: 5,
default_initial_backoff_ms: 250,
per_job_defaults: HashMap::from([("send_email".to_string(), (5, 250))]),
});
let failed_id = backend.record_enqueue_for_test(
"send_email",
serde_json::json!({
"user_id": 7,
"correlation_id": "req-retry"
}),
2,
5,
);
backend.record_start_for_test(&failed_id, 2);
backend.record_failure_for_test(&failed_id, "smtp refused recipient");
backend
.retry(&failed_id)
.await
.expect("failed job should be retried");
let queued = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("retry should enqueue promptly")
.expect("retry should enqueue a job");
assert_eq!(queued.name, "send_email");
assert_eq!(queued.attempt, 1);
assert_eq!(queued.max_attempts, 5);
assert_eq!(queued.payload["user_id"], 7);
assert_eq!(queued.payload["correlation_id"], "req-retry");
let snapshot = backend
.snapshot(JobAdminQuery::default())
.await
.expect("snapshot after retry");
assert!(snapshot.failed.records.is_empty());
assert_eq!(snapshot.enqueued.total, 1);
clear_global_job_client();
}
#[tokio::test]
async fn job_admin_retry_restores_failed_record_when_enqueue_fails() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
let backend = JobAdminMemoryBackend::new_for_test(32);
let registry = crate::actuator::JobRegistry::new();
registry.register("send_email");
let (tx, rx) = mpsc::channel(1);
drop(rx);
init_global_job_client(JobClient {
local_sender: Some(tx),
#[cfg(feature = "redis")]
redis: None,
registry: registry.clone(),
job_admin: backend.clone(),
default_max_attempts: 5,
default_initial_backoff_ms: 250,
per_job_defaults: HashMap::from([("send_email".to_string(), (5, 250))]),
});
let failed_id =
backend.record_enqueue_for_test("send_email", serde_json::json!({"user_id": 7}), 2, 5);
backend.record_start_for_test(&failed_id, 2);
backend.record_failure_for_test(&failed_id, "smtp refused recipient");
let error = backend
.retry(&failed_id)
.await
.expect_err("closed worker channel should make retry enqueue fail");
assert!(
error.to_string().contains("failed to enqueue job"),
"unexpected retry error: {error}"
);
let snapshot = backend
.snapshot(JobAdminQuery::default())
.await
.expect("snapshot after failed retry enqueue");
assert_eq!(snapshot.failed.total, 1);
assert_eq!(snapshot.failed.records[0].id, failed_id);
assert_eq!(
snapshot.failed.records[0].last_error.as_deref(),
Some("smtp refused recipient")
);
assert_eq!(snapshot.enqueued.total, 0);
let status = registry.snapshot()["send_email"].clone();
assert_eq!(status.queued, 0);
clear_global_job_client();
}
#[tokio::test]
async fn job_admin_retry_claims_failed_record_before_enqueueing() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
let backend = JobAdminMemoryBackend::new_for_test(32);
let (tx, mut rx) = mpsc::channel(2);
init_global_job_client(JobClient {
local_sender: Some(tx),
#[cfg(feature = "redis")]
redis: None,
registry: crate::actuator::JobRegistry::new(),
job_admin: backend.clone(),
default_max_attempts: 5,
default_initial_backoff_ms: 250,
per_job_defaults: HashMap::from([("send_email".to_string(), (5, 250))]),
});
let failed_id =
backend.record_enqueue_for_test("send_email", serde_json::json!({"user_id": 7}), 2, 5);
backend.record_start_for_test(&failed_id, 2);
backend.record_failure_for_test(&failed_id, "smtp refused recipient");
let (first, second) = tokio::join!(backend.retry(&failed_id), backend.retry(&failed_id));
assert!(
first.is_ok() ^ second.is_ok(),
"exactly one concurrent retry should claim the failed job"
);
let queued = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("one retry should enqueue promptly")
.expect("one retry should enqueue a job");
assert_eq!(queued.name, "send_email");
assert!(timeout(Duration::from_millis(25), rx.recv()).await.is_err());
clear_global_job_client();
}
#[tokio::test]
async fn job_admin_retry_payload_claim_is_single_use() {
let backend = JobAdminMemoryBackend::new_for_test(32);
let failed_id =
backend.record_enqueue_for_test("send_email", serde_json::json!({"user_id": 7}), 2, 5);
backend.record_start_for_test(&failed_id, 2);
backend.record_failure_for_test(&failed_id, "smtp refused recipient");
let first = backend
.retry_payload(&failed_id)
.expect("first retry claim should return the payload");
assert_eq!(first.0, "send_email");
let second = backend
.retry_payload(&failed_id)
.expect_err("second retry claim must be rejected before enqueue");
assert!(
second
.to_string()
.contains("only failed jobs can be retried"),
"unexpected second retry error: {second}"
);
}
#[tokio::test]
async fn run_job_handler_reports_immediate_panics() {
let state = AppState::for_test().with_profile("dev");
let outcome =
run_job_handler(instantly_panicking_handler, state, serde_json::json!({})).await;
assert_eq!(
outcome,
JobExecutionOutcome::Panicked("job handler panicked: panic before future".to_string())
);
}
#[tokio::test]
async fn local_enqueue_p99_is_under_5ms() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
let state = AppState::for_test().with_profile("dev");
let shutdown = tokio_util::sync::CancellationToken::new();
start_local_runtime(
vec![JobInfo {
name: "noop".to_string(),
max_attempts: 3,
initial_backoff_ms: 10,
handler: |_state, _payload| Box::pin(async move { Ok(()) }),
}],
&state,
&shutdown,
1,
5,
250,
);
let mut samples = Vec::new();
for _ in 0..300 {
let started = std::time::Instant::now();
enqueue("noop", serde_json::json!({})).await.unwrap();
samples.push(started.elapsed());
}
samples.sort_unstable();
let p99 = samples[(samples.len() * 99) / 100];
assert!(
p99 < std::time::Duration::from_millis(5),
"expected p99 enqueue latency < 5ms, got {p99:?}",
);
shutdown.cancel();
clear_global_job_client();
}
#[tokio::test]
async fn local_panicking_handler_records_terminal_failure_without_requeue() {
let state = AppState::for_test().with_profile("dev");
state.job_registry().register("panic");
state.job_registry().record_enqueue("panic");
let mut jobs = HashMap::new();
jobs.insert(
"panic".to_string(),
JobInfo {
name: "panic".to_string(),
max_attempts: 3,
initial_backoff_ms: 1,
handler: panicking_handler,
},
);
let jobs_by_name = Arc::new(RwLock::new(jobs));
let (tx, mut rx) = mpsc::channel(1);
let job_admin = JobAdminMemoryBackend::new_for_test(32);
let job_id = job_admin.record_enqueue_for_test("panic", serde_json::json!({}), 1, 3);
execute_local_job(
QueuedJob {
id: job_id,
name: "panic".to_string(),
payload: serde_json::json!({}),
attempt: 1,
max_attempts: 3,
initial_backoff_ms: 1,
},
&jobs_by_name,
&tx,
&state,
&job_admin,
)
.await;
assert!(timeout(Duration::from_millis(25), rx.recv()).await.is_err());
let snapshot = state.job_registry().snapshot();
let status = snapshot.get("panic").expect("job should be registered");
assert_eq!(status.queued, 0);
assert_eq!(status.in_flight, 0);
assert_eq!(status.total_failures, 1);
assert_eq!(status.dead_letters, 1);
assert_eq!(
status.last_error.as_deref(),
Some("job handler panicked: forced panic")
);
}
#[tokio::test]
async fn local_retry_records_enqueue_before_requeue() {
let state = AppState::for_test().with_profile("dev");
state.job_registry().register("flaky");
state.job_registry().record_enqueue("flaky");
let mut jobs = HashMap::new();
jobs.insert(
"flaky".to_string(),
JobInfo {
name: "flaky".to_string(),
max_attempts: 2,
initial_backoff_ms: 1,
handler: always_fail_handler,
},
);
let jobs_by_name = Arc::new(RwLock::new(jobs));
let (tx, mut rx) = mpsc::channel(1);
let job_admin = JobAdminMemoryBackend::new_for_test(32);
let job_id = job_admin.record_enqueue_for_test("flaky", serde_json::json!({}), 1, 2);
execute_local_job(
QueuedJob {
id: job_id.clone(),
name: "flaky".to_string(),
payload: serde_json::json!({}),
attempt: 1,
max_attempts: 2,
initial_backoff_ms: 1,
},
&jobs_by_name,
&tx,
&state,
&job_admin,
)
.await;
let retried = timeout(Duration::from_millis(100), rx.recv())
.await
.expect("retry should be scheduled")
.expect("retry payload should be sent");
assert_eq!(retried.id, job_id);
assert_eq!(retried.name, "flaky");
assert_eq!(retried.attempt, 2);
let snapshot = state.job_registry().snapshot();
let status = snapshot.get("flaky").expect("job should be registered");
assert_eq!(status.queued, 1);
assert_eq!(status.in_flight, 0);
assert_eq!(status.total_failures, 0);
assert!(status.last_error.is_some());
}
#[tokio::test]
async fn local_terminal_failure_does_not_requeue() {
let state = AppState::for_test().with_profile("dev");
state.job_registry().register("flaky");
state.job_registry().record_enqueue("flaky");
let mut jobs = HashMap::new();
jobs.insert(
"flaky".to_string(),
JobInfo {
name: "flaky".to_string(),
max_attempts: 1,
initial_backoff_ms: 1,
handler: always_fail_handler,
},
);
let jobs_by_name = Arc::new(RwLock::new(jobs));
let (tx, mut rx) = mpsc::channel(1);
let job_admin = JobAdminMemoryBackend::new_for_test(32);
let job_id = job_admin.record_enqueue_for_test("flaky", serde_json::json!({}), 1, 1);
execute_local_job(
QueuedJob {
id: job_id,
name: "flaky".to_string(),
payload: serde_json::json!({}),
attempt: 1,
max_attempts: 1,
initial_backoff_ms: 1,
},
&jobs_by_name,
&tx,
&state,
&job_admin,
)
.await;
assert!(timeout(Duration::from_millis(25), rx.recv()).await.is_err());
let snapshot = state.job_registry().snapshot();
let status = snapshot.get("flaky").expect("job should be registered");
assert_eq!(status.queued, 0);
assert_eq!(status.in_flight, 0);
assert_eq!(status.total_failures, 1);
assert_eq!(status.dead_letters, 1);
assert!(status.last_error.is_some());
}
#[tokio::test]
async fn job_admin_local_retriable_failure_is_not_operator_retryable_failed_work() {
let state = AppState::for_test().with_profile("dev");
state.job_registry().register("flaky");
state.job_registry().record_enqueue("flaky");
let mut jobs = HashMap::new();
jobs.insert(
"flaky".to_string(),
JobInfo {
name: "flaky".to_string(),
max_attempts: 2,
initial_backoff_ms: 60_000,
handler: always_fail_handler,
},
);
let jobs_by_name = Arc::new(RwLock::new(jobs));
let (tx, mut rx) = mpsc::channel(1);
let job_admin = JobAdminMemoryBackend::new_for_test(32);
let job_id = job_admin.record_enqueue_for_test("flaky", serde_json::json!({}), 1, 2);
execute_local_job(
QueuedJob {
id: job_id.clone(),
name: "flaky".to_string(),
payload: serde_json::json!({}),
attempt: 1,
max_attempts: 2,
initial_backoff_ms: 60_000,
},
&jobs_by_name,
&tx,
&state,
&job_admin,
)
.await;
let snapshot = job_admin
.snapshot(JobAdminQuery::default())
.await
.expect("job admin snapshot");
assert!(
snapshot.failed.records.is_empty(),
"automatic retries must stay out of terminal failed work"
);
let retry_error = job_admin
.retry(&job_id)
.await
.expect_err("operator retry must reject sleeping automatic retries");
assert!(
retry_error
.to_string()
.contains("only failed jobs can be retried"),
"unexpected retry error: {retry_error}"
);
assert!(timeout(Duration::from_millis(25), rx.recv()).await.is_err());
}
#[cfg(feature = "redis")]
fn redis_test_record(attempt: u32, max_attempts: u32) -> RedisJobRecord {
RedisJobRecord {
id: "job-1".to_string(),
name: "send_email".to_string(),
payload: serde_json::json!({ "user_id": 42 }),
attempt,
max_attempts,
initial_backoff_ms: 250,
enqueued_at_ms: Some(1_000),
started_at_ms: None,
finished_at_ms: None,
claimed_by: None,
claimed_at_ms: None,
last_error: None,
}
}
#[cfg(feature = "redis")]
#[test]
fn redis_claim_metadata_records_worker_and_deadline() {
let claimed = claim_redis_record(redis_test_record(1, 3), "worker-a", 10_000, 30_000);
assert_eq!(claimed.deadline_ms, 40_000);
assert_eq!(claimed.record.claimed_by.as_deref(), Some("worker-a"));
assert_eq!(claimed.record.claimed_at_ms, Some(10_000));
assert_eq!(claimed.record.attempt, 1);
}
#[cfg(feature = "redis")]
#[test]
fn redis_maintenance_throttle_runs_immediately_then_waits_for_interval() {
let start = std::time::Instant::now();
let mut throttle = RedisMaintenanceThrottle::new(start, Duration::from_secs(1));
assert!(throttle.take_due(start));
assert!(!throttle.take_due(start + Duration::from_millis(999)));
assert!(throttle.take_due(start + Duration::from_secs(1)));
}
#[cfg(feature = "redis")]
#[test]
fn redis_retry_promotion_interval_uses_smallest_configured_backoff() {
let jobs = vec![
JobInfo {
name: "slow".to_string(),
max_attempts: 3,
initial_backoff_ms: 250,
handler: redis_counting_success_handler,
},
JobInfo {
name: "fast".to_string(),
max_attempts: 3,
initial_backoff_ms: 25,
handler: redis_counting_success_handler,
},
];
assert_eq!(redis_retry_promotion_interval_ms(250, &jobs), 25);
assert_eq!(redis_retry_promotion_interval_ms(0, &[]), 1);
}
#[cfg(feature = "redis")]
#[test]
fn redis_worker_idle_sleep_is_bounded_by_retry_promotion_interval() {
assert_eq!(
redis_worker_idle_sleep(Duration::from_millis(25)),
Duration::from_millis(25)
);
assert_eq!(
redis_worker_idle_sleep(Duration::from_millis(250)),
Duration::from_millis(200)
);
}
#[cfg(feature = "redis")]
#[test]
fn redis_failed_job_schedules_next_attempt_with_exponential_backoff() {
let mut record = redis_test_record(2, 4);
record.claimed_by = Some("worker-a".to_string());
record.claimed_at_ms = Some(20_000);
let action = prepare_redis_failure_action(record, "stripe timed out".to_string(), 50_000);
let RedisFailureAction::Retry(schedule) = action else {
panic!("second attempt below max should be scheduled for retry");
};
assert_eq!(schedule.due_at_ms, 50_500);
assert_eq!(schedule.record.attempt, 3);
assert_eq!(schedule.record.claimed_by, None);
assert_eq!(schedule.record.claimed_at_ms, None);
assert_eq!(
schedule.record.last_error.as_deref(),
Some("stripe timed out")
);
}
#[cfg(feature = "redis")]
#[test]
fn redis_failed_job_dead_letters_after_max_attempts() {
let mut record = redis_test_record(3, 3);
record.claimed_by = Some("worker-a".to_string());
record.claimed_at_ms = Some(20_000);
let action = prepare_redis_failure_action(record, "permanent failure".to_string(), 50_000);
let RedisFailureAction::DeadLetter(record) = action else {
panic!("max attempt failure should dead-letter");
};
assert_eq!(record.attempt, 3);
assert_eq!(record.claimed_by, None);
assert_eq!(record.claimed_at_ms, None);
assert_eq!(record.last_error.as_deref(), Some("permanent failure"));
}
#[cfg(feature = "redis")]
#[test]
fn redis_panicking_job_dead_letters_without_retry_even_when_attempts_remain() {
let mut record = redis_test_record(1, 3);
record.claimed_by = Some("worker-a".to_string());
record.claimed_at_ms = Some(20_000);
let dead =
prepare_redis_panic_dead_letter(record, "job handler panicked".to_string(), 50_000);
assert_eq!(dead.attempt, 1);
assert_eq!(dead.max_attempts, 3);
assert_eq!(dead.claimed_by, None);
assert_eq!(dead.claimed_at_ms, None);
assert_eq!(dead.finished_at_ms, Some(50_000));
assert_eq!(dead.last_error.as_deref(), Some("job handler panicked"));
}
#[cfg(feature = "redis")]
#[test]
fn redis_stale_claim_recovery_requeues_next_attempt() {
let mut record = redis_test_record(1, 3);
record.claimed_by = Some("worker-a".to_string());
record.claimed_at_ms = Some(10_000);
let action = recover_stale_redis_record(record, 45_000, 30_000)
.expect("expired claim should be recovered");
let RedisStaleRecovery::Requeue(record) = action else {
panic!("stale nonterminal claim should requeue");
};
assert_eq!(record.attempt, 2);
assert_eq!(record.claimed_by, None);
assert_eq!(record.claimed_at_ms, None);
assert!(
record
.last_error
.as_deref()
.is_some_and(|error| error.contains("visibility timeout expired")),
"stale recovery should record a useful last_error"
);
}
#[cfg(feature = "redis")]
#[test]
fn redis_stale_claim_recovery_dead_letters_exhausted_job() {
let mut record = redis_test_record(1, 1);
record.claimed_by = Some("worker-a".to_string());
record.claimed_at_ms = Some(10_000);
let action = recover_stale_redis_record(record, 45_000, 30_000)
.expect("expired claim should be recovered");
let RedisStaleRecovery::DeadLetter(record) = action else {
panic!("stale terminal claim should dead-letter");
};
assert_eq!(record.attempt, 1);
assert_eq!(record.claimed_by, None);
assert_eq!(record.claimed_at_ms, None);
assert!(
record
.last_error
.as_deref()
.is_some_and(|error| error.contains("visibility timeout expired")),
"dead-lettered stale claims should retain the recovery reason"
);
}
#[cfg(feature = "redis")]
#[test]
fn redis_dead_letter_scripts_delete_trimmed_dead_record_metadata() {
assert!(
CLAIMED_REDIS_TRANSITION_SCRIPT
.contains("trim_dead_history(KEYS[4], KEYS[6], tonumber(ARGV[7]))"),
"claimed-job dead-letter trim should delete metadata for records beyond the history limit"
);
assert!(
STALE_REDIS_RECOVERY_SCRIPT
.contains("trim_dead_history(KEYS[4], KEYS[5], tonumber(ARGV[6]))"),
"stale-recovery dead-letter trim should delete metadata for records beyond the history limit"
);
assert!(
CLAIMED_REDIS_TRANSITION_SCRIPT
.matches("redis.call('DEL', dead_record_prefix .. trimmed['id'])")
.count()
>= 1,
"claimed-job dead-letter script should remove trimmed per-id metadata"
);
assert!(
STALE_REDIS_RECOVERY_SCRIPT
.matches("redis.call('DEL', dead_record_prefix .. trimmed['id'])")
.count()
>= 1,
"stale-recovery dead-letter script should remove trimmed per-id metadata"
);
}
#[cfg(feature = "redis")]
fn redis_test_worker_config(
prefix: &str,
worker_id: &str,
visibility_timeout_ms: u64,
) -> RedisWorkerConfig {
RedisWorkerConfig {
queue_key: format!("{prefix}:queue"),
processing_key: format!("{prefix}:processing"),
delayed_key: format!("{prefix}:delayed"),
dead_key: format!("{prefix}:dead"),
completed_key: format!("{prefix}:completed"),
record_prefix: format!("{prefix}:record:"),
dead_record_prefix: format!("{prefix}:dead-record:"),
worker_id: worker_id.to_string(),
visibility_timeout_ms,
default_attempts: 3,
default_backoff: 1,
retry_promotion_interval: Duration::from_millis(1),
}
}
#[cfg(feature = "redis")]
async fn redis_test_client() -> (
testcontainers::ContainerAsync<testcontainers_modules::redis::Redis>,
redis::Client,
) {
use testcontainers::runners::AsyncRunner as _;
use testcontainers_modules::redis::Redis as RedisImage;
let container = RedisImage::default().start().await.unwrap();
let port = container.get_host_port_ipv4(6379).await.unwrap();
let url = format!("redis://127.0.0.1:{port}");
(container, redis::Client::open(url).unwrap())
}
#[cfg(feature = "redis")]
fn redis_jobs_by_name(
handler: JobHandler,
max_attempts: u32,
) -> Arc<RwLock<HashMap<String, JobInfo>>> {
Arc::new(RwLock::new(HashMap::from([(
"send_email".to_string(),
JobInfo {
name: "send_email".to_string(),
max_attempts,
initial_backoff_ms: 1,
handler,
},
)])))
}
#[cfg(feature = "redis")]
async fn redis_enqueue_test_job(
client: &redis::Client,
worker_config: &RedisWorkerConfig,
max_attempts: u32,
) {
let connection = new_redis_connection_manager(client, "test redis producer").unwrap();
let producer = RedisClient {
connection,
queue_key: worker_config.queue_key.clone(),
record_prefix: worker_config.record_prefix.clone(),
};
producer
.enqueue(
uuid::Uuid::new_v4().to_string(),
"send_email",
serde_json::json!({ "user_id": 42 }),
max_attempts,
1,
)
.await
.unwrap();
}
#[cfg(feature = "redis")]
struct RedisAdminSeedRecords {
enqueued: RedisJobRecord,
running: RedisJobRecord,
completed: RedisJobRecord,
failed_retry: RedisJobRecord,
failed_discard: RedisJobRecord,
}
#[cfg(feature = "redis")]
fn redis_admin_test_backend(
client: &redis::Client,
worker_config: &RedisWorkerConfig,
) -> RedisJobAdminBackend {
let admin_connection = new_redis_connection_manager(client, "test redis admin").unwrap();
RedisJobAdminBackend::new(
admin_connection,
worker_config.queue_key.clone(),
worker_config.processing_key.clone(),
worker_config.dead_key.clone(),
worker_config.completed_key.clone(),
worker_config.record_prefix.clone(),
worker_config.dead_record_prefix.clone(),
128,
)
}
#[cfg(feature = "redis")]
fn redis_admin_seed_records(now: u64) -> RedisAdminSeedRecords {
RedisAdminSeedRecords {
enqueued: RedisJobRecord {
id: "job-enqueued".to_string(),
name: "send_email".to_string(),
payload: serde_json::json!({"user_id": 42, "correlation_id": "req-redis"}),
attempt: 1,
max_attempts: 5,
initial_backoff_ms: 250,
enqueued_at_ms: Some(now.saturating_sub(4_000)),
started_at_ms: None,
finished_at_ms: None,
claimed_by: None,
claimed_at_ms: None,
last_error: None,
},
running: RedisJobRecord {
id: "job-running".to_string(),
name: "reindex".to_string(),
payload: serde_json::json!({}),
attempt: 1,
max_attempts: 3,
initial_backoff_ms: 250,
enqueued_at_ms: Some(now.saturating_sub(3_000)),
started_at_ms: Some(now.saturating_sub(2_000)),
finished_at_ms: None,
claimed_by: Some("worker-a".to_string()),
claimed_at_ms: Some(now.saturating_sub(2_000)),
last_error: None,
},
completed: RedisJobRecord {
id: "job-completed".to_string(),
name: "digest".to_string(),
payload: serde_json::json!({}),
attempt: 1,
max_attempts: 3,
initial_backoff_ms: 250,
enqueued_at_ms: Some(now.saturating_sub(3_000)),
started_at_ms: Some(now.saturating_sub(2_000)),
finished_at_ms: Some(now.saturating_sub(1_000)),
claimed_by: None,
claimed_at_ms: None,
last_error: None,
},
failed_retry: RedisJobRecord {
id: "job-failed-retry".to_string(),
name: "send_email".to_string(),
payload: serde_json::json!({ "user_id": 7 }),
attempt: 5,
max_attempts: 5,
initial_backoff_ms: 250,
enqueued_at_ms: Some(now.saturating_sub(4_000)),
started_at_ms: Some(now.saturating_sub(3_000)),
finished_at_ms: Some(now.saturating_sub(500)),
claimed_by: None,
claimed_at_ms: None,
last_error: Some("smtp refused recipient".to_string()),
},
failed_discard: RedisJobRecord {
id: "job-failed-discard".to_string(),
name: "webhook".to_string(),
payload: serde_json::json!({}),
attempt: 2,
max_attempts: 2,
initial_backoff_ms: 250,
enqueued_at_ms: Some(now.saturating_sub(4_000)),
started_at_ms: Some(now.saturating_sub(3_000)),
finished_at_ms: Some(now.saturating_sub(250)),
claimed_by: None,
claimed_at_ms: None,
last_error: Some("endpoint returned 410".to_string()),
},
}
}
#[cfg(feature = "redis")]
async fn redis_store_active_admin_record(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
record: &RedisJobRecord,
now: u64,
) {
use redis::AsyncCommands as _;
connection
.set::<_, _, ()>(
redis_record_key(&worker_config.record_prefix, &record.id),
encode_redis_record(record).unwrap(),
)
.await
.unwrap();
match record.started_at_ms {
Some(_) => {
connection
.zadd::<_, _, _, ()>(
&worker_config.processing_key,
&record.id,
now.saturating_add(30_000),
)
.await
.unwrap();
}
None => {
connection
.lpush::<_, _, ()>(&worker_config.queue_key, &record.id)
.await
.unwrap();
}
}
}
#[cfg(feature = "redis")]
async fn redis_store_history_admin_record(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
record: &RedisJobRecord,
failed: bool,
) {
use redis::AsyncCommands as _;
let encoded = encode_redis_record(record).unwrap();
if failed {
connection
.lpush::<_, _, ()>(&worker_config.dead_key, &encoded)
.await
.unwrap();
connection
.set::<_, _, ()>(
format!("{}{}", worker_config.dead_record_prefix, record.id),
encoded,
)
.await
.unwrap();
} else {
connection
.lpush::<_, _, ()>(&worker_config.completed_key, encoded)
.await
.unwrap();
}
}
#[cfg(feature = "redis")]
async fn seed_redis_admin_storage(
connection: &mut redis::aio::ConnectionManager,
worker_config: &RedisWorkerConfig,
now: u64,
) -> RedisAdminSeedRecords {
let records = redis_admin_seed_records(now);
redis_store_active_admin_record(connection, worker_config, &records.enqueued, now).await;
redis_store_active_admin_record(connection, worker_config, &records.running, now).await;
redis_store_history_admin_record(connection, worker_config, &records.completed, false)
.await;
redis_store_history_admin_record(connection, worker_config, &records.failed_retry, true)
.await;
redis_store_history_admin_record(connection, worker_config, &records.failed_discard, true)
.await;
records
}
#[cfg(feature = "redis")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires Docker (testcontainers)"]
async fn redis_job_admin_dashboard_reads_cluster_storage_and_operates() {
use redis::AsyncCommands as _;
let (_container, client) = redis_test_client().await;
let worker_config = redis_test_worker_config("autumn:test:admin", "worker-a", 30_000);
let backend = redis_admin_test_backend(&client, &worker_config);
let mut connection = new_redis_connection_manager(&client, "test redis setup").unwrap();
let records =
seed_redis_admin_storage(&mut connection, &worker_config, now_unix_ms()).await;
let snapshot = backend
.snapshot(JobAdminQuery {
enqueued_page: 1,
running_page: 1,
completed_page: 1,
failed_page: 1,
per_page: 10,
})
.await
.expect("redis dashboard snapshot");
assert_eq!(snapshot.enqueued.records[0].id, records.enqueued.id);
assert_eq!(
snapshot.enqueued.records[0].correlation_id.as_deref(),
Some("req-redis")
);
assert_eq!(snapshot.running.records[0].id, records.running.id);
assert_eq!(snapshot.completed.records[0].id, records.completed.id);
assert_eq!(snapshot.failed.total, 2);
backend
.cancel(&records.enqueued.id)
.await
.expect("enqueued redis job should be cancelable");
backend
.retry(&records.failed_retry.id)
.await
.expect("failed redis job should be retryable");
backend
.discard(&records.failed_discard.id)
.await
.expect("failed redis job should be discardable");
let queue_len: usize = connection.llen(&worker_config.queue_key).await.unwrap();
let dead_len: usize = connection.llen(&worker_config.dead_key).await.unwrap();
assert_eq!(queue_len, 1, "retry should enqueue a replacement job");
assert_eq!(dead_len, 0, "retry and discard should clear failed jobs");
}
#[cfg(feature = "redis")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires Docker (testcontainers)"]
async fn redis_claim_ack_deletes_record_only_after_success() {
use redis::AsyncCommands as _;
REDIS_HANDLER_CALLS.store(0, Ordering::SeqCst);
let (_container, client) = redis_test_client().await;
let worker_config = redis_test_worker_config("autumn:test:ack", "worker-a", 30_000);
redis_enqueue_test_job(&client, &worker_config, 2).await;
let mut connection = new_redis_connection_manager(&client, "test redis worker").unwrap();
let record = claim_next_redis_job(&mut connection, &worker_config)
.await
.unwrap()
.expect("job should be claimed");
let record_key = redis_record_key(&worker_config.record_prefix, &record.id);
let processing_count: usize = connection
.zcard(&worker_config.processing_key)
.await
.unwrap();
assert_eq!(processing_count, 1);
let state = AppState::for_test().with_profile("dev");
let job_admin = JobAdminMemoryBackend::new_for_test(32);
state.job_registry().register("send_email");
state.job_registry().record_enqueue("send_email");
process_redis_job_record(
&mut connection,
record,
&redis_jobs_by_name(redis_counting_success_handler, 2),
&state,
&job_admin,
&worker_config,
)
.await;
let exists: bool = connection.exists(record_key).await.unwrap();
let processing_count: usize = connection
.zcard(&worker_config.processing_key)
.await
.unwrap();
let dead_count: usize = connection.llen(&worker_config.dead_key).await.unwrap();
assert!(!exists, "successful ack should delete the durable record");
assert_eq!(processing_count, 0);
assert_eq!(dead_count, 0);
assert_eq!(REDIS_HANDLER_CALLS.load(Ordering::SeqCst), 1);
let status = state.job_registry().snapshot()["send_email"].clone();
assert_eq!(status.queued, 0);
assert_eq!(status.in_flight, 0);
assert_eq!(status.total_successes, 1);
}
#[cfg(feature = "redis")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires Docker (testcontainers)"]
async fn redis_failure_retries_with_backoff_then_dead_letters() {
use redis::AsyncCommands as _;
REDIS_HANDLER_CALLS.store(0, Ordering::SeqCst);
let (_container, client) = redis_test_client().await;
let worker_config = redis_test_worker_config("autumn:test:retry", "worker-a", 30_000);
redis_enqueue_test_job(&client, &worker_config, 2).await;
let mut connection = new_redis_connection_manager(&client, "test redis worker").unwrap();
let state = AppState::for_test().with_profile("dev");
let job_admin = JobAdminMemoryBackend::new_for_test(32);
state.job_registry().register("send_email");
state.job_registry().record_enqueue("send_email");
let jobs = redis_jobs_by_name(redis_counting_failure_handler, 2);
let first = claim_next_redis_job(&mut connection, &worker_config)
.await
.unwrap()
.expect("first attempt should be claimed");
process_redis_job_record(
&mut connection,
first,
&jobs,
&state,
&job_admin,
&worker_config,
)
.await;
let delayed_count: usize = connection.zcard(&worker_config.delayed_key).await.unwrap();
let processing_count: usize = connection
.zcard(&worker_config.processing_key)
.await
.unwrap();
assert_eq!(delayed_count, 1);
assert_eq!(processing_count, 0);
tokio::time::sleep(Duration::from_millis(5)).await;
promote_due_redis_retries(&mut connection, &worker_config, &state, &job_admin)
.await
.unwrap();
let queued_count: usize = connection.llen(&worker_config.queue_key).await.unwrap();
assert_eq!(queued_count, 1);
let second = claim_next_redis_job(&mut connection, &worker_config)
.await
.unwrap()
.expect("retry attempt should be claimed");
assert_eq!(second.attempt, 2);
process_redis_job_record(
&mut connection,
second,
&jobs,
&state,
&job_admin,
&worker_config,
)
.await;
let dead_count: usize = connection.llen(&worker_config.dead_key).await.unwrap();
let delayed_count: usize = connection.zcard(&worker_config.delayed_key).await.unwrap();
assert_eq!(dead_count, 1);
assert_eq!(delayed_count, 0);
assert_eq!(REDIS_HANDLER_CALLS.load(Ordering::SeqCst), 2);
let status = state.job_registry().snapshot()["send_email"].clone();
assert_eq!(status.in_flight, 0);
assert_eq!(status.total_failures, 1);
assert_eq!(status.dead_letters, 1);
}
#[cfg(feature = "redis")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires Docker (testcontainers)"]
async fn redis_panicking_handler_dead_letters_without_retry() {
use redis::AsyncCommands as _;
let (_container, client) = redis_test_client().await;
let worker_config = redis_test_worker_config("autumn:test:panic", "worker-a", 30_000);
redis_enqueue_test_job(&client, &worker_config, 3).await;
let mut connection = new_redis_connection_manager(&client, "test redis worker").unwrap();
let state = AppState::for_test().with_profile("dev");
let job_admin = JobAdminMemoryBackend::new_for_test(32);
state.job_registry().register("send_email");
state.job_registry().record_enqueue("send_email");
let record = claim_next_redis_job(&mut connection, &worker_config)
.await
.unwrap()
.expect("panicking job should be claimed");
process_redis_job_record(
&mut connection,
record,
&redis_jobs_by_name(panicking_handler, 3),
&state,
&job_admin,
&worker_config,
)
.await;
let queued_count: usize = connection.llen(&worker_config.queue_key).await.unwrap();
let delayed_count: usize = connection.zcard(&worker_config.delayed_key).await.unwrap();
let processing_count: usize = connection
.zcard(&worker_config.processing_key)
.await
.unwrap();
let dead_count: usize = connection.llen(&worker_config.dead_key).await.unwrap();
assert_eq!(queued_count, 0);
assert_eq!(delayed_count, 0);
assert_eq!(processing_count, 0);
assert_eq!(dead_count, 1);
let status = state.job_registry().snapshot()["send_email"].clone();
assert_eq!(status.in_flight, 0);
assert_eq!(status.total_failures, 1);
assert_eq!(status.dead_letters, 1);
}
#[cfg(feature = "redis")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires Docker (testcontainers)"]
async fn redis_stale_claim_recovery_requeues_for_another_worker() {
use redis::AsyncCommands as _;
let (_container, client) = redis_test_client().await;
let worker_a = redis_test_worker_config("autumn:test:stale", "worker-a", 1);
let worker_b = redis_test_worker_config("autumn:test:stale", "worker-b", 30_000);
redis_enqueue_test_job(&client, &worker_a, 3).await;
let mut connection = new_redis_connection_manager(&client, "test redis worker").unwrap();
let claimed = claim_next_redis_job(&mut connection, &worker_a)
.await
.unwrap()
.expect("first worker should claim the job");
assert_eq!(claimed.claimed_by.as_deref(), Some("worker-a"));
assert_eq!(claimed.attempt, 1);
tokio::time::sleep(Duration::from_millis(5)).await;
let state = AppState::for_test().with_profile("dev");
let job_admin = JobAdminMemoryBackend::new_for_test(32);
state.job_registry().register("send_email");
recover_stale_redis_jobs(&mut connection, &worker_b, &state, &job_admin)
.await
.unwrap();
let queued_count: usize = connection.llen(&worker_b.queue_key).await.unwrap();
assert_eq!(queued_count, 1);
let reclaimed = claim_next_redis_job(&mut connection, &worker_b)
.await
.unwrap()
.expect("second worker should reclaim stale job");
assert_eq!(reclaimed.claimed_by.as_deref(), Some("worker-b"));
assert_eq!(reclaimed.attempt, 2);
assert!(
reclaimed
.last_error
.as_deref()
.is_some_and(|error| error.contains("visibility timeout expired"))
);
}
#[cfg(feature = "redis")]
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires Docker (testcontainers)"]
async fn redis_stale_terminal_failure_keeps_retry_discard_metadata() {
use redis::AsyncCommands as _;
let (_container, client) = redis_test_client().await;
let worker_a = redis_test_worker_config("autumn:test:stale-dead", "worker-a", 1);
let worker_b = redis_test_worker_config("autumn:test:stale-dead", "worker-b", 30_000);
redis_enqueue_test_job(&client, &worker_a, 1).await;
let mut connection = new_redis_connection_manager(&client, "test redis worker").unwrap();
let claimed = claim_next_redis_job(&mut connection, &worker_a)
.await
.unwrap()
.expect("first worker should claim the final attempt");
assert_eq!(claimed.claimed_by.as_deref(), Some("worker-a"));
assert_eq!(claimed.attempt, 1);
let failed_id = claimed.id.clone();
tokio::time::sleep(Duration::from_millis(5)).await;
let state = AppState::for_test().with_profile("dev");
let job_admin = JobAdminMemoryBackend::new_for_test(32);
state.job_registry().register("send_email");
recover_stale_redis_jobs(&mut connection, &worker_b, &state, &job_admin)
.await
.unwrap();
let dead_record_key = format!("{}{}", worker_b.dead_record_prefix, failed_id);
let dead_record: Option<String> = connection.get(&dead_record_key).await.unwrap();
assert!(
dead_record.is_some(),
"stale terminal failures need per-id metadata for admin actions"
);
let dead_count: usize = connection.llen(&worker_b.dead_key).await.unwrap();
assert_eq!(dead_count, 1);
let backend = redis_admin_test_backend(&client, &worker_b);
backend
.retry(&failed_id)
.await
.expect("stale terminal failure should be retryable from the dashboard");
let queued_count: usize = connection.llen(&worker_b.queue_key).await.unwrap();
let dead_count: usize = connection.llen(&worker_b.dead_key).await.unwrap();
let dead_record_exists: bool = connection.exists(&dead_record_key).await.unwrap();
assert_eq!(queued_count, 1);
assert_eq!(dead_count, 0);
assert!(!dead_record_exists);
}
#[tokio::test]
async fn enqueue_rejects_unregistered_job_name_before_queueing() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
let state = AppState::for_test().with_profile("dev");
let shutdown = tokio_util::sync::CancellationToken::new();
start_local_runtime(
vec![JobInfo {
name: "known".to_string(),
max_attempts: 3,
initial_backoff_ms: 10,
handler: |_state, _payload| Box::pin(async move { Ok(()) }),
}],
&state,
&shutdown,
1,
5,
250,
);
let error = enqueue("typoed-job", serde_json::json!({}))
.await
.expect_err("unknown job names should be rejected before queueing");
assert!(
error
.to_string()
.contains("job 'typoed-job' is not registered"),
"unexpected error: {error}"
);
let snapshot = state.job_registry().snapshot();
assert!(
!snapshot.contains_key("typoed-job"),
"unknown jobs must not be recorded as queued"
);
let known = snapshot
.get("known")
.expect("registered job should remain in the registry");
assert_eq!(known.queued, 0);
assert_eq!(known.in_flight, 0);
shutdown.cancel();
clear_global_job_client();
}
#[tokio::test]
async fn start_runtime_rejects_duplicate_job_names() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
let state = AppState::for_test().with_profile("dev");
let shutdown = tokio_util::sync::CancellationToken::new();
let error = start_runtime(
vec![
JobInfo {
name: "dupe".to_string(),
max_attempts: 1,
initial_backoff_ms: 1,
handler: |_state, _payload| Box::pin(async move { Ok(()) }),
},
JobInfo {
name: "dupe".to_string(),
max_attempts: 1,
initial_backoff_ms: 1,
handler: |_state, _payload| Box::pin(async move { Ok(()) }),
},
],
&state,
&shutdown,
&crate::config::JobConfig::default(),
)
.expect_err("duplicate job names should surface as init errors");
assert!(
error
.to_string()
.contains("invalid jobs configuration: duplicate job name 'dupe'"),
"unexpected error: {error}"
);
assert!(global_job_client().is_none());
}
#[tokio::test]
#[cfg(not(feature = "redis"))]
async fn start_runtime_rejects_redis_backend_when_feature_disabled() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
let state = AppState::for_test().with_profile("dev");
let shutdown = tokio_util::sync::CancellationToken::new();
let config = crate::config::JobConfig {
backend: "redis".to_string(),
..Default::default()
};
let error = start_runtime(
vec![JobInfo {
name: "known".to_string(),
max_attempts: 1,
initial_backoff_ms: 1,
handler: |_state, _payload| Box::pin(async move { Ok(()) }),
}],
&state,
&shutdown,
&config,
)
.expect_err("redis backend must fail without the redis feature");
assert!(
error
.to_string()
.contains("jobs.backend=redis requested but redis feature is disabled"),
"unexpected error: {error}"
);
assert!(global_job_client().is_none());
}
#[tokio::test]
#[cfg(feature = "redis")]
async fn start_runtime_rejects_redis_backend_without_url() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
let state = AppState::for_test().with_profile("dev");
let shutdown = tokio_util::sync::CancellationToken::new();
let config = crate::config::JobConfig {
backend: "redis".to_string(),
redis: crate::config::JobRedisConfig {
url: None,
..Default::default()
},
..Default::default()
};
let error = start_runtime(
vec![JobInfo {
name: "known".to_string(),
max_attempts: 1,
initial_backoff_ms: 1,
handler: |_state, _payload| Box::pin(async move { Ok(()) }),
}],
&state,
&shutdown,
&config,
)
.expect_err("redis backend must fail when its url is missing");
assert!(
error
.to_string()
.contains("jobs.backend=redis requires jobs.redis.url"),
"unexpected error: {error}"
);
assert!(global_job_client().is_none());
}
#[tokio::test]
async fn clear_global_job_client_resets_client() {
let _guard = global_job_runtime_test_lock().lock().await;
clear_global_job_client();
assert!(global_job_client().is_none());
init_global_job_client(JobClient {
local_sender: None,
#[cfg(feature = "redis")]
redis: None,
registry: crate::actuator::JobRegistry::new(),
job_admin: JobAdminMemoryBackend::new_for_test(32),
default_max_attempts: 3,
default_initial_backoff_ms: 250,
per_job_defaults: HashMap::new(),
});
assert!(global_job_client().is_some());
clear_global_job_client();
assert!(global_job_client().is_none());
}
}