apalis_postgres/queries/
list_queues.rs

1use apalis_core::backend::{BackendExt, ListQueues, QueueInfo};
2use apalis_sql::context::SqlContext;
3use serde_json::Value;
4use ulid::Ulid;
5
6use crate::{CompactType, PostgresStorage};
7
8impl<Args, D, F> ListQueues for PostgresStorage<Args, CompactType, D, F>
9where
10    PostgresStorage<Args, CompactType, D, F>:
11        BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
12{
13    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
14        let pool = self.pool.clone();
15        struct QueueInfoRow {
16            pub name: Option<String>,
17            pub stats: Option<Value>,
18            pub workers: Option<Value>,
19            pub activity: Option<Value>,
20        }
21
22        async move {
23            let queues = sqlx::query_file_as!(QueueInfoRow, "queries/backend/list_queues.sql")
24                .fetch_all(&pool)
25                .await?
26                .into_iter()
27                .map(|row| QueueInfo {
28                    name: row.name.unwrap_or_default(),
29                    stats: serde_json::from_value(row.stats.unwrap()).unwrap_or_default(),
30                    workers: serde_json::from_value(row.workers.unwrap()).unwrap_or_default(),
31                    activity: serde_json::from_value(row.activity.unwrap()).unwrap_or_default(),
32                })
33                .collect();
34            Ok(queues)
35        }
36    }
37}