apalis_postgres/queries/
metrics.rs1use apalis_core::backend::{BackendExt, Metrics, Statistic};
2use apalis_sql::context::SqlContext;
3use ulid::Ulid;
4
5use crate::{CompactType, PostgresStorage};
6
7struct StatisticRow {
8 priority: Option<i32>,
9 r#type: Option<String>,
10 statistic: Option<String>,
11 value: Option<f32>,
12}
13
14impl<Args, D, F> Metrics for PostgresStorage<Args, CompactType, D, F>
15where
16 PostgresStorage<Args, CompactType, D, F>:
17 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
18{
19 fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
20 let pool = self.pool.clone();
21
22 async move {
23 let rec = sqlx::query_file_as!(StatisticRow, "queries/backend/overview.sql")
24 .fetch_all(&pool)
25 .await?
26 .into_iter()
27 .map(|r| Statistic {
28 priority: Some(r.priority.unwrap_or_default() as u64),
29 stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
30 title: r.statistic.unwrap_or_default(),
31 value: r.value.unwrap_or_default().to_string(),
32 })
33 .collect();
34 Ok(rec)
35 }
36 }
37 fn fetch_by_queue(
38 &self,
39 queue_id: &str,
40 ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
41 let pool = self.pool.clone();
42 let queue_id = queue_id.to_string();
43 async move {
44 let rec = sqlx::query_file_as!(
45 StatisticRow,
46 "queries/backend/overview_by_queue.sql",
47 queue_id
48 )
49 .fetch_all(&pool)
50 .await?
51 .into_iter()
52 .map(|r| Statistic {
53 priority: Some(r.priority.unwrap_or_default() as u64),
54 stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
55 title: r.statistic.unwrap_or_default(),
56 value: r.value.unwrap_or_default().to_string(),
57 })
58 .collect();
59 Ok(rec)
60 }
61 }
62}