apalis_postgres/queries/
list_workers.rs

1use apalis_core::backend::{Backend, ListWorkers, RunningWorker};
2use apalis_sql::{ context::SqlContext};
3use chrono::{DateTime, Utc};
4use futures::TryFutureExt;
5use ulid::Ulid;
6
7#[derive(Debug)]
8pub struct WorkerRow {
9    pub id: String,
10    pub worker_type: String,
11    pub storage_name: String,
12    pub layers: Option<String>,
13    pub last_seen: DateTime<Utc>,
14    pub started_at: Option<DateTime<Utc>>,
15}
16
17
18use crate::{CompactType, PostgresStorage};
19
20impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
21where
22    PostgresStorage<Args, CompactType, D, F>:
23        Backend<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
24{
25    fn list_workers(
26        &self,
27        queue: &str,
28    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
29        let queue = queue.to_string();
30        let pool = self.pool.clone();
31        let limit = 100;
32        let offset = 0;
33        async move {
34            let workers = sqlx::query_file_as!(
35                WorkerRow,
36                "queries/backend/list_workers.sql",
37                queue,
38                limit,
39                offset
40            )
41            .fetch_all(&pool)
42            .map_ok(|w| {
43                w.into_iter()
44                    .map(|w| RunningWorker {
45                        id: w.id,
46                        backend: w.storage_name,
47                        started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
48                        last_heartbeat: w.last_seen.timestamp() as u64,
49                        layers: w.layers.unwrap_or_default(),
50                        queue: w.worker_type,
51                    })
52                    .collect()
53            })
54            .await?;
55            Ok(workers)
56        }
57    }
58
59    fn list_all_workers(
60        &self,
61    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
62        let pool = self.pool.clone();
63        let limit = 100;
64        let offset = 0;
65        async move {
66            let workers = sqlx::query_file_as!(
67                WorkerRow,
68                "queries/backend/list_all_workers.sql",
69                limit,
70                offset
71            )
72            .fetch_all(&pool)
73            .map_ok(|w| {
74                w.into_iter()
75                    .map(|w| RunningWorker {
76                        id: w.id,
77                        backend: w.storage_name,
78                        started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
79                        last_heartbeat: w.last_seen.timestamp() as u64,
80                        layers: w.layers.unwrap_or_default(),
81                        queue: w.worker_type,
82                    })
83                    .collect()
84            })
85            .await?;
86            Ok(workers)
87        }
88    }
89}