apalis_postgres/queries/
reenqueue_orphaned.rs1use std::time::Duration;
2
3use futures::{FutureExt, Stream, stream};
4use sqlx::{PgPool, postgres::types::PgInterval};
5
6use crate::Config;
7
8pub fn reenqueue_orphaned(
9 pool: PgPool,
10 config: Config,
11) -> impl Future<Output = Result<u64, sqlx::Error>> + Send {
12 let dead_for = config.reenqueue_orphaned_after().as_secs() as i64;
13 let queue = config.queue().to_string();
14 let dead_for = PgInterval {
15 months: 0,
16 days: 0,
17 microseconds: dead_for * 1_000_000,
18 };
19 async move {
20 match sqlx::query_file!("queries/backend/reenqueue_orphaned.sql", dead_for, queue,)
21 .execute(&pool)
22 .await
23 {
24 Ok(res) => {
25 if res.rows_affected() > 0 {
26 }
31 Ok(res.rows_affected())
32 }
33 Err(e) => {
34 Err(e)
36 }
37 }
38 }
39}
40
41pub fn reenqueue_orphaned_stream(
42 pool: PgPool,
43 config: Config,
44 interval: Duration,
45) -> impl Stream<Item = Result<u64, sqlx::Error>> + Send {
46 let config = config.clone();
47 stream::unfold((), move |_| {
48 let pool = pool.clone();
49 let config = config.clone();
50 let interval = apalis_core::timer::Delay::new(interval);
51 let fut = async move {
52 interval.await;
53 reenqueue_orphaned(pool, config).await
54 };
55 fut.map(|res| Some((res, ())))
56 })
57}