apalis_postgres/queries/
list_workers.rs1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use apalis_sql::context::SqlContext;
3use chrono::{DateTime, Utc};
4use futures::TryFutureExt;
5use ulid::Ulid;
6
7#[derive(Debug)]
8pub struct WorkerRow {
9 pub id: String,
10 pub worker_type: String,
11 pub storage_name: String,
12 pub layers: Option<String>,
13 pub last_seen: DateTime<Utc>,
14 pub started_at: Option<DateTime<Utc>>,
15}
16
17use crate::{CompactType, PostgresStorage};
18
19impl<Args: Sync, D, F> ListWorkers for PostgresStorage<Args, CompactType, D, F>
20where
21 PostgresStorage<Args, CompactType, D, F>:
22 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
23{
24 fn list_workers(
25 &self,
26 queue: &str,
27 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
28 let queue = queue.to_string();
29 let pool = self.pool.clone();
30 let limit = 100;
31 let offset = 0;
32 async move {
33 let workers = sqlx::query_file_as!(
34 WorkerRow,
35 "queries/backend/list_workers.sql",
36 queue,
37 limit,
38 offset
39 )
40 .fetch_all(&pool)
41 .map_ok(|w| {
42 w.into_iter()
43 .map(|w| RunningWorker {
44 id: w.id,
45 backend: w.storage_name,
46 started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
47 last_heartbeat: w.last_seen.timestamp() as u64,
48 layers: w.layers.unwrap_or_default(),
49 queue: w.worker_type,
50 })
51 .collect()
52 })
53 .await?;
54 Ok(workers)
55 }
56 }
57
58 fn list_all_workers(
59 &self,
60 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
61 let pool = self.pool.clone();
62 let limit = 100;
63 let offset = 0;
64 async move {
65 let workers = sqlx::query_file_as!(
66 WorkerRow,
67 "queries/backend/list_all_workers.sql",
68 limit,
69 offset
70 )
71 .fetch_all(&pool)
72 .map_ok(|w| {
73 w.into_iter()
74 .map(|w| RunningWorker {
75 id: w.id,
76 backend: w.storage_name,
77 started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64,
78 last_heartbeat: w.last_seen.timestamp() as u64,
79 layers: w.layers.unwrap_or_default(),
80 queue: w.worker_type,
81 })
82 .collect()
83 })
84 .await?;
85 Ok(workers)
86 }
87 }
88}