1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
use crate::ack::JobOk;
use crate::config::RetryConfig;
use crate::consumer::dlq::{self, DlqReason, DlqRelocate};
use crate::consumer::retry::{self, RetryRelocate};
use crate::error::HandlerError;
use crate::events::EventsWriter;
use crate::job::{Job, now_ms};
use crate::metrics::{self, JobOutcome, JobOutcomeKind, MetricsSink};
use crate::redis::delayed_member::encode_delayed_member;
use crate::redis::parse::StreamEntryId;
use bytes::Bytes;
use futures_util::FutureExt;
use serde::Serialize;
use std::future::Future;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
pub(crate) struct DispatchedJob<T> {
pub entry_id: StreamEntryId,
pub job: Job<T>,
}
pub(crate) struct WorkerPool {
set: JoinSet<()>,
}
pub(crate) struct WorkerWiring {
pub ack_tx: mpsc::Sender<StreamEntryId>,
pub retry_tx: mpsc::Sender<RetryRelocate>,
pub dlq_tx: mpsc::Sender<DlqRelocate>,
pub max_attempts: u32,
pub retry_cfg: RetryConfig,
pub metrics: Arc<dyn MetricsSink>,
pub events: EventsWriter,
/// When `true` and the handler returned a non-empty `Bytes`, route the
/// entry through `ok_result_tx` (per-entry `JOB_OK_SCRIPT` — XACKDEL +
/// SET result_key), otherwise stay on the batched `ack_tx` fast path.
pub store_results: bool,
pub result_ttl_secs: u64,
/// `Some` when the consumer was configured with `store_results=true` and
/// `run_ok_result_writer` was spawned. `None` is the zero-overhead path:
/// the worker's match unconditionally takes the existing batched ack
/// branch.
pub ok_result_tx: Option<mpsc::Sender<JobOk>>,
}
pub(crate) fn spawn_workers<T, H, Fut>(
concurrency: usize,
handler: H,
job_rx: async_channel::Receiver<DispatchedJob<T>>,
wiring: WorkerWiring,
) -> WorkerPool
where
T: Serialize + Clone + Send + 'static,
H: Fn(Job<T>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = std::result::Result<Bytes, HandlerError>> + Send + 'static,
{
let handler = Arc::new(handler);
let wiring = Arc::new(wiring);
let mut set: JoinSet<()> = JoinSet::new();
for _ in 0..concurrency {
let handler = handler.clone();
let rx = job_rx.clone();
let wiring = wiring.clone();
set.spawn(async move {
while let Ok(dispatched) = rx.recv().await {
let entry_id = dispatched.entry_id.clone();
let job_id = dispatched.job.id.clone();
let job_name = dispatched.job.name.clone();
// 1-indexed attempt number that's *about* to run. `Job::attempt`
// is 0-indexed ("0 runs have happened yet"), so the run we're
// about to execute is number `Job::attempt + 1`. Used in both
// the JobOutcome event (where it's "the run that just executed"
// by the time the event fires) and human-readable log lines.
let attempt_index = dispatched.job.attempt.saturating_add(1);
let job_for_retry = dispatched.job.clone();
// Emit `active` *before* the handler runs so subscribers can
// build a "currently running" view that's correct even for
// handlers that take a long time. Best-effort by design — a
// failing XADD must not delay the handler.
wiring
.events
.emit_active(&job_id, &job_name, attempt_index)
.await;
let started = Instant::now();
let outcome = std::panic::AssertUnwindSafe(handler(dispatched.job))
.catch_unwind()
.await;
// Microseconds, not millis: most handlers complete well under
// 1ms — recording in ms makes every fast handler look like 0
// and the histogram is useless. u128 → u64 saturates at ~584
// millennia, well past any sane handler duration.
let duration_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
let kind = match &outcome {
Ok(Ok(_)) => JobOutcomeKind::Ok,
Ok(Err(_)) => JobOutcomeKind::Err,
Err(_) => JobOutcomeKind::Panic,
};
let event = JobOutcome {
kind,
attempt: attempt_index,
handler_duration_us: duration_us,
name: job_name.clone(),
};
let sink = &*wiring.metrics;
metrics::dispatch("job_outcome", move || sink.job_outcome(event));
match outcome {
Ok(Ok(result_bytes)) => {
wiring
.events
.emit_completed(&job_id, &job_name, attempt_index, duration_us)
.await;
// Route through the result-writer only when the
// worker opted in *and* the handler returned a
// non-empty payload. The `JOB_OK_SCRIPT` itself
// also gates the SET on a non-empty payload, so
// this Rust-side check just keeps the per-entry
// EVALSHA off the fast path for empty results.
if wiring.store_results
&& !result_bytes.is_empty()
&& let Some(tx) = wiring.ok_result_tx.as_ref()
{
let item = JobOk {
entry_id,
job_id: job_id.clone(),
result_bytes,
ttl_secs: wiring.result_ttl_secs,
};
if tx.send(item).await.is_err() {
break;
}
} else if wiring.ack_tx.send(entry_id).await.is_err() {
break;
}
}
Ok(Err(e)) => {
// Use the inner source error for the on-the-wire `reason`
// — `format!("{e}")` would prepend the engine's
// `"handler: "` Display prefix, which is implementation
// detail that consumers of `failed` / `dlq` events
// shouldn't have to strip. The full HandlerError
// (including the prefix) is still surfaced via the
// `tracing::warn!` below for operator logs.
let reason = format!("{}", e.source_err());
let unrecoverable = e.is_unrecoverable();
tracing::warn!(job_id = %job_id, error = %e, attempt = attempt_index, unrecoverable, "handler returned Err");
wiring
.events
.emit_failed(
&job_id,
&job_name,
attempt_index,
&reason,
Some(duration_us),
)
.await;
on_handler_failure(&wiring, entry_id, job_for_retry, unrecoverable).await;
}
Err(_panic) => {
tracing::warn!(job_id = %job_id, attempt = attempt_index, "handler panicked");
wiring
.events
.emit_failed(
&job_id,
&job_name,
attempt_index,
"handler panicked",
Some(duration_us),
)
.await;
// Panics are treated as recoverable: the handler may have
// panicked on transient state (e.g. a poisoned RwLock that
// recovers on retry). Unrecoverable is a deliberate signal
// from the handler; a panic isn't deliberate.
on_handler_failure(&wiring, entry_id, job_for_retry, false).await;
}
}
}
});
}
WorkerPool { set }
}
async fn on_handler_failure<T: Serialize + Send + 'static>(
wiring: &WorkerWiring,
entry_id: StreamEntryId,
mut job: Job<T>,
unrecoverable: bool,
) {
let next_attempt = job.attempt.saturating_add(1);
let encoded = match rmp_serde::to_vec(&job) {
Ok(b) => Bytes::from(b),
Err(e) => {
tracing::error!(job_id = %job.id, error = %e, "retry: re-encode failed; entry will reclaim via CLAIM");
return;
}
};
// 1-indexed run count for metric events. `Job::attempt` is 0-indexed
// ("0 runs have happened yet"), so the run that just executed is
// `job.attempt + 1`, and the next run will be `job.attempt + 2`.
// `next_attempt` above happens to equal "current 1-indexed run count"
// numerically, but that's a coincidence — derive metric values from
// run-counts explicitly so the meaning is unambiguous at the call site.
let just_ran = job.attempt.saturating_add(1);
let will_run_next = just_ran.saturating_add(1);
// Unrecoverable short-circuits the retry budget entirely: the handler
// signalled "this failure is terminal — do not retry." Route straight
// to the DLQ with `Unrecoverable` so the reason label is preserved
// through the metrics + events pipeline. Carry the attempt that just
// gave up exactly like the `RetriesExhausted` path does.
if unrecoverable {
dlq::enqueue(
&wiring.dlq_tx,
job.id.clone(),
entry_id,
encoded,
DlqReason::Unrecoverable,
just_ran,
job.name.clone(),
)
.await;
return;
}
// Per-job override: `Job::retry.max_attempts` wins over the queue-wide
// `WorkerWiring::max_attempts` when set. None → fall back.
let max_attempts = job
.retry
.as_ref()
.and_then(|r| r.max_attempts)
.unwrap_or(wiring.max_attempts);
if next_attempt >= max_attempts {
dlq::enqueue(
&wiring.dlq_tx,
job.id.clone(),
entry_id,
encoded,
DlqReason::RetriesExhausted,
just_ran,
job.name.clone(),
)
.await;
return;
}
// Per-job override: `Job::retry.backoff` wins over the queue-wide
// `WorkerWiring::retry_cfg` when set. None → fall back to the
// existing exponential math driven by `RetryConfig`.
let backoff = match job.retry.as_ref().and_then(|r| r.backoff.as_ref()) {
Some(spec) => retry::backoff_ms_from_spec(next_attempt, spec, &wiring.retry_cfg),
None => retry::backoff_ms(next_attempt, &wiring.retry_cfg),
};
job.attempt = next_attempt;
let payload_bytes = match rmp_serde::to_vec(&job) {
Ok(b) => b,
Err(e) => {
tracing::error!(job_id = %job.id, error = %e, "retry: re-encode failed; entry will reclaim via CLAIM");
return;
}
};
// Wrap with the slice-3 length-prefixed delayed-ZSET member so the
// promoter re-emits `n` on the stream entry when the retry fires.
// `Job::name` is `#[serde(skip)]` so it's not in `payload_bytes` — it
// travels via the prefix only.
let encoded_with_bumped = encode_delayed_member(&job.name, &payload_bytes);
let run_at = now_ms().saturating_add(backoff);
let run_at_i64 = i64::try_from(run_at).unwrap_or(i64::MAX);
retry::enqueue(
&wiring.retry_tx,
job.id.clone(),
entry_id,
encoded_with_bumped,
run_at_i64,
will_run_next,
backoff,
job.name.clone(),
)
.await;
}
pub(crate) async fn drain_workers(mut pool: WorkerPool, deadline: std::time::Duration) {
let drain = async { while pool.set.join_next().await.is_some() {} };
if tokio::time::timeout(deadline, drain).await.is_err() {
tracing::warn!(
?deadline,
"worker drain hit deadline; aborting in-flight tasks"
);
pool.set.abort_all();
while pool.set.join_next().await.is_some() {}
}
}