apalis_sqlite/queries/
reenqueue_orphaned.rs

1use std::time::Duration;
2
3use futures::{FutureExt, Stream, stream};
4use sqlx::SqlitePool;
5
6use crate::Config;
7
8pub fn reenqueue_orphaned(
9    pool: SqlitePool,
10    config: Config,
11) -> impl Future<Output = Result<u64, sqlx::Error>> + Send {
12    let dead_for = config.reenqueue_orphaned_after().as_secs() as i64;
13    let queue = config.queue().to_string();
14    async move {
15        match sqlx::query_file!("queries/backend/reenqueue_orphaned.sql", dead_for, queue,)
16            .execute(&pool)
17            .await
18        {
19            Ok(res) => {
20                if res.rows_affected() > 0 {
21                    log::info!(
22                        "Re-enqueued {} orphaned tasks that were being processed by dead workers",
23                        res.rows_affected()
24                    );
25                }
26                Ok(res.rows_affected())
27            }
28            Err(e) => {
29                log::error!("Failed to re-enqueue orphaned tasks: {e}");
30                Err(e)
31            }
32        }
33    }
34}
35
36pub fn reenqueue_orphaned_stream(
37    pool: SqlitePool,
38    config: Config,
39    interval: Duration,
40) -> impl Stream<Item = Result<u64, sqlx::Error>> + Send {
41    let config = config.clone();
42    stream::unfold((), move |_| {
43        let pool = pool.clone();
44        let config = config.clone();
45        let interval = apalis_core::timer::Delay::new(interval);
46        let fut = async move {
47            interval.await;
48            reenqueue_orphaned(pool, config).await
49        };
50        fut.map(|res| Some((res, ())))
51    })
52}