apalis_sqlite/queries/
keep_alive.rs1use apalis_core::worker::context::WorkerContext;
2use futures::{FutureExt, Stream, stream};
3use sqlx::SqlitePool;
4
5use crate::{
6 Config,
7 queries::{reenqueue_orphaned::reenqueue_orphaned, register_worker::register_worker},
8};
9
10pub async fn keep_alive(
12 pool: SqlitePool,
13 config: Config,
14 worker: WorkerContext,
15) -> Result<(), sqlx::Error> {
16 let worker = worker.name().to_owned();
17 let queue = config.queue().to_string();
18 let res = sqlx::query_file!("queries/backend/keep_alive.sql", worker, queue)
19 .execute(&pool)
20 .await?;
21 if res.rows_affected() == 0 {
22 return Err(sqlx::Error::Io(std::io::Error::new(
23 std::io::ErrorKind::NotFound,
24 "WORKER_DOES_NOT_EXIST",
25 )));
26 }
27 Ok(())
28}
29
30pub async fn initial_heartbeat(
32 pool: SqlitePool,
33 config: Config,
34 worker: WorkerContext,
35 storage_type: &str,
36) -> Result<(), sqlx::Error> {
37 reenqueue_orphaned(pool.clone(), &config).await?;
38 register_worker(pool, config, worker, storage_type).await?;
39 Ok(())
40}
41
42pub fn keep_alive_stream(
44 pool: SqlitePool,
45 config: Config,
46 worker: WorkerContext,
47) -> impl Stream<Item = Result<(), sqlx::Error>> + Send {
48 stream::unfold((), move |_| {
49 let register = keep_alive(pool.clone(), config.clone(), worker.clone());
50 let interval = apalis_core::timer::Delay::new(*config.keep_alive());
51 interval.then(move |_| register.map(|res| Some((res, ()))))
52 })
53}