apalis_postgres/queries/
keep_alive.rs1use apalis_core::worker::context::WorkerContext;
2use chrono::Utc;
3use futures::{FutureExt, Stream, stream};
4use sqlx::PgPool;
5
6use crate::{
7 Config,
8 queries::{
9 reenqueue_orphaned::reenqueue_orphaned, register_worker::register as register_worker,
10 },
11};
12
13pub async fn keep_alive(
14 pool: PgPool,
15 config: Config,
16 worker: WorkerContext,
17) -> Result<(), sqlx::Error> {
18 let worker = worker.name().to_owned();
19 let queue = config.queue().to_string();
20 let res = sqlx::query_file!("queries/backend/keep_alive.sql", worker, queue)
21 .execute(&pool)
22 .await?;
23 if res.rows_affected() == 0 {
24 return Err(sqlx::Error::Io(std::io::Error::new(
25 std::io::ErrorKind::NotFound,
26 "WORKER_DOES_NOT_EXIST",
27 )));
28 }
29 Ok(())
30}
31
32pub async fn initial_heartbeat(
33 pool: PgPool,
34 config: Config,
35 worker: WorkerContext,
36 storage_type: &str,
37) -> Result<(), sqlx::Error> {
38 reenqueue_orphaned(pool.clone(), config.clone()).await?;
39 let last_seen = Utc::now();
40 register_worker(
41 pool,
42 config.queue().to_string(),
43 worker,
44 last_seen,
45 storage_type,
46 )
47 .await?;
48 Ok(())
49}
50
51pub fn keep_alive_stream(
52 pool: PgPool,
53 config: Config,
54 worker: WorkerContext,
55) -> impl Stream<Item = Result<(), sqlx::Error>> + Send {
56 stream::unfold((), move |_| {
57 let register = keep_alive(pool.clone(), config.clone(), worker.clone());
58 let interval = apalis_core::timer::Delay::new(*config.keep_alive());
59 interval.then(move |_| register.map(|res| Some((res, ()))))
60 })
61}