apalis_postgres/queries/
metrics.rs

1use apalis_core::backend::{Backend, Metrics, Statistic};
2use apalis_sql::context::SqlContext;
3use ulid::Ulid;
4
5use crate::{CompactType, PostgresStorage};
6
7struct StatisticRow {
8    priority: Option<i32>,
9    r#type: Option<String>,
10    statistic: Option<String>,
11    value: Option<f32>,
12}
13
14impl<Args, D, F> Metrics for PostgresStorage<Args, CompactType, D, F>
15where
16    PostgresStorage<Args, CompactType, D, F>:
17        Backend<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
18{
19    fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
20        let pool = self.pool.clone();
21
22        async move {
23            let rec = sqlx::query_file_as!(StatisticRow, "queries/backend/overview.sql")
24                .fetch_all(&pool)
25                .await?
26                .into_iter()
27                .map(|r| Statistic {
28                    priority: Some(r.priority.unwrap_or_default() as u64),
29                    stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
30                    title: r.statistic.unwrap_or_default(),
31                    value: r.value.unwrap_or_default().to_string(),
32                })
33                .collect();
34            Ok(rec)
35        }
36    }
37    fn fetch_by_queue(
38        &self,
39        queue_id: &str,
40    ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
41        let pool = self.pool.clone();
42        let queue_id = queue_id.to_string();
43        async move {
44            let rec = sqlx::query_file_as!(
45                StatisticRow,
46                "queries/backend/overview_by_queue.sql",
47                queue_id
48            )
49            .fetch_all(&pool)
50            .await?
51            .into_iter()
52            .map(|r| Statistic {
53                priority: Some(r.priority.unwrap_or_default() as u64),
54                stat_type: apalis_sql::stat_type_from_string(&r.r#type.unwrap_or_default()),
55                title: r.statistic.unwrap_or_default(),
56                value: r.value.unwrap_or_default().to_string(),
57            })
58            .collect();
59            Ok(rec)
60        }
61    }
62}