forge_jobs/runtime/handler.rs
1//! `JobHandler` trait + `JobOutcome` + dispatch registry + `JobCtx`.
2//!
3//! Consumer crates implement [`JobHandler`] for each kind of work
4//! they want to run on the queue. Handlers are stateless from the
5//! runtime's perspective; per-job state lives in the `payload`
6//! argument or in whatever store the consumer reaches through `ctx`.
7//!
8//! `JobCtx` is the only API surface a handler sees. It carries a
9//! reference to the backend-agnostic `Storage` bundle (so handlers
10//! can enqueue follow-up jobs) and the cancellation token (so they
11//! can shut down cleanly when the supervisor signals).
12
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use tokio_util::sync::CancellationToken;
20
21use super::routing::Router;
22use crate::storage::Storage;
23use crate::storage::error::Result;
24use crate::storage::types::{EnqueueOutcome, EnqueueRequest, JobId};
25
26/// Implementations are registered in a [`HandlerRegistry`] keyed by
27/// their [`JobHandler::kind`].
28#[async_trait]
29pub trait JobHandler: Send + Sync + 'static {
30 /// Stable identifier matched against `sync_queue.kind`.
31 fn kind(&self) -> &'static str;
32
33 /// Perform the unit of work. Returning [`JobOutcome::Done`] marks
34 /// the row terminal-success; the other variants are described in
35 /// the enum doc-comments.
36 async fn run(&self, ctx: JobCtx<'_>, payload: serde_json::Value) -> JobOutcome;
37}
38
39/// What happened when a handler ran. The runtime maps this to a
40/// `FinalizeOutcome` (with backoff applied) before calling
41/// `JobQueue::finalize`.
42#[derive(Debug)]
43#[non_exhaustive]
44pub enum JobOutcome {
45 /// Success. Row → `done`.
46 Done,
47 /// Rate-limited (e.g. HTTP 429). The runtime pushes
48 /// `scheduled_at` forward by `retry_after` and returns the row to
49 /// `pending` without burning an attempt.
50 Throttled { retry_after: Duration },
51 /// Application failure. The runtime applies exponential backoff
52 /// against `max_attempts` and lands the row in `failed`/`dead`.
53 Failed(String),
54 /// Permanent application failure — retrying won't help (deleted
55 /// upstream resource, 404 / `thread_not_found` / `channel_not_found`,
56 /// payload references an entity that no longer exists). The runtime
57 /// lands the row in `dead` directly, skipping the retry budget.
58 /// Use `Failed` for transient or maybe-transient errors and `Dead`
59 /// only when the handler can prove a retry would also fail.
60 Dead(String),
61}
62
63/// Per-invocation context passed to a handler's `run`.
64///
65/// Handlers should use `ctx.enqueue(req)` to chain follow-up work —
66/// it applies the runtime's router so `req.queue_name = None` is
67/// resolved automatically.
68pub struct JobCtx<'a> {
69 /// The backend-agnostic storage bundle. Handlers usually only
70 /// need `storage.jobs` for follow-up enqueues; the other Arcs
71 /// are exposed for read-only inspection.
72 pub storage: &'a Storage,
73 /// The router used to fill in `queue_name` on enqueues.
74 pub router: &'a (dyn Router + Send + Sync),
75 /// Cluster-wide rate-limit budget. Handlers that talk to an
76 /// external API (`acquire("slack")`, `acquire("gh")`) gate
77 /// every upstream call through this so a sibling pod doesn't
78 /// independently spend the same budget.
79 pub rate_limit: &'a super::RateLimiter,
80 /// Id of the row being processed.
81 pub job_id: JobId,
82 /// Worker name (`"{queue}-{slot}-{host_id}"`).
83 pub process_id: &'a str,
84 /// Process-boot ULID so handlers can correlate logs with the
85 /// originating process across restarts.
86 pub host_id: &'a str,
87 /// Honor this if the handler can cooperatively shut down — the
88 /// supervisor signals it when the queue is paused or the process
89 /// is exiting.
90 pub cancel: CancellationToken,
91}
92
93impl std::fmt::Debug for JobCtx<'_> {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct("JobCtx")
96 .field("job_id", &self.job_id)
97 .field("process_id", &self.process_id)
98 .field("host_id", &self.host_id)
99 .finish_non_exhaustive()
100 }
101}
102
103/// Retry delays for handler-side writes hit by transient
104/// `is_transient_conflict()` errors (typically "database is locked"
105/// during peak backfill). Mirrors the schedule in `finalize` so the
106/// shape is one place to look. After all delays elapse the error
107/// surfaces to the caller as before.
108const ENQUEUE_RETRY_DELAYS: &[Duration] = &[
109 Duration::from_millis(100),
110 Duration::from_millis(300),
111 Duration::from_secs(1),
112];
113
114impl JobCtx<'_> {
115 /// Enqueue a follow-up job from inside a handler. The router
116 /// fills in `queue_name` when the request doesn't pin one.
117 /// Retries up to 3× on transient writer-lock conflicts so a
118 /// handler doesn't abort a long fan-out loop because of a
119 /// momentary lock race against another worker.
120 pub async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome> {
121 let mut req = req;
122 if req.queue_name.is_none() {
123 req.queue_name = Some(Cow::Borrowed(self.router.route(req.kind.as_ref())));
124 }
125 let mut attempt = 0usize;
126 loop {
127 match self.storage.jobs.enqueue(req.clone()).await {
128 Ok(v) => return Ok(v),
129 Err(e) if e.is_transient_conflict() && attempt < ENQUEUE_RETRY_DELAYS.len() => {
130 tracing::warn!(
131 kind = %req.kind,
132 attempt,
133 delay_ms = ENQUEUE_RETRY_DELAYS[attempt].as_millis(),
134 err = %e,
135 "ctx.enqueue: transient conflict; retrying"
136 );
137 tokio::time::sleep(ENQUEUE_RETRY_DELAYS[attempt]).await;
138 attempt += 1;
139 }
140 Err(e) => return Err(e),
141 }
142 }
143 }
144
145 /// Enqueue many follow-up jobs in a single transaction. Use this
146 /// from bootstrap-style handlers that produce hundreds-to-thousands
147 /// of sub-jobs in one go — otherwise we'd take and release the
148 /// underlying writer lock N times and starve heartbeats. Same
149 /// 3× transient-conflict retry as [`Self::enqueue`].
150 pub async fn enqueue_bulk(&self, reqs: Vec<EnqueueRequest>) -> Result<Vec<EnqueueOutcome>> {
151 let routed: Vec<EnqueueRequest> = reqs
152 .into_iter()
153 .map(|mut req| {
154 if req.queue_name.is_none() {
155 req.queue_name = Some(Cow::Borrowed(self.router.route(req.kind.as_ref())));
156 }
157 req
158 })
159 .collect();
160 let mut attempt = 0usize;
161 loop {
162 match self.storage.jobs.enqueue_bulk(routed.clone()).await {
163 Ok(v) => return Ok(v),
164 Err(e) if e.is_transient_conflict() && attempt < ENQUEUE_RETRY_DELAYS.len() => {
165 tracing::warn!(
166 batch_size = routed.len(),
167 attempt,
168 delay_ms = ENQUEUE_RETRY_DELAYS[attempt].as_millis(),
169 err = %e,
170 "ctx.enqueue_bulk: transient conflict; retrying"
171 );
172 tokio::time::sleep(ENQUEUE_RETRY_DELAYS[attempt]).await;
173 attempt += 1;
174 }
175 Err(e) => return Err(e),
176 }
177 }
178 }
179}
180
181/// Maps `kind` → handler. `Arc<dyn JobHandler>` so each worker can
182/// hold its own reference without copying the trait object.
183#[derive(Default)]
184pub struct HandlerRegistry {
185 handlers: HashMap<&'static str, Arc<dyn JobHandler>>,
186}
187
188impl HandlerRegistry {
189 #[must_use]
190 pub fn new() -> Self {
191 Self::default()
192 }
193
194 /// Register a handler. If a handler for the same kind is already
195 /// present the new one wins — useful for tests overriding
196 /// production handlers.
197 pub fn register<H: JobHandler>(&mut self, handler: H) {
198 self.handlers.insert(handler.kind(), Arc::new(handler));
199 }
200
201 #[must_use]
202 pub fn get(&self, kind: &str) -> Option<Arc<dyn JobHandler>> {
203 self.handlers.get(kind).cloned()
204 }
205
206 /// Snapshot the registered kinds. Used by the runtime to log
207 /// "what can we run?" at boot.
208 #[must_use]
209 pub fn kinds(&self) -> Vec<&'static str> {
210 self.handlers.keys().copied().collect()
211 }
212}
213
214impl std::fmt::Debug for HandlerRegistry {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 f.debug_struct("HandlerRegistry")
217 .field("kinds", &self.kinds())
218 .finish()
219 }
220}