1use super::{JobId, JobStatus};
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6use tracing::{debug, error, info, warn};
7
8#[derive(Debug, Clone, Serialize)]
10pub struct JobExecutionContext {
11 pub job_id: JobId,
13 pub job_type: String,
15 pub started_at: DateTime<Utc>,
17 pub priority: i32,
19 pub attempt: u32,
21 pub max_retries: u32,
23}
24
25impl JobExecutionContext {
26 #[must_use]
28 pub fn new(
29 job_id: JobId,
30 job_type: String,
31 priority: i32,
32 attempt: u32,
33 max_retries: u32,
34 ) -> Self {
35 Self {
36 job_id,
37 job_type,
38 started_at: Utc::now(),
39 priority,
40 attempt,
41 max_retries,
42 }
43 }
44
45 #[must_use]
47 pub fn execution_duration_ms(&self) -> u64 {
48 Utc::now()
49 .signed_duration_since(self.started_at)
50 .num_milliseconds()
51 .max(0)
52 .try_into()
53 .unwrap_or(0)
54 }
55
56 pub fn log_start(&self) {
58 info!(
59 job_id = %self.job_id,
60 job_type = %self.job_type,
61 priority = self.priority,
62 attempt = self.attempt,
63 max_retries = self.max_retries,
64 "Job execution started"
65 );
66 }
67
68 pub fn log_completion(&self) {
70 let duration_ms = self.execution_duration_ms();
71 info!(
72 job_id = %self.job_id,
73 job_type = %self.job_type,
74 duration_ms = duration_ms,
75 attempt = self.attempt,
76 "Job completed successfully"
77 );
78 }
79
80 pub fn log_failure(&self, error: &str) {
82 let duration_ms = self.execution_duration_ms();
83 error!(
84 job_id = %self.job_id,
85 job_type = %self.job_type,
86 duration_ms = duration_ms,
87 attempt = self.attempt,
88 max_retries = self.max_retries,
89 error = error,
90 "Job execution failed"
91 );
92 }
93
94 pub fn log_retry(&self, retry_after_ms: u64, error: &str) {
96 warn!(
97 job_id = %self.job_id,
98 job_type = %self.job_type,
99 attempt = self.attempt,
100 max_retries = self.max_retries,
101 retry_after_ms = retry_after_ms,
102 error = error,
103 "Job failed, will retry"
104 );
105 }
106
107 pub fn log_cancellation(&self) {
109 let duration_ms = self.execution_duration_ms();
110 warn!(
111 job_id = %self.job_id,
112 job_type = %self.job_type,
113 duration_ms = duration_ms,
114 "Job cancelled"
115 );
116 }
117}
118
119pub struct JobQueueObserver;
121
122impl JobQueueObserver {
123 pub fn log_enqueued(job_id: JobId, job_type: &str, priority: i32) {
125 debug!(
126 job_id = %job_id,
127 job_type = job_type,
128 priority = priority,
129 "Job enqueued"
130 );
131 }
132
133 pub fn log_dequeued(job_id: JobId, job_type: &str, queue_time_ms: u64) {
135 debug!(
136 job_id = %job_id,
137 job_type = job_type,
138 queue_time_ms = queue_time_ms,
139 "Job dequeued for processing"
140 );
141 }
142
143 pub fn log_queue_full(job_id: JobId, job_type: &str, max_size: usize) {
145 warn!(
146 job_id = %job_id,
147 job_type = job_type,
148 max_queue_size = max_size,
149 "Job rejected: queue is full"
150 );
151 }
152
153 pub fn log_moved_to_dlq(
155 job_id: JobId,
156 job_type: &str,
157 attempts: u32,
158 final_error: &str,
159 ) {
160 error!(
161 job_id = %job_id,
162 job_type = job_type,
163 attempts = attempts,
164 error = final_error,
165 "Job moved to dead letter queue after exhausting retries"
166 );
167 }
168
169 pub fn log_status_change(job_id: JobId, job_type: &str, old_status: &JobStatus, new_status: &JobStatus) {
171 debug!(
172 job_id = %job_id,
173 job_type = job_type,
174 old_status = old_status.name(),
175 new_status = new_status.name(),
176 "Job status changed"
177 );
178 }
179}
180
181pub struct JobPerformanceRecorder {
183 pub job_type: String,
185}
186
187impl JobPerformanceRecorder {
188 #[must_use]
190 pub const fn new(job_type: String) -> Self {
191 Self { job_type }
192 }
193
194 pub fn record_execution_time(&self, job_id: JobId, duration_ms: u64, success: bool) {
196 debug!(
197 job_id = %job_id,
198 job_type = %self.job_type,
199 duration_ms = duration_ms,
200 success = success,
201 "Job execution completed"
202 );
203
204 }
207
208 pub fn record_queue_wait_time(&self, job_id: JobId, wait_time_ms: u64) {
210 debug!(
211 job_id = %job_id,
212 job_type = %self.job_type,
213 queue_wait_ms = wait_time_ms,
214 "Job queue wait time"
215 );
216 }
217
218 pub fn record_retry(&self, job_id: JobId, attempt: u32) {
220 debug!(
221 job_id = %job_id,
222 job_type = %self.job_type,
223 attempt = attempt,
224 "Job retry attempt"
225 );
226 }
227}
228
229#[cfg(feature = "otel-metrics")]
230mod otel {
231 use super::*;
232 use opentelemetry::metrics::{Counter, Histogram, Meter};
233 use std::sync::Arc;
234
235 pub struct JobMetricsCollector {
237 jobs_executed: Counter<u64>,
239 jobs_failed: Counter<u64>,
241 execution_duration: Histogram<u64>,
243 queue_wait_time: Histogram<u64>,
245 }
246
247 impl JobMetricsCollector {
248 pub fn new(meter: &Meter) -> Result<Arc<Self>, opentelemetry::metrics::MetricsError> {
254 Ok(Arc::new(Self {
255 jobs_executed: meter
256 .u64_counter("acton_htmx.jobs.executed")
257 .with_description("Number of jobs executed")
258 .build()?,
259 jobs_failed: meter
260 .u64_counter("acton_htmx.jobs.failed")
261 .with_description("Number of jobs that failed")
262 .build()?,
263 execution_duration: meter
264 .u64_histogram("acton_htmx.jobs.execution_duration_ms")
265 .with_description("Job execution duration in milliseconds")
266 .build()?,
267 queue_wait_time: meter
268 .u64_histogram("acton_htmx.jobs.queue_wait_time_ms")
269 .with_description("Time jobs spend waiting in queue (milliseconds)")
270 .build()?,
271 }))
272 }
273
274 pub fn record_execution(&self, job_type: &str, duration_ms: u64, success: bool) {
276 let attributes = &[
277 opentelemetry::KeyValue::new("job_type", job_type.to_string()),
278 opentelemetry::KeyValue::new("success", success),
279 ];
280
281 if success {
282 self.jobs_executed.add(1, attributes);
283 } else {
284 self.jobs_failed.add(1, attributes);
285 }
286
287 self.execution_duration.record(duration_ms, attributes);
288 }
289
290 pub fn record_queue_wait(&self, job_type: &str, wait_time_ms: u64) {
292 let attributes = &[opentelemetry::KeyValue::new("job_type", job_type.to_string())];
293 self.queue_wait_time.record(wait_time_ms, attributes);
294 }
295 }
296}
297
298#[cfg(feature = "otel-metrics")]
299pub use otel::JobMetricsCollector;
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn test_job_execution_context_creation() {
307 let ctx = JobExecutionContext::new(
308 JobId::new(),
309 "TestJob".to_string(),
310 5,
311 0,
312 3,
313 );
314
315 assert_eq!(ctx.job_type, "TestJob");
316 assert_eq!(ctx.priority, 5);
317 assert_eq!(ctx.attempt, 0);
318 assert_eq!(ctx.max_retries, 3);
319 }
320
321 #[test]
322 fn test_execution_duration() {
323 let ctx = JobExecutionContext::new(
324 JobId::new(),
325 "TestJob".to_string(),
326 0,
327 0,
328 3,
329 );
330
331 let duration = ctx.execution_duration_ms();
333 assert!(duration < 100);
334 }
335
336 #[test]
337 fn test_performance_recorder() {
338 let recorder = JobPerformanceRecorder::new("TestJob".to_string());
339 assert_eq!(recorder.job_type, "TestJob");
340
341 recorder.record_execution_time(JobId::new(), 100, true);
343 recorder.record_queue_wait_time(JobId::new(), 50);
344 recorder.record_retry(JobId::new(), 1);
345 }
346}