pg_taskq/
helper.rs

1use chrono::prelude::*;
2use sqlx::Pool;
3use sqlx::Postgres;
4
5use crate::error::Result;
6use crate::TaskTableProvider;
7
8/// Sets the `in_progress` flag to `false` for any tasks that have been in progress for longer than
9/// `max_allowed_last_update`.
10pub async fn fixup_stale_tasks(
11    pool: &Pool<Postgres>,
12    tables: &impl TaskTableProvider,
13    max_allowed_last_update: std::time::Duration,
14) -> Result<()> {
15    let max_age = Utc::now()
16        - chrono::Duration::from_std(max_allowed_last_update)
17            .expect("max_allowed_last_update is too large");
18    let table = tables.tasks_table_full_name();
19    let sql = format!(
20        "
21UPDATE {table}
22SET updated_at = NOW(),
23    in_progress = false
24WHERE updated_at < $1::timestamp AND in_progress = true
25RETURNING *;
26"
27    );
28
29    let result: Vec<crate::Task> = sqlx::query_as(&sql).bind(max_age).fetch_all(pool).await?;
30
31    if !result.is_empty() {
32        tracing::info!("fixup_stale_tasks found {} stale tasks", result.len());
33        for task in result {
34            tracing::info!("  id={} created_at={}", task.id, task.created_at);
35        }
36    }
37
38    Ok(())
39}