apalis_postgres/queries/
list_workers.rs1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use apalis_sql::{DateTime, DateTimeExt};
3use futures::TryFutureExt;
4use ulid::Ulid;
5
6#[derive(Debug)]
7pub struct WorkerRow {
8 pub id: String,
9 pub worker_type: String,
10 pub storage_name: String,
11 pub layers: Option<String>,
12 pub last_seen: DateTime,
13 pub started_at: Option<DateTime>,
14}
15
16use crate::{CompactType, PgContext, PostgresStorage};
17
18impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
19where
20 PostgresStorage<Args, CompactType, D, F>:
21 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
22{
23 fn list_workers(
24 &self,
25 queue: &str,
26 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
27 let queue = queue.to_string();
28 let pool = self.pool.clone();
29 let limit = 100;
30 let offset = 0;
31 async move {
32 let workers = sqlx::query_file_as!(
33 WorkerRow,
34 "queries/backend/list_workers.sql",
35 queue,
36 limit,
37 offset
38 )
39 .fetch_all(&pool)
40 .map_ok(|w| {
41 w.into_iter()
42 .map(|w| RunningWorker {
43 id: w.id,
44 backend: w.storage_name,
45 started_at: w
46 .started_at
47 .map(|t| t.to_unix_timestamp())
48 .unwrap_or_default() as u64,
49 last_heartbeat: w.last_seen.to_unix_timestamp() as u64,
50 layers: w.layers.unwrap_or_default(),
51 queue: w.worker_type,
52 })
53 .collect()
54 })
55 .await?;
56 Ok(workers)
57 }
58 }
59
60 fn list_all_workers(
61 &self,
62 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
63 let pool = self.pool.clone();
64 let limit = 100;
65 let offset = 0;
66 async move {
67 let workers = sqlx::query_file_as!(
68 WorkerRow,
69 "queries/backend/list_all_workers.sql",
70 limit,
71 offset
72 )
73 .fetch_all(&pool)
74 .map_ok(|w| {
75 w.into_iter()
76 .map(|w| RunningWorker {
77 id: w.id,
78 backend: w.storage_name,
79 started_at: w
80 .started_at
81 .map(|t| t.to_unix_timestamp())
82 .unwrap_or_default() as u64,
83 last_heartbeat: w.last_seen.to_unix_timestamp() as u64,
84 layers: w.layers.unwrap_or_default(),
85 queue: w.worker_type,
86 })
87 .collect()
88 })
89 .await?;
90 Ok(workers)
91 }
92 }
93}