apalis_postgres/queries/
list_workers.rs1use apalis_core::backend::{Backend, ListWorkers, RunningWorker};
2use apalis_sql::{ context::SqlContext};
3use chrono::{DateTime, Utc};
4use futures::TryFutureExt;
5use ulid::Ulid;
6
7#[derive(Debug)]
8pub struct WorkerRow {
9 pub id: String,
10 pub worker_type: String,
11 pub storage_name: String,
12 pub layers: Option<String>,
13 pub last_seen: DateTime<Utc>,
14 pub started_at: Option<DateTime<Utc>>,
15}
16
17
18use crate::{CompactType, PostgresStorage};
19
20impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
21where
22 PostgresStorage<Args, CompactType, D, F>:
23 Backend<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
24{
25 fn list_workers(
26 &self,
27 queue: &str,
28 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
29 let queue = queue.to_string();
30 let pool = self.pool.clone();
31 let limit = 100;
32 let offset = 0;
33 async move {
34 let workers = sqlx::query_file_as!(
35 WorkerRow,
36 "queries/backend/list_workers.sql",
37 queue,
38 limit,
39 offset
40 )
41 .fetch_all(&pool)
42 .map_ok(|w| {
43 w.into_iter()
44 .map(|w| RunningWorker {
45 id: w.id,
46 backend: w.storage_name,
47 started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
48 last_heartbeat: w.last_seen.timestamp() as u64,
49 layers: w.layers.unwrap_or_default(),
50 queue: w.worker_type,
51 })
52 .collect()
53 })
54 .await?;
55 Ok(workers)
56 }
57 }
58
59 fn list_all_workers(
60 &self,
61 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
62 let pool = self.pool.clone();
63 let limit = 100;
64 let offset = 0;
65 async move {
66 let workers = sqlx::query_file_as!(
67 WorkerRow,
68 "queries/backend/list_all_workers.sql",
69 limit,
70 offset
71 )
72 .fetch_all(&pool)
73 .map_ok(|w| {
74 w.into_iter()
75 .map(|w| RunningWorker {
76 id: w.id,
77 backend: w.storage_name,
78 started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
79 last_heartbeat: w.last_seen.timestamp() as u64,
80 layers: w.layers.unwrap_or_default(),
81 queue: w.worker_type,
82 })
83 .collect()
84 })
85 .await?;
86 Ok(workers)
87 }
88 }
89}