mod cmd_exec;
mod cron;
mod demo;
mod handler;
mod metrics;
mod rate_limit;
mod rebalance;
mod retry;
mod routing;
mod worker_pool;
pub use cmd_exec::{CMD_EXEC_KIND, CmdExecHandler, CmdExecPayload};
pub use cron::{CRON_TICK, CronTickReport, cron_tick_once, ensure_schedules};
pub use demo::{NOOP_ECHO_KIND, NoopEcho};
pub use handler::{HandlerRegistry, JobCtx, JobHandler, JobOutcome};
pub use metrics::{METRICS_BUCKET_SECS, METRICS_TICK, metrics_roll_once};
pub use rate_limit::{
AcquireOutcome, DEFAULT_RATE_LIMIT_SCOPES, RateLimiter, ensure_default_rate_limits,
};
pub use rebalance::{REBALANCE_TICK, rebalance_once};
pub(crate) use retry::{THROTTLE_DECAY_GRACE_SECS, failed_delay};
pub use routing::{DefaultRouter, KindPrefixRouter, Router};
pub use worker_pool::{WorkerPoolConfig, WorkerPoolHandler};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use chrono::Duration as ChronoDuration;
use chrono::Utc;
use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use ulid::Ulid;
use crate::storage::HeartbeatStatus;
use crate::storage::Storage;
use crate::storage::error::Result;
use crate::storage::types::{
EnqueueOutcome, EnqueueRequest, FinalizeOutcome, JobId, JobRecord, JobStatus,
};
pub const DEFAULT_QUEUE_WORKERS: &[(&str, i32)] = &[("default", 1), ("gh", 3), ("slack", 2)];
pub const QUEUES_ENV: &str = "FORGE_QUEUES";
pub const WORKER_PREFIX_ENV: &str = "FORGE_WORKER_PREFIX";
pub const WORKER_VERSION_ENV: &str = "FORGE_WORKER_VERSION";
pub const WORKER_ID_ENV: &str = "FORGE_WORKER_ID";
#[must_use]
pub fn queues_from_env() -> Vec<String> {
std::env::var(QUEUES_ENV)
.unwrap_or_default()
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_owned)
.collect()
}
#[must_use]
pub fn worker_prefix_from_env() -> Option<String> {
std::env::var(WORKER_PREFIX_ENV)
.ok()
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty())
}
#[must_use]
pub fn worker_version_from_env() -> Option<String> {
std::env::var(WORKER_VERSION_ENV)
.ok()
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty())
}
#[must_use]
pub fn worker_id_from_env() -> Option<String> {
std::env::var(WORKER_ID_ENV)
.ok()
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty())
}
#[must_use]
#[allow(
clippy::cast_possible_truncation,
reason = "deliberately keeping the low 16 bits of the hash as a 4-hex display tag"
)]
fn autogen_short_id(host_id: &str) -> String {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
host_id.hash(&mut h);
format!("{:04x}", h.finish() as u16)
}
#[must_use]
fn compose_worker_name(prefix: Option<&str>, version: Option<&str>, id: &str) -> Option<String> {
let prefix = prefix?;
Some(version.map_or_else(
|| format!("{prefix}-worker-{id}"),
|v| format!("{prefix}-worker-{v}-{id}"),
))
}
const SUPERVISOR_TICK: Duration = Duration::from_secs(1);
const IDLE_POLL: Duration = Duration::from_millis(500);
const WORKER_CAP: usize = 64;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
pub const REAPER_TICK: Duration = Duration::from_secs(15);
const STALE_THRESHOLD: ChronoDuration = ChronoDuration::seconds(60);
pub const CLEANUP_TICK: Duration = Duration::from_mins(5);
pub const EVENT_FLUSH_TICK: Duration = Duration::from_secs(2);
pub const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
type RunningJobs = Arc<Mutex<HashMap<JobId, CancellationToken>>>;
#[derive(Clone)]
pub struct QueueRuntime {
storage: Storage,
handlers: Arc<HandlerRegistry>,
router: Arc<dyn Router>,
host_id: String,
running_jobs: RunningJobs,
rate_limit: Arc<RateLimiter>,
queues: Vec<String>,
worker_prefix: Option<String>,
worker_version: Option<String>,
worker_id: Option<String>,
}
impl std::fmt::Debug for QueueRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueueRuntime")
.field("host_id", &self.host_id)
.field("handlers", &self.handlers)
.finish_non_exhaustive()
}
}
impl QueueRuntime {
#[must_use]
pub fn new(storage: Storage, handlers: HandlerRegistry, router: Arc<dyn Router>) -> Self {
let rate_limit = Arc::new(RateLimiter::new(
storage.clone(),
rate_limit::DEFAULT_RATE_LIMIT_SCOPES,
));
Self {
storage,
handlers: Arc::new(handlers),
router,
host_id: Ulid::new().to_string(),
running_jobs: Arc::new(Mutex::new(HashMap::new())),
rate_limit,
queues: Vec::new(),
worker_prefix: None,
worker_version: None,
worker_id: None,
}
}
#[must_use]
pub fn with_queues(mut self, queues: impl IntoIterator<Item = String>) -> Self {
let mut seen = std::collections::HashSet::new();
self.queues = queues
.into_iter()
.filter(|q| !q.is_empty() && seen.insert(q.clone()))
.collect();
self
}
#[must_use]
pub fn with_worker_prefix(mut self, prefix: impl Into<String>) -> Self {
let prefix = prefix.into();
self.worker_prefix = (!prefix.is_empty()).then_some(prefix);
self
}
#[must_use]
pub fn with_worker_version(mut self, version: impl Into<String>) -> Self {
let version = version.into();
self.worker_version = (!version.is_empty()).then_some(version);
self
}
#[must_use]
pub fn with_worker_id(mut self, id: impl Into<String>) -> Self {
let id = id.into();
self.worker_id = (!id.is_empty()).then_some(id);
self
}
pub async fn ensure_queue(&self, name: &str, default_max_workers: i32) -> Result<()> {
self.storage
.config
.ensure_queue(name, default_max_workers)
.await
}
pub async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome> {
let mut req = req;
if req.queue_name.is_none() {
req.queue_name = Some(std::borrow::Cow::Borrowed(
self.router.route(req.kind.as_ref()),
));
}
self.storage.jobs.enqueue(req).await
}
pub async fn start(self) -> Result<QueueHandle> {
if self.queues.is_empty() {
return Err(crate::storage::error::StorageError::Config(
"no queues declared: set FORGE_QUEUES or call QueueRuntime::with_queues — \
running all queues implicitly is no longer supported"
.to_owned(),
));
}
let shutdown = CancellationToken::new();
let mut join_set = JoinSet::new();
for name in &self.queues {
crate::storage::types::validate_queue_name(name)?;
let default_workers = DEFAULT_QUEUE_WORKERS
.iter()
.find_map(|(q, n)| (*q == name).then_some(*n))
.unwrap_or(1);
self.storage
.config
.ensure_queue(name, default_workers)
.await?;
join_set.spawn(supervisor_loop(
self.storage.clone(),
self.handlers.clone(),
self.router.clone(),
name.clone(),
self.host_id.clone(),
self.running_jobs.clone(),
self.rate_limit.clone(),
shutdown.clone(),
));
}
join_set.spawn(reaper_loop(self.storage.clone(), shutdown.clone()));
join_set.spawn(cleanup_loop(
self.storage.clone(),
self.host_id.clone(),
shutdown.clone(),
));
join_set.spawn(cron::cron_loop(
self.storage.clone(),
self.router.clone(),
self.host_id.clone(),
shutdown.clone(),
));
let worker_id = self
.worker_id
.clone()
.unwrap_or_else(|| autogen_short_id(&self.host_id));
let worker_name = compose_worker_name(
self.worker_prefix.as_deref(),
self.worker_version.as_deref(),
&worker_id,
);
join_set.spawn(rebalance::pod_heartbeat_loop(
self.storage.clone(),
self.host_id.clone(),
worker_name,
self.queues.clone(),
shutdown.clone(),
));
join_set.spawn(rebalance::rebalance_loop(
self.storage.clone(),
self.host_id.clone(),
shutdown.clone(),
));
join_set.spawn(metrics::metrics_loop(
self.storage.clone(),
self.host_id.clone(),
shutdown.clone(),
));
join_set.spawn(event_flush_loop(self.storage.clone(), shutdown.clone()));
Ok(QueueHandle {
shutdown,
join_set,
host_id: self.host_id,
storage: self.storage,
handlers: self.handlers,
router: self.router,
running_jobs: self.running_jobs,
rate_limit: self.rate_limit,
})
}
}
pub struct QueueHandle {
shutdown: CancellationToken,
join_set: JoinSet<()>,
host_id: String,
storage: Storage,
handlers: Arc<HandlerRegistry>,
router: Arc<dyn Router>,
running_jobs: RunningJobs,
rate_limit: Arc<RateLimiter>,
}
impl std::fmt::Debug for QueueHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueueHandle")
.field("host_id", &self.host_id)
.field("handlers", &self.handlers)
.finish_non_exhaustive()
}
}
impl QueueHandle {
#[must_use]
pub fn host_id(&self) -> &str {
&self.host_id
}
#[must_use]
pub const fn storage(&self) -> &Storage {
&self.storage
}
#[must_use]
pub fn rate_limit(&self) -> &RateLimiter {
&self.rate_limit
}
#[must_use]
pub fn request_cancel(&self, job_id: &JobId) -> bool {
let map = match self.running_jobs.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
map.get(job_id).is_some_and(|token| {
token.cancel();
true
})
}
pub async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome> {
let mut req = req;
if req.queue_name.is_none() {
req.queue_name = Some(std::borrow::Cow::Borrowed(
self.router.route(req.kind.as_ref()),
));
}
self.storage.jobs.enqueue(req).await
}
pub async fn shutdown_graceful(mut self, timeout: Duration) {
self.shutdown.cancel();
let drain = async { while self.join_set.join_next().await.is_some() {} };
if tokio::time::timeout(timeout, drain).await.is_err() {
tracing::warn!(
timeout_secs = timeout.as_secs(),
host_id = %self.host_id,
"shutdown_graceful: timeout exceeded, aborting remaining tasks",
);
self.join_set.abort_all();
while self.join_set.join_next().await.is_some() {}
}
if let Err(e) = self.storage.jobs.flush_event_buffer().await {
tracing::warn!(?e, host_id = %self.host_id, "event flush on shutdown failed");
}
if let Err(e) = self.storage.procs.delete_for_host(&self.host_id).await {
tracing::warn!(?e, host_id = %self.host_id, "delete_for_host on shutdown failed");
}
}
}
struct WorkerSlot {
handle: JoinHandle<()>,
cancel: CancellationToken,
}
#[allow(
clippy::too_many_arguments,
reason = "per-supervisor scratch state; `running_jobs` is the in-process cancel registry shared with workers, `rate_limit` is the boot-constructed limiter passed by ref to JobCtx"
)]
async fn supervisor_loop(
storage: Storage,
handlers: Arc<HandlerRegistry>,
router: Arc<dyn Router>,
queue_name: String,
host_id: String,
running_jobs: RunningJobs,
rate_limit: Arc<RateLimiter>,
shutdown: CancellationToken,
) {
tracing::info!(queue = %queue_name, host_id = %host_id, "supervisor: start");
let mut workers: HashMap<usize, WorkerSlot> = HashMap::new();
let mut tick = tokio::time::interval(SUPERVISOR_TICK);
tick.tick().await;
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::info!(queue = %queue_name, "supervisor: shutdown signal");
drain_all(&mut workers).await;
tracing::info!(queue = %queue_name, "supervisor: stopped");
return;
}
_ = tick.tick() => {
reap_finished(&mut workers).await;
let Some(target) = resolve_target(&storage, &queue_name, &host_id).await else {
continue;
};
scale_down(&mut workers, target);
scale_up(
&mut workers,
target,
&storage,
&handlers,
&router,
&queue_name,
&host_id,
&running_jobs,
&rate_limit,
);
}
}
}
}
async fn reap_finished(workers: &mut HashMap<usize, WorkerSlot>) {
let finished: Vec<usize> = workers
.iter()
.filter_map(|(&id, slot)| slot.handle.is_finished().then_some(id))
.collect();
for id in finished {
if let Some(slot) = workers.remove(&id)
&& let Err(e) = slot.handle.await
{
tracing::error!(?e, slot = id, "supervisor: worker task panicked");
}
}
}
async fn resolve_target(storage: &Storage, queue_name: &str, host_id: &str) -> Option<usize> {
let q = match storage.config.get_queue(queue_name).await {
Ok(Some(q)) => q,
Ok(None) => {
tracing::warn!(queue = %queue_name, "supervisor: queue row vanished");
return None;
}
Err(e) => {
tracing::warn!(queue = %queue_name, ?e, "supervisor: queue lookup failed");
return None;
}
};
if q.paused {
return Some(0);
}
let raw = match storage.procs.get_slots(queue_name, host_id).await {
Ok(Some(slots)) => usize::try_from(slots).unwrap_or(0),
Ok(None) => fair_fallback(storage, queue_name, q.max_workers).await,
Err(e) => {
tracing::warn!(queue = %queue_name, ?e, "supervisor: slot lookup failed; estimating fair share");
fair_fallback(storage, queue_name, q.max_workers).await
}
};
Some(raw.min(WORKER_CAP))
}
async fn fair_fallback(storage: &Storage, queue_name: &str, max_workers: i32) -> usize {
let total = usize::try_from(max_workers).unwrap_or(0);
if total == 0 {
return 0;
}
let stale_before = Utc::now() - STALE_THRESHOLD;
let eligible = storage
.procs
.list_live_pods(stale_before)
.await
.map_or(1, |pods| {
pods.iter().filter(|p| p.handles(queue_name)).count()
});
total.div_ceil(eligible.max(1))
}
fn scale_down(workers: &mut HashMap<usize, WorkerSlot>, target: usize) {
while workers.len() > target {
let Some(&max_slot) = workers.keys().max() else {
break;
};
if let Some(slot) = workers.remove(&max_slot) {
slot.cancel.cancel();
drop(slot.handle);
}
}
}
#[allow(
clippy::too_many_arguments,
reason = "supervisor scratch state passed through to each spawned worker. Threshold to revisit: if scale_up ever needs one more parameter, OR if any of the loops here (supervisor_loop, worker_loop) grows another ~30-line block, extract a `WorkerCtx` struct."
)]
fn scale_up(
workers: &mut HashMap<usize, WorkerSlot>,
target: usize,
storage: &Storage,
handlers: &Arc<HandlerRegistry>,
router: &Arc<dyn Router>,
queue_name: &str,
host_id: &str,
running_jobs: &RunningJobs,
rate_limit: &Arc<RateLimiter>,
) {
while workers.len() < target {
let mut slot_idx: usize = 0;
while workers.contains_key(&slot_idx) {
slot_idx = slot_idx.saturating_add(1);
}
let process_id = format!("{queue_name}-{slot_idx}-{host_id}");
tracing::info!(
queue = %queue_name,
slot = slot_idx,
worker_id = %process_id,
"worker spawned"
);
let cancel = CancellationToken::new();
let handle = tokio::spawn(worker_loop(
storage.clone(),
handlers.clone(),
router.clone(),
queue_name.to_owned(),
process_id,
host_id.to_owned(),
running_jobs.clone(),
rate_limit.clone(),
cancel.clone(),
));
workers.insert(slot_idx, WorkerSlot { handle, cancel });
}
}
async fn drain_all(workers: &mut HashMap<usize, WorkerSlot>) {
for (_, slot) in workers.drain() {
slot.cancel.cancel();
if let Err(e) = slot.handle.await {
tracing::warn!(?e, "supervisor: worker join error on drain");
}
}
}
fn log_handler_outcome(job: &JobRecord, job_id: &JobId, outcome: &FinalizeOutcome) {
match outcome {
FinalizeOutcome::Failed {
message,
retry_after,
} => tracing::warn!(
kind = %job.kind,
queue = %job.queue_name,
job_id = %job_id.as_str(),
attempts = job.attempts,
max_attempts = job.max_attempts,
retry_in_secs = retry_after.as_secs(),
error = %message,
"worker: handler failed; will retry",
),
FinalizeOutcome::Dead { message } => tracing::error!(
kind = %job.kind,
queue = %job.queue_name,
job_id = %job_id.as_str(),
attempts = job.attempts,
error = %message,
"worker: handler failed terminally (dead)",
),
FinalizeOutcome::Done | FinalizeOutcome::Throttled { .. } => {}
}
}
#[allow(
clippy::too_many_arguments,
clippy::too_many_lines,
reason = "worker scratch state; B2 cancel registry + Dead-on-user-cancel branch pushed past the 100-line cap. The loop is one cohesive claim→run→finalize cycle; splitting it splits the lifetime of the per-job cancel token. Revisit when extracting a `WorkerCtx`."
)]
async fn worker_loop(
storage: Storage,
handlers: Arc<HandlerRegistry>,
router: Arc<dyn Router>,
queue_name: String,
process_id: String,
host_id: String,
running_jobs: RunningJobs,
rate_limit: Arc<RateLimiter>,
cancel: CancellationToken,
) {
tracing::debug!(%process_id, "worker: start");
if let Err(e) = storage
.procs
.register(&process_id, &queue_name, &host_id)
.await
{
tracing::error!(?e, %process_id, "worker: register failed; exiting");
return;
}
loop {
if cancel.is_cancelled() {
break;
}
let job = match storage.jobs.claim_next(&queue_name, &process_id).await {
Ok(Some(j)) => j,
Ok(None) => {
if idle_wait(&storage, &queue_name, &process_id, &cancel).await {
break;
}
continue;
}
Err(e) => {
tracing::warn!(?e, %process_id, "worker: claim failed, backing off 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let job_id = job.id.clone();
if let Err(e) = storage
.procs
.heartbeat(&process_id, Some(job_id.clone()))
.await
{
tracing::warn!(?e, %process_id, "worker: heartbeat-with-job failed");
}
let job_cancel = cancel.child_token();
register_running_job(&running_jobs, &job_id, job_cancel.clone());
let heartbeat_cancel = CancellationToken::new();
let heartbeat_task = tokio::spawn(heartbeat_loop(
storage.clone(),
process_id.clone(),
job_id.clone(),
heartbeat_cancel.clone(),
job_cancel.clone(),
));
let outcome = match handlers.get(&job.kind) {
Some(handler) => {
let ctx = JobCtx {
storage: &storage,
router: router.as_ref(),
rate_limit: rate_limit.as_ref(),
job_id: job_id.clone(),
process_id: &process_id,
host_id: &host_id,
cancel: job_cancel.clone(),
};
handler.run(ctx, job.payload.clone()).await
}
None => JobOutcome::Failed(format!("no handler registered for kind: {}", job.kind)),
};
heartbeat_cancel.cancel();
if let Err(e) = heartbeat_task.await {
tracing::warn!(?e, %process_id, "worker: heartbeat task join failed");
}
deregister_running_job(&running_jobs, &job_id);
let user_cancelled = !cancel.is_cancelled() && job_cancel.is_cancelled();
let backoff_cfg = fetch_backoff_cfg(&storage, &queue_name).await;
let finalize_outcome = if user_cancelled && !matches!(outcome, JobOutcome::Done) {
FinalizeOutcome::Dead {
message: "cancelled by user".to_owned(),
}
} else {
map_outcome(&job, outcome, backoff_cfg.as_ref())
};
log_handler_outcome(&job, &job_id, &finalize_outcome);
let throttle_pause = match &finalize_outcome {
FinalizeOutcome::Throttled {
retry_after,
cool_down_queue: true,
} => Some(*retry_after),
_ => None,
};
if let Err(e) = storage
.jobs
.finalize(&job_id, Some(&process_id), finalize_outcome)
.await
{
tracing::error!(?e, ?job_id, %process_id, "worker: finalize failed");
}
if let Err(e) = storage.procs.heartbeat(&process_id, None).await {
tracing::warn!(?e, %process_id, "worker: heartbeat-clear failed");
}
if let Some(pause) = throttle_pause {
tracing::info!(
queue = %queue_name,
secs = pause.as_secs(),
"worker: queue throttled; pausing before next claim"
);
tokio::select! {
biased;
() = cancel.cancelled() => break,
() = tokio::time::sleep(pause) => {}
}
}
}
if let Err(e) = storage.procs.deregister(&process_id).await {
tracing::warn!(?e, %process_id, "worker: deregister failed on exit");
}
tracing::debug!(%process_id, "worker: stop");
}
async fn idle_wait(
storage: &Storage,
queue_name: &str,
process_id: &str,
cancel: &CancellationToken,
) -> bool {
if let Err(e) = storage.procs.heartbeat(process_id, None).await {
tracing::warn!(?e, %process_id, "worker: idle heartbeat failed");
}
let cool_down = fetch_backoff_cfg(storage, queue_name)
.await
.and_then(|c| c.throttled_until)
.and_then(|until| (until - Utc::now()).to_std().ok());
let wait = async {
match cool_down {
Some(dur) => tokio::time::sleep(dur).await,
None => {
let _ = storage.jobs.wait_for_work(queue_name, IDLE_POLL).await;
}
}
};
tokio::select! {
biased;
() = cancel.cancelled() => true,
() = wait => false,
}
}
async fn fetch_backoff_cfg(
storage: &Storage,
queue_name: &str,
) -> Option<crate::storage::types::QueueConfigRow> {
match storage.config.get_queue(queue_name).await {
Ok(Some(cfg)) => Some(cfg),
Ok(None) => {
tracing::warn!(
queue = %queue_name,
"worker: queue config vanished; using legacy throttle fallback"
);
None
}
Err(e) => {
tracing::warn!(
?e,
queue = %queue_name,
"worker: queue config read failed; using legacy throttle fallback"
);
None
}
}
}
fn map_outcome(
job: &JobRecord,
outcome: JobOutcome,
backoff_cfg: Option<&crate::storage::types::QueueConfigRow>,
) -> FinalizeOutcome {
match outcome {
JobOutcome::Done => FinalizeOutcome::Done,
JobOutcome::Throttled { retry_after: _hint } => {
let (enabled, base, max, attempts) = backoff_cfg.map_or((false, 60, 1800, 0), |c| {
(
c.backoff_enabled,
c.backoff_base_seconds,
c.backoff_max_seconds,
c.throttle_attempts,
)
});
let retry_after = retry::throttle_delay(attempts, enabled, base, max);
FinalizeOutcome::Throttled {
retry_after,
cool_down_queue: true,
}
}
JobOutcome::Dead(msg) => FinalizeOutcome::Dead { message: msg },
JobOutcome::Failed(msg) => {
if job.attempts >= job.max_attempts {
FinalizeOutcome::Dead { message: msg }
} else {
let (enabled, base, max) = backoff_cfg.map_or((false, 60, 1800), |c| {
(
c.backoff_enabled,
c.backoff_base_seconds,
c.backoff_max_seconds,
)
});
let retry_after = retry::failed_delay(job.attempts, enabled, base, max);
FinalizeOutcome::Failed {
retry_after,
message: msg,
}
}
}
}
}
async fn heartbeat_loop(
storage: Storage,
process_id: String,
job_id: JobId,
stop: CancellationToken,
job_cancel: CancellationToken,
) {
let mut tick = tokio::time::interval(HEARTBEAT_INTERVAL);
tick.tick().await;
let mut cancel_signalled = false;
loop {
tokio::select! {
biased;
() = stop.cancelled() => return,
_ = tick.tick() => {
match storage.jobs.heartbeat_job(&job_id, &process_id).await {
Ok(HeartbeatStatus::CancelRequested) if !cancel_signalled => {
tracing::info!(
job_id = %job_id.as_str(),
%process_id,
"heartbeat: cancel requested; signalling handler"
);
job_cancel.cancel();
cancel_signalled = true;
}
Ok(HeartbeatStatus::Lost) if !cancel_signalled => {
tracing::warn!(
job_id = %job_id.as_str(),
%process_id,
"heartbeat: lost row ownership (reaped + re-claimed); stopping handler"
);
job_cancel.cancel();
cancel_signalled = true;
}
Ok(_) => {}
Err(e) => tracing::warn!(?e, %process_id, "heartbeat: job update failed"),
}
if let Err(e) = storage.procs.heartbeat(&process_id, Some(job_id.clone())).await {
tracing::warn!(?e, %process_id, "heartbeat: process update failed");
}
}
}
}
}
fn register_running_job(map: &RunningJobs, job_id: &JobId, token: CancellationToken) {
let mut g = match map.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
g.insert(job_id.clone(), token);
}
fn deregister_running_job(map: &RunningJobs, job_id: &JobId) {
let mut g = match map.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
g.remove(job_id);
}
async fn reaper_loop(storage: Storage, shutdown: CancellationToken) {
tracing::debug!("reaper: start");
let mut tick = tokio::time::interval(REAPER_TICK);
tick.tick().await;
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::debug!("reaper: shutdown");
return;
}
_ = tick.tick() => {
let stale_before = Utc::now() - STALE_THRESHOLD;
match storage.jobs.revive_stale(stale_before).await {
Ok(n) if n > 0 => {
tracing::info!(revived = n, "reaper: revived stuck jobs");
}
Ok(_) => {}
Err(e) => tracing::warn!(?e, "reaper: revive_stale failed"),
}
if let Err(e) = storage.procs.reap_stale(stale_before).await {
tracing::warn!(?e, "reaper: process sweep failed");
}
}
}
}
}
pub async fn reap_stale_jobs(storage: &Storage) -> Result<u64> {
let stale_before = Utc::now() - STALE_THRESHOLD;
storage.jobs.revive_stale(stale_before).await
}
async fn event_flush_loop(storage: Storage, shutdown: CancellationToken) {
tracing::debug!("event flush: start");
let mut tick = tokio::time::interval(EVENT_FLUSH_TICK);
tick.tick().await;
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
if let Err(e) = storage.jobs.flush_event_buffer().await {
tracing::warn!(?e, "event flush: final flush failed");
}
tracing::debug!("event flush: shutdown");
return;
}
_ = tick.tick() => {
if let Err(e) = storage.jobs.flush_event_buffer().await {
tracing::warn!(?e, "event flush: flush failed");
}
}
}
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct CleanupReport {
pub done_deleted: u64,
pub dead_deleted: u64,
}
impl CleanupReport {
#[must_use]
pub const fn total(&self) -> u64 {
self.done_deleted + self.dead_deleted
}
}
pub async fn cleanup_once(storage: &Storage) -> Result<CleanupReport> {
let queues = storage.config.list_queues().await?;
let mut report = CleanupReport::default();
let now = Utc::now();
for q in queues {
let done_threshold = now - ChronoDuration::days(i64::from(q.retain_done_for_days));
let dead_threshold = now - ChronoDuration::days(i64::from(q.retain_dead_for_days));
report.done_deleted += storage
.jobs
.cleanup_aged(&q.name, JobStatus::Done, done_threshold)
.await?;
report.dead_deleted += storage
.jobs
.cleanup_aged(&q.name, JobStatus::Dead, dead_threshold)
.await?;
}
let metric_threshold = now - ChronoDuration::days(metrics::METRIC_RETENTION_DAYS);
storage
.jobs
.delete_metric_buckets_before(metric_threshold)
.await?;
Ok(report)
}
async fn cleanup_loop(storage: Storage, host_id: String, shutdown: CancellationToken) {
tracing::debug!("cleanup: start");
let mut tick = tokio::time::interval(CLEANUP_TICK);
tick.tick().await;
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::debug!("cleanup: shutdown");
return;
}
_ = tick.tick() => {
match storage.cron.try_cron_lease(&host_id, cron::CRON_LEASE_TTL).await {
Ok(true) => {}
Ok(false) => continue,
Err(e) => {
tracing::warn!(?e, %host_id, "cleanup: lease check failed");
continue;
}
}
match cleanup_once(&storage).await {
Ok(report) if report.total() > 0 => {
tracing::info!(
done = report.done_deleted,
dead = report.dead_deleted,
"cleanup: purged aged rows",
);
}
Ok(_) => {}
Err(e) => tracing::warn!(?e, "cleanup tick failed"),
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::{autogen_short_id, compose_worker_name};
const HOST: &str = "01J9ZK3QP7V8M2T4R6W0E9K7QF";
#[test]
fn no_prefix_yields_none_so_view_falls_back_to_host_id() {
assert_eq!(compose_worker_name(None, Some("abc123"), "9f3a"), None);
assert_eq!(compose_worker_name(None, None, "9f3a"), None);
}
#[test]
fn composes_prefix_worker_version_id() {
assert_eq!(
compose_worker_name(Some("rates"), Some("a1b2c3d"), "9f3a").as_deref(),
Some("rates-worker-a1b2c3d-9f3a"),
);
}
#[test]
fn version_segment_omitted_when_unset() {
assert_eq!(
compose_worker_name(Some("rates"), None, "9f3a").as_deref(),
Some("rates-worker-9f3a"),
);
}
#[test]
fn autogen_id_is_four_hex_and_stable_per_host() {
let id = autogen_short_id(HOST);
assert_eq!(id.len(), 4, "4 hex chars");
assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
assert_eq!(id, autogen_short_id(HOST), "deterministic for a host_id");
assert_ne!(
autogen_short_id(HOST),
autogen_short_id("01J9ZK3QP7V8M2T4R6W0E9K7QG"),
"distinct host_ids almost always render distinct tags",
);
}
}