apalis_sqlite/queries/
keep_alive.rs

1use apalis_core::worker::context::WorkerContext;
2use futures::{FutureExt, Stream, stream};
3use sqlx::SqlitePool;
4
5use crate::{
6    Config,
7    queries::{reenqueue_orphaned::reenqueue_orphaned, register_worker::register_worker},
8};
9
10pub async fn keep_alive(
11    pool: SqlitePool,
12    config: Config,
13    worker: WorkerContext,
14) -> Result<(), sqlx::Error> {
15    let worker = worker.name().to_owned();
16    let queue = config.queue().to_string();
17    let res = sqlx::query_file!("queries/backend/keep_alive.sql", worker, queue)
18        .execute(&pool)
19        .await?;
20    if res.rows_affected() == 0 {
21        return Err(sqlx::Error::Io(std::io::Error::new(
22            std::io::ErrorKind::NotFound,
23            "WORKER_DOES_NOT_EXIST",
24        )));
25    }
26    Ok(())
27}
28
29pub async fn initial_heartbeat(
30    pool: SqlitePool,
31    config: Config,
32    worker: WorkerContext,
33    storage_type: &str,
34) -> Result<(), sqlx::Error> {
35    reenqueue_orphaned(pool.clone(), config.clone()).await?;
36    register_worker(pool, config, worker, storage_type).await?;
37    Ok(())
38}
39
40pub fn keep_alive_stream(
41    pool: SqlitePool,
42    config: Config,
43    worker: WorkerContext,
44) -> impl Stream<Item = Result<(), sqlx::Error>> + Send {
45    stream::unfold((), move |_| {
46        let register = keep_alive(pool.clone(), config.clone(), worker.clone());
47        let interval = apalis_core::timer::Delay::new(*config.keep_alive());
48        interval.then(move |_| register.map(|res| Some((res, ()))))
49    })
50}