apalis_postgres/queries/
metrics.rs1use apalis_core::backend::{BackendExt, Metrics, Statistic};
2use ulid::Ulid;
3
4use crate::{CompactType, PgContext, PostgresStorage};
5
6struct StatisticRow {
7 priority: Option<i32>,
8 r#type: Option<String>,
9 statistic: Option<String>,
10 value: Option<f32>,
11}
12
13impl<Args, D, F> Metrics for PostgresStorage<Args, CompactType, D, F>
14where
15 PostgresStorage<Args, CompactType, D, F>:
16 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
17{
18 fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
19 let pool = self.pool.clone();
20
21 async move {
22 let rec = sqlx::query_file_as!(StatisticRow, "queries/backend/overview.sql")
23 .fetch_all(&pool)
24 .await?
25 .into_iter()
26 .map(|r| Statistic {
27 priority: Some(r.priority.unwrap_or_default() as u64),
28 stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
29 title: r.statistic.unwrap_or_default(),
30 value: r.value.unwrap_or_default().to_string(),
31 })
32 .collect();
33 Ok(rec)
34 }
35 }
36 fn fetch_by_queue(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
37 let pool = self.pool.clone();
38 let queue_id = self.config.queue().to_string();
39 async move {
40 let rec = sqlx::query_file_as!(
41 StatisticRow,
42 "queries/backend/overview_by_queue.sql",
43 queue_id
44 )
45 .fetch_all(&pool)
46 .await?
47 .into_iter()
48 .map(|r| Statistic {
49 priority: Some(r.priority.unwrap_or_default() as u64),
50 stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
51 title: r.statistic.unwrap_or_default(),
52 value: r.value.unwrap_or_default().to_string(),
53 })
54 .collect();
55 Ok(rec)
56 }
57 }
58}