Skip to main content

simple_queue/
reaper.rs

1use sqlx::{PgPool, error::BoxDynError};
2
3use super::*;
4use result::JobResultInternal;
5
6/// Reaper is a background task that fixes stale jobs.
7///
8/// It periodically runs and:
9/// - Check for stale jobs (not updated for some time but status = `running`)
10/// - Marks jobs as failed when their attempt count exceeds the maximum allowed
11pub struct Reaper {
12    pub heartbeat_interval: tokio::time::Interval,
13    pub pool: PgPool,
14}
15
16impl Reaper {
17    /// Runs the reaper loop.
18    pub async fn run(&mut self) {
19        loop {
20            self.heartbeat_interval.tick().await;
21            self.run_reaper().await.ok();
22        }
23    }
24
25    fn stale_job_interval(&self) -> chrono::TimeDelta {
26        chrono::TimeDelta::milliseconds(
27            (self.heartbeat_interval.period().as_millis() as f32 * 2.5f32) as i64,
28        )
29    }
30    #[tracing::instrument(skip(self))]
31    async fn run_reaper(&self) -> Result<(), BoxDynError> {
32        let stale = chrono::Utc::now()
33            .checked_sub_signed(self.stale_job_interval())
34            .ok_or(anyhow::anyhow!("heartbeat interval is too large"))?;
35
36        let ids: Vec<uuid::Uuid> = sqlx::query_scalar!(
37            r#"
38            UPDATE job_queue
39            SET status = $1, updated_at = CURRENT_TIMESTAMP, attempt = attempt - 1
40            WHERE status = 'running'
41            AND updated_at < $2
42            AND attempt < max_attempts
43            RETURNING id
44            "#,
45            JobResultInternal::Pending.to_string(),
46            stale,
47        )
48        .fetch_all(&self.pool)
49        .await?;
50        if !ids.is_empty() {
51            tracing::debug!(name = "stalled job ids", ?ids);
52        }
53
54        let ids: Vec<uuid::Uuid> = sqlx::query_scalar!(
55            r#"
56            UPDATE job_queue
57            SET status = $1, updated_at = CURRENT_TIMESTAMP
58            WHERE status = $2
59            AND attempt >= max_attempts
60            RETURNING id
61            "#,
62            JobResultInternal::Failed.to_string(),
63            JobResultInternal::Pending.to_string(),
64        )
65        .fetch_all(&self.pool)
66        .await?;
67        if !ids.is_empty() {
68            tracing::debug!(name = "no more attempts job ids", ?ids);
69        }
70
71        Ok(())
72    }
73}