apalis_postgres/queries/
list_queues.rs1use apalis_core::backend::{BackendExt, ListQueues, QueueInfo};
2use apalis_sql::context::SqlContext;
3use serde_json::Value;
4use ulid::Ulid;
5
6use crate::{CompactType, PostgresStorage};
7
8impl<Args, D, F> ListQueues for PostgresStorage<Args, CompactType, D, F>
9where
10 PostgresStorage<Args, CompactType, D, F>:
11 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
12{
13 fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
14 let pool = self.pool.clone();
15 struct QueueInfoRow {
16 pub name: Option<String>,
17 pub stats: Option<Value>,
18 pub workers: Option<Value>,
19 pub activity: Option<Value>,
20 }
21
22 async move {
23 let queues = sqlx::query_file_as!(QueueInfoRow, "queries/backend/list_queues.sql")
24 .fetch_all(&pool)
25 .await?
26 .into_iter()
27 .map(|row| QueueInfo {
28 name: row.name.unwrap_or_default(),
29 stats: serde_json::from_value(row.stats.unwrap()).unwrap_or_default(),
30 workers: serde_json::from_value(row.workers.unwrap()).unwrap_or_default(),
31 activity: serde_json::from_value(row.activity.unwrap()).unwrap_or_default(),
32 })
33 .collect();
34 Ok(queues)
35 }
36 }
37}