Skip to main content

forge_jobs/
runtime.rs

1//! `QueueRuntime` — supervisors + workers + reaper + cleanup + cron.
2//!
3//! Everything in this module talks to the storage layer through the
4//! four trait Arcs bundled in [`crate::storage::Storage`]. Swapping
5//! from `SQLite` to Redis (or anything else) only requires a new impl
6//! of the four traits — nothing in this file changes.
7//!
8//! On `start()` the runtime spawns:
9//!  - one supervisor per registered queue (scales workers to match
10//!    `max_workers` / `paused`)
11//!  - one reaper task (revives stuck jobs, sweeps stale workers)
12//!  - one cleanup task (purges aged `done` / `dead` rows per
13//!    retention)
14//!  - one cron task (fires recurring schedules)
15//!
16//! Worker loop: claim → run → finalize → `wait_for_work` → repeat.
17//! The `wait_for_work` primitive is the storage layer's job — `SQLite`
18//! uses an in-process `Notify`; Redis would use BLPOP; Postgres
19//! would use LISTEN/NOTIFY.
20
21mod cmd_exec;
22mod cron;
23mod demo;
24mod handler;
25mod metrics;
26mod rate_limit;
27mod rebalance;
28mod retry;
29mod routing;
30mod worker_pool;
31
32pub use cmd_exec::{CMD_EXEC_KIND, CmdExecHandler, CmdExecPayload};
33pub use cron::{CRON_TICK, CronTickReport, cron_tick_once, ensure_schedules};
34pub use demo::{NOOP_ECHO_KIND, NoopEcho};
35pub use handler::{HandlerRegistry, JobCtx, JobHandler, JobOutcome};
36pub use metrics::{METRICS_BUCKET_SECS, METRICS_TICK, metrics_roll_once};
37pub use rate_limit::{
38    AcquireOutcome, DEFAULT_RATE_LIMIT_SCOPES, RateLimiter, ensure_default_rate_limits,
39};
40pub use rebalance::{REBALANCE_TICK, rebalance_once};
41pub(crate) use retry::{THROTTLE_DECAY_GRACE_SECS, failed_delay};
42pub use routing::{DefaultRouter, KindPrefixRouter, Router};
43pub use worker_pool::{WorkerPoolConfig, WorkerPoolHandler};
44
45use std::collections::HashMap;
46use std::sync::{Arc, Mutex};
47use std::time::Duration;
48
49use chrono::Duration as ChronoDuration;
50use chrono::Utc;
51use tokio::task::{JoinHandle, JoinSet};
52use tokio_util::sync::CancellationToken;
53use ulid::Ulid;
54
55use crate::storage::HeartbeatStatus;
56use crate::storage::Storage;
57use crate::storage::error::Result;
58use crate::storage::types::{
59    EnqueueOutcome, EnqueueRequest, FinalizeOutcome, JobId, JobRecord, JobStatus,
60};
61
62/// Default worker counts per named queue.
63///
64/// Hosts iterate this at boot after `QueueRuntime::new` and call
65/// `ensure_queue(name, n)` for each. `ensure_queue` won't overwrite an
66/// existing row's `max_workers`, so these are only used for a freshly-
67/// created queue config row; user-tuned values from the Mission Control
68/// panel persist.
69///
70/// The `gh` and `slack` queues get > 1 worker so an initial backfill
71/// drains in parallel out of the box; `default` stays at 1 because
72/// most catch-all kinds are I/O-bound rather than CPU-bound.
73pub const DEFAULT_QUEUE_WORKERS: &[(&str, i32)] = &[("default", 1), ("gh", 3), ("slack", 2)];
74
75/// Env var naming the comma-separated queues a worker consumes.
76pub const QUEUES_ENV: &str = "FORGE_QUEUES";
77/// Env var giving a worker its monitoring-label *prefix*. The full label
78/// is composed as `{prefix}-worker-{version}-{id}` (see
79/// [`QueueRuntime::with_worker_prefix`]).
80pub const WORKER_PREFIX_ENV: &str = "FORGE_WORKER_PREFIX";
81/// Env var giving the build version used in the composed worker label
82/// (see [`QueueRuntime::with_worker_version`]). Typically the build's git
83/// commit, injected by CI/deploy.
84pub const WORKER_VERSION_ENV: &str = "FORGE_WORKER_VERSION";
85/// Env var overriding the autogenerated id segment of the composed worker
86/// label (see [`QueueRuntime::with_worker_id`]).
87///
88/// A k8s deployment can set it to the pod id so the label tracks pod
89/// identity; unset → forge autogenerates a short id from the per-boot
90/// `host_id`.
91pub const WORKER_ID_ENV: &str = "FORGE_WORKER_ID";
92
93/// Parse [`QUEUES_ENV`] (`FORGE_QUEUES=gh,slack`) into a queue list.
94///
95/// For [`QueueRuntime::with_queues`]. Whitespace is trimmed and blanks
96/// dropped. Returns an empty vec when unset — `start()` then rejects it,
97/// surfacing the misconfiguration rather than silently running nothing.
98#[must_use]
99pub fn queues_from_env() -> Vec<String> {
100    std::env::var(QUEUES_ENV)
101        .unwrap_or_default()
102        .split(',')
103        .map(str::trim)
104        .filter(|s| !s.is_empty())
105        .map(str::to_owned)
106        .collect()
107}
108
109/// Read [`WORKER_PREFIX_ENV`] for [`QueueRuntime::with_worker_prefix`].
110/// `None` when unset/blank.
111#[must_use]
112pub fn worker_prefix_from_env() -> Option<String> {
113    std::env::var(WORKER_PREFIX_ENV)
114        .ok()
115        .map(|s| s.trim().to_owned())
116        .filter(|s| !s.is_empty())
117}
118
119/// Read [`WORKER_VERSION_ENV`] for [`QueueRuntime::with_worker_version`].
120/// `None` when unset/blank.
121#[must_use]
122pub fn worker_version_from_env() -> Option<String> {
123    std::env::var(WORKER_VERSION_ENV)
124        .ok()
125        .map(|s| s.trim().to_owned())
126        .filter(|s| !s.is_empty())
127}
128
129/// Read [`WORKER_ID_ENV`] for [`QueueRuntime::with_worker_id`]. `None`
130/// when unset/blank — forge then autogenerates the id.
131#[must_use]
132pub fn worker_id_from_env() -> Option<String> {
133    std::env::var(WORKER_ID_ENV)
134        .ok()
135        .map(|s| s.trim().to_owned())
136        .filter(|s| !s.is_empty())
137}
138
139/// Forge-autogenerated short id for the worker label: 4 hex derived from
140/// the per-boot `host_id` (a freshly minted ULID).
141///
142/// Stable for the worker's lifetime and tied to the always-unique
143/// `host_id`, so it's a readable short tag — not a uniqueness guarantee on
144/// its own (4 hex is ~65k values). Use the `host_id` (shown on the Workers
145/// tab) or override via [`QueueRuntime::with_worker_id`] when you need
146/// guaranteed identity.
147#[must_use]
148#[allow(
149    clippy::cast_possible_truncation,
150    reason = "deliberately keeping the low 16 bits of the hash as a 4-hex display tag"
151)]
152fn autogen_short_id(host_id: &str) -> String {
153    use std::hash::{Hash, Hasher};
154    // DefaultHasher with fixed keys is deterministic per input, so a given
155    // host_id always renders the same tag within and across runs.
156    let mut h = std::collections::hash_map::DefaultHasher::new();
157    host_id.hash(&mut h);
158    format!("{:04x}", h.finish() as u16)
159}
160
161/// Compose a worker's display label from its operator-set `prefix`, an
162/// optional build `version`, and a resolved `id`. `None` when no prefix is
163/// set, so the monitoring view falls back to the raw `host_id`.
164///
165/// Shape: `{prefix}-worker-{version}-{id}`, with the version segment
166/// dropped when unset. So `with_worker_prefix("rates")` with id `9f3a`
167/// yields `rates-worker-{ver}-9f3a` (or `rates-worker-9f3a` with no
168/// version). Two workers sharing a prefix stay distinct via the id.
169#[must_use]
170fn compose_worker_name(prefix: Option<&str>, version: Option<&str>, id: &str) -> Option<String> {
171    let prefix = prefix?;
172    Some(version.map_or_else(
173        || format!("{prefix}-worker-{id}"),
174        |v| format!("{prefix}-worker-{v}-{id}"),
175    ))
176}
177
178const SUPERVISOR_TICK: Duration = Duration::from_secs(1);
179/// Worker idle-poll fallback when the storage's `wait_for_work`
180/// returns without a notify (timeout). 500 ms keeps a missed wake
181/// from stalling the queue more than half a second.
182const IDLE_POLL: Duration = Duration::from_millis(500);
183const WORKER_CAP: usize = 64;
184/// In-flight job heartbeat cadence.
185const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
186/// Reaper tick cadence.
187pub const REAPER_TICK: Duration = Duration::from_secs(15);
188/// Jobs / processes with heartbeat older than this are reaped.
189///
190/// L5 — clock domain: the staleness horizon is computed as
191/// `Utc::now() - STALE_THRESHOLD` from the **runtime (app) clock** and
192/// passed as an absolute instant to `revive_stale` / `reap_stale` /
193/// `list_live_pods`. The cron/coordinator lease, by contrast, compares
194/// and writes with the **DB clock** (`now()`) on both sides, so it's
195/// internally consistent. On a single-process `SQLite` deploy these are
196/// the same clock. On a multi-replica Postgres deploy they differ by the
197/// app↔DB skew: a replica whose clock runs behind could briefly look
198/// stale to the leader (and be trimmed from the live-pod set) and
199/// self-heal on the next heartbeat. The 60s threshold is >> realistic
200/// NTP-synced skew, so this is a documented assumption, not a live bug;
201/// moving the horizons into SQL (`now() - interval`) would unify the
202/// domain if a deploy ever runs with looser clocks.
203const STALE_THRESHOLD: ChronoDuration = ChronoDuration::seconds(60);
204/// Retention cleanup cadence.
205pub const CLEANUP_TICK: Duration = Duration::from_mins(5);
206/// Timeline-event flush cadence.
207///
208/// Workers buffer `queue_event` rows in-process (off the hot enqueue /
209/// claim / finalize transactions); this loop drains and batch-inserts
210/// them. Well under `METRICS_TICK` (60s) so a minute's events are
211/// persisted before the metrics roller aggregates them, and short enough
212/// that a crash loses only a small tail of chart data. Runs on every
213/// replica — each flushes its own buffer.
214pub const EVENT_FLUSH_TICK: Duration = Duration::from_secs(2);
215/// Default `shutdown_graceful` timeout.
216pub const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
217
218// ────────────────────────────────────────────────────────────────────
219// QueueRuntime
220// ────────────────────────────────────────────────────────────────────
221
222/// In-process map from currently-running `JobId` → per-job cancel
223/// token. Lets `QueueHandle::request_cancel` short-circuit the
224/// heartbeat-tick round-trip when the job runs on the same pod;
225/// cross-pod cancels still flow through the DB flag.
226type RunningJobs = Arc<Mutex<HashMap<JobId, CancellationToken>>>;
227
228/// Top-level handle for the queue subsystem.
229#[derive(Clone)]
230pub struct QueueRuntime {
231    storage: Storage,
232    handlers: Arc<HandlerRegistry>,
233    router: Arc<dyn Router>,
234    host_id: String,
235    running_jobs: RunningJobs,
236    /// Constructed once at `new()` so the per-scope refill-rate
237    /// cache lives across every worker's `JobCtx` borrow.
238    rate_limit: Arc<RateLimiter>,
239    /// Queues this worker is responsible for. **Required** — `start()`
240    /// errors if empty. Set via [`QueueRuntime::with_queues`] /
241    /// [`queues_from_env`]. The supervisor spawns one loop per queue
242    /// here; the rebalancer only hands this pod slots for these queues.
243    queues: Vec<String>,
244    /// Operator-set label prefix (`FORGE_WORKER_PREFIX`). Composed with the
245    /// build version + a short `host_id` into the monitoring-view label; see
246    /// [`compose_worker_name`]. `None` → display falls back to `host_id`.
247    worker_prefix: Option<String>,
248    /// Build version (`FORGE_WORKER_VERSION`, usually a git commit) for the
249    /// composed worker label. `None` → the version segment is omitted.
250    worker_version: Option<String>,
251    /// Explicit id segment override (`FORGE_WORKER_ID`, e.g. a k8s pod id).
252    /// `None` → forge autogenerates a short id from `host_id` at `start()`.
253    worker_id: Option<String>,
254}
255
256impl std::fmt::Debug for QueueRuntime {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        f.debug_struct("QueueRuntime")
259            .field("host_id", &self.host_id)
260            .field("handlers", &self.handlers)
261            .finish_non_exhaustive()
262    }
263}
264
265impl QueueRuntime {
266    /// Build a new runtime. `host_id` is freshly minted from a ULID
267    /// so each process boot has its own identity in the process
268    /// registry.
269    #[must_use]
270    pub fn new(storage: Storage, handlers: HandlerRegistry, router: Arc<dyn Router>) -> Self {
271        let rate_limit = Arc::new(RateLimiter::new(
272            storage.clone(),
273            rate_limit::DEFAULT_RATE_LIMIT_SCOPES,
274        ));
275        Self {
276            storage,
277            handlers: Arc::new(handlers),
278            router,
279            host_id: Ulid::new().to_string(),
280            running_jobs: Arc::new(Mutex::new(HashMap::new())),
281            rate_limit,
282            queues: Vec::new(),
283            worker_prefix: None,
284            worker_version: None,
285            worker_id: None,
286        }
287    }
288
289    /// Declare the queues this worker is responsible for. **Required** —
290    /// a worker that declares none fails at [`start`](Self::start).
291    /// Names are de-duplicated, preserving first-seen order. Only
292    /// supervisors for these queues are spawned, and the rebalancer only
293    /// hands this pod slots for them.
294    #[must_use]
295    pub fn with_queues(mut self, queues: impl IntoIterator<Item = String>) -> Self {
296        let mut seen = std::collections::HashSet::new();
297        self.queues = queues
298            .into_iter()
299            .filter(|q| !q.is_empty() && seen.insert(q.clone()))
300            .collect();
301        self
302    }
303
304    /// Set this worker's label *prefix*, shown in the monitoring view.
305    /// The full label is composed as `{prefix}-worker-{version}-{id}`
306    /// (k8s-style), so two workers sharing a prefix stay distinct via the
307    /// trailing `host_id`. Accepts a `&'static str` const, so a service can
308    /// bake its identity in at compile time:
309    /// `.with_worker_prefix("rates")` → `rates-worker-{ver}-{id}`.
310    /// Unset → the view falls back to `host_id`. See also
311    /// [`with_worker_version`](Self::with_worker_version) /
312    /// [`worker_prefix_from_env`].
313    #[must_use]
314    pub fn with_worker_prefix(mut self, prefix: impl Into<String>) -> Self {
315        let prefix = prefix.into();
316        self.worker_prefix = (!prefix.is_empty()).then_some(prefix);
317        self
318    }
319
320    /// Set the build version segment of the composed worker label —
321    /// typically the git commit the binary was built at. Omitted from the
322    /// label when unset. Has no effect unless a prefix is also set (the
323    /// label only exists with a prefix). See
324    /// [`with_worker_prefix`](Self::with_worker_prefix) /
325    /// [`worker_version_from_env`].
326    #[must_use]
327    pub fn with_worker_version(mut self, version: impl Into<String>) -> Self {
328        let version = version.into();
329        self.worker_version = (!version.is_empty()).then_some(version);
330        self
331    }
332
333    /// Override the autogenerated id segment of the composed worker label
334    /// — e.g. a k8s pod id from `FORGE_WORKER_ID`, so the label tracks pod
335    /// identity. Unset → forge autogenerates a short id (4 hex) from the
336    /// per-boot `host_id`. Has no effect unless a prefix is also set. See
337    /// [`worker_id_from_env`].
338    #[must_use]
339    pub fn with_worker_id(mut self, id: impl Into<String>) -> Self {
340        let id = id.into();
341        self.worker_id = (!id.is_empty()).then_some(id);
342        self
343    }
344
345    /// Ensure a queue config row exists. Safe to call before or after
346    /// `start`; the supervisor for a newly-added queue is *not*
347    /// spawned until the next `start` invocation.
348    pub async fn ensure_queue(&self, name: &str, default_max_workers: i32) -> Result<()> {
349        self.storage
350            .config
351            .ensure_queue(name, default_max_workers)
352            .await
353    }
354
355    /// Insert a job and wake the destination queue.
356    pub async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome> {
357        let mut req = req;
358        if req.queue_name.is_none() {
359            req.queue_name = Some(std::borrow::Cow::Borrowed(
360                self.router.route(req.kind.as_ref()),
361            ));
362        }
363        self.storage.jobs.enqueue(req).await
364    }
365
366    /// Spawn one supervisor per registered queue + reaper + cleanup
367    /// + cron. Returns a [`QueueHandle`] for orchestration.
368    pub async fn start(self) -> Result<QueueHandle> {
369        // Declaring queues is mandatory — running every queue implicitly
370        // is no longer supported (a worker now owns an explicit subset so
371        // the cluster can split responsibilities). Fail fast and loud.
372        if self.queues.is_empty() {
373            return Err(crate::storage::error::StorageError::Config(
374                "no queues declared: set FORGE_QUEUES or call QueueRuntime::with_queues — \
375                 running all queues implicitly is no longer supported"
376                    .to_owned(),
377            ));
378        }
379
380        let shutdown = CancellationToken::new();
381        let mut join_set = JoinSet::new();
382
383        for name in &self.queues {
384            // A queue name is round-tripped through the `pod.queues` CSV
385            // column, so it must not contain the ',' delimiter (else it
386            // would decode into phantom queues). Reject at the declaration
387            // gate rather than corrupting silently downstream.
388            crate::storage::types::validate_queue_name(name)?;
389            // Make sure the config row exists so the supervisor has a
390            // max_workers/paused row to read even on a fresh DB. Seed
391            // from the well-known defaults when we recognize the queue,
392            // else 1.
393            let default_workers = DEFAULT_QUEUE_WORKERS
394                .iter()
395                .find_map(|(q, n)| (*q == name).then_some(*n))
396                .unwrap_or(1);
397            self.storage
398                .config
399                .ensure_queue(name, default_workers)
400                .await?;
401            join_set.spawn(supervisor_loop(
402                self.storage.clone(),
403                self.handlers.clone(),
404                self.router.clone(),
405                name.clone(),
406                self.host_id.clone(),
407                self.running_jobs.clone(),
408                self.rate_limit.clone(),
409                shutdown.clone(),
410            ));
411        }
412
413        join_set.spawn(reaper_loop(self.storage.clone(), shutdown.clone()));
414        join_set.spawn(cleanup_loop(
415            self.storage.clone(),
416            self.host_id.clone(),
417            shutdown.clone(),
418        ));
419        join_set.spawn(cron::cron_loop(
420            self.storage.clone(),
421            self.router.clone(),
422            self.host_id.clone(),
423            shutdown.clone(),
424        ));
425        // Cluster rebalancing: every pod stamps its liveness; the
426        // coordinator (cron-lease holder) splits each queue's
427        // max_workers across live pods into pod_slot_assignment.
428        // Compose the monitoring label once, now that host_id + prefix +
429        // version + id are all known. The id is the explicit override
430        // (e.g. a k8s pod id) when set, else a short tag forge derives
431        // from the per-boot host_id. `None` (no prefix) → the view falls
432        // back to host_id. This is what lands in `pod.worker_name` and
433        // what the resource/DB charts join host_id against for labels.
434        let worker_id = self
435            .worker_id
436            .clone()
437            .unwrap_or_else(|| autogen_short_id(&self.host_id));
438        let worker_name = compose_worker_name(
439            self.worker_prefix.as_deref(),
440            self.worker_version.as_deref(),
441            &worker_id,
442        );
443        join_set.spawn(rebalance::pod_heartbeat_loop(
444            self.storage.clone(),
445            self.host_id.clone(),
446            worker_name,
447            self.queues.clone(),
448            shutdown.clone(),
449        ));
450        join_set.spawn(rebalance::rebalance_loop(
451            self.storage.clone(),
452            self.host_id.clone(),
453            shutdown.clone(),
454        ));
455        // Metrics rollup: the cron-lease holder pre-aggregates per-queue
456        // counts + latency percentiles into metric_bucket every minute.
457        join_set.spawn(metrics::metrics_loop(
458            self.storage.clone(),
459            self.host_id.clone(),
460            shutdown.clone(),
461        ));
462        // Timeline-event flush: drain this replica's in-process event
463        // buffer and batch-insert into queue_event. Not lease-gated —
464        // every replica flushes the events its own workers produced.
465        join_set.spawn(event_flush_loop(self.storage.clone(), shutdown.clone()));
466
467        Ok(QueueHandle {
468            shutdown,
469            join_set,
470            host_id: self.host_id,
471            storage: self.storage,
472            handlers: self.handlers,
473            router: self.router,
474            running_jobs: self.running_jobs,
475            rate_limit: self.rate_limit,
476        })
477    }
478}
479
480// ────────────────────────────────────────────────────────────────────
481// QueueHandle
482// ────────────────────────────────────────────────────────────────────
483
484pub struct QueueHandle {
485    shutdown: CancellationToken,
486    join_set: JoinSet<()>,
487    host_id: String,
488    storage: Storage,
489    handlers: Arc<HandlerRegistry>,
490    router: Arc<dyn Router>,
491    running_jobs: RunningJobs,
492    rate_limit: Arc<RateLimiter>,
493}
494
495impl std::fmt::Debug for QueueHandle {
496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497        f.debug_struct("QueueHandle")
498            .field("host_id", &self.host_id)
499            .field("handlers", &self.handlers)
500            .finish_non_exhaustive()
501    }
502}
503
504impl QueueHandle {
505    /// Per-boot identifier. Surfaced for tests + diagnostics.
506    #[must_use]
507    pub fn host_id(&self) -> &str {
508        &self.host_id
509    }
510
511    /// Read-only access to the storage for IPC commands.
512    #[must_use]
513    pub const fn storage(&self) -> &Storage {
514        &self.storage
515    }
516
517    /// Cluster-wide rate limiter — same instance every worker's
518    /// `JobCtx::rate_limit` points at. Surfaced so out-of-band
519    /// callers (e.g. cron tasks spawned outside the worker loop)
520    /// can `acquire` against the same budget as in-flight jobs.
521    #[must_use]
522    pub fn rate_limit(&self) -> &RateLimiter {
523        &self.rate_limit
524    }
525
526    /// Signal a cancel for an in-flight job by id.
527    ///
528    /// If the job is running on this pod, cancels its in-process
529    /// token immediately and returns `true`. If the job isn't on
530    /// this pod (or isn't running at all) returns `false`; cross-pod
531    /// cancels go through `JobQueue::delete` on an `in_progress`
532    /// row, which sets a DB flag the owning pod's heartbeat tick
533    /// observes within `HEARTBEAT_INTERVAL`.
534    #[must_use]
535    pub fn request_cancel(&self, job_id: &JobId) -> bool {
536        // poison-safe: a panic-poisoned lock means *some* worker hit
537        // a panic mid-insert/remove; better to take the lock and try
538        // than to silently fail every cancel.
539        let map = match self.running_jobs.lock() {
540            Ok(g) => g,
541            Err(poisoned) => poisoned.into_inner(),
542        };
543        map.get(job_id).is_some_and(|token| {
544            token.cancel();
545            true
546        })
547    }
548
549    /// Enqueue from a different task after start. Same semantics as
550    /// `QueueRuntime::enqueue`.
551    pub async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome> {
552        let mut req = req;
553        if req.queue_name.is_none() {
554            req.queue_name = Some(std::borrow::Cow::Borrowed(
555                self.router.route(req.kind.as_ref()),
556            ));
557        }
558        self.storage.jobs.enqueue(req).await
559    }
560
561    /// Signal shutdown, wait up to `timeout` for in-flight jobs to
562    /// drain, then delete process registry rows for this host.
563    pub async fn shutdown_graceful(mut self, timeout: Duration) {
564        self.shutdown.cancel();
565        let drain = async { while self.join_set.join_next().await.is_some() {} };
566        if tokio::time::timeout(timeout, drain).await.is_err() {
567            tracing::warn!(
568                timeout_secs = timeout.as_secs(),
569                host_id = %self.host_id,
570                "shutdown_graceful: timeout exceeded, aborting remaining tasks",
571            );
572            self.join_set.abort_all();
573            while self.join_set.join_next().await.is_some() {}
574        }
575        // Workers have now drained (or been aborted); flush once more so
576        // any timeline events buffered during the drain window — after
577        // event_flush_loop did its own final flush and exited — still
578        // land. Best-effort: chart data, never job state.
579        if let Err(e) = self.storage.jobs.flush_event_buffer().await {
580            tracing::warn!(?e, host_id = %self.host_id, "event flush on shutdown failed");
581        }
582        if let Err(e) = self.storage.procs.delete_for_host(&self.host_id).await {
583            tracing::warn!(?e, host_id = %self.host_id, "delete_for_host on shutdown failed");
584        }
585    }
586}
587
588// ────────────────────────────────────────────────────────────────────
589// Supervisor — one per queue.
590// ────────────────────────────────────────────────────────────────────
591
592struct WorkerSlot {
593    handle: JoinHandle<()>,
594    cancel: CancellationToken,
595}
596
597#[allow(
598    clippy::too_many_arguments,
599    reason = "per-supervisor scratch state; `running_jobs` is the in-process cancel registry shared with workers, `rate_limit` is the boot-constructed limiter passed by ref to JobCtx"
600)]
601async fn supervisor_loop(
602    storage: Storage,
603    handlers: Arc<HandlerRegistry>,
604    router: Arc<dyn Router>,
605    queue_name: String,
606    host_id: String,
607    running_jobs: RunningJobs,
608    rate_limit: Arc<RateLimiter>,
609    shutdown: CancellationToken,
610) {
611    tracing::info!(queue = %queue_name, host_id = %host_id, "supervisor: start");
612    let mut workers: HashMap<usize, WorkerSlot> = HashMap::new();
613    let mut tick = tokio::time::interval(SUPERVISOR_TICK);
614    tick.tick().await;
615
616    loop {
617        tokio::select! {
618            biased;
619            () = shutdown.cancelled() => {
620                tracing::info!(queue = %queue_name, "supervisor: shutdown signal");
621                drain_all(&mut workers).await;
622                tracing::info!(queue = %queue_name, "supervisor: stopped");
623                return;
624            }
625            _ = tick.tick() => {
626                reap_finished(&mut workers).await;
627                let Some(target) = resolve_target(&storage, &queue_name, &host_id).await else {
628                    continue;
629                };
630                scale_down(&mut workers, target);
631                scale_up(
632                    &mut workers,
633                    target,
634                    &storage,
635                    &handlers,
636                    &router,
637                    &queue_name,
638                    &host_id,
639                    &running_jobs,
640                    &rate_limit,
641                );
642            }
643        }
644    }
645}
646
647/// Drop every worker slot whose join handle has resolved.
648///
649/// We collect the finished slot ids first, then await each handle to
650/// surface its `JoinError`. A `panic!()` inside a handler shows up
651/// here as `Err(JoinError::panic)` — without the await, the panic
652/// payload disappears silently. Cancellation (the shutdown path) is
653/// also a `JoinError` but `drain_all` does its own await + log.
654async fn reap_finished(workers: &mut HashMap<usize, WorkerSlot>) {
655    let finished: Vec<usize> = workers
656        .iter()
657        .filter_map(|(&id, slot)| slot.handle.is_finished().then_some(id))
658        .collect();
659    for id in finished {
660        if let Some(slot) = workers.remove(&id)
661            && let Err(e) = slot.handle.await
662        {
663            tracing::error!(?e, slot = id, "supervisor: worker task panicked");
664        }
665    }
666}
667
668async fn resolve_target(storage: &Storage, queue_name: &str, host_id: &str) -> Option<usize> {
669    let q = match storage.config.get_queue(queue_name).await {
670        Ok(Some(q)) => q,
671        Ok(None) => {
672            tracing::warn!(queue = %queue_name, "supervisor: queue row vanished");
673            return None;
674        }
675        Err(e) => {
676            tracing::warn!(queue = %queue_name, ?e, "supervisor: queue lookup failed");
677            return None;
678        }
679    };
680    if q.paused {
681        return Some(0);
682    }
683    // This pod's share of the cluster-wide max_workers, as set by the
684    // rebalancer. When no assignment exists yet (fresh pod,
685    // pre-first-rebalance, or rebalancer down) fall back to a fair-share
686    // *estimate* — NOT the full total. M6: running the whole total here
687    // means a rolling deploy's N replacement pods each spin up the full
688    // count, an N× over-parallelism storm aimed at the very upstreams the
689    // cluster rate budget exists to protect. The next rebalance refines
690    // the estimate. On SQLite the lone live pod estimates the whole total.
691    let raw = match storage.procs.get_slots(queue_name, host_id).await {
692        Ok(Some(slots)) => usize::try_from(slots).unwrap_or(0),
693        Ok(None) => fair_fallback(storage, queue_name, q.max_workers).await,
694        Err(e) => {
695            tracing::warn!(queue = %queue_name, ?e, "supervisor: slot lookup failed; estimating fair share");
696            fair_fallback(storage, queue_name, q.max_workers).await
697        }
698    };
699    Some(raw.min(WORKER_CAP))
700}
701
702/// Fair-share worker estimate for a pod that has no slot assignment yet.
703/// `max_workers` is the cluster total; divide by the count of live pods
704/// *eligible for this queue* (those that declared it) — rounded up so the
705/// fleet never *under*-serves the total, clamped to ≥1 pod. Counting only
706/// eligible pods mirrors the rebalancer: if just one pod runs `gh`, it
707/// estimates the whole `gh` total rather than a fraction. Used only on the
708/// rare unassigned / rebalancer-down path, so the extra `list_live_pods`
709/// read isn't on the steady per-tick path.
710async fn fair_fallback(storage: &Storage, queue_name: &str, max_workers: i32) -> usize {
711    let total = usize::try_from(max_workers).unwrap_or(0);
712    if total == 0 {
713        return 0;
714    }
715    let stale_before = Utc::now() - STALE_THRESHOLD;
716    let eligible = storage
717        .procs
718        .list_live_pods(stale_before)
719        .await
720        .map_or(1, |pods| {
721            pods.iter().filter(|p| p.handles(queue_name)).count()
722        });
723    total.div_ceil(eligible.max(1))
724}
725
726fn scale_down(workers: &mut HashMap<usize, WorkerSlot>, target: usize) {
727    while workers.len() > target {
728        let Some(&max_slot) = workers.keys().max() else {
729            break;
730        };
731        if let Some(slot) = workers.remove(&max_slot) {
732            slot.cancel.cancel();
733            drop(slot.handle);
734        }
735    }
736}
737
738#[allow(
739    clippy::too_many_arguments,
740    reason = "supervisor scratch state passed through to each spawned worker. Threshold to revisit: if scale_up ever needs one more parameter, OR if any of the loops here (supervisor_loop, worker_loop) grows another ~30-line block, extract a `WorkerCtx` struct."
741)]
742fn scale_up(
743    workers: &mut HashMap<usize, WorkerSlot>,
744    target: usize,
745    storage: &Storage,
746    handlers: &Arc<HandlerRegistry>,
747    router: &Arc<dyn Router>,
748    queue_name: &str,
749    host_id: &str,
750    running_jobs: &RunningJobs,
751    rate_limit: &Arc<RateLimiter>,
752) {
753    while workers.len() < target {
754        let mut slot_idx: usize = 0;
755        while workers.contains_key(&slot_idx) {
756            slot_idx = slot_idx.saturating_add(1);
757        }
758        let process_id = format!("{queue_name}-{slot_idx}-{host_id}");
759        tracing::info!(
760            queue = %queue_name,
761            slot = slot_idx,
762            worker_id = %process_id,
763            "worker spawned"
764        );
765        let cancel = CancellationToken::new();
766        let handle = tokio::spawn(worker_loop(
767            storage.clone(),
768            handlers.clone(),
769            router.clone(),
770            queue_name.to_owned(),
771            process_id,
772            host_id.to_owned(),
773            running_jobs.clone(),
774            rate_limit.clone(),
775            cancel.clone(),
776        ));
777        workers.insert(slot_idx, WorkerSlot { handle, cancel });
778    }
779}
780
781async fn drain_all(workers: &mut HashMap<usize, WorkerSlot>) {
782    for (_, slot) in workers.drain() {
783        slot.cancel.cancel();
784        if let Err(e) = slot.handle.await {
785            tracing::warn!(?e, "supervisor: worker join error on drain");
786        }
787    }
788}
789
790// ────────────────────────────────────────────────────────────────────
791// Worker — claim/run/finalize loop.
792// ────────────────────────────────────────────────────────────────────
793
794/// Surface handler `Failed` / `Dead` outcomes via `tracing` so they don't
795/// bury silently in `last_error`. The `error` field carries the full source
796/// chain assembled by [`crate::format_error_chain`] upstream.
797fn log_handler_outcome(job: &JobRecord, job_id: &JobId, outcome: &FinalizeOutcome) {
798    match outcome {
799        FinalizeOutcome::Failed {
800            message,
801            retry_after,
802        } => tracing::warn!(
803            kind = %job.kind,
804            queue = %job.queue_name,
805            job_id = %job_id.as_str(),
806            attempts = job.attempts,
807            max_attempts = job.max_attempts,
808            retry_in_secs = retry_after.as_secs(),
809            error = %message,
810            "worker: handler failed; will retry",
811        ),
812        FinalizeOutcome::Dead { message } => tracing::error!(
813            kind = %job.kind,
814            queue = %job.queue_name,
815            job_id = %job_id.as_str(),
816            attempts = job.attempts,
817            error = %message,
818            "worker: handler failed terminally (dead)",
819        ),
820        FinalizeOutcome::Done | FinalizeOutcome::Throttled { .. } => {}
821    }
822}
823
824#[allow(
825    clippy::too_many_arguments,
826    clippy::too_many_lines,
827    reason = "worker scratch state; B2 cancel registry + Dead-on-user-cancel branch pushed past the 100-line cap. The loop is one cohesive claim→run→finalize cycle; splitting it splits the lifetime of the per-job cancel token. Revisit when extracting a `WorkerCtx`."
828)]
829async fn worker_loop(
830    storage: Storage,
831    handlers: Arc<HandlerRegistry>,
832    router: Arc<dyn Router>,
833    queue_name: String,
834    process_id: String,
835    host_id: String,
836    running_jobs: RunningJobs,
837    rate_limit: Arc<RateLimiter>,
838    cancel: CancellationToken,
839) {
840    tracing::debug!(%process_id, "worker: start");
841    if let Err(e) = storage
842        .procs
843        .register(&process_id, &queue_name, &host_id)
844        .await
845    {
846        tracing::error!(?e, %process_id, "worker: register failed; exiting");
847        return;
848    }
849
850    loop {
851        if cancel.is_cancelled() {
852            break;
853        }
854
855        let job = match storage.jobs.claim_next(&queue_name, &process_id).await {
856            Ok(Some(j)) => j,
857            Ok(None) => {
858                if idle_wait(&storage, &queue_name, &process_id, &cancel).await {
859                    break;
860                }
861                continue;
862            }
863            Err(e) => {
864                tracing::warn!(?e, %process_id, "worker: claim failed, backing off 1s");
865                tokio::time::sleep(Duration::from_secs(1)).await;
866                continue;
867            }
868        };
869
870        let job_id = job.id.clone();
871
872        if let Err(e) = storage
873            .procs
874            .heartbeat(&process_id, Some(job_id.clone()))
875            .await
876        {
877            tracing::warn!(?e, %process_id, "worker: heartbeat-with-job failed");
878        }
879
880        // Per-job child token: parented to the worker cancel so
881        // shutdown still propagates to the handler, but cancellable
882        // independently for `QueueHandle::request_cancel` and the
883        // heartbeat-tick observation of a DB cancel flag.
884        let job_cancel = cancel.child_token();
885        register_running_job(&running_jobs, &job_id, job_cancel.clone());
886
887        // Side-task heartbeats both the job row and the process row
888        // every HEARTBEAT_INTERVAL so a long handler doesn't trip
889        // the reaper. It also reads the row's `cancel_requested_at`
890        // each tick and triggers `job_cancel` when set.
891        let heartbeat_cancel = CancellationToken::new();
892        let heartbeat_task = tokio::spawn(heartbeat_loop(
893            storage.clone(),
894            process_id.clone(),
895            job_id.clone(),
896            heartbeat_cancel.clone(),
897            job_cancel.clone(),
898        ));
899
900        let outcome = match handlers.get(&job.kind) {
901            Some(handler) => {
902                let ctx = JobCtx {
903                    storage: &storage,
904                    router: router.as_ref(),
905                    rate_limit: rate_limit.as_ref(),
906                    job_id: job_id.clone(),
907                    process_id: &process_id,
908                    host_id: &host_id,
909                    cancel: job_cancel.clone(),
910                };
911                handler.run(ctx, job.payload.clone()).await
912            }
913            None => JobOutcome::Failed(format!("no handler registered for kind: {}", job.kind)),
914        };
915
916        heartbeat_cancel.cancel();
917        if let Err(e) = heartbeat_task.await {
918            tracing::warn!(?e, %process_id, "worker: heartbeat task join failed");
919        }
920        deregister_running_job(&running_jobs, &job_id);
921
922        // User-initiated cancel (in-process via `request_cancel` or
923        // cross-pod via the DB cancel flag) trips `job_cancel`
924        // without tripping the worker-slot `cancel`. In that case
925        // skip the retry curve and finalize as Dead — otherwise a
926        // backoff-off queue would immediately re-claim and re-run
927        // the same row, defeating the user's cancel intent. A
928        // worker-slot cancel (shutdown / scale-down) cascades to
929        // `job_cancel` too, so this branch ignores that path.
930        let user_cancelled = !cancel.is_cancelled() && job_cancel.is_cancelled();
931        // Fetch the queue's backoff config so map_outcome can size
932        // the throttle delay. One extra read per finalize is fine —
933        // this path is rare (only on failure / throttle) and matches
934        // the supervisor's per-tick config read.
935        let backoff_cfg = fetch_backoff_cfg(&storage, &queue_name).await;
936        // M2: a cancel that lands while (or just after) the handler
937        // already returned `Done` must NOT rewrite the row as Dead — the
938        // work happened (e-mail sent, API call committed), and recording
939        // it "cancelled by user" invites an operator retry that genuinely
940        // double-executes it. The cancel simply arrived too late. The
941        // Dead-on-cancel override only applies to non-`Done` outcomes,
942        // where it prevents a backoff-off queue immediately re-claiming
943        // and re-running the row the user asked to stop.
944        let finalize_outcome = if user_cancelled && !matches!(outcome, JobOutcome::Done) {
945            FinalizeOutcome::Dead {
946                message: "cancelled by user".to_owned(),
947            }
948        } else {
949            map_outcome(&job, outcome, backoff_cfg.as_ref())
950        };
951        log_handler_outcome(&job, &job_id, &finalize_outcome);
952        // Capture the cool-down so this worker idles locally instead of
953        // spinning empty claims while the queue gate (set by finalize)
954        // holds. Other workers hit the gate in `claim_next` and idle via
955        // their normal wait path.
956        let throttle_pause = match &finalize_outcome {
957            FinalizeOutcome::Throttled {
958                retry_after,
959                cool_down_queue: true,
960            } => Some(*retry_after),
961            _ => None,
962        };
963        // Pass our `process_id` as the ownership guard (H1): if this
964        // worker stalled past the stale threshold and the reaper revived
965        // + another worker re-claimed the row, this finalize no-ops
966        // instead of clobbering the new claimant's in-flight job.
967        if let Err(e) = storage
968            .jobs
969            .finalize(&job_id, Some(&process_id), finalize_outcome)
970            .await
971        {
972            tracing::error!(?e, ?job_id, %process_id, "worker: finalize failed");
973        }
974
975        if let Err(e) = storage.procs.heartbeat(&process_id, None).await {
976            tracing::warn!(?e, %process_id, "worker: heartbeat-clear failed");
977        }
978
979        if let Some(pause) = throttle_pause {
980            tracing::info!(
981                queue = %queue_name,
982                secs = pause.as_secs(),
983                "worker: queue throttled; pausing before next claim"
984            );
985            tokio::select! {
986                biased;
987                () = cancel.cancelled() => break,
988                () = tokio::time::sleep(pause) => {}
989            }
990        }
991    }
992
993    if let Err(e) = storage.procs.deregister(&process_id).await {
994        tracing::warn!(?e, %process_id, "worker: deregister failed on exit");
995    }
996    tracing::debug!(%process_id, "worker: stop");
997}
998
999/// Idle handling when `claim_next` found nothing: heartbeat the process
1000/// row, then either sleep out an active queue cool-down (so idle workers
1001/// don't busy-poll the gate for the whole window) or block on
1002/// `wait_for_work` until the next enqueue / idle-poll timeout. Returns
1003/// `true` if the worker was cancelled (the caller should break).
1004async fn idle_wait(
1005    storage: &Storage,
1006    queue_name: &str,
1007    process_id: &str,
1008    cancel: &CancellationToken,
1009) -> bool {
1010    if let Err(e) = storage.procs.heartbeat(process_id, None).await {
1011        tracing::warn!(?e, %process_id, "worker: idle heartbeat failed");
1012    }
1013    let cool_down = fetch_backoff_cfg(storage, queue_name)
1014        .await
1015        .and_then(|c| c.throttled_until)
1016        .and_then(|until| (until - Utc::now()).to_std().ok());
1017    let wait = async {
1018        match cool_down {
1019            Some(dur) => tokio::time::sleep(dur).await,
1020            None => {
1021                let _ = storage.jobs.wait_for_work(queue_name, IDLE_POLL).await;
1022            }
1023        }
1024    };
1025    tokio::select! {
1026        biased;
1027        () = cancel.cancelled() => true,
1028        () = wait => false,
1029    }
1030}
1031
1032/// Read the queue's config for `map_outcome`. A missing row or read
1033/// error degrades to `None` (legacy flat-60s throttle, no queue
1034/// cool-down) rather than failing the finalize.
1035async fn fetch_backoff_cfg(
1036    storage: &Storage,
1037    queue_name: &str,
1038) -> Option<crate::storage::types::QueueConfigRow> {
1039    match storage.config.get_queue(queue_name).await {
1040        Ok(Some(cfg)) => Some(cfg),
1041        Ok(None) => {
1042            tracing::warn!(
1043                queue = %queue_name,
1044                "worker: queue config vanished; using legacy throttle fallback"
1045            );
1046            None
1047        }
1048        Err(e) => {
1049            tracing::warn!(
1050                ?e,
1051                queue = %queue_name,
1052                "worker: queue config read failed; using legacy throttle fallback"
1053            );
1054            None
1055        }
1056    }
1057}
1058
1059/// Map a handler's `JobOutcome` into the storage layer's
1060/// `FinalizeOutcome`, applying retry budget + backoff.
1061///
1062/// `backoff_cfg` is the queue's current row (or `None` if the read
1063/// failed / the row vanished). The `backoff_enabled` toggle governs
1064/// both arms:
1065///  - `Throttled` — off → flat 60s; on → exponential on the
1066///    queue-wide throttle counter (`min(base * 2^throttle_attempts, max)`).
1067///  - `Failed` — off → no delay (immediately re-claimable, bounded
1068///    only by `max_attempts`); on → exponential on the job's own
1069///    attempt counter (`min(base * 2^attempts, max)`).
1070///
1071/// When `backoff_cfg` is `None`, both arms degrade to the legacy
1072/// shape (flat 60s for Throttled, no delay for Failed).
1073fn map_outcome(
1074    job: &JobRecord,
1075    outcome: JobOutcome,
1076    backoff_cfg: Option<&crate::storage::types::QueueConfigRow>,
1077) -> FinalizeOutcome {
1078    match outcome {
1079        JobOutcome::Done => FinalizeOutcome::Done,
1080        JobOutcome::Throttled { retry_after: _hint } => {
1081            // The handler's hint is intentionally ignored — the
1082            // runtime owns the throttle curve via per-queue config so
1083            // the cadence stays consistent across every handler that
1084            // returns `Throttled`.
1085            //
1086            // A throttle ALWAYS cools down the whole queue: rate limits
1087            // are per-token, so letting other workers keep claiming just
1088            // hammers the limiter. `backoff_enabled` only chooses the
1089            // delay *shape* — flat 60s when off, the exponential curve
1090            // (compounding on the queue-wide throttle count, since the
1091            // limit is shared) when on.
1092            let (enabled, base, max, attempts) = backoff_cfg.map_or((false, 60, 1800, 0), |c| {
1093                (
1094                    c.backoff_enabled,
1095                    c.backoff_base_seconds,
1096                    c.backoff_max_seconds,
1097                    c.throttle_attempts,
1098                )
1099            });
1100            let retry_after = retry::throttle_delay(attempts, enabled, base, max);
1101            FinalizeOutcome::Throttled {
1102                retry_after,
1103                cool_down_queue: true,
1104            }
1105        }
1106        JobOutcome::Dead(msg) => FinalizeOutcome::Dead { message: msg },
1107        JobOutcome::Failed(msg) => {
1108            if job.attempts >= job.max_attempts {
1109                FinalizeOutcome::Dead { message: msg }
1110            } else {
1111                // Mirror the Throttled arm: backoff_enabled toggles the
1112                // curve shape — off → no delay (`max_attempts` still
1113                // bounds the retry budget), on → the same exponential
1114                // curve the queue config configures.
1115                let (enabled, base, max) = backoff_cfg.map_or((false, 60, 1800), |c| {
1116                    (
1117                        c.backoff_enabled,
1118                        c.backoff_base_seconds,
1119                        c.backoff_max_seconds,
1120                    )
1121                });
1122                let retry_after = retry::failed_delay(job.attempts, enabled, base, max);
1123                FinalizeOutcome::Failed {
1124                    retry_after,
1125                    message: msg,
1126                }
1127            }
1128        }
1129    }
1130}
1131
1132// ────────────────────────────────────────────────────────────────────
1133// Heartbeat side-task.
1134// ────────────────────────────────────────────────────────────────────
1135
1136async fn heartbeat_loop(
1137    storage: Storage,
1138    process_id: String,
1139    job_id: JobId,
1140    stop: CancellationToken,
1141    job_cancel: CancellationToken,
1142) {
1143    let mut tick = tokio::time::interval(HEARTBEAT_INTERVAL);
1144    tick.tick().await;
1145    // Latches `true` the first time we observe the DB cancel flag
1146    // so we don't re-fire `job_cancel.cancel()` and re-log on every
1147    // subsequent tick. Cancellation is idempotent, but a handler
1148    // that takes 30s to unwind would otherwise log "cancel
1149    // requested" three times.
1150    let mut cancel_signalled = false;
1151    loop {
1152        tokio::select! {
1153            biased;
1154            () = stop.cancelled() => return,
1155            _ = tick.tick() => {
1156                match storage.jobs.heartbeat_job(&job_id, &process_id).await {
1157                    Ok(HeartbeatStatus::CancelRequested) if !cancel_signalled => {
1158                        // First observation of the DB cancel flag.
1159                        // Signal the handler; keep ticking so the
1160                        // row's heartbeat_at stays fresh while the
1161                        // handler unwinds.
1162                        tracing::info!(
1163                            job_id = %job_id.as_str(),
1164                            %process_id,
1165                            "heartbeat: cancel requested; signalling handler"
1166                        );
1167                        job_cancel.cancel();
1168                        cancel_signalled = true;
1169                    }
1170                    Ok(HeartbeatStatus::Lost) if !cancel_signalled => {
1171                        // M1: we no longer own this row — it was reaped
1172                        // past the stale threshold and another worker
1173                        // re-claimed it. Stop running so the same job
1174                        // isn't executing on two workers at once; the
1175                        // new owner holds it now. Our eventual finalize
1176                        // is a no-op under the H1 ownership guard.
1177                        tracing::warn!(
1178                            job_id = %job_id.as_str(),
1179                            %process_id,
1180                            "heartbeat: lost row ownership (reaped + re-claimed); stopping handler"
1181                        );
1182                        job_cancel.cancel();
1183                        cancel_signalled = true;
1184                    }
1185                    Ok(_) => {}
1186                    Err(e) => tracing::warn!(?e, %process_id, "heartbeat: job update failed"),
1187                }
1188                if let Err(e) = storage.procs.heartbeat(&process_id, Some(job_id.clone())).await {
1189                    tracing::warn!(?e, %process_id, "heartbeat: process update failed");
1190                }
1191            }
1192        }
1193    }
1194}
1195
1196fn register_running_job(map: &RunningJobs, job_id: &JobId, token: CancellationToken) {
1197    // poison-safe: prior poisoning means a worker panicked mid-op;
1198    // recover the guard and proceed so future cancels still work.
1199    let mut g = match map.lock() {
1200        Ok(g) => g,
1201        Err(poisoned) => poisoned.into_inner(),
1202    };
1203    g.insert(job_id.clone(), token);
1204}
1205
1206fn deregister_running_job(map: &RunningJobs, job_id: &JobId) {
1207    let mut g = match map.lock() {
1208        Ok(g) => g,
1209        Err(poisoned) => poisoned.into_inner(),
1210    };
1211    g.remove(job_id);
1212}
1213
1214// ────────────────────────────────────────────────────────────────────
1215// Reaper — one task per runtime.
1216// ────────────────────────────────────────────────────────────────────
1217
1218async fn reaper_loop(storage: Storage, shutdown: CancellationToken) {
1219    tracing::debug!("reaper: start");
1220    let mut tick = tokio::time::interval(REAPER_TICK);
1221    tick.tick().await;
1222    loop {
1223        tokio::select! {
1224            biased;
1225            () = shutdown.cancelled() => {
1226                tracing::debug!("reaper: shutdown");
1227                return;
1228            }
1229            _ = tick.tick() => {
1230                let stale_before = Utc::now() - STALE_THRESHOLD;
1231                match storage.jobs.revive_stale(stale_before).await {
1232                    Ok(n) if n > 0 => {
1233                        tracing::info!(revived = n, "reaper: revived stuck jobs");
1234                    }
1235                    Ok(_) => {}
1236                    Err(e) => tracing::warn!(?e, "reaper: revive_stale failed"),
1237                }
1238                if let Err(e) = storage.procs.reap_stale(stale_before).await {
1239                    tracing::warn!(?e, "reaper: process sweep failed");
1240                }
1241            }
1242        }
1243    }
1244}
1245
1246/// Public one-off sweep — tests + ops can trigger it without waiting
1247/// for the 15 s background tick.
1248pub async fn reap_stale_jobs(storage: &Storage) -> Result<u64> {
1249    let stale_before = Utc::now() - STALE_THRESHOLD;
1250    storage.jobs.revive_stale(stale_before).await
1251}
1252
1253// ────────────────────────────────────────────────────────────────────
1254// Timeline-event flush — one task per runtime, on every replica.
1255// ────────────────────────────────────────────────────────────────────
1256
1257/// Drain this replica's in-process timeline-event buffer and batch-insert
1258/// the rows into `queue_event` on every tick, plus one final flush on
1259/// shutdown so a graceful exit doesn't drop the tail. The buffer is the
1260/// adapter's; mock backends default to a no-op `flush_event_buffer`.
1261async fn event_flush_loop(storage: Storage, shutdown: CancellationToken) {
1262    tracing::debug!("event flush: start");
1263    let mut tick = tokio::time::interval(EVENT_FLUSH_TICK);
1264    tick.tick().await;
1265    loop {
1266        tokio::select! {
1267            biased;
1268            () = shutdown.cancelled() => {
1269                // Final drain — persist whatever workers buffered before
1270                // the cancel so a clean shutdown loses nothing.
1271                if let Err(e) = storage.jobs.flush_event_buffer().await {
1272                    tracing::warn!(?e, "event flush: final flush failed");
1273                }
1274                tracing::debug!("event flush: shutdown");
1275                return;
1276            }
1277            _ = tick.tick() => {
1278                if let Err(e) = storage.jobs.flush_event_buffer().await {
1279                    tracing::warn!(?e, "event flush: flush failed");
1280                }
1281            }
1282        }
1283    }
1284}
1285
1286// ────────────────────────────────────────────────────────────────────
1287// Cleanup — per-queue retention.
1288// ────────────────────────────────────────────────────────────────────
1289
1290/// Counts of rows the cleanup pass deleted, broken out by status.
1291#[derive(Debug, Default, Clone, Copy)]
1292pub struct CleanupReport {
1293    pub done_deleted: u64,
1294    pub dead_deleted: u64,
1295}
1296
1297impl CleanupReport {
1298    #[must_use]
1299    pub const fn total(&self) -> u64 {
1300        self.done_deleted + self.dead_deleted
1301    }
1302}
1303
1304/// One pass over per-queue retention. Public so tests / ops can
1305/// trigger an immediate sweep.
1306pub async fn cleanup_once(storage: &Storage) -> Result<CleanupReport> {
1307    let queues = storage.config.list_queues().await?;
1308    let mut report = CleanupReport::default();
1309    let now = Utc::now();
1310
1311    for q in queues {
1312        let done_threshold = now - ChronoDuration::days(i64::from(q.retain_done_for_days));
1313        let dead_threshold = now - ChronoDuration::days(i64::from(q.retain_dead_for_days));
1314        report.done_deleted += storage
1315            .jobs
1316            .cleanup_aged(&q.name, JobStatus::Done, done_threshold)
1317            .await?;
1318        report.dead_deleted += storage
1319            .jobs
1320            .cleanup_aged(&q.name, JobStatus::Dead, dead_threshold)
1321            .await?;
1322    }
1323
1324    // Metrics rollup retention (ADR 0009) — global, not per-queue.
1325    let metric_threshold = now - ChronoDuration::days(metrics::METRIC_RETENTION_DAYS);
1326    storage
1327        .jobs
1328        .delete_metric_buckets_before(metric_threshold)
1329        .await?;
1330
1331    Ok(report)
1332}
1333
1334async fn cleanup_loop(storage: Storage, host_id: String, shutdown: CancellationToken) {
1335    tracing::debug!("cleanup: start");
1336    let mut tick = tokio::time::interval(CLEANUP_TICK);
1337    tick.tick().await;
1338    loop {
1339        tokio::select! {
1340            biased;
1341            () = shutdown.cancelled() => {
1342                tracing::debug!("cleanup: shutdown");
1343                return;
1344            }
1345            _ = tick.tick() => {
1346                // Only the cron-lease holder runs the purge. The
1347                // DELETEs are idempotent across pods (a non-leader
1348                // re-run finds nothing the leader didn't already
1349                // sweep), but they take the writer lock — multiplying
1350                // contention by every pod every CLEANUP_TICK. On
1351                // SQLite the lease always grants (single-process).
1352                match storage.cron.try_cron_lease(&host_id, cron::CRON_LEASE_TTL).await {
1353                    Ok(true) => {}
1354                    Ok(false) => continue,
1355                    Err(e) => {
1356                        tracing::warn!(?e, %host_id, "cleanup: lease check failed");
1357                        continue;
1358                    }
1359                }
1360                match cleanup_once(&storage).await {
1361                    Ok(report) if report.total() > 0 => {
1362                        tracing::info!(
1363                            done = report.done_deleted,
1364                            dead = report.dead_deleted,
1365                            "cleanup: purged aged rows",
1366                        );
1367                    }
1368                    Ok(_) => {}
1369                    Err(e) => tracing::warn!(?e, "cleanup tick failed"),
1370                }
1371            }
1372        }
1373    }
1374}
1375
1376#[cfg(test)]
1377mod tests {
1378    use super::{autogen_short_id, compose_worker_name};
1379
1380    const HOST: &str = "01J9ZK3QP7V8M2T4R6W0E9K7QF";
1381
1382    #[test]
1383    fn no_prefix_yields_none_so_view_falls_back_to_host_id() {
1384        assert_eq!(compose_worker_name(None, Some("abc123"), "9f3a"), None);
1385        assert_eq!(compose_worker_name(None, None, "9f3a"), None);
1386    }
1387
1388    #[test]
1389    fn composes_prefix_worker_version_id() {
1390        assert_eq!(
1391            compose_worker_name(Some("rates"), Some("a1b2c3d"), "9f3a").as_deref(),
1392            Some("rates-worker-a1b2c3d-9f3a"),
1393        );
1394    }
1395
1396    #[test]
1397    fn version_segment_omitted_when_unset() {
1398        assert_eq!(
1399            compose_worker_name(Some("rates"), None, "9f3a").as_deref(),
1400            Some("rates-worker-9f3a"),
1401        );
1402    }
1403
1404    #[test]
1405    fn autogen_id_is_four_hex_and_stable_per_host() {
1406        let id = autogen_short_id(HOST);
1407        assert_eq!(id.len(), 4, "4 hex chars");
1408        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1409        assert_eq!(id, autogen_short_id(HOST), "deterministic for a host_id");
1410        assert_ne!(
1411            autogen_short_id(HOST),
1412            autogen_short_id("01J9ZK3QP7V8M2T4R6W0E9K7QG"),
1413            "distinct host_ids almost always render distinct tags",
1414        );
1415    }
1416}