apalis_sqlite/queries/
list_workers.rs1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use futures::TryFutureExt;
3use ulid::Ulid;
4
5use crate::{CompactType, SqlContext, SqliteStorage};
6
7struct Worker {
8 id: String,
9 worker_type: String,
10 storage_name: String,
11 layers: Option<String>,
12 last_seen: i64,
13 started_at: Option<i64>,
14}
15
16impl<Args: Sync, D, F> ListWorkers for SqliteStorage<Args, D, F>
17where
18 Self:
19 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
20{
21 fn list_workers(
22 &self,
23 queue: &str,
24 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
25 let queue = queue.to_owned();
26 let pool = self.pool.clone();
27 let limit = 100;
28 let offset = 0;
29 async move {
30 let workers = sqlx::query_file_as!(
31 Worker,
32 "queries/backend/list_workers.sql",
33 queue,
34 limit,
35 offset
36 )
37 .fetch_all(&pool)
38 .map_ok(|w| {
39 w.into_iter()
40 .map(|w| RunningWorker {
41 id: w.id,
42 backend: w.storage_name,
43 started_at: w.started_at.unwrap_or_default() as u64,
44 last_heartbeat: w.last_seen as u64,
45 layers: w.layers.unwrap_or_default(),
46 queue: w.worker_type,
47 })
48 .collect()
49 })
50 .await?;
51 Ok(workers)
52 }
53 }
54
55 fn list_all_workers(
56 &self,
57 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
58 let pool = self.pool.clone();
59 let limit = 100;
60 let offset = 0;
61 async move {
62 let workers = sqlx::query_file_as!(
63 Worker,
64 "queries/backend/list_all_workers.sql",
65 limit,
66 offset
67 )
68 .fetch_all(&pool)
69 .map_ok(|w| {
70 w.into_iter()
71 .map(|w| RunningWorker {
72 id: w.id,
73 backend: w.storage_name,
74 started_at: w.started_at.unwrap_or_default() as u64,
75 last_heartbeat: w.last_seen as u64,
76 layers: w.layers.unwrap_or_default(),
77 queue: w.worker_type,
78 })
79 .collect()
80 })
81 .await?;
82 Ok(workers)
83 }
84 }
85}