apalis_postgres/queries/
list_workers.rs

1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use apalis_sql::context::SqlContext;
3use chrono::{DateTime, Utc};
4use futures::TryFutureExt;
5use ulid::Ulid;
6
7#[derive(Debug)]
8pub struct WorkerRow {
9    pub id: String,
10    pub worker_type: String,
11    pub storage_name: String,
12    pub layers: Option<String>,
13    pub last_seen: DateTime<Utc>,
14    pub started_at: Option<DateTime<Utc>>,
15}
16
17use crate::{CompactType, PostgresStorage};
18
19impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
20where
21    PostgresStorage<Args, CompactType, D, F>:
22        BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
23{
24    fn list_workers(
25        &self,
26        queue: &str,
27    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
28        let queue = queue.to_string();
29        let pool = self.pool.clone();
30        let limit = 100;
31        let offset = 0;
32        async move {
33            let workers = sqlx::query_file_as!(
34                WorkerRow,
35                "queries/backend/list_workers.sql",
36                queue,
37                limit,
38                offset
39            )
40            .fetch_all(&pool)
41            .map_ok(|w| {
42                w.into_iter()
43                    .map(|w| RunningWorker {
44                        id: w.id,
45                        backend: w.storage_name,
46                        started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
47                        last_heartbeat: w.last_seen.timestamp() as u64,
48                        layers: w.layers.unwrap_or_default(),
49                        queue: w.worker_type,
50                    })
51                    .collect()
52            })
53            .await?;
54            Ok(workers)
55        }
56    }
57
58    fn list_all_workers(
59        &self,
60    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
61        let pool = self.pool.clone();
62        let limit = 100;
63        let offset = 0;
64        async move {
65            let workers = sqlx::query_file_as!(
66                WorkerRow,
67                "queries/backend/list_all_workers.sql",
68                limit,
69                offset
70            )
71            .fetch_all(&pool)
72            .map_ok(|w| {
73                w.into_iter()
74                    .map(|w| RunningWorker {
75                        id: w.id,
76                        backend: w.storage_name,
77                        started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
78                        last_heartbeat: w.last_seen.timestamp() as u64,
79                        layers: w.layers.unwrap_or_default(),
80                        queue: w.worker_type,
81                    })
82                    .collect()
83            })
84            .await?;
85            Ok(workers)
86        }
87    }
88}