apalis_sqlite/queries/
keep_alive.rs1use apalis_core::worker::context::WorkerContext;
2use futures::{FutureExt, Stream, stream};
3use sqlx::SqlitePool;
4
5use crate::{
6 Config,
7 queries::{reenqueue_orphaned::reenqueue_orphaned, register_worker::register_worker},
8};
9
10pub async fn keep_alive(
11 pool: SqlitePool,
12 config: Config,
13 worker: WorkerContext,
14) -> Result<(), sqlx::Error> {
15 let worker = worker.name().to_owned();
16 let queue = config.queue().to_string();
17 let res = sqlx::query_file!("queries/backend/keep_alive.sql", worker, queue)
18 .execute(&pool)
19 .await?;
20 if res.rows_affected() == 0 {
21 return Err(sqlx::Error::Io(std::io::Error::new(
22 std::io::ErrorKind::NotFound,
23 "WORKER_DOES_NOT_EXIST",
24 )));
25 }
26 Ok(())
27}
28
29pub async fn initial_heartbeat(
30 pool: SqlitePool,
31 config: Config,
32 worker: WorkerContext,
33 storage_type: &str,
34) -> Result<(), sqlx::Error> {
35 reenqueue_orphaned(pool.clone(), config.clone()).await?;
36 register_worker(pool, config, worker, storage_type).await?;
37 Ok(())
38}
39
40pub fn keep_alive_stream(
41 pool: SqlitePool,
42 config: Config,
43 worker: WorkerContext,
44) -> impl Stream<Item = Result<(), sqlx::Error>> + Send {
45 stream::unfold((), move |_| {
46 let register = keep_alive(pool.clone(), config.clone(), worker.clone());
47 let interval = apalis_core::timer::Delay::new(*config.keep_alive());
48 interval.then(move |_| register.map(|res| Some((res, ()))))
49 })
50}