apalis_postgres/queries/
list_queues.rs

1use apalis_core::backend::{BackendExt, ListQueues, QueueInfo};
2use serde_json::Value;
3use ulid::Ulid;
4
5use crate::{CompactType, PgContext, PostgresStorage};
6
7impl<Args, D, F> ListQueues for PostgresStorage<Args, CompactType, D, F>
8where
9    PostgresStorage<Args, CompactType, D, F>:
10        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
11{
12    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
13        let pool = self.pool.clone();
14        struct QueueInfoRow {
15            pub name: Option<String>,
16            pub stats: Option<Value>,
17            pub workers: Option<Value>,
18            pub activity: Option<Value>,
19        }
20
21        async move {
22            let queues = sqlx::query_file_as!(QueueInfoRow, "queries/backend/list_queues.sql")
23                .fetch_all(&pool)
24                .await?
25                .into_iter()
26                .map(|row| QueueInfo {
27                    name: row.name.unwrap_or_default(),
28                    stats: serde_json::from_value(row.stats.unwrap()).unwrap_or_default(),
29                    workers: serde_json::from_value(row.workers.unwrap()).unwrap_or_default(),
30                    activity: serde_json::from_value(row.activity.unwrap()).unwrap_or_default(),
31                })
32                .collect();
33            Ok(queues)
34        }
35    }
36}