apalis_postgres/queries/
list_workers.rs1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use chrono::{DateTime, Utc};
3use futures::TryFutureExt;
4use ulid::Ulid;
5
6#[derive(Debug)]
7pub struct WorkerRow {
8 pub id: String,
9 pub worker_type: String,
10 pub storage_name: String,
11 pub layers: Option<String>,
12 pub last_seen: DateTime<Utc>,
13 pub started_at: Option<DateTime<Utc>>,
14}
15
16use crate::{CompactType, PgContext, PostgresStorage};
17
18impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
19where
20 PostgresStorage<Args, CompactType, D, F>:
21 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
22{
23 fn list_workers(
24 &self,
25 queue: &str,
26 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
27 let queue = queue.to_string();
28 let pool = self.pool.clone();
29 let limit = 100;
30 let offset = 0;
31 async move {
32 let workers = sqlx::query_file_as!(
33 WorkerRow,
34 "queries/backend/list_workers.sql",
35 queue,
36 limit,
37 offset
38 )
39 .fetch_all(&pool)
40 .map_ok(|w| {
41 w.into_iter()
42 .map(|w| RunningWorker {
43 id: w.id,
44 backend: w.storage_name,
45 started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
46 last_heartbeat: w.last_seen.timestamp() as u64,
47 layers: w.layers.unwrap_or_default(),
48 queue: w.worker_type,
49 })
50 .collect()
51 })
52 .await?;
53 Ok(workers)
54 }
55 }
56
57 fn list_all_workers(
58 &self,
59 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
60 let pool = self.pool.clone();
61 let limit = 100;
62 let offset = 0;
63 async move {
64 let workers = sqlx::query_file_as!(
65 WorkerRow,
66 "queries/backend/list_all_workers.sql",
67 limit,
68 offset
69 )
70 .fetch_all(&pool)
71 .map_ok(|w| {
72 w.into_iter()
73 .map(|w| RunningWorker {
74 id: w.id,
75 backend: w.storage_name,
76 started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
77 last_heartbeat: w.last_seen.timestamp() as u64,
78 layers: w.layers.unwrap_or_default(),
79 queue: w.worker_type,
80 })
81 .collect()
82 })
83 .await?;
84 Ok(workers)
85 }
86 }
87}