forge_jobs/runtime.rs
1//! `QueueRuntime` — supervisors + workers + reaper + cleanup + cron.
2//!
3//! Everything in this module talks to the storage layer through the
4//! four trait Arcs bundled in [`crate::storage::Storage`]. Swapping
5//! from `SQLite` to Redis (or anything else) only requires a new impl
6//! of the four traits — nothing in this file changes.
7//!
8//! On `start()` the runtime spawns:
9//! - one supervisor per registered queue (scales workers to match
10//! `max_workers` / `paused`)
11//! - one reaper task (revives stuck jobs, sweeps stale workers)
12//! - one cleanup task (purges aged `done` / `dead` rows per
13//! retention)
14//! - one cron task (fires recurring schedules)
15//!
16//! Worker loop: claim → run → finalize → `wait_for_work` → repeat.
17//! The `wait_for_work` primitive is the storage layer's job — `SQLite`
18//! uses an in-process `Notify`; Redis would use BLPOP; Postgres
19//! would use LISTEN/NOTIFY.
20
21mod cmd_exec;
22mod cron;
23mod demo;
24mod handler;
25mod metrics;
26mod rate_limit;
27mod rebalance;
28mod retry;
29mod routing;
30mod worker_pool;
31
32pub use cmd_exec::{CMD_EXEC_KIND, CmdExecHandler, CmdExecPayload};
33pub use cron::{CRON_TICK, CronTickReport, cron_tick_once, ensure_schedules};
34pub use demo::{NOOP_ECHO_KIND, NoopEcho};
35pub use handler::{HandlerRegistry, JobCtx, JobHandler, JobOutcome};
36pub use metrics::{METRICS_BUCKET_SECS, METRICS_TICK, metrics_roll_once};
37pub use rate_limit::{
38 AcquireOutcome, DEFAULT_RATE_LIMIT_SCOPES, RateLimiter, ensure_default_rate_limits,
39};
40pub use rebalance::{REBALANCE_TICK, rebalance_once};
41pub(crate) use retry::{THROTTLE_DECAY_GRACE_SECS, failed_delay};
42pub use routing::{DefaultRouter, KindPrefixRouter, Router};
43pub use worker_pool::{WorkerPoolConfig, WorkerPoolHandler};
44
45use std::collections::HashMap;
46use std::sync::{Arc, Mutex};
47use std::time::Duration;
48
49use chrono::Duration as ChronoDuration;
50use chrono::Utc;
51use tokio::task::{JoinHandle, JoinSet};
52use tokio_util::sync::CancellationToken;
53use ulid::Ulid;
54
55use crate::storage::HeartbeatStatus;
56use crate::storage::Storage;
57use crate::storage::error::Result;
58use crate::storage::types::{
59 EnqueueOutcome, EnqueueRequest, FinalizeOutcome, JobId, JobRecord, JobStatus,
60};
61
62/// Default worker counts per named queue.
63///
64/// Hosts iterate this at boot after `QueueRuntime::new` and call
65/// `ensure_queue(name, n)` for each. `ensure_queue` won't overwrite an
66/// existing row's `max_workers`, so these are only used for a freshly-
67/// created queue config row; user-tuned values from the Mission Control
68/// panel persist.
69///
70/// The `gh` and `slack` queues get > 1 worker so an initial backfill
71/// drains in parallel out of the box; `default` stays at 1 because
72/// most catch-all kinds are I/O-bound rather than CPU-bound.
73pub const DEFAULT_QUEUE_WORKERS: &[(&str, i32)] = &[("default", 1), ("gh", 3), ("slack", 2)];
74
75/// Env var naming the comma-separated queues a worker consumes.
76pub const QUEUES_ENV: &str = "FORGE_QUEUES";
77/// Env var giving a worker its monitoring-label *prefix*. The full label
78/// is composed as `{prefix}-worker-{version}-{id}` (see
79/// [`QueueRuntime::with_worker_prefix`]).
80pub const WORKER_PREFIX_ENV: &str = "FORGE_WORKER_PREFIX";
81/// Env var giving the build version used in the composed worker label
82/// (see [`QueueRuntime::with_worker_version`]). Typically the build's git
83/// commit, injected by CI/deploy.
84pub const WORKER_VERSION_ENV: &str = "FORGE_WORKER_VERSION";
85/// Env var overriding the autogenerated id segment of the composed worker
86/// label (see [`QueueRuntime::with_worker_id`]).
87///
88/// A k8s deployment can set it to the pod id so the label tracks pod
89/// identity; unset → forge autogenerates a short id from the per-boot
90/// `host_id`.
91pub const WORKER_ID_ENV: &str = "FORGE_WORKER_ID";
92
93/// Parse [`QUEUES_ENV`] (`FORGE_QUEUES=gh,slack`) into a queue list.
94///
95/// For [`QueueRuntime::with_queues`]. Whitespace is trimmed and blanks
96/// dropped. Returns an empty vec when unset — `start()` then rejects it,
97/// surfacing the misconfiguration rather than silently running nothing.
98#[must_use]
99pub fn queues_from_env() -> Vec<String> {
100 std::env::var(QUEUES_ENV)
101 .unwrap_or_default()
102 .split(',')
103 .map(str::trim)
104 .filter(|s| !s.is_empty())
105 .map(str::to_owned)
106 .collect()
107}
108
109/// Read [`WORKER_PREFIX_ENV`] for [`QueueRuntime::with_worker_prefix`].
110/// `None` when unset/blank.
111#[must_use]
112pub fn worker_prefix_from_env() -> Option<String> {
113 std::env::var(WORKER_PREFIX_ENV)
114 .ok()
115 .map(|s| s.trim().to_owned())
116 .filter(|s| !s.is_empty())
117}
118
119/// Read [`WORKER_VERSION_ENV`] for [`QueueRuntime::with_worker_version`].
120/// `None` when unset/blank.
121#[must_use]
122pub fn worker_version_from_env() -> Option<String> {
123 std::env::var(WORKER_VERSION_ENV)
124 .ok()
125 .map(|s| s.trim().to_owned())
126 .filter(|s| !s.is_empty())
127}
128
129/// Read [`WORKER_ID_ENV`] for [`QueueRuntime::with_worker_id`]. `None`
130/// when unset/blank — forge then autogenerates the id.
131#[must_use]
132pub fn worker_id_from_env() -> Option<String> {
133 std::env::var(WORKER_ID_ENV)
134 .ok()
135 .map(|s| s.trim().to_owned())
136 .filter(|s| !s.is_empty())
137}
138
139/// Forge-autogenerated short id for the worker label: 4 hex derived from
140/// the per-boot `host_id` (a freshly minted ULID).
141///
142/// Stable for the worker's lifetime and tied to the always-unique
143/// `host_id`, so it's a readable short tag — not a uniqueness guarantee on
144/// its own (4 hex is ~65k values). Use the `host_id` (shown on the Workers
145/// tab) or override via [`QueueRuntime::with_worker_id`] when you need
146/// guaranteed identity.
147#[must_use]
148#[allow(
149 clippy::cast_possible_truncation,
150 reason = "deliberately keeping the low 16 bits of the hash as a 4-hex display tag"
151)]
152fn autogen_short_id(host_id: &str) -> String {
153 use std::hash::{Hash, Hasher};
154 // DefaultHasher with fixed keys is deterministic per input, so a given
155 // host_id always renders the same tag within and across runs.
156 let mut h = std::collections::hash_map::DefaultHasher::new();
157 host_id.hash(&mut h);
158 format!("{:04x}", h.finish() as u16)
159}
160
161/// Compose a worker's display label from its operator-set `prefix`, an
162/// optional build `version`, and a resolved `id`. `None` when no prefix is
163/// set, so the monitoring view falls back to the raw `host_id`.
164///
165/// Shape: `{prefix}-worker-{version}-{id}`, with the version segment
166/// dropped when unset. So `with_worker_prefix("rates")` with id `9f3a`
167/// yields `rates-worker-{ver}-9f3a` (or `rates-worker-9f3a` with no
168/// version). Two workers sharing a prefix stay distinct via the id.
169#[must_use]
170fn compose_worker_name(prefix: Option<&str>, version: Option<&str>, id: &str) -> Option<String> {
171 let prefix = prefix?;
172 Some(version.map_or_else(
173 || format!("{prefix}-worker-{id}"),
174 |v| format!("{prefix}-worker-{v}-{id}"),
175 ))
176}
177
178const SUPERVISOR_TICK: Duration = Duration::from_secs(1);
179/// Worker idle-poll fallback when the storage's `wait_for_work`
180/// returns without a notify (timeout). 500 ms keeps a missed wake
181/// from stalling the queue more than half a second.
182const IDLE_POLL: Duration = Duration::from_millis(500);
183const WORKER_CAP: usize = 64;
184/// In-flight job heartbeat cadence.
185const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
186/// Reaper tick cadence.
187pub const REAPER_TICK: Duration = Duration::from_secs(15);
188/// Jobs / processes with heartbeat older than this are reaped.
189///
190/// L5 — clock domain: the staleness horizon is computed as
191/// `Utc::now() - STALE_THRESHOLD` from the **runtime (app) clock** and
192/// passed as an absolute instant to `revive_stale` / `reap_stale` /
193/// `list_live_pods`. The cron/coordinator lease, by contrast, compares
194/// and writes with the **DB clock** (`now()`) on both sides, so it's
195/// internally consistent. On a single-process `SQLite` deploy these are
196/// the same clock. On a multi-replica Postgres deploy they differ by the
197/// app↔DB skew: a replica whose clock runs behind could briefly look
198/// stale to the leader (and be trimmed from the live-pod set) and
199/// self-heal on the next heartbeat. The 60s threshold is >> realistic
200/// NTP-synced skew, so this is a documented assumption, not a live bug;
201/// moving the horizons into SQL (`now() - interval`) would unify the
202/// domain if a deploy ever runs with looser clocks.
203const STALE_THRESHOLD: ChronoDuration = ChronoDuration::seconds(60);
204/// Retention cleanup cadence.
205pub const CLEANUP_TICK: Duration = Duration::from_mins(5);
206/// Timeline-event flush cadence.
207///
208/// Workers buffer `queue_event` rows in-process (off the hot enqueue /
209/// claim / finalize transactions); this loop drains and batch-inserts
210/// them. Well under `METRICS_TICK` (60s) so a minute's events are
211/// persisted before the metrics roller aggregates them, and short enough
212/// that a crash loses only a small tail of chart data. Runs on every
213/// replica — each flushes its own buffer.
214pub const EVENT_FLUSH_TICK: Duration = Duration::from_secs(2);
215/// Default `shutdown_graceful` timeout.
216pub const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
217
218// ────────────────────────────────────────────────────────────────────
219// QueueRuntime
220// ────────────────────────────────────────────────────────────────────
221
222/// In-process map from currently-running `JobId` → per-job cancel
223/// token. Lets `QueueHandle::request_cancel` short-circuit the
224/// heartbeat-tick round-trip when the job runs on the same pod;
225/// cross-pod cancels still flow through the DB flag.
226type RunningJobs = Arc<Mutex<HashMap<JobId, CancellationToken>>>;
227
228/// Top-level handle for the queue subsystem.
229#[derive(Clone)]
230pub struct QueueRuntime {
231 storage: Storage,
232 handlers: Arc<HandlerRegistry>,
233 router: Arc<dyn Router>,
234 host_id: String,
235 running_jobs: RunningJobs,
236 /// Constructed once at `new()` so the per-scope refill-rate
237 /// cache lives across every worker's `JobCtx` borrow.
238 rate_limit: Arc<RateLimiter>,
239 /// Queues this worker is responsible for. **Required** — `start()`
240 /// errors if empty. Set via [`QueueRuntime::with_queues`] /
241 /// [`queues_from_env`]. The supervisor spawns one loop per queue
242 /// here; the rebalancer only hands this pod slots for these queues.
243 queues: Vec<String>,
244 /// Operator-set label prefix (`FORGE_WORKER_PREFIX`). Composed with the
245 /// build version + a short `host_id` into the monitoring-view label; see
246 /// [`compose_worker_name`]. `None` → display falls back to `host_id`.
247 worker_prefix: Option<String>,
248 /// Build version (`FORGE_WORKER_VERSION`, usually a git commit) for the
249 /// composed worker label. `None` → the version segment is omitted.
250 worker_version: Option<String>,
251 /// Explicit id segment override (`FORGE_WORKER_ID`, e.g. a k8s pod id).
252 /// `None` → forge autogenerates a short id from `host_id` at `start()`.
253 worker_id: Option<String>,
254}
255
256impl std::fmt::Debug for QueueRuntime {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 f.debug_struct("QueueRuntime")
259 .field("host_id", &self.host_id)
260 .field("handlers", &self.handlers)
261 .finish_non_exhaustive()
262 }
263}
264
265impl QueueRuntime {
266 /// Build a new runtime. `host_id` is freshly minted from a ULID
267 /// so each process boot has its own identity in the process
268 /// registry.
269 #[must_use]
270 pub fn new(storage: Storage, handlers: HandlerRegistry, router: Arc<dyn Router>) -> Self {
271 let rate_limit = Arc::new(RateLimiter::new(
272 storage.clone(),
273 rate_limit::DEFAULT_RATE_LIMIT_SCOPES,
274 ));
275 Self {
276 storage,
277 handlers: Arc::new(handlers),
278 router,
279 host_id: Ulid::new().to_string(),
280 running_jobs: Arc::new(Mutex::new(HashMap::new())),
281 rate_limit,
282 queues: Vec::new(),
283 worker_prefix: None,
284 worker_version: None,
285 worker_id: None,
286 }
287 }
288
289 /// Declare the queues this worker is responsible for. **Required** —
290 /// a worker that declares none fails at [`start`](Self::start).
291 /// Names are de-duplicated, preserving first-seen order. Only
292 /// supervisors for these queues are spawned, and the rebalancer only
293 /// hands this pod slots for them.
294 #[must_use]
295 pub fn with_queues(mut self, queues: impl IntoIterator<Item = String>) -> Self {
296 let mut seen = std::collections::HashSet::new();
297 self.queues = queues
298 .into_iter()
299 .filter(|q| !q.is_empty() && seen.insert(q.clone()))
300 .collect();
301 self
302 }
303
304 /// Set this worker's label *prefix*, shown in the monitoring view.
305 /// The full label is composed as `{prefix}-worker-{version}-{id}`
306 /// (k8s-style), so two workers sharing a prefix stay distinct via the
307 /// trailing `host_id`. Accepts a `&'static str` const, so a service can
308 /// bake its identity in at compile time:
309 /// `.with_worker_prefix("rates")` → `rates-worker-{ver}-{id}`.
310 /// Unset → the view falls back to `host_id`. See also
311 /// [`with_worker_version`](Self::with_worker_version) /
312 /// [`worker_prefix_from_env`].
313 #[must_use]
314 pub fn with_worker_prefix(mut self, prefix: impl Into<String>) -> Self {
315 let prefix = prefix.into();
316 self.worker_prefix = (!prefix.is_empty()).then_some(prefix);
317 self
318 }
319
320 /// Set the build version segment of the composed worker label —
321 /// typically the git commit the binary was built at. Omitted from the
322 /// label when unset. Has no effect unless a prefix is also set (the
323 /// label only exists with a prefix). See
324 /// [`with_worker_prefix`](Self::with_worker_prefix) /
325 /// [`worker_version_from_env`].
326 #[must_use]
327 pub fn with_worker_version(mut self, version: impl Into<String>) -> Self {
328 let version = version.into();
329 self.worker_version = (!version.is_empty()).then_some(version);
330 self
331 }
332
333 /// Override the autogenerated id segment of the composed worker label
334 /// — e.g. a k8s pod id from `FORGE_WORKER_ID`, so the label tracks pod
335 /// identity. Unset → forge autogenerates a short id (4 hex) from the
336 /// per-boot `host_id`. Has no effect unless a prefix is also set. See
337 /// [`worker_id_from_env`].
338 #[must_use]
339 pub fn with_worker_id(mut self, id: impl Into<String>) -> Self {
340 let id = id.into();
341 self.worker_id = (!id.is_empty()).then_some(id);
342 self
343 }
344
345 /// Ensure a queue config row exists. Safe to call before or after
346 /// `start`; the supervisor for a newly-added queue is *not*
347 /// spawned until the next `start` invocation.
348 pub async fn ensure_queue(&self, name: &str, default_max_workers: i32) -> Result<()> {
349 self.storage
350 .config
351 .ensure_queue(name, default_max_workers)
352 .await
353 }
354
355 /// Insert a job and wake the destination queue.
356 pub async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome> {
357 let mut req = req;
358 if req.queue_name.is_none() {
359 req.queue_name = Some(std::borrow::Cow::Borrowed(
360 self.router.route(req.kind.as_ref()),
361 ));
362 }
363 self.storage.jobs.enqueue(req).await
364 }
365
366 /// Spawn one supervisor per registered queue + reaper + cleanup
367 /// + cron. Returns a [`QueueHandle`] for orchestration.
368 pub async fn start(self) -> Result<QueueHandle> {
369 // Declaring queues is mandatory — running every queue implicitly
370 // is no longer supported (a worker now owns an explicit subset so
371 // the cluster can split responsibilities). Fail fast and loud.
372 if self.queues.is_empty() {
373 return Err(crate::storage::error::StorageError::Config(
374 "no queues declared: set FORGE_QUEUES or call QueueRuntime::with_queues — \
375 running all queues implicitly is no longer supported"
376 .to_owned(),
377 ));
378 }
379
380 let shutdown = CancellationToken::new();
381 let mut join_set = JoinSet::new();
382
383 for name in &self.queues {
384 // A queue name is round-tripped through the `pod.queues` CSV
385 // column, so it must not contain the ',' delimiter (else it
386 // would decode into phantom queues). Reject at the declaration
387 // gate rather than corrupting silently downstream.
388 crate::storage::types::validate_queue_name(name)?;
389 // Make sure the config row exists so the supervisor has a
390 // max_workers/paused row to read even on a fresh DB. Seed
391 // from the well-known defaults when we recognize the queue,
392 // else 1.
393 let default_workers = DEFAULT_QUEUE_WORKERS
394 .iter()
395 .find_map(|(q, n)| (*q == name).then_some(*n))
396 .unwrap_or(1);
397 self.storage
398 .config
399 .ensure_queue(name, default_workers)
400 .await?;
401 join_set.spawn(supervisor_loop(
402 self.storage.clone(),
403 self.handlers.clone(),
404 self.router.clone(),
405 name.clone(),
406 self.host_id.clone(),
407 self.running_jobs.clone(),
408 self.rate_limit.clone(),
409 shutdown.clone(),
410 ));
411 }
412
413 join_set.spawn(reaper_loop(self.storage.clone(), shutdown.clone()));
414 join_set.spawn(cleanup_loop(
415 self.storage.clone(),
416 self.host_id.clone(),
417 shutdown.clone(),
418 ));
419 join_set.spawn(cron::cron_loop(
420 self.storage.clone(),
421 self.router.clone(),
422 self.host_id.clone(),
423 shutdown.clone(),
424 ));
425 // Cluster rebalancing: every pod stamps its liveness; the
426 // coordinator (cron-lease holder) splits each queue's
427 // max_workers across live pods into pod_slot_assignment.
428 // Compose the monitoring label once, now that host_id + prefix +
429 // version + id are all known. The id is the explicit override
430 // (e.g. a k8s pod id) when set, else a short tag forge derives
431 // from the per-boot host_id. `None` (no prefix) → the view falls
432 // back to host_id. This is what lands in `pod.worker_name` and
433 // what the resource/DB charts join host_id against for labels.
434 let worker_id = self
435 .worker_id
436 .clone()
437 .unwrap_or_else(|| autogen_short_id(&self.host_id));
438 let worker_name = compose_worker_name(
439 self.worker_prefix.as_deref(),
440 self.worker_version.as_deref(),
441 &worker_id,
442 );
443 join_set.spawn(rebalance::pod_heartbeat_loop(
444 self.storage.clone(),
445 self.host_id.clone(),
446 worker_name,
447 self.queues.clone(),
448 shutdown.clone(),
449 ));
450 join_set.spawn(rebalance::rebalance_loop(
451 self.storage.clone(),
452 self.host_id.clone(),
453 shutdown.clone(),
454 ));
455 // Metrics rollup: the cron-lease holder pre-aggregates per-queue
456 // counts + latency percentiles into metric_bucket every minute.
457 join_set.spawn(metrics::metrics_loop(
458 self.storage.clone(),
459 self.host_id.clone(),
460 shutdown.clone(),
461 ));
462 // Timeline-event flush: drain this replica's in-process event
463 // buffer and batch-insert into queue_event. Not lease-gated —
464 // every replica flushes the events its own workers produced.
465 join_set.spawn(event_flush_loop(self.storage.clone(), shutdown.clone()));
466
467 Ok(QueueHandle {
468 shutdown,
469 join_set,
470 host_id: self.host_id,
471 storage: self.storage,
472 handlers: self.handlers,
473 router: self.router,
474 running_jobs: self.running_jobs,
475 rate_limit: self.rate_limit,
476 })
477 }
478}
479
480// ────────────────────────────────────────────────────────────────────
481// QueueHandle
482// ────────────────────────────────────────────────────────────────────
483
484pub struct QueueHandle {
485 shutdown: CancellationToken,
486 join_set: JoinSet<()>,
487 host_id: String,
488 storage: Storage,
489 handlers: Arc<HandlerRegistry>,
490 router: Arc<dyn Router>,
491 running_jobs: RunningJobs,
492 rate_limit: Arc<RateLimiter>,
493}
494
495impl std::fmt::Debug for QueueHandle {
496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 f.debug_struct("QueueHandle")
498 .field("host_id", &self.host_id)
499 .field("handlers", &self.handlers)
500 .finish_non_exhaustive()
501 }
502}
503
504impl QueueHandle {
505 /// Per-boot identifier. Surfaced for tests + diagnostics.
506 #[must_use]
507 pub fn host_id(&self) -> &str {
508 &self.host_id
509 }
510
511 /// Read-only access to the storage for IPC commands.
512 #[must_use]
513 pub const fn storage(&self) -> &Storage {
514 &self.storage
515 }
516
517 /// Cluster-wide rate limiter — same instance every worker's
518 /// `JobCtx::rate_limit` points at. Surfaced so out-of-band
519 /// callers (e.g. cron tasks spawned outside the worker loop)
520 /// can `acquire` against the same budget as in-flight jobs.
521 #[must_use]
522 pub fn rate_limit(&self) -> &RateLimiter {
523 &self.rate_limit
524 }
525
526 /// Signal a cancel for an in-flight job by id.
527 ///
528 /// If the job is running on this pod, cancels its in-process
529 /// token immediately and returns `true`. If the job isn't on
530 /// this pod (or isn't running at all) returns `false`; cross-pod
531 /// cancels go through `JobQueue::delete` on an `in_progress`
532 /// row, which sets a DB flag the owning pod's heartbeat tick
533 /// observes within `HEARTBEAT_INTERVAL`.
534 #[must_use]
535 pub fn request_cancel(&self, job_id: &JobId) -> bool {
536 // poison-safe: a panic-poisoned lock means *some* worker hit
537 // a panic mid-insert/remove; better to take the lock and try
538 // than to silently fail every cancel.
539 let map = match self.running_jobs.lock() {
540 Ok(g) => g,
541 Err(poisoned) => poisoned.into_inner(),
542 };
543 map.get(job_id).is_some_and(|token| {
544 token.cancel();
545 true
546 })
547 }
548
549 /// Enqueue from a different task after start. Same semantics as
550 /// `QueueRuntime::enqueue`.
551 pub async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome> {
552 let mut req = req;
553 if req.queue_name.is_none() {
554 req.queue_name = Some(std::borrow::Cow::Borrowed(
555 self.router.route(req.kind.as_ref()),
556 ));
557 }
558 self.storage.jobs.enqueue(req).await
559 }
560
561 /// Signal shutdown, wait up to `timeout` for in-flight jobs to
562 /// drain, then delete process registry rows for this host.
563 pub async fn shutdown_graceful(mut self, timeout: Duration) {
564 self.shutdown.cancel();
565 let drain = async { while self.join_set.join_next().await.is_some() {} };
566 if tokio::time::timeout(timeout, drain).await.is_err() {
567 tracing::warn!(
568 timeout_secs = timeout.as_secs(),
569 host_id = %self.host_id,
570 "shutdown_graceful: timeout exceeded, aborting remaining tasks",
571 );
572 self.join_set.abort_all();
573 while self.join_set.join_next().await.is_some() {}
574 }
575 // Workers have now drained (or been aborted); flush once more so
576 // any timeline events buffered during the drain window — after
577 // event_flush_loop did its own final flush and exited — still
578 // land. Best-effort: chart data, never job state.
579 if let Err(e) = self.storage.jobs.flush_event_buffer().await {
580 tracing::warn!(?e, host_id = %self.host_id, "event flush on shutdown failed");
581 }
582 if let Err(e) = self.storage.procs.delete_for_host(&self.host_id).await {
583 tracing::warn!(?e, host_id = %self.host_id, "delete_for_host on shutdown failed");
584 }
585 }
586}
587
588// ────────────────────────────────────────────────────────────────────
589// Supervisor — one per queue.
590// ────────────────────────────────────────────────────────────────────
591
592struct WorkerSlot {
593 handle: JoinHandle<()>,
594 cancel: CancellationToken,
595}
596
597#[allow(
598 clippy::too_many_arguments,
599 reason = "per-supervisor scratch state; `running_jobs` is the in-process cancel registry shared with workers, `rate_limit` is the boot-constructed limiter passed by ref to JobCtx"
600)]
601async fn supervisor_loop(
602 storage: Storage,
603 handlers: Arc<HandlerRegistry>,
604 router: Arc<dyn Router>,
605 queue_name: String,
606 host_id: String,
607 running_jobs: RunningJobs,
608 rate_limit: Arc<RateLimiter>,
609 shutdown: CancellationToken,
610) {
611 tracing::info!(queue = %queue_name, host_id = %host_id, "supervisor: start");
612 let mut workers: HashMap<usize, WorkerSlot> = HashMap::new();
613 let mut tick = tokio::time::interval(SUPERVISOR_TICK);
614 tick.tick().await;
615
616 loop {
617 tokio::select! {
618 biased;
619 () = shutdown.cancelled() => {
620 tracing::info!(queue = %queue_name, "supervisor: shutdown signal");
621 drain_all(&mut workers).await;
622 tracing::info!(queue = %queue_name, "supervisor: stopped");
623 return;
624 }
625 _ = tick.tick() => {
626 reap_finished(&mut workers).await;
627 let Some(target) = resolve_target(&storage, &queue_name, &host_id).await else {
628 continue;
629 };
630 scale_down(&mut workers, target);
631 scale_up(
632 &mut workers,
633 target,
634 &storage,
635 &handlers,
636 &router,
637 &queue_name,
638 &host_id,
639 &running_jobs,
640 &rate_limit,
641 );
642 }
643 }
644 }
645}
646
647/// Drop every worker slot whose join handle has resolved.
648///
649/// We collect the finished slot ids first, then await each handle to
650/// surface its `JoinError`. A `panic!()` inside a handler shows up
651/// here as `Err(JoinError::panic)` — without the await, the panic
652/// payload disappears silently. Cancellation (the shutdown path) is
653/// also a `JoinError` but `drain_all` does its own await + log.
654async fn reap_finished(workers: &mut HashMap<usize, WorkerSlot>) {
655 let finished: Vec<usize> = workers
656 .iter()
657 .filter_map(|(&id, slot)| slot.handle.is_finished().then_some(id))
658 .collect();
659 for id in finished {
660 if let Some(slot) = workers.remove(&id)
661 && let Err(e) = slot.handle.await
662 {
663 tracing::error!(?e, slot = id, "supervisor: worker task panicked");
664 }
665 }
666}
667
668async fn resolve_target(storage: &Storage, queue_name: &str, host_id: &str) -> Option<usize> {
669 let q = match storage.config.get_queue(queue_name).await {
670 Ok(Some(q)) => q,
671 Ok(None) => {
672 tracing::warn!(queue = %queue_name, "supervisor: queue row vanished");
673 return None;
674 }
675 Err(e) => {
676 tracing::warn!(queue = %queue_name, ?e, "supervisor: queue lookup failed");
677 return None;
678 }
679 };
680 if q.paused {
681 return Some(0);
682 }
683 // This pod's share of the cluster-wide max_workers, as set by the
684 // rebalancer. When no assignment exists yet (fresh pod,
685 // pre-first-rebalance, or rebalancer down) fall back to a fair-share
686 // *estimate* — NOT the full total. M6: running the whole total here
687 // means a rolling deploy's N replacement pods each spin up the full
688 // count, an N× over-parallelism storm aimed at the very upstreams the
689 // cluster rate budget exists to protect. The next rebalance refines
690 // the estimate. On SQLite the lone live pod estimates the whole total.
691 let raw = match storage.procs.get_slots(queue_name, host_id).await {
692 Ok(Some(slots)) => usize::try_from(slots).unwrap_or(0),
693 Ok(None) => fair_fallback(storage, queue_name, q.max_workers).await,
694 Err(e) => {
695 tracing::warn!(queue = %queue_name, ?e, "supervisor: slot lookup failed; estimating fair share");
696 fair_fallback(storage, queue_name, q.max_workers).await
697 }
698 };
699 Some(raw.min(WORKER_CAP))
700}
701
702/// Fair-share worker estimate for a pod that has no slot assignment yet.
703/// `max_workers` is the cluster total; divide by the count of live pods
704/// *eligible for this queue* (those that declared it) — rounded up so the
705/// fleet never *under*-serves the total, clamped to ≥1 pod. Counting only
706/// eligible pods mirrors the rebalancer: if just one pod runs `gh`, it
707/// estimates the whole `gh` total rather than a fraction. Used only on the
708/// rare unassigned / rebalancer-down path, so the extra `list_live_pods`
709/// read isn't on the steady per-tick path.
710async fn fair_fallback(storage: &Storage, queue_name: &str, max_workers: i32) -> usize {
711 let total = usize::try_from(max_workers).unwrap_or(0);
712 if total == 0 {
713 return 0;
714 }
715 let stale_before = Utc::now() - STALE_THRESHOLD;
716 let eligible = storage
717 .procs
718 .list_live_pods(stale_before)
719 .await
720 .map_or(1, |pods| {
721 pods.iter().filter(|p| p.handles(queue_name)).count()
722 });
723 total.div_ceil(eligible.max(1))
724}
725
726fn scale_down(workers: &mut HashMap<usize, WorkerSlot>, target: usize) {
727 while workers.len() > target {
728 let Some(&max_slot) = workers.keys().max() else {
729 break;
730 };
731 if let Some(slot) = workers.remove(&max_slot) {
732 slot.cancel.cancel();
733 drop(slot.handle);
734 }
735 }
736}
737
738#[allow(
739 clippy::too_many_arguments,
740 reason = "supervisor scratch state passed through to each spawned worker. Threshold to revisit: if scale_up ever needs one more parameter, OR if any of the loops here (supervisor_loop, worker_loop) grows another ~30-line block, extract a `WorkerCtx` struct."
741)]
742fn scale_up(
743 workers: &mut HashMap<usize, WorkerSlot>,
744 target: usize,
745 storage: &Storage,
746 handlers: &Arc<HandlerRegistry>,
747 router: &Arc<dyn Router>,
748 queue_name: &str,
749 host_id: &str,
750 running_jobs: &RunningJobs,
751 rate_limit: &Arc<RateLimiter>,
752) {
753 while workers.len() < target {
754 let mut slot_idx: usize = 0;
755 while workers.contains_key(&slot_idx) {
756 slot_idx = slot_idx.saturating_add(1);
757 }
758 let process_id = format!("{queue_name}-{slot_idx}-{host_id}");
759 tracing::info!(
760 queue = %queue_name,
761 slot = slot_idx,
762 worker_id = %process_id,
763 "worker spawned"
764 );
765 let cancel = CancellationToken::new();
766 let handle = tokio::spawn(worker_loop(
767 storage.clone(),
768 handlers.clone(),
769 router.clone(),
770 queue_name.to_owned(),
771 process_id,
772 host_id.to_owned(),
773 running_jobs.clone(),
774 rate_limit.clone(),
775 cancel.clone(),
776 ));
777 workers.insert(slot_idx, WorkerSlot { handle, cancel });
778 }
779}
780
781async fn drain_all(workers: &mut HashMap<usize, WorkerSlot>) {
782 for (_, slot) in workers.drain() {
783 slot.cancel.cancel();
784 if let Err(e) = slot.handle.await {
785 tracing::warn!(?e, "supervisor: worker join error on drain");
786 }
787 }
788}
789
790// ────────────────────────────────────────────────────────────────────
791// Worker — claim/run/finalize loop.
792// ────────────────────────────────────────────────────────────────────
793
794/// Surface handler `Failed` / `Dead` outcomes via `tracing` so they don't
795/// bury silently in `last_error`. The `error` field carries the full source
796/// chain assembled by [`crate::format_error_chain`] upstream.
797fn log_handler_outcome(job: &JobRecord, job_id: &JobId, outcome: &FinalizeOutcome) {
798 match outcome {
799 FinalizeOutcome::Failed {
800 message,
801 retry_after,
802 } => tracing::warn!(
803 kind = %job.kind,
804 queue = %job.queue_name,
805 job_id = %job_id.as_str(),
806 attempts = job.attempts,
807 max_attempts = job.max_attempts,
808 retry_in_secs = retry_after.as_secs(),
809 error = %message,
810 "worker: handler failed; will retry",
811 ),
812 FinalizeOutcome::Dead { message } => tracing::error!(
813 kind = %job.kind,
814 queue = %job.queue_name,
815 job_id = %job_id.as_str(),
816 attempts = job.attempts,
817 error = %message,
818 "worker: handler failed terminally (dead)",
819 ),
820 FinalizeOutcome::Done | FinalizeOutcome::Throttled { .. } => {}
821 }
822}
823
824#[allow(
825 clippy::too_many_arguments,
826 clippy::too_many_lines,
827 reason = "worker scratch state; B2 cancel registry + Dead-on-user-cancel branch pushed past the 100-line cap. The loop is one cohesive claim→run→finalize cycle; splitting it splits the lifetime of the per-job cancel token. Revisit when extracting a `WorkerCtx`."
828)]
829async fn worker_loop(
830 storage: Storage,
831 handlers: Arc<HandlerRegistry>,
832 router: Arc<dyn Router>,
833 queue_name: String,
834 process_id: String,
835 host_id: String,
836 running_jobs: RunningJobs,
837 rate_limit: Arc<RateLimiter>,
838 cancel: CancellationToken,
839) {
840 tracing::debug!(%process_id, "worker: start");
841 if let Err(e) = storage
842 .procs
843 .register(&process_id, &queue_name, &host_id)
844 .await
845 {
846 tracing::error!(?e, %process_id, "worker: register failed; exiting");
847 return;
848 }
849
850 loop {
851 if cancel.is_cancelled() {
852 break;
853 }
854
855 let job = match storage.jobs.claim_next(&queue_name, &process_id).await {
856 Ok(Some(j)) => j,
857 Ok(None) => {
858 if idle_wait(&storage, &queue_name, &process_id, &cancel).await {
859 break;
860 }
861 continue;
862 }
863 Err(e) => {
864 tracing::warn!(?e, %process_id, "worker: claim failed, backing off 1s");
865 tokio::time::sleep(Duration::from_secs(1)).await;
866 continue;
867 }
868 };
869
870 let job_id = job.id.clone();
871
872 if let Err(e) = storage
873 .procs
874 .heartbeat(&process_id, Some(job_id.clone()))
875 .await
876 {
877 tracing::warn!(?e, %process_id, "worker: heartbeat-with-job failed");
878 }
879
880 // Per-job child token: parented to the worker cancel so
881 // shutdown still propagates to the handler, but cancellable
882 // independently for `QueueHandle::request_cancel` and the
883 // heartbeat-tick observation of a DB cancel flag.
884 let job_cancel = cancel.child_token();
885 register_running_job(&running_jobs, &job_id, job_cancel.clone());
886
887 // Side-task heartbeats both the job row and the process row
888 // every HEARTBEAT_INTERVAL so a long handler doesn't trip
889 // the reaper. It also reads the row's `cancel_requested_at`
890 // each tick and triggers `job_cancel` when set.
891 let heartbeat_cancel = CancellationToken::new();
892 let heartbeat_task = tokio::spawn(heartbeat_loop(
893 storage.clone(),
894 process_id.clone(),
895 job_id.clone(),
896 heartbeat_cancel.clone(),
897 job_cancel.clone(),
898 ));
899
900 let outcome = match handlers.get(&job.kind) {
901 Some(handler) => {
902 let ctx = JobCtx {
903 storage: &storage,
904 router: router.as_ref(),
905 rate_limit: rate_limit.as_ref(),
906 job_id: job_id.clone(),
907 process_id: &process_id,
908 host_id: &host_id,
909 cancel: job_cancel.clone(),
910 };
911 handler.run(ctx, job.payload.clone()).await
912 }
913 None => JobOutcome::Failed(format!("no handler registered for kind: {}", job.kind)),
914 };
915
916 heartbeat_cancel.cancel();
917 if let Err(e) = heartbeat_task.await {
918 tracing::warn!(?e, %process_id, "worker: heartbeat task join failed");
919 }
920 deregister_running_job(&running_jobs, &job_id);
921
922 // User-initiated cancel (in-process via `request_cancel` or
923 // cross-pod via the DB cancel flag) trips `job_cancel`
924 // without tripping the worker-slot `cancel`. In that case
925 // skip the retry curve and finalize as Dead — otherwise a
926 // backoff-off queue would immediately re-claim and re-run
927 // the same row, defeating the user's cancel intent. A
928 // worker-slot cancel (shutdown / scale-down) cascades to
929 // `job_cancel` too, so this branch ignores that path.
930 let user_cancelled = !cancel.is_cancelled() && job_cancel.is_cancelled();
931 // Fetch the queue's backoff config so map_outcome can size
932 // the throttle delay. One extra read per finalize is fine —
933 // this path is rare (only on failure / throttle) and matches
934 // the supervisor's per-tick config read.
935 let backoff_cfg = fetch_backoff_cfg(&storage, &queue_name).await;
936 // M2: a cancel that lands while (or just after) the handler
937 // already returned `Done` must NOT rewrite the row as Dead — the
938 // work happened (e-mail sent, API call committed), and recording
939 // it "cancelled by user" invites an operator retry that genuinely
940 // double-executes it. The cancel simply arrived too late. The
941 // Dead-on-cancel override only applies to non-`Done` outcomes,
942 // where it prevents a backoff-off queue immediately re-claiming
943 // and re-running the row the user asked to stop.
944 let finalize_outcome = if user_cancelled && !matches!(outcome, JobOutcome::Done) {
945 FinalizeOutcome::Dead {
946 message: "cancelled by user".to_owned(),
947 }
948 } else {
949 map_outcome(&job, outcome, backoff_cfg.as_ref())
950 };
951 log_handler_outcome(&job, &job_id, &finalize_outcome);
952 // Capture the cool-down so this worker idles locally instead of
953 // spinning empty claims while the queue gate (set by finalize)
954 // holds. Other workers hit the gate in `claim_next` and idle via
955 // their normal wait path.
956 let throttle_pause = match &finalize_outcome {
957 FinalizeOutcome::Throttled {
958 retry_after,
959 cool_down_queue: true,
960 } => Some(*retry_after),
961 _ => None,
962 };
963 // Pass our `process_id` as the ownership guard (H1): if this
964 // worker stalled past the stale threshold and the reaper revived
965 // + another worker re-claimed the row, this finalize no-ops
966 // instead of clobbering the new claimant's in-flight job.
967 if let Err(e) = storage
968 .jobs
969 .finalize(&job_id, Some(&process_id), finalize_outcome)
970 .await
971 {
972 tracing::error!(?e, ?job_id, %process_id, "worker: finalize failed");
973 }
974
975 if let Err(e) = storage.procs.heartbeat(&process_id, None).await {
976 tracing::warn!(?e, %process_id, "worker: heartbeat-clear failed");
977 }
978
979 if let Some(pause) = throttle_pause {
980 tracing::info!(
981 queue = %queue_name,
982 secs = pause.as_secs(),
983 "worker: queue throttled; pausing before next claim"
984 );
985 tokio::select! {
986 biased;
987 () = cancel.cancelled() => break,
988 () = tokio::time::sleep(pause) => {}
989 }
990 }
991 }
992
993 if let Err(e) = storage.procs.deregister(&process_id).await {
994 tracing::warn!(?e, %process_id, "worker: deregister failed on exit");
995 }
996 tracing::debug!(%process_id, "worker: stop");
997}
998
999/// Idle handling when `claim_next` found nothing: heartbeat the process
1000/// row, then either sleep out an active queue cool-down (so idle workers
1001/// don't busy-poll the gate for the whole window) or block on
1002/// `wait_for_work` until the next enqueue / idle-poll timeout. Returns
1003/// `true` if the worker was cancelled (the caller should break).
1004async fn idle_wait(
1005 storage: &Storage,
1006 queue_name: &str,
1007 process_id: &str,
1008 cancel: &CancellationToken,
1009) -> bool {
1010 if let Err(e) = storage.procs.heartbeat(process_id, None).await {
1011 tracing::warn!(?e, %process_id, "worker: idle heartbeat failed");
1012 }
1013 let cool_down = fetch_backoff_cfg(storage, queue_name)
1014 .await
1015 .and_then(|c| c.throttled_until)
1016 .and_then(|until| (until - Utc::now()).to_std().ok());
1017 let wait = async {
1018 match cool_down {
1019 Some(dur) => tokio::time::sleep(dur).await,
1020 None => {
1021 let _ = storage.jobs.wait_for_work(queue_name, IDLE_POLL).await;
1022 }
1023 }
1024 };
1025 tokio::select! {
1026 biased;
1027 () = cancel.cancelled() => true,
1028 () = wait => false,
1029 }
1030}
1031
1032/// Read the queue's config for `map_outcome`. A missing row or read
1033/// error degrades to `None` (legacy flat-60s throttle, no queue
1034/// cool-down) rather than failing the finalize.
1035async fn fetch_backoff_cfg(
1036 storage: &Storage,
1037 queue_name: &str,
1038) -> Option<crate::storage::types::QueueConfigRow> {
1039 match storage.config.get_queue(queue_name).await {
1040 Ok(Some(cfg)) => Some(cfg),
1041 Ok(None) => {
1042 tracing::warn!(
1043 queue = %queue_name,
1044 "worker: queue config vanished; using legacy throttle fallback"
1045 );
1046 None
1047 }
1048 Err(e) => {
1049 tracing::warn!(
1050 ?e,
1051 queue = %queue_name,
1052 "worker: queue config read failed; using legacy throttle fallback"
1053 );
1054 None
1055 }
1056 }
1057}
1058
1059/// Map a handler's `JobOutcome` into the storage layer's
1060/// `FinalizeOutcome`, applying retry budget + backoff.
1061///
1062/// `backoff_cfg` is the queue's current row (or `None` if the read
1063/// failed / the row vanished). The `backoff_enabled` toggle governs
1064/// both arms:
1065/// - `Throttled` — off → flat 60s; on → exponential on the
1066/// queue-wide throttle counter (`min(base * 2^throttle_attempts, max)`).
1067/// - `Failed` — off → no delay (immediately re-claimable, bounded
1068/// only by `max_attempts`); on → exponential on the job's own
1069/// attempt counter (`min(base * 2^attempts, max)`).
1070///
1071/// When `backoff_cfg` is `None`, both arms degrade to the legacy
1072/// shape (flat 60s for Throttled, no delay for Failed).
1073fn map_outcome(
1074 job: &JobRecord,
1075 outcome: JobOutcome,
1076 backoff_cfg: Option<&crate::storage::types::QueueConfigRow>,
1077) -> FinalizeOutcome {
1078 match outcome {
1079 JobOutcome::Done => FinalizeOutcome::Done,
1080 JobOutcome::Throttled { retry_after: _hint } => {
1081 // The handler's hint is intentionally ignored — the
1082 // runtime owns the throttle curve via per-queue config so
1083 // the cadence stays consistent across every handler that
1084 // returns `Throttled`.
1085 //
1086 // A throttle ALWAYS cools down the whole queue: rate limits
1087 // are per-token, so letting other workers keep claiming just
1088 // hammers the limiter. `backoff_enabled` only chooses the
1089 // delay *shape* — flat 60s when off, the exponential curve
1090 // (compounding on the queue-wide throttle count, since the
1091 // limit is shared) when on.
1092 let (enabled, base, max, attempts) = backoff_cfg.map_or((false, 60, 1800, 0), |c| {
1093 (
1094 c.backoff_enabled,
1095 c.backoff_base_seconds,
1096 c.backoff_max_seconds,
1097 c.throttle_attempts,
1098 )
1099 });
1100 let retry_after = retry::throttle_delay(attempts, enabled, base, max);
1101 FinalizeOutcome::Throttled {
1102 retry_after,
1103 cool_down_queue: true,
1104 }
1105 }
1106 JobOutcome::Dead(msg) => FinalizeOutcome::Dead { message: msg },
1107 JobOutcome::Failed(msg) => {
1108 if job.attempts >= job.max_attempts {
1109 FinalizeOutcome::Dead { message: msg }
1110 } else {
1111 // Mirror the Throttled arm: backoff_enabled toggles the
1112 // curve shape — off → no delay (`max_attempts` still
1113 // bounds the retry budget), on → the same exponential
1114 // curve the queue config configures.
1115 let (enabled, base, max) = backoff_cfg.map_or((false, 60, 1800), |c| {
1116 (
1117 c.backoff_enabled,
1118 c.backoff_base_seconds,
1119 c.backoff_max_seconds,
1120 )
1121 });
1122 let retry_after = retry::failed_delay(job.attempts, enabled, base, max);
1123 FinalizeOutcome::Failed {
1124 retry_after,
1125 message: msg,
1126 }
1127 }
1128 }
1129 }
1130}
1131
1132// ────────────────────────────────────────────────────────────────────
1133// Heartbeat side-task.
1134// ────────────────────────────────────────────────────────────────────
1135
1136async fn heartbeat_loop(
1137 storage: Storage,
1138 process_id: String,
1139 job_id: JobId,
1140 stop: CancellationToken,
1141 job_cancel: CancellationToken,
1142) {
1143 let mut tick = tokio::time::interval(HEARTBEAT_INTERVAL);
1144 tick.tick().await;
1145 // Latches `true` the first time we observe the DB cancel flag
1146 // so we don't re-fire `job_cancel.cancel()` and re-log on every
1147 // subsequent tick. Cancellation is idempotent, but a handler
1148 // that takes 30s to unwind would otherwise log "cancel
1149 // requested" three times.
1150 let mut cancel_signalled = false;
1151 loop {
1152 tokio::select! {
1153 biased;
1154 () = stop.cancelled() => return,
1155 _ = tick.tick() => {
1156 match storage.jobs.heartbeat_job(&job_id, &process_id).await {
1157 Ok(HeartbeatStatus::CancelRequested) if !cancel_signalled => {
1158 // First observation of the DB cancel flag.
1159 // Signal the handler; keep ticking so the
1160 // row's heartbeat_at stays fresh while the
1161 // handler unwinds.
1162 tracing::info!(
1163 job_id = %job_id.as_str(),
1164 %process_id,
1165 "heartbeat: cancel requested; signalling handler"
1166 );
1167 job_cancel.cancel();
1168 cancel_signalled = true;
1169 }
1170 Ok(HeartbeatStatus::Lost) if !cancel_signalled => {
1171 // M1: we no longer own this row — it was reaped
1172 // past the stale threshold and another worker
1173 // re-claimed it. Stop running so the same job
1174 // isn't executing on two workers at once; the
1175 // new owner holds it now. Our eventual finalize
1176 // is a no-op under the H1 ownership guard.
1177 tracing::warn!(
1178 job_id = %job_id.as_str(),
1179 %process_id,
1180 "heartbeat: lost row ownership (reaped + re-claimed); stopping handler"
1181 );
1182 job_cancel.cancel();
1183 cancel_signalled = true;
1184 }
1185 Ok(_) => {}
1186 Err(e) => tracing::warn!(?e, %process_id, "heartbeat: job update failed"),
1187 }
1188 if let Err(e) = storage.procs.heartbeat(&process_id, Some(job_id.clone())).await {
1189 tracing::warn!(?e, %process_id, "heartbeat: process update failed");
1190 }
1191 }
1192 }
1193 }
1194}
1195
1196fn register_running_job(map: &RunningJobs, job_id: &JobId, token: CancellationToken) {
1197 // poison-safe: prior poisoning means a worker panicked mid-op;
1198 // recover the guard and proceed so future cancels still work.
1199 let mut g = match map.lock() {
1200 Ok(g) => g,
1201 Err(poisoned) => poisoned.into_inner(),
1202 };
1203 g.insert(job_id.clone(), token);
1204}
1205
1206fn deregister_running_job(map: &RunningJobs, job_id: &JobId) {
1207 let mut g = match map.lock() {
1208 Ok(g) => g,
1209 Err(poisoned) => poisoned.into_inner(),
1210 };
1211 g.remove(job_id);
1212}
1213
1214// ────────────────────────────────────────────────────────────────────
1215// Reaper — one task per runtime.
1216// ────────────────────────────────────────────────────────────────────
1217
1218async fn reaper_loop(storage: Storage, shutdown: CancellationToken) {
1219 tracing::debug!("reaper: start");
1220 let mut tick = tokio::time::interval(REAPER_TICK);
1221 tick.tick().await;
1222 loop {
1223 tokio::select! {
1224 biased;
1225 () = shutdown.cancelled() => {
1226 tracing::debug!("reaper: shutdown");
1227 return;
1228 }
1229 _ = tick.tick() => {
1230 let stale_before = Utc::now() - STALE_THRESHOLD;
1231 match storage.jobs.revive_stale(stale_before).await {
1232 Ok(n) if n > 0 => {
1233 tracing::info!(revived = n, "reaper: revived stuck jobs");
1234 }
1235 Ok(_) => {}
1236 Err(e) => tracing::warn!(?e, "reaper: revive_stale failed"),
1237 }
1238 if let Err(e) = storage.procs.reap_stale(stale_before).await {
1239 tracing::warn!(?e, "reaper: process sweep failed");
1240 }
1241 }
1242 }
1243 }
1244}
1245
1246/// Public one-off sweep — tests + ops can trigger it without waiting
1247/// for the 15 s background tick.
1248pub async fn reap_stale_jobs(storage: &Storage) -> Result<u64> {
1249 let stale_before = Utc::now() - STALE_THRESHOLD;
1250 storage.jobs.revive_stale(stale_before).await
1251}
1252
1253// ────────────────────────────────────────────────────────────────────
1254// Timeline-event flush — one task per runtime, on every replica.
1255// ────────────────────────────────────────────────────────────────────
1256
1257/// Drain this replica's in-process timeline-event buffer and batch-insert
1258/// the rows into `queue_event` on every tick, plus one final flush on
1259/// shutdown so a graceful exit doesn't drop the tail. The buffer is the
1260/// adapter's; mock backends default to a no-op `flush_event_buffer`.
1261async fn event_flush_loop(storage: Storage, shutdown: CancellationToken) {
1262 tracing::debug!("event flush: start");
1263 let mut tick = tokio::time::interval(EVENT_FLUSH_TICK);
1264 tick.tick().await;
1265 loop {
1266 tokio::select! {
1267 biased;
1268 () = shutdown.cancelled() => {
1269 // Final drain — persist whatever workers buffered before
1270 // the cancel so a clean shutdown loses nothing.
1271 if let Err(e) = storage.jobs.flush_event_buffer().await {
1272 tracing::warn!(?e, "event flush: final flush failed");
1273 }
1274 tracing::debug!("event flush: shutdown");
1275 return;
1276 }
1277 _ = tick.tick() => {
1278 if let Err(e) = storage.jobs.flush_event_buffer().await {
1279 tracing::warn!(?e, "event flush: flush failed");
1280 }
1281 }
1282 }
1283 }
1284}
1285
1286// ────────────────────────────────────────────────────────────────────
1287// Cleanup — per-queue retention.
1288// ────────────────────────────────────────────────────────────────────
1289
1290/// Counts of rows the cleanup pass deleted, broken out by status.
1291#[derive(Debug, Default, Clone, Copy)]
1292pub struct CleanupReport {
1293 pub done_deleted: u64,
1294 pub dead_deleted: u64,
1295}
1296
1297impl CleanupReport {
1298 #[must_use]
1299 pub const fn total(&self) -> u64 {
1300 self.done_deleted + self.dead_deleted
1301 }
1302}
1303
1304/// One pass over per-queue retention. Public so tests / ops can
1305/// trigger an immediate sweep.
1306pub async fn cleanup_once(storage: &Storage) -> Result<CleanupReport> {
1307 let queues = storage.config.list_queues().await?;
1308 let mut report = CleanupReport::default();
1309 let now = Utc::now();
1310
1311 for q in queues {
1312 let done_threshold = now - ChronoDuration::days(i64::from(q.retain_done_for_days));
1313 let dead_threshold = now - ChronoDuration::days(i64::from(q.retain_dead_for_days));
1314 report.done_deleted += storage
1315 .jobs
1316 .cleanup_aged(&q.name, JobStatus::Done, done_threshold)
1317 .await?;
1318 report.dead_deleted += storage
1319 .jobs
1320 .cleanup_aged(&q.name, JobStatus::Dead, dead_threshold)
1321 .await?;
1322 }
1323
1324 // Metrics rollup retention (ADR 0009) — global, not per-queue.
1325 let metric_threshold = now - ChronoDuration::days(metrics::METRIC_RETENTION_DAYS);
1326 storage
1327 .jobs
1328 .delete_metric_buckets_before(metric_threshold)
1329 .await?;
1330
1331 Ok(report)
1332}
1333
1334async fn cleanup_loop(storage: Storage, host_id: String, shutdown: CancellationToken) {
1335 tracing::debug!("cleanup: start");
1336 let mut tick = tokio::time::interval(CLEANUP_TICK);
1337 tick.tick().await;
1338 loop {
1339 tokio::select! {
1340 biased;
1341 () = shutdown.cancelled() => {
1342 tracing::debug!("cleanup: shutdown");
1343 return;
1344 }
1345 _ = tick.tick() => {
1346 // Only the cron-lease holder runs the purge. The
1347 // DELETEs are idempotent across pods (a non-leader
1348 // re-run finds nothing the leader didn't already
1349 // sweep), but they take the writer lock — multiplying
1350 // contention by every pod every CLEANUP_TICK. On
1351 // SQLite the lease always grants (single-process).
1352 match storage.cron.try_cron_lease(&host_id, cron::CRON_LEASE_TTL).await {
1353 Ok(true) => {}
1354 Ok(false) => continue,
1355 Err(e) => {
1356 tracing::warn!(?e, %host_id, "cleanup: lease check failed");
1357 continue;
1358 }
1359 }
1360 match cleanup_once(&storage).await {
1361 Ok(report) if report.total() > 0 => {
1362 tracing::info!(
1363 done = report.done_deleted,
1364 dead = report.dead_deleted,
1365 "cleanup: purged aged rows",
1366 );
1367 }
1368 Ok(_) => {}
1369 Err(e) => tracing::warn!(?e, "cleanup tick failed"),
1370 }
1371 }
1372 }
1373 }
1374}
1375
1376#[cfg(test)]
1377mod tests {
1378 use super::{autogen_short_id, compose_worker_name};
1379
1380 const HOST: &str = "01J9ZK3QP7V8M2T4R6W0E9K7QF";
1381
1382 #[test]
1383 fn no_prefix_yields_none_so_view_falls_back_to_host_id() {
1384 assert_eq!(compose_worker_name(None, Some("abc123"), "9f3a"), None);
1385 assert_eq!(compose_worker_name(None, None, "9f3a"), None);
1386 }
1387
1388 #[test]
1389 fn composes_prefix_worker_version_id() {
1390 assert_eq!(
1391 compose_worker_name(Some("rates"), Some("a1b2c3d"), "9f3a").as_deref(),
1392 Some("rates-worker-a1b2c3d-9f3a"),
1393 );
1394 }
1395
1396 #[test]
1397 fn version_segment_omitted_when_unset() {
1398 assert_eq!(
1399 compose_worker_name(Some("rates"), None, "9f3a").as_deref(),
1400 Some("rates-worker-9f3a"),
1401 );
1402 }
1403
1404 #[test]
1405 fn autogen_id_is_four_hex_and_stable_per_host() {
1406 let id = autogen_short_id(HOST);
1407 assert_eq!(id.len(), 4, "4 hex chars");
1408 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1409 assert_eq!(id, autogen_short_id(HOST), "deterministic for a host_id");
1410 assert_ne!(
1411 autogen_short_id(HOST),
1412 autogen_short_id("01J9ZK3QP7V8M2T4R6W0E9K7QG"),
1413 "distinct host_ids almost always render distinct tags",
1414 );
1415 }
1416}