apalis_sqlite/queries/
keep_alive.rs

1use apalis_core::worker::context::WorkerContext;
2use futures::{FutureExt, Stream, stream};
3use sqlx::SqlitePool;
4
5use crate::{
6    Config,
7    queries::{reenqueue_orphaned::reenqueue_orphaned, register_worker::register_worker},
8};
9
10/// Send a keep-alive signal to the database to indicate that the worker is still active
11pub async fn keep_alive(
12    pool: SqlitePool,
13    config: Config,
14    worker: WorkerContext,
15) -> Result<(), sqlx::Error> {
16    let worker = worker.name().to_owned();
17    let queue = config.queue().to_string();
18    let res = sqlx::query_file!("queries/backend/keep_alive.sql", worker, queue)
19        .execute(&pool)
20        .await?;
21    if res.rows_affected() == 0 {
22        return Err(sqlx::Error::Io(std::io::Error::new(
23            std::io::ErrorKind::NotFound,
24            "WORKER_DOES_NOT_EXIST",
25        )));
26    }
27    Ok(())
28}
29
30/// Perform the initial heartbeat and registration of the worker
31pub async fn initial_heartbeat(
32    pool: SqlitePool,
33    config: Config,
34    worker: WorkerContext,
35    storage_type: &str,
36) -> Result<(), sqlx::Error> {
37    reenqueue_orphaned(pool.clone(), &config).await?;
38    register_worker(pool, config, worker, storage_type).await?;
39    Ok(())
40}
41
42/// Create a stream that sends keep-alive signals at regular intervals
43pub fn keep_alive_stream(
44    pool: SqlitePool,
45    config: Config,
46    worker: WorkerContext,
47) -> impl Stream<Item = Result<(), sqlx::Error>> + Send {
48    stream::unfold((), move |_| {
49        let register = keep_alive(pool.clone(), config.clone(), worker.clone());
50        let interval = apalis_core::timer::Delay::new(*config.keep_alive());
51        interval.then(move |_| register.map(|res| Some((res, ()))))
52    })
53}