apalis_postgres/queries/
metrics.rs1use apalis_core::backend::{BackendExt, Metrics, Statistic};
2use ulid::Ulid;
3
4use crate::{CompactType, PgContext, PostgresStorage};
5
6struct StatisticRow {
7 priority: Option<i32>,
8 r#type: Option<String>,
9 statistic: Option<String>,
10 value: Option<f32>,
11}
12
13impl<Args, D, F> Metrics for PostgresStorage<Args, CompactType, D, F>
14where
15 PostgresStorage<Args, CompactType, D, F>:
16 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
17{
18 fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
19 let pool = self.pool.clone();
20
21 async move {
22 let rec = sqlx::query_file_as!(StatisticRow, "queries/backend/overview.sql")
23 .fetch_all(&pool)
24 .await?
25 .into_iter()
26 .map(|r| Statistic {
27 priority: Some(r.priority.unwrap_or_default() as u64),
28 stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
29 title: r.statistic.unwrap_or_default(),
30 value: r.value.unwrap_or_default().to_string(),
31 })
32 .collect();
33 Ok(rec)
34 }
35 }
36 fn fetch_by_queue(
37 &self,
38 queue_id: &str,
39 ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
40 let pool = self.pool.clone();
41 let queue_id = queue_id.to_string();
42 async move {
43 let rec = sqlx::query_file_as!(
44 StatisticRow,
45 "queries/backend/overview_by_queue.sql",
46 queue_id
47 )
48 .fetch_all(&pool)
49 .await?
50 .into_iter()
51 .map(|r| Statistic {
52 priority: Some(r.priority.unwrap_or_default() as u64),
53 stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
54 title: r.statistic.unwrap_or_default(),
55 value: r.value.unwrap_or_default().to_string(),
56 })
57 .collect();
58 Ok(rec)
59 }
60 }
61}