apalis_postgres/queries/
reenqueue_orphaned.rs

1use std::time::Duration;
2
3use futures::{FutureExt, Stream, stream};
4use sqlx::{PgPool, postgres::types::PgInterval};
5
6use crate::Config;
7
8pub fn reenqueue_orphaned(
9    pool: PgPool,
10    config: Config,
11) -> impl Future<Output = Result<u64, sqlx::Error>> + Send {
12    let dead_for = config.reenqueue_orphaned_after().as_secs() as i64;
13    let queue = config.queue().to_string();
14    let dead_for = PgInterval {
15        months: 0,
16        days: 0,
17        microseconds: dead_for * 1_000_000,
18    };
19    async move {
20        match sqlx::query_file!("queries/backend/reenqueue_orphaned.sql", dead_for, queue,)
21            .execute(&pool)
22            .await
23        {
24            Ok(res) => {
25                if res.rows_affected() > 0 {
26                    // log::info!(
27                    //     "Re-enqueued {} orphaned tasks that were being processed by dead workers",
28                    //     res.rows_affected()
29                    // );
30                }
31                Ok(res.rows_affected())
32            }
33            Err(e) => {
34                // log::error!("Failed to re-enqueue orphaned tasks: {e}");
35                Err(e)
36            }
37        }
38    }
39}
40
41pub fn reenqueue_orphaned_stream(
42    pool: PgPool,
43    config: Config,
44    interval: Duration,
45) -> impl Stream<Item = Result<u64, sqlx::Error>> + Send {
46    let config = config.clone();
47    stream::unfold((), move |_| {
48        let pool = pool.clone();
49        let config = config.clone();
50        let interval = apalis_core::timer::Delay::new(interval);
51        let fut = async move {
52            interval.await;
53            reenqueue_orphaned(pool, config).await
54        };
55        fut.map(|res| Some((res, ())))
56    })
57}