apalis_sqlite/queries/
list_workers.rs

1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker};
2use futures::TryFutureExt;
3use ulid::Ulid;
4
5use crate::{CompactType, SqlContext, SqliteStorage};
6
7struct Worker {
8    id: String,
9    worker_type: String,
10    storage_name: String,
11    layers: Option<String>,
12    last_seen: i64,
13    started_at: Option<i64>,
14}
15
16impl<Args: Sync, D, F> ListWorkers for SqliteStorage<Args, D, F>
17where
18    Self:
19        BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
20{
21    fn list_workers(
22        &self,
23        queue: &str,
24    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
25        let queue = queue.to_owned();
26        let pool = self.pool.clone();
27        let limit = 100;
28        let offset = 0;
29        async move {
30            let workers = sqlx::query_file_as!(
31                Worker,
32                "queries/backend/list_workers.sql",
33                queue,
34                limit,
35                offset
36            )
37            .fetch_all(&pool)
38            .map_ok(|w| {
39                w.into_iter()
40                    .map(|w| RunningWorker {
41                        id: w.id,
42                        backend: w.storage_name,
43                        started_at: w.started_at.unwrap_or_default() as u64,
44                        last_heartbeat: w.last_seen as u64,
45                        layers: w.layers.unwrap_or_default(),
46                        queue: w.worker_type,
47                    })
48                    .collect()
49            })
50            .await?;
51            Ok(workers)
52        }
53    }
54
55    fn list_all_workers(
56        &self,
57    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
58        let pool = self.pool.clone();
59        let limit = 100;
60        let offset = 0;
61        async move {
62            let workers = sqlx::query_file_as!(
63                Worker,
64                "queries/backend/list_all_workers.sql",
65                limit,
66                offset
67            )
68            .fetch_all(&pool)
69            .map_ok(|w| {
70                w.into_iter()
71                    .map(|w| RunningWorker {
72                        id: w.id,
73                        backend: w.storage_name,
74                        started_at: w.started_at.unwrap_or_default() as u64,
75                        last_heartbeat: w.last_seen as u64,
76                        layers: w.layers.unwrap_or_default(),
77                        queue: w.worker_type,
78                    })
79                    .collect()
80            })
81            .await?;
82            Ok(workers)
83        }
84    }
85}