apalis_postgres/queries/
list_workers.rs1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use apalis_sql::{DateTime, DateTimeExt};
3use futures::TryFutureExt;
4use ulid::Ulid;
5
6#[derive(Debug)]
7pub struct WorkerRow {
8 pub id: String,
9 pub worker_type: String,
10 pub storage_name: String,
11 pub layers: Option<String>,
12 pub last_seen: DateTime,
13 pub started_at: Option<DateTime>,
14}
15
16use crate::{CompactType, PgContext, PostgresStorage};
17
18impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
19where
20 PostgresStorage<Args, CompactType, D, F>:
21 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
22{
23 fn list_workers(&self) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
24 let queue = self.config.queue().to_string();
25
26 let pool = self.pool.clone();
27 let limit = 100;
28 let offset = 0;
29 async move {
30 let workers = sqlx::query_file_as!(
31 WorkerRow,
32 "queries/backend/list_workers.sql",
33 queue,
34 limit,
35 offset
36 )
37 .fetch_all(&pool)
38 .map_ok(|w| {
39 w.into_iter()
40 .map(|w| RunningWorker {
41 id: w.id,
42 backend: w.storage_name,
43 started_at: w
44 .started_at
45 .map(|t| t.to_unix_timestamp())
46 .unwrap_or_default() as u64,
47 last_heartbeat: w.last_seen.to_unix_timestamp() as u64,
48 layers: w.layers.unwrap_or_default(),
49 queue: w.worker_type,
50 })
51 .collect()
52 })
53 .await?;
54 Ok(workers)
55 }
56 }
57
58 fn list_all_workers(
59 &self,
60 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
61 let pool = self.pool.clone();
62 let limit = 100;
63 let offset = 0;
64 async move {
65 let workers = sqlx::query_file_as!(
66 WorkerRow,
67 "queries/backend/list_all_workers.sql",
68 limit,
69 offset
70 )
71 .fetch_all(&pool)
72 .map_ok(|w| {
73 w.into_iter()
74 .map(|w| RunningWorker {
75 id: w.id,
76 backend: w.storage_name,
77 started_at: w
78 .started_at
79 .map(|t| t.to_unix_timestamp())
80 .unwrap_or_default() as u64,
81 last_heartbeat: w.last_seen.to_unix_timestamp() as u64,
82 layers: w.layers.unwrap_or_default(),
83 queue: w.worker_type,
84 })
85 .collect()
86 })
87 .await?;
88 Ok(workers)
89 }
90 }
91}