apalis_postgres/queries/
metrics.rs

1use apalis_core::backend::{BackendExt, Metrics, Statistic};
2use ulid::Ulid;
3
4use crate::{CompactType, PgContext, PostgresStorage};
5
6struct StatisticRow {
7    priority: Option<i32>,
8    r#type: Option<String>,
9    statistic: Option<String>,
10    value: Option<f32>,
11}
12
13impl<Args, D, F> Metrics for PostgresStorage<Args, CompactType, D, F>
14where
15    PostgresStorage<Args, CompactType, D, F>:
16        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
17{
18    fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
19        let pool = self.pool.clone();
20
21        async move {
22            let rec = sqlx::query_file_as!(StatisticRow, "queries/backend/overview.sql")
23                .fetch_all(&pool)
24                .await?
25                .into_iter()
26                .map(|r| Statistic {
27                    priority: Some(r.priority.unwrap_or_default() as u64),
28                    stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
29                    title: r.statistic.unwrap_or_default(),
30                    value: r.value.unwrap_or_default().to_string(),
31                })
32                .collect();
33            Ok(rec)
34        }
35    }
36    fn fetch_by_queue(
37        &self,
38        queue_id: &str,
39    ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
40        let pool = self.pool.clone();
41        let queue_id = queue_id.to_string();
42        async move {
43            let rec = sqlx::query_file_as!(
44                StatisticRow,
45                "queries/backend/overview_by_queue.sql",
46                queue_id
47            )
48            .fetch_all(&pool)
49            .await?
50            .into_iter()
51            .map(|r| Statistic {
52                priority: Some(r.priority.unwrap_or_default() as u64),
53                stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
54                title: r.statistic.unwrap_or_default(),
55                value: r.value.unwrap_or_default().to_string(),
56            })
57            .collect();
58            Ok(rec)
59        }
60    }
61}