1use sqlx::{PgPool, error::BoxDynError};
2
3use super::*;
4use result::JobResultInternal;
5
6pub struct Reaper {
12 pub heartbeat_interval: tokio::time::Interval,
13 pub pool: PgPool,
14}
15
16impl Reaper {
17 pub async fn run(&mut self) {
19 loop {
20 self.heartbeat_interval.tick().await;
21 self.run_reaper().await.ok();
22 }
23 }
24
25 fn stale_job_interval(&self) -> chrono::TimeDelta {
26 chrono::TimeDelta::milliseconds(
27 (self.heartbeat_interval.period().as_millis() as f32 * 2.5f32) as i64,
28 )
29 }
30 #[tracing::instrument(skip(self))]
31 async fn run_reaper(&self) -> Result<(), BoxDynError> {
32 let stale = chrono::Utc::now()
33 .checked_sub_signed(self.stale_job_interval())
34 .ok_or(anyhow::anyhow!("heartbeat interval is too large"))?;
35
36 let ids: Vec<uuid::Uuid> = sqlx::query_scalar!(
37 r#"
38 UPDATE job_queue
39 SET status = $1, updated_at = CURRENT_TIMESTAMP, attempt = attempt - 1
40 WHERE status = 'running'
41 AND updated_at < $2
42 AND attempt < max_attempts
43 RETURNING id
44 "#,
45 JobResultInternal::Pending.to_string(),
46 stale,
47 )
48 .fetch_all(&self.pool)
49 .await?;
50 if !ids.is_empty() {
51 tracing::debug!(name = "stalled job ids", ?ids);
52 }
53
54 let ids: Vec<uuid::Uuid> = sqlx::query_scalar!(
55 r#"
56 UPDATE job_queue
57 SET
58 status = $1,
59 updated_at = CURRENT_TIMESTAMP,
60 completed_at = CURRENT_TIMESTAMP
61 WHERE status = $2
62 AND attempt >= max_attempts
63 RETURNING id
64 "#,
65 JobResultInternal::Failed.to_string(),
66 JobResultInternal::Pending.to_string(),
67 )
68 .fetch_all(&self.pool)
69 .await?;
70 if !ids.is_empty() {
71 tracing::debug!(name = "no more attempts job ids", ?ids);
72 }
73
74 Ok(())
75 }
76}