background_jobs_metrics/
lib.rs

1/*
2 * This file is part of Background Jobs.
3 *
4 * Copyright © 2023 Riley Trautman
5 *
6 * Background Jobs is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Background Jobs is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Background Jobs.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20//! Types for collecting stats from background-jobs
21
22mod recorder;
23
24pub use metrics::SetRecorderError;
25
26pub use recorder::{JobStat, Stats, StatsHandle, StatsRecorder};
27
28/// Install the stats recorder into the process
29///
30/// ```rust
31/// background_jobs_metrics::install().expect("Failed to install recorder");
32/// ```
33pub fn install() -> Result<StatsHandle, SetRecorderError<StatsRecorder>> {
34    StatsRecorder::install()
35}
36
37/// Build the stats recorder and fetch the handle.
38///
39/// This can be used in conjunction with `metrics_util::layers::FanoutBuilder` to add it in
40/// addition to another recorder
41///
42/// ```rust
43/// let (jobs_recorder, handle) = background_jobs_metrics::build();
44///
45/// let recorder = metrics_util::layers::FanoutBuilder::default()
46///     .add_recorder(jobs_recorder)
47///     .build();
48///
49/// metrics::set_global_recorder(recorder).expect("Failed to set recorder");
50///
51/// println!("{:?}", handle.get());
52/// ```
53pub fn build() -> (StatsRecorder, StatsHandle) {
54    StatsRecorder::build()
55}
56
57#[derive(Clone, Debug)]
58/// A wrapper for any Storage type adding metrics
59pub struct MetricsStorage<S>(S);
60
61impl<S> MetricsStorage<S> {
62    /// Add metrics to a provided Storage
63    pub const fn wrap(storage: S) -> MetricsStorage<S>
64    where
65        S: background_jobs_core::Storage,
66    {
67        Self(storage)
68    }
69}
70
71#[async_trait::async_trait]
72impl<S> background_jobs_core::Storage for MetricsStorage<S>
73where
74    S: background_jobs_core::Storage + Sync,
75{
76    type Error = S::Error;
77
78    async fn info(
79        &self,
80        job_id: uuid::Uuid,
81    ) -> Result<Option<background_jobs_core::JobInfo>, Self::Error> {
82        self.0.info(job_id).await
83    }
84
85    async fn push(&self, job: background_jobs_core::NewJobInfo) -> Result<uuid::Uuid, Self::Error> {
86        let queue = job.queue().to_string();
87        let name = job.name().to_string();
88
89        let uuid = self.0.push(job).await?;
90
91        metrics::counter!("background-jobs.job.created", "queue" => queue, "name" => name)
92            .increment(1);
93
94        Ok(uuid)
95    }
96
97    async fn pop(
98        &self,
99        queue: &str,
100        runner_id: uuid::Uuid,
101    ) -> Result<background_jobs_core::JobInfo, Self::Error> {
102        let job_info = self.0.pop(queue, runner_id).await?;
103
104        metrics::counter!("background-jobs.job.started", "queue" => job_info.queue.clone(), "name" => job_info.name.clone()).increment(1);
105
106        Ok(job_info)
107    }
108
109    async fn heartbeat(
110        &self,
111        job_id: uuid::Uuid,
112        runner_id: uuid::Uuid,
113    ) -> Result<(), Self::Error> {
114        self.0.heartbeat(job_id, runner_id).await
115    }
116
117    async fn complete(
118        &self,
119        return_job_info: background_jobs_core::ReturnJobInfo,
120    ) -> Result<bool, Self::Error> {
121        let info = if let Some(info) = self.0.info(return_job_info.id).await? {
122            Some(info)
123        } else {
124            tracing::warn!("Returned non-existant job");
125            metrics::counter!("background-jobs.job.missing").increment(1);
126            None
127        };
128
129        let result = return_job_info.result;
130
131        let completed = self.0.complete(return_job_info).await?;
132
133        if let Some(info) = info {
134            metrics::counter!("background-jobs.job.finished", "queue" => info.queue.clone(), "name" => info.name.clone()).increment(1);
135
136            match result {
137                background_jobs_core::JobResult::Success => {
138                    metrics::counter!("background-jobs.job.completed", "queue" => info.queue, "name" => info.name).increment(1);
139                }
140                background_jobs_core::JobResult::Failure if completed => {
141                    metrics::counter!("background-jobs.job.dead", "queue" => info.queue, "name" => info.name).increment(1);
142                }
143                background_jobs_core::JobResult::Failure => {
144                    metrics::counter!("background-jobs.job.failed", "queue" => info.queue, "name" => info.name).increment(1);
145                }
146                background_jobs_core::JobResult::Unexecuted
147                | background_jobs_core::JobResult::Unregistered => {
148                    metrics::counter!("background-jobs.job.returned", "queue" => info.queue, "name" => info.name).increment(1);
149                }
150            }
151        }
152
153        Ok(completed)
154    }
155}