1use sqlx::{PgPool, error::BoxDynError};
2
3use super::*;
4use result::JobResultInternal;
5
6pub struct Reaper {
12 pub heartbeat_interval: tokio::time::Interval,
13 pub pool: PgPool,
14}
15
16impl Reaper {
17 pub async fn run(&mut self) {
19 loop {
20 self.heartbeat_interval.tick().await;
21 self.run_reaper().await.ok();
22 }
23 }
24
25 fn stale_job_interval(&self) -> chrono::TimeDelta {
26 chrono::TimeDelta::milliseconds(
27 (self.heartbeat_interval.period().as_millis() as f32 * 2.5f32) as i64,
28 )
29 }
30 #[tracing::instrument(skip(self))]
31 async fn run_reaper(&self) -> Result<(), BoxDynError> {
32 let stale = chrono::Utc::now()
33 .checked_sub_signed(self.stale_job_interval())
34 .ok_or(anyhow::anyhow!("heartbeat interval is too large"))?;
35
36 let ids: Vec<uuid::Uuid> = sqlx::query_scalar!(
37 r#"
38 UPDATE job_queue
39 SET status = $1, updated_at = CURRENT_TIMESTAMP, attempt = attempt - 1
40 WHERE status = 'running'
41 AND updated_at < $2
42 AND attempt < max_attempts
43 RETURNING id
44 "#,
45 JobResultInternal::Pending.to_string(),
46 stale,
47 )
48 .fetch_all(&self.pool)
49 .await?;
50 if !ids.is_empty() {
51 tracing::debug!(name = "stalled job ids", ?ids);
52 }
53
54 let ids: Vec<uuid::Uuid> = sqlx::query_scalar!(
55 r#"
56 UPDATE job_queue
57 SET status = $1, updated_at = CURRENT_TIMESTAMP
58 WHERE status = $2
59 AND attempt >= max_attempts
60 RETURNING id
61 "#,
62 JobResultInternal::Failed.to_string(),
63 JobResultInternal::Pending.to_string(),
64 )
65 .fetch_all(&self.pool)
66 .await?;
67 if !ids.is_empty() {
68 tracing::debug!(name = "no more attempts job ids", ?ids);
69 }
70
71 Ok(())
72 }
73}