apalis_postgres/queries/
list_workers.rs

1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use apalis_sql::{DateTime, DateTimeExt};
3use futures::TryFutureExt;
4use ulid::Ulid;
5
6#[derive(Debug)]
7pub struct WorkerRow {
8    pub id: String,
9    pub worker_type: String,
10    pub storage_name: String,
11    pub layers: Option<String>,
12    pub last_seen: DateTime,
13    pub started_at: Option<DateTime>,
14}
15
16use crate::{CompactType, PgContext, PostgresStorage};
17
18impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
19where
20    PostgresStorage<Args, CompactType, D, F>:
21        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
22{
23    fn list_workers(
24        &self,
25        queue: &str,
26    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
27        let queue = queue.to_string();
28        let pool = self.pool.clone();
29        let limit = 100;
30        let offset = 0;
31        async move {
32            let workers = sqlx::query_file_as!(
33                WorkerRow,
34                "queries/backend/list_workers.sql",
35                queue,
36                limit,
37                offset
38            )
39            .fetch_all(&pool)
40            .map_ok(|w| {
41                w.into_iter()
42                    .map(|w| RunningWorker {
43                        id: w.id,
44                        backend: w.storage_name,
45                        started_at: w
46                            .started_at
47                            .map(|t| t.to_unix_timestamp())
48                            .unwrap_or_default() as u64,
49                        last_heartbeat: w.last_seen.to_unix_timestamp() as u64,
50                        layers: w.layers.unwrap_or_default(),
51                        queue: w.worker_type,
52                    })
53                    .collect()
54            })
55            .await?;
56            Ok(workers)
57        }
58    }
59
60    fn list_all_workers(
61        &self,
62    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
63        let pool = self.pool.clone();
64        let limit = 100;
65        let offset = 0;
66        async move {
67            let workers = sqlx::query_file_as!(
68                WorkerRow,
69                "queries/backend/list_all_workers.sql",
70                limit,
71                offset
72            )
73            .fetch_all(&pool)
74            .map_ok(|w| {
75                w.into_iter()
76                    .map(|w| RunningWorker {
77                        id: w.id,
78                        backend: w.storage_name,
79                        started_at: w
80                            .started_at
81                            .map(|t| t.to_unix_timestamp())
82                            .unwrap_or_default() as u64,
83                        last_heartbeat: w.last_seen.to_unix_timestamp() as u64,
84                        layers: w.layers.unwrap_or_default(),
85                        queue: w.worker_type,
86                    })
87                    .collect()
88            })
89            .await?;
90            Ok(workers)
91        }
92    }
93}