Skip to main content

apalis_postgres/queries/
list_workers.rs

1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use apalis_sql::{DateTime, DateTimeExt};
3use futures::TryFutureExt;
4use ulid::Ulid;
5
6#[derive(Debug)]
7pub struct WorkerRow {
8    pub id: String,
9    pub worker_type: String,
10    pub storage_name: String,
11    pub layers: Option<String>,
12    pub last_seen: DateTime,
13    pub started_at: Option<DateTime>,
14}
15
16use crate::{CompactType, PgContext, PostgresStorage};
17
18impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
19where
20    PostgresStorage<Args, CompactType, D, F>:
21        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
22{
23    fn list_workers(&self) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
24        let queue = self.config.queue().to_string();
25
26        let pool = self.pool.clone();
27        let limit = 100;
28        let offset = 0;
29        async move {
30            let workers = sqlx::query_file_as!(
31                WorkerRow,
32                "queries/backend/list_workers.sql",
33                queue,
34                limit,
35                offset
36            )
37            .fetch_all(&pool)
38            .map_ok(|w| {
39                w.into_iter()
40                    .map(|w| RunningWorker {
41                        id: w.id,
42                        backend: w.storage_name,
43                        started_at: w
44                            .started_at
45                            .map(|t| t.to_unix_timestamp())
46                            .unwrap_or_default() as u64,
47                        last_heartbeat: w.last_seen.to_unix_timestamp() as u64,
48                        layers: w.layers.unwrap_or_default(),
49                        queue: w.worker_type,
50                    })
51                    .collect()
52            })
53            .await?;
54            Ok(workers)
55        }
56    }
57
58    fn list_all_workers(
59        &self,
60    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
61        let pool = self.pool.clone();
62        let limit = 100;
63        let offset = 0;
64        async move {
65            let workers = sqlx::query_file_as!(
66                WorkerRow,
67                "queries/backend/list_all_workers.sql",
68                limit,
69                offset
70            )
71            .fetch_all(&pool)
72            .map_ok(|w| {
73                w.into_iter()
74                    .map(|w| RunningWorker {
75                        id: w.id,
76                        backend: w.storage_name,
77                        started_at: w
78                            .started_at
79                            .map(|t| t.to_unix_timestamp())
80                            .unwrap_or_default() as u64,
81                        last_heartbeat: w.last_seen.to_unix_timestamp() as u64,
82                        layers: w.layers.unwrap_or_default(),
83                        queue: w.worker_type,
84                    })
85                    .collect()
86            })
87            .await?;
88            Ok(workers)
89        }
90    }
91}