apalis_postgres/queries/
list_queues.rs1use apalis_core::backend::{BackendExt, ListQueues, QueueInfo};
2use serde_json::Value;
3use ulid::Ulid;
4
5use crate::{CompactType, PgContext, PostgresStorage};
6
7impl<Args, D, F> ListQueues for PostgresStorage<Args, CompactType, D, F>
8where
9 PostgresStorage<Args, CompactType, D, F>:
10 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
11{
12 fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
13 let pool = self.pool.clone();
14 struct QueueInfoRow {
15 pub name: Option<String>,
16 pub stats: Option<Value>,
17 pub workers: Option<Value>,
18 pub activity: Option<Value>,
19 }
20
21 async move {
22 let queues = sqlx::query_file_as!(QueueInfoRow, "queries/backend/list_queues.sql")
23 .fetch_all(&pool)
24 .await?
25 .into_iter()
26 .map(|row| QueueInfo {
27 name: row.name.unwrap_or_default(),
28 stats: serde_json::from_value(row.stats.unwrap()).unwrap_or_default(),
29 workers: serde_json::from_value(row.workers.unwrap()).unwrap_or_default(),
30 activity: serde_json::from_value(row.activity.unwrap()).unwrap_or_default(),
31 })
32 .collect();
33 Ok(queues)
34 }
35 }
36}