apalis_sqlite/queries/
reenqueue_orphaned.rs

1use std::time::Duration;
2
3use futures::{FutureExt, Stream, stream};
4use sqlx::SqlitePool;
5
6use crate::Config;
7
8/// Re-enqueue tasks that were being processed by workers that have not sent a keep-alive signal within the specified duration
9pub fn reenqueue_orphaned(
10    pool: SqlitePool,
11    config: &Config,
12) -> impl Future<Output = Result<u64, sqlx::Error>> + Send {
13    let dead_for = config.reenqueue_orphaned_after().as_secs() as i64;
14    let queue = config.queue().to_string();
15    async move {
16        match sqlx::query_file!("queries/backend/reenqueue_orphaned.sql", dead_for, queue,)
17            .execute(&pool)
18            .await
19        {
20            Ok(res) => {
21                if res.rows_affected() > 0 {
22                    log::info!(
23                        "Re-enqueued {} orphaned tasks that were being processed by dead workers",
24                        res.rows_affected()
25                    );
26                }
27                Ok(res.rows_affected())
28            }
29            Err(e) => {
30                log::error!("Failed to re-enqueue orphaned tasks: {e}");
31                Err(e)
32            }
33        }
34    }
35}
36
37/// Create a stream that periodically re-enqueues orphaned tasks
38pub fn reenqueue_orphaned_stream(
39    pool: SqlitePool,
40    config: Config,
41    interval: Duration,
42) -> impl Stream<Item = Result<u64, sqlx::Error>> + Send {
43    let config = config;
44    stream::unfold((), move |_| {
45        let pool = pool.clone();
46        let config = config.clone();
47        let interval = apalis_core::timer::Delay::new(interval);
48        let fut = async move {
49            interval.await;
50            reenqueue_orphaned(pool, &config).await
51        };
52        fut.map(|res| Some((res, ())))
53    })
54}