apalis_postgres/queries/
keep_alive.rs

1use apalis_core::worker::context::WorkerContext;
2use chrono::Utc;
3use futures::{FutureExt, Stream, stream};
4use sqlx::PgPool;
5
6use crate::{
7    Config,
8    queries::{
9        reenqueue_orphaned::reenqueue_orphaned, register_worker::register as register_worker,
10    },
11};
12
13pub async fn keep_alive(
14    pool: PgPool,
15    config: Config,
16    worker: WorkerContext,
17) -> Result<(), sqlx::Error> {
18    let worker = worker.name().to_owned();
19    let queue = config.queue().to_string();
20    let res = sqlx::query_file!("queries/backend/keep_alive.sql", worker, queue)
21        .execute(&pool)
22        .await?;
23    if res.rows_affected() == 0 {
24        return Err(sqlx::Error::Io(std::io::Error::new(
25            std::io::ErrorKind::NotFound,
26            "WORKER_DOES_NOT_EXIST",
27        )));
28    }
29    Ok(())
30}
31
32pub async fn initial_heartbeat(
33    pool: PgPool,
34    config: Config,
35    worker: WorkerContext,
36    storage_type: &str,
37) -> Result<(), sqlx::Error> {
38    reenqueue_orphaned(pool.clone(), config.clone()).await?;
39    let last_seen = Utc::now();
40    register_worker(
41        pool,
42        config.queue().to_string(),
43        worker,
44        last_seen,
45        storage_type,
46    )
47    .await?;
48    Ok(())
49}
50
51pub fn keep_alive_stream(
52    pool: PgPool,
53    config: Config,
54    worker: WorkerContext,
55) -> impl Stream<Item = Result<(), sqlx::Error>> + Send {
56    stream::unfold((), move |_| {
57        let register = keep_alive(pool.clone(), config.clone(), worker.clone());
58        let interval = apalis_core::timer::Delay::new(*config.keep_alive());
59        interval.then(move |_| register.map(|res| Some((res, ()))))
60    })
61}