apalis_sqlite/queries/
reenqueue_orphaned.rs1use std::time::Duration;
2
3use futures::{FutureExt, Stream, stream};
4use sqlx::SqlitePool;
5
6use crate::Config;
7
8pub fn reenqueue_orphaned(
9 pool: SqlitePool,
10 config: Config,
11) -> impl Future<Output = Result<u64, sqlx::Error>> + Send {
12 let dead_for = config.reenqueue_orphaned_after().as_secs() as i64;
13 let queue = config.queue().to_string();
14 async move {
15 match sqlx::query_file!("queries/backend/reenqueue_orphaned.sql", dead_for, queue,)
16 .execute(&pool)
17 .await
18 {
19 Ok(res) => {
20 if res.rows_affected() > 0 {
21 log::info!(
22 "Re-enqueued {} orphaned tasks that were being processed by dead workers",
23 res.rows_affected()
24 );
25 }
26 Ok(res.rows_affected())
27 }
28 Err(e) => {
29 log::error!("Failed to re-enqueue orphaned tasks: {e}");
30 Err(e)
31 }
32 }
33 }
34}
35
36pub fn reenqueue_orphaned_stream(
37 pool: SqlitePool,
38 config: Config,
39 interval: Duration,
40) -> impl Stream<Item = Result<u64, sqlx::Error>> + Send {
41 let config = config.clone();
42 stream::unfold((), move |_| {
43 let pool = pool.clone();
44 let config = config.clone();
45 let interval = apalis_core::timer::Delay::new(interval);
46 let fut = async move {
47 interval.await;
48 reenqueue_orphaned(pool, config).await
49 };
50 fut.map(|res| Some((res, ())))
51 })
52}