use crate::config::RetryConfig;
use crate::error::{Error, Result};
use crate::events::EventsWriter;
use crate::job::{BackoffKind, BackoffSpec};
use crate::metrics::{self, MetricsSink, RetryScheduled};
use crate::redis::commands::{
RETRY_RESCHEDULE_SCRIPT, eval_retry_args, evalsha_retry_args, script_load_args,
};
use crate::redis::parse::StreamEntryId;
use bytes::Bytes;
use fred::clients::Client;
use fred::interfaces::ClientLike;
use fred::types::{ClusterHash, CustomCommand, Value};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::mpsc;
static WARNED_UNKNOWN_KIND: AtomicBool = AtomicBool::new(false);
const RETRY_REDIS_ATTEMPTS: usize = 3;
const RETRY_REDIS_BASE_MS: u64 = 50;
#[derive(Debug)]
pub(crate) struct RetryRelocate {
pub job_id: String,
pub entry_id: StreamEntryId,
pub job_bytes: Bytes,
pub run_at_ms: i64,
pub attempt: u32,
pub backoff_ms: u64,
pub name: String,
}
pub(crate) struct RetryRelocatorConfig {
pub stream_key: String,
pub delayed_key: String,
pub group: String,
pub metrics: Arc<dyn MetricsSink>,
pub events: EventsWriter,
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn enqueue(
tx: &mpsc::Sender<RetryRelocate>,
job_id: String,
entry_id: StreamEntryId,
job_bytes: Bytes,
run_at_ms: i64,
attempt: u32,
backoff_ms: u64,
name: String,
) {
if tx
.send(RetryRelocate {
job_id,
entry_id,
job_bytes,
run_at_ms,
attempt,
backoff_ms,
name,
})
.await
.is_err()
{
tracing::error!("retry relocator channel closed; retry dropped");
}
}
pub(crate) async fn run_retry_relocator(
client: Client,
cfg: RetryRelocatorConfig,
mut rx: mpsc::Receiver<RetryRelocate>,
) {
let mut sha = match load_script(&client).await {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "retry relocator: SCRIPT LOAD failed; entries will reclaim via CLAIM");
return;
}
};
while let Some(relocate) = rx.recv().await {
match reschedule_with_retry(&client, &cfg, &relocate, &mut sha).await {
Ok(true) => {
let event = RetryScheduled {
attempt: relocate.attempt,
backoff_ms: relocate.backoff_ms,
name: relocate.name.clone(),
};
let sink = &*cfg.metrics;
metrics::dispatch("retry_scheduled", move || sink.retry_scheduled(event));
if cfg.events.is_enabled() {
cfg.events
.emit_retry_scheduled(
&relocate.job_id,
&relocate.name,
relocate.attempt,
relocate.backoff_ms,
)
.await;
}
}
Ok(false) => {
tracing::trace!(entry_id = %relocate.entry_id, "retry reschedule gated: entry already removed");
}
Err(e) => {
tracing::error!(entry_id = %relocate.entry_id, error = %e, "retry reschedule failed permanently; entry remains pending and will be retried on next CLAIM tick");
}
}
}
}
async fn reschedule_with_retry(
client: &Client,
cfg: &RetryRelocatorConfig,
relocate: &RetryRelocate,
sha: &mut String,
) -> Result<bool> {
let mut last_err: Option<Error> = None;
for attempt in 0..RETRY_REDIS_ATTEMPTS {
match reschedule_once(client, cfg, relocate, sha).await {
Ok(rescheduled) => return Ok(rescheduled),
Err(e) => {
let backoff = RETRY_REDIS_BASE_MS << attempt;
tracing::warn!(entry_id = %relocate.entry_id, attempt = attempt + 1, error = %e, backoff_ms = backoff, "retry reschedule failed; retrying");
last_err = Some(e);
tokio::time::sleep(Duration::from_millis(backoff)).await;
}
}
}
Err(last_err.unwrap_or_else(|| Error::Config("retry reschedule exhausted retries".into())))
}
async fn reschedule_once(
client: &Client,
cfg: &RetryRelocatorConfig,
relocate: &RetryRelocate,
sha: &mut String,
) -> Result<bool> {
let cmd = CustomCommand::new_static("EVALSHA", ClusterHash::FirstKey, false);
let args = evalsha_retry_args(
sha,
&cfg.stream_key,
&cfg.delayed_key,
&cfg.group,
relocate.entry_id.as_ref(),
relocate.run_at_ms,
relocate.job_bytes.clone(),
);
let res: std::result::Result<Value, fred::error::Error> = client.custom(cmd, args).await;
match res {
Ok(v) => Ok(script_returned_one(&v)),
Err(e) if format!("{e}").contains("NOSCRIPT") => {
*sha = load_script(client).await?;
let cmd = CustomCommand::new_static("EVAL", ClusterHash::FirstKey, false);
let args = eval_retry_args(
RETRY_RESCHEDULE_SCRIPT,
&cfg.stream_key,
&cfg.delayed_key,
&cfg.group,
relocate.entry_id.as_ref(),
relocate.run_at_ms,
relocate.job_bytes.clone(),
);
let v: Value = client.custom(cmd, args).await.map_err(Error::Redis)?;
Ok(script_returned_one(&v))
}
Err(e) => Err(Error::Redis(e)),
}
}
fn script_returned_one(v: &Value) -> bool {
match v {
Value::Integer(n) => *n == 1,
Value::String(s) => s.as_bytes() == b"1",
Value::Bytes(b) => b.as_ref() == b"1",
_ => false,
}
}
async fn load_script(client: &Client) -> Result<String> {
let cmd = CustomCommand::new_static("SCRIPT", ClusterHash::FirstKey, false);
let res: Value = client
.custom(cmd, script_load_args(RETRY_RESCHEDULE_SCRIPT))
.await
.map_err(Error::Redis)?;
match res {
Value::String(s) => Ok(s.to_string()),
Value::Bytes(b) => std::str::from_utf8(&b)
.map(|s| s.to_string())
.map_err(|_| Error::Config("SCRIPT LOAD returned non-utf8 sha".into())),
other => Err(Error::Config(format!(
"SCRIPT LOAD returned unexpected: {other:?}"
))),
}
}
pub(crate) fn backoff_ms(attempt: u32, cfg: &RetryConfig) -> u64 {
let spec = BackoffSpec {
kind: BackoffKind::Exponential,
delay_ms: cfg.initial_backoff_ms,
max_delay_ms: Some(cfg.max_backoff_ms),
multiplier: Some(cfg.multiplier),
jitter_ms: Some(cfg.jitter_ms),
};
backoff_ms_from_spec(attempt, &spec, cfg)
}
pub(crate) fn backoff_ms_from_spec(
attempt: u32,
spec: &BackoffSpec,
fallback: &RetryConfig,
) -> u64 {
if matches!(spec.kind, BackoffKind::Unknown)
&& !WARNED_UNKNOWN_KIND.swap(true, Ordering::Relaxed)
{
tracing::warn!(
"consumer decoded BackoffKind::Unknown — likely a future-SDK variant; \
retry math degraded to exponential. Upgrade consumer to silence this warning."
);
}
let max_delay_ms = spec.max_delay_ms.unwrap_or(fallback.max_backoff_ms);
let jitter_ms = spec.jitter_ms.unwrap_or(fallback.jitter_ms);
let base = match spec.kind {
BackoffKind::Fixed => spec.delay_ms as f64,
BackoffKind::Exponential | BackoffKind::Unknown => {
let multiplier = spec.multiplier.unwrap_or(fallback.multiplier);
let exp = attempt.saturating_sub(1) as i32;
(spec.delay_ms as f64) * multiplier.powi(exp)
}
};
let capped = base.min(max_delay_ms as f64).max(0.0) as u64;
let jitter = if jitter_ms == 0 {
0
} else {
fastrand_jitter(jitter_ms)
};
capped.saturating_add(jitter)
}
fn fastrand_jitter(max: u64) -> u64 {
use std::cell::Cell;
use std::sync::atomic::{AtomicU64, Ordering};
static SEED: AtomicU64 = AtomicU64::new(0);
thread_local! {
static STATE: Cell<u64> = const { Cell::new(0) };
}
STATE.with(|s| {
let mut x = s.get();
if x == 0 {
let salt = SEED.fetch_add(0x9E3779B97F4A7C15, Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(1);
x = now ^ salt ^ 0xDEAD_BEEF_CAFE_F00D;
if x == 0 {
x = 1;
}
}
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
s.set(x);
x % max
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn backoff_monotone_until_cap() {
let cfg = RetryConfig {
initial_backoff_ms: 100,
max_backoff_ms: 1_000,
multiplier: 2.0,
jitter_ms: 0,
};
assert_eq!(backoff_ms(1, &cfg), 100);
assert_eq!(backoff_ms(2, &cfg), 200);
assert_eq!(backoff_ms(3, &cfg), 400);
assert_eq!(backoff_ms(4, &cfg), 800);
assert_eq!(backoff_ms(5, &cfg), 1_000);
assert_eq!(backoff_ms(10, &cfg), 1_000);
}
#[test]
fn backoff_jitter_within_bounds() {
let cfg = RetryConfig {
initial_backoff_ms: 100,
max_backoff_ms: 1_000,
multiplier: 2.0,
jitter_ms: 50,
};
for _ in 0..200 {
let v = backoff_ms(2, &cfg);
assert!((200..200 + 50).contains(&v), "out of range: {v}");
}
}
#[test]
fn backoff_zero_attempt_floors_to_initial() {
let cfg = RetryConfig {
initial_backoff_ms: 100,
max_backoff_ms: 1_000,
multiplier: 2.0,
jitter_ms: 0,
};
assert_eq!(backoff_ms(0, &cfg), 100);
}
#[test]
fn backoff_from_spec_exponential_uses_spec_fields() {
let fallback = RetryConfig {
initial_backoff_ms: 999,
max_backoff_ms: 100_000,
multiplier: 99.0,
jitter_ms: 0,
};
let spec = BackoffSpec {
kind: BackoffKind::Exponential,
delay_ms: 100,
max_delay_ms: Some(5_000),
multiplier: Some(3.0),
jitter_ms: Some(0),
};
assert_eq!(backoff_ms_from_spec(1, &spec, &fallback), 100);
assert_eq!(backoff_ms_from_spec(2, &spec, &fallback), 300);
assert_eq!(backoff_ms_from_spec(3, &spec, &fallback), 900);
assert_eq!(backoff_ms_from_spec(4, &spec, &fallback), 2_700);
assert_eq!(backoff_ms_from_spec(5, &spec, &fallback), 5_000);
assert_eq!(backoff_ms_from_spec(10, &spec, &fallback), 5_000);
}
#[test]
fn backoff_from_spec_fixed_ignores_multiplier() {
let fallback = RetryConfig {
initial_backoff_ms: 0,
max_backoff_ms: 100_000,
multiplier: 99.0,
jitter_ms: 0,
};
let spec = BackoffSpec {
kind: BackoffKind::Fixed,
delay_ms: 50,
max_delay_ms: None,
multiplier: Some(99.0),
jitter_ms: Some(0),
};
for attempt in 1..=10 {
assert_eq!(backoff_ms_from_spec(attempt, &spec, &fallback), 50);
}
}
#[test]
fn backoff_from_spec_unknown_kind_degrades_to_exponential() {
let fallback = RetryConfig {
initial_backoff_ms: 999,
max_backoff_ms: 100_000,
multiplier: 2.0,
jitter_ms: 0,
};
let spec = BackoffSpec {
kind: BackoffKind::Unknown,
delay_ms: 100,
max_delay_ms: None,
multiplier: None, jitter_ms: Some(0),
};
assert_eq!(backoff_ms_from_spec(1, &spec, &fallback), 100);
assert_eq!(backoff_ms_from_spec(2, &spec, &fallback), 200);
assert_eq!(backoff_ms_from_spec(3, &spec, &fallback), 400);
}
#[test]
fn backoff_from_spec_unknown_kind_is_idempotent_across_calls() {
let fallback = RetryConfig {
initial_backoff_ms: 0,
max_backoff_ms: 100_000,
multiplier: 2.0,
jitter_ms: 0,
};
let spec = BackoffSpec {
kind: BackoffKind::Unknown,
delay_ms: 100,
max_delay_ms: None,
multiplier: None,
jitter_ms: Some(0),
};
let first = backoff_ms_from_spec(2, &spec, &fallback);
let second = backoff_ms_from_spec(2, &spec, &fallback);
let third = backoff_ms_from_spec(2, &spec, &fallback);
assert_eq!(first, 200);
assert_eq!(second, 200);
assert_eq!(third, 200);
}
#[test]
fn backoff_from_spec_falls_back_to_retry_config_fields() {
let fallback = RetryConfig {
initial_backoff_ms: 0,
max_backoff_ms: 250,
multiplier: 2.0,
jitter_ms: 0,
};
let spec = BackoffSpec {
kind: BackoffKind::Exponential,
delay_ms: 100,
max_delay_ms: None, multiplier: None, jitter_ms: None, };
assert_eq!(backoff_ms_from_spec(1, &spec, &fallback), 100);
assert_eq!(backoff_ms_from_spec(2, &spec, &fallback), 200);
assert_eq!(backoff_ms_from_spec(3, &spec, &fallback), 250);
assert_eq!(backoff_ms_from_spec(4, &spec, &fallback), 250);
}
#[test]
fn backoff_from_spec_jitter_within_bounds() {
let fallback = RetryConfig {
initial_backoff_ms: 0,
max_backoff_ms: 100_000,
multiplier: 2.0,
jitter_ms: 0,
};
let spec = BackoffSpec {
kind: BackoffKind::Fixed,
delay_ms: 100,
max_delay_ms: None,
multiplier: None,
jitter_ms: Some(25),
};
for _ in 0..200 {
let v = backoff_ms_from_spec(1, &spec, &fallback);
assert!((100..100 + 25).contains(&v), "out of range: {v}");
}
}
#[test]
fn backoff_ms_delegates_to_spec() {
let cfg = RetryConfig {
initial_backoff_ms: 100,
max_backoff_ms: 1_000,
multiplier: 2.0,
jitter_ms: 0,
};
let synthesized = BackoffSpec {
kind: BackoffKind::Exponential,
delay_ms: cfg.initial_backoff_ms,
max_delay_ms: Some(cfg.max_backoff_ms),
multiplier: Some(cfg.multiplier),
jitter_ms: Some(cfg.jitter_ms),
};
for attempt in 0u32..=10 {
assert_eq!(
backoff_ms(attempt, &cfg),
backoff_ms_from_spec(attempt, &synthesized, &cfg),
"drift between backoff_ms and backoff_ms_from_spec at attempt={attempt}"
);
}
}
#[test]
fn script_returned_one_handles_value_shapes() {
assert!(script_returned_one(&Value::Integer(1)));
assert!(!script_returned_one(&Value::Integer(0)));
assert!(!script_returned_one(&Value::Integer(2)));
assert!(script_returned_one(&Value::String("1".into())));
assert!(!script_returned_one(&Value::String("0".into())));
assert!(script_returned_one(&Value::Bytes(
bytes::Bytes::from_static(b"1")
)));
assert!(!script_returned_one(&Value::Null));
}
#[tokio::test]
async fn enqueue_plumbs_job_id_onto_relocate() {
let (tx, mut rx) = mpsc::channel::<RetryRelocate>(1);
let entry_id: StreamEntryId = std::sync::Arc::from("1700000000000-0");
enqueue(
&tx,
"job-abc-123".to_string(),
entry_id.clone(),
Bytes::from_static(b"opaque"),
1_700_000_000_500,
3,
500,
"send-email".to_string(),
)
.await;
let received = rx.recv().await.expect("relocate sent");
assert_eq!(received.job_id, "job-abc-123");
assert_eq!(received.entry_id, entry_id);
assert_eq!(received.run_at_ms, 1_700_000_000_500);
assert_eq!(received.attempt, 3);
assert_eq!(received.backoff_ms, 500);
assert_eq!(received.name, "send-email");
}
}