Skip to main content

forge_jobs/runtime/
handler.rs

1//! `JobHandler` trait + `JobOutcome` + dispatch registry + `JobCtx`.
2//!
3//! Consumer crates implement [`JobHandler`] for each kind of work
4//! they want to run on the queue. Handlers are stateless from the
5//! runtime's perspective; per-job state lives in the `payload`
6//! argument or in whatever store the consumer reaches through `ctx`.
7//!
8//! `JobCtx` is the only API surface a handler sees. It carries a
9//! reference to the backend-agnostic `Storage` bundle (so handlers
10//! can enqueue follow-up jobs) and the cancellation token (so they
11//! can shut down cleanly when the supervisor signals).
12
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use tokio_util::sync::CancellationToken;
20
21use super::routing::Router;
22use crate::storage::Storage;
23use crate::storage::error::Result;
24use crate::storage::types::{EnqueueOutcome, EnqueueRequest, JobId};
25
26/// Implementations are registered in a [`HandlerRegistry`] keyed by
27/// their [`JobHandler::kind`].
28#[async_trait]
29pub trait JobHandler: Send + Sync + 'static {
30    /// Stable identifier matched against `sync_queue.kind`.
31    fn kind(&self) -> &'static str;
32
33    /// Perform the unit of work. Returning [`JobOutcome::Done`] marks
34    /// the row terminal-success; the other variants are described in
35    /// the enum doc-comments.
36    async fn run(&self, ctx: JobCtx<'_>, payload: serde_json::Value) -> JobOutcome;
37}
38
39/// What happened when a handler ran. The runtime maps this to a
40/// `FinalizeOutcome` (with backoff applied) before calling
41/// `JobQueue::finalize`.
42#[derive(Debug)]
43#[non_exhaustive]
44pub enum JobOutcome {
45    /// Success. Row → `done`.
46    Done,
47    /// Rate-limited (e.g. HTTP 429). The runtime pushes
48    /// `scheduled_at` forward by `retry_after` and returns the row to
49    /// `pending` without burning an attempt.
50    Throttled { retry_after: Duration },
51    /// Application failure. The runtime applies exponential backoff
52    /// against `max_attempts` and lands the row in `failed`/`dead`.
53    Failed(String),
54    /// Permanent application failure — retrying won't help (deleted
55    /// upstream resource, 404 / `thread_not_found` / `channel_not_found`,
56    /// payload references an entity that no longer exists). The runtime
57    /// lands the row in `dead` directly, skipping the retry budget.
58    /// Use `Failed` for transient or maybe-transient errors and `Dead`
59    /// only when the handler can prove a retry would also fail.
60    Dead(String),
61}
62
63/// Per-invocation context passed to a handler's `run`.
64///
65/// Handlers should use `ctx.enqueue(req)` to chain follow-up work —
66/// it applies the runtime's router so `req.queue_name = None` is
67/// resolved automatically.
68pub struct JobCtx<'a> {
69    /// The backend-agnostic storage bundle. Handlers usually only
70    /// need `storage.jobs` for follow-up enqueues; the other Arcs
71    /// are exposed for read-only inspection.
72    pub storage: &'a Storage,
73    /// The router used to fill in `queue_name` on enqueues.
74    pub router: &'a (dyn Router + Send + Sync),
75    /// Cluster-wide rate-limit budget. Handlers that talk to an
76    /// external API (`acquire("slack")`, `acquire("gh")`) gate
77    /// every upstream call through this so a sibling pod doesn't
78    /// independently spend the same budget.
79    pub rate_limit: &'a super::RateLimiter,
80    /// Id of the row being processed.
81    pub job_id: JobId,
82    /// Worker name (`"{queue}-{slot}-{host_id}"`).
83    pub process_id: &'a str,
84    /// Process-boot ULID so handlers can correlate logs with the
85    /// originating process across restarts.
86    pub host_id: &'a str,
87    /// Honor this if the handler can cooperatively shut down — the
88    /// supervisor signals it when the queue is paused or the process
89    /// is exiting.
90    pub cancel: CancellationToken,
91}
92
93impl std::fmt::Debug for JobCtx<'_> {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_struct("JobCtx")
96            .field("job_id", &self.job_id)
97            .field("process_id", &self.process_id)
98            .field("host_id", &self.host_id)
99            .finish_non_exhaustive()
100    }
101}
102
103/// Retry delays for handler-side writes hit by transient
104/// `is_transient_conflict()` errors (typically "database is locked"
105/// during peak backfill). Mirrors the schedule in `finalize` so the
106/// shape is one place to look. After all delays elapse the error
107/// surfaces to the caller as before.
108const ENQUEUE_RETRY_DELAYS: &[Duration] = &[
109    Duration::from_millis(100),
110    Duration::from_millis(300),
111    Duration::from_secs(1),
112];
113
114impl JobCtx<'_> {
115    /// Enqueue a follow-up job from inside a handler. The router
116    /// fills in `queue_name` when the request doesn't pin one.
117    /// Retries up to 3× on transient writer-lock conflicts so a
118    /// handler doesn't abort a long fan-out loop because of a
119    /// momentary lock race against another worker.
120    pub async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome> {
121        let mut req = req;
122        if req.queue_name.is_none() {
123            req.queue_name = Some(Cow::Borrowed(self.router.route(req.kind.as_ref())));
124        }
125        let mut attempt = 0usize;
126        loop {
127            match self.storage.jobs.enqueue(req.clone()).await {
128                Ok(v) => return Ok(v),
129                Err(e) if e.is_transient_conflict() && attempt < ENQUEUE_RETRY_DELAYS.len() => {
130                    tracing::warn!(
131                        kind = %req.kind,
132                        attempt,
133                        delay_ms = ENQUEUE_RETRY_DELAYS[attempt].as_millis(),
134                        err = %e,
135                        "ctx.enqueue: transient conflict; retrying"
136                    );
137                    tokio::time::sleep(ENQUEUE_RETRY_DELAYS[attempt]).await;
138                    attempt += 1;
139                }
140                Err(e) => return Err(e),
141            }
142        }
143    }
144
145    /// Enqueue many follow-up jobs in a single transaction. Use this
146    /// from bootstrap-style handlers that produce hundreds-to-thousands
147    /// of sub-jobs in one go — otherwise we'd take and release the
148    /// underlying writer lock N times and starve heartbeats. Same
149    /// 3× transient-conflict retry as [`Self::enqueue`].
150    pub async fn enqueue_bulk(&self, reqs: Vec<EnqueueRequest>) -> Result<Vec<EnqueueOutcome>> {
151        let routed: Vec<EnqueueRequest> = reqs
152            .into_iter()
153            .map(|mut req| {
154                if req.queue_name.is_none() {
155                    req.queue_name = Some(Cow::Borrowed(self.router.route(req.kind.as_ref())));
156                }
157                req
158            })
159            .collect();
160        let mut attempt = 0usize;
161        loop {
162            match self.storage.jobs.enqueue_bulk(routed.clone()).await {
163                Ok(v) => return Ok(v),
164                Err(e) if e.is_transient_conflict() && attempt < ENQUEUE_RETRY_DELAYS.len() => {
165                    tracing::warn!(
166                        batch_size = routed.len(),
167                        attempt,
168                        delay_ms = ENQUEUE_RETRY_DELAYS[attempt].as_millis(),
169                        err = %e,
170                        "ctx.enqueue_bulk: transient conflict; retrying"
171                    );
172                    tokio::time::sleep(ENQUEUE_RETRY_DELAYS[attempt]).await;
173                    attempt += 1;
174                }
175                Err(e) => return Err(e),
176            }
177        }
178    }
179}
180
181/// Maps `kind` → handler. `Arc<dyn JobHandler>` so each worker can
182/// hold its own reference without copying the trait object.
183#[derive(Default)]
184pub struct HandlerRegistry {
185    handlers: HashMap<&'static str, Arc<dyn JobHandler>>,
186}
187
188impl HandlerRegistry {
189    #[must_use]
190    pub fn new() -> Self {
191        Self::default()
192    }
193
194    /// Register a handler. If a handler for the same kind is already
195    /// present the new one wins — useful for tests overriding
196    /// production handlers.
197    pub fn register<H: JobHandler>(&mut self, handler: H) {
198        self.handlers.insert(handler.kind(), Arc::new(handler));
199    }
200
201    #[must_use]
202    pub fn get(&self, kind: &str) -> Option<Arc<dyn JobHandler>> {
203        self.handlers.get(kind).cloned()
204    }
205
206    /// Snapshot the registered kinds. Used by the runtime to log
207    /// "what can we run?" at boot.
208    #[must_use]
209    pub fn kinds(&self) -> Vec<&'static str> {
210        self.handlers.keys().copied().collect()
211    }
212}
213
214impl std::fmt::Debug for HandlerRegistry {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        f.debug_struct("HandlerRegistry")
217            .field("kinds", &self.kinds())
218            .finish()
219    }
220}