apalis_sqlite/queries/
reenqueue_orphaned.rs1use std::time::Duration;
2
3use futures::{FutureExt, Stream, stream};
4use sqlx::SqlitePool;
5
6use crate::Config;
7
8pub fn reenqueue_orphaned(
10 pool: SqlitePool,
11 config: &Config,
12) -> impl Future<Output = Result<u64, sqlx::Error>> + Send {
13 let dead_for = config.reenqueue_orphaned_after().as_secs() as i64;
14 let queue = config.queue().to_string();
15 async move {
16 match sqlx::query_file!("queries/backend/reenqueue_orphaned.sql", dead_for, queue,)
17 .execute(&pool)
18 .await
19 {
20 Ok(res) => {
21 if res.rows_affected() > 0 {
22 log::info!(
23 "Re-enqueued {} orphaned tasks that were being processed by dead workers",
24 res.rows_affected()
25 );
26 }
27 Ok(res.rows_affected())
28 }
29 Err(e) => {
30 log::error!("Failed to re-enqueue orphaned tasks: {e}");
31 Err(e)
32 }
33 }
34 }
35}
36
37pub fn reenqueue_orphaned_stream(
39 pool: SqlitePool,
40 config: Config,
41 interval: Duration,
42) -> impl Stream<Item = Result<u64, sqlx::Error>> + Send {
43 let config = config;
44 stream::unfold((), move |_| {
45 let pool = pool.clone();
46 let config = config.clone();
47 let interval = apalis_core::timer::Delay::new(interval);
48 let fut = async move {
49 interval.await;
50 reenqueue_orphaned(pool, &config).await
51 };
52 fut.map(|res| Some((res, ())))
53 })
54}