background_jobs_metrics/
lib.rs1mod recorder;
23
24pub use metrics::SetRecorderError;
25
26pub use recorder::{JobStat, Stats, StatsHandle, StatsRecorder};
27
28pub fn install() -> Result<StatsHandle, SetRecorderError<StatsRecorder>> {
34 StatsRecorder::install()
35}
36
37pub fn build() -> (StatsRecorder, StatsHandle) {
54 StatsRecorder::build()
55}
56
57#[derive(Clone, Debug)]
58pub struct MetricsStorage<S>(S);
60
61impl<S> MetricsStorage<S> {
62 pub const fn wrap(storage: S) -> MetricsStorage<S>
64 where
65 S: background_jobs_core::Storage,
66 {
67 Self(storage)
68 }
69}
70
71#[async_trait::async_trait]
72impl<S> background_jobs_core::Storage for MetricsStorage<S>
73where
74 S: background_jobs_core::Storage + Sync,
75{
76 type Error = S::Error;
77
78 async fn info(
79 &self,
80 job_id: uuid::Uuid,
81 ) -> Result<Option<background_jobs_core::JobInfo>, Self::Error> {
82 self.0.info(job_id).await
83 }
84
85 async fn push(&self, job: background_jobs_core::NewJobInfo) -> Result<uuid::Uuid, Self::Error> {
86 let queue = job.queue().to_string();
87 let name = job.name().to_string();
88
89 let uuid = self.0.push(job).await?;
90
91 metrics::counter!("background-jobs.job.created", "queue" => queue, "name" => name)
92 .increment(1);
93
94 Ok(uuid)
95 }
96
97 async fn pop(
98 &self,
99 queue: &str,
100 runner_id: uuid::Uuid,
101 ) -> Result<background_jobs_core::JobInfo, Self::Error> {
102 let job_info = self.0.pop(queue, runner_id).await?;
103
104 metrics::counter!("background-jobs.job.started", "queue" => job_info.queue.clone(), "name" => job_info.name.clone()).increment(1);
105
106 Ok(job_info)
107 }
108
109 async fn heartbeat(
110 &self,
111 job_id: uuid::Uuid,
112 runner_id: uuid::Uuid,
113 ) -> Result<(), Self::Error> {
114 self.0.heartbeat(job_id, runner_id).await
115 }
116
117 async fn complete(
118 &self,
119 return_job_info: background_jobs_core::ReturnJobInfo,
120 ) -> Result<bool, Self::Error> {
121 let info = if let Some(info) = self.0.info(return_job_info.id).await? {
122 Some(info)
123 } else {
124 tracing::warn!("Returned non-existant job");
125 metrics::counter!("background-jobs.job.missing").increment(1);
126 None
127 };
128
129 let result = return_job_info.result;
130
131 let completed = self.0.complete(return_job_info).await?;
132
133 if let Some(info) = info {
134 metrics::counter!("background-jobs.job.finished", "queue" => info.queue.clone(), "name" => info.name.clone()).increment(1);
135
136 match result {
137 background_jobs_core::JobResult::Success => {
138 metrics::counter!("background-jobs.job.completed", "queue" => info.queue, "name" => info.name).increment(1);
139 }
140 background_jobs_core::JobResult::Failure if completed => {
141 metrics::counter!("background-jobs.job.dead", "queue" => info.queue, "name" => info.name).increment(1);
142 }
143 background_jobs_core::JobResult::Failure => {
144 metrics::counter!("background-jobs.job.failed", "queue" => info.queue, "name" => info.name).increment(1);
145 }
146 background_jobs_core::JobResult::Unexecuted
147 | background_jobs_core::JobResult::Unregistered => {
148 metrics::counter!("background-jobs.job.returned", "queue" => info.queue, "name" => info.name).increment(1);
149 }
150 }
151 }
152
153 Ok(completed)
154 }
155}