acton_htmx/jobs/
observability.rs

1// ! Job observability with OpenTelemetry and structured logging.
2
3use super::{JobId, JobStatus};
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6use tracing::{debug, error, info, warn};
7
8/// Job execution context for logging and tracing.
9#[derive(Debug, Clone, Serialize)]
10pub struct JobExecutionContext {
11    /// Job ID.
12    pub job_id: JobId,
13    /// Job type name.
14    pub job_type: String,
15    /// When the job started executing.
16    pub started_at: DateTime<Utc>,
17    /// Job priority.
18    pub priority: i32,
19    /// Current attempt number (0-based).
20    pub attempt: u32,
21    /// Maximum retries allowed.
22    pub max_retries: u32,
23}
24
25impl JobExecutionContext {
26    /// Create a new job execution context.
27    #[must_use]
28    pub fn new(
29        job_id: JobId,
30        job_type: String,
31        priority: i32,
32        attempt: u32,
33        max_retries: u32,
34    ) -> Self {
35        Self {
36            job_id,
37            job_type,
38            started_at: Utc::now(),
39            priority,
40            attempt,
41            max_retries,
42        }
43    }
44
45    /// Calculate execution duration from start time.
46    #[must_use]
47    pub fn execution_duration_ms(&self) -> u64 {
48        Utc::now()
49            .signed_duration_since(self.started_at)
50            .num_milliseconds()
51            .max(0)
52            .try_into()
53            .unwrap_or(0)
54    }
55
56    /// Log job start.
57    pub fn log_start(&self) {
58        info!(
59            job_id = %self.job_id,
60            job_type = %self.job_type,
61            priority = self.priority,
62            attempt = self.attempt,
63            max_retries = self.max_retries,
64            "Job execution started"
65        );
66    }
67
68    /// Log job completion.
69    pub fn log_completion(&self) {
70        let duration_ms = self.execution_duration_ms();
71        info!(
72            job_id = %self.job_id,
73            job_type = %self.job_type,
74            duration_ms = duration_ms,
75            attempt = self.attempt,
76            "Job completed successfully"
77        );
78    }
79
80    /// Log job failure.
81    pub fn log_failure(&self, error: &str) {
82        let duration_ms = self.execution_duration_ms();
83        error!(
84            job_id = %self.job_id,
85            job_type = %self.job_type,
86            duration_ms = duration_ms,
87            attempt = self.attempt,
88            max_retries = self.max_retries,
89            error = error,
90            "Job execution failed"
91        );
92    }
93
94    /// Log job retry.
95    pub fn log_retry(&self, retry_after_ms: u64, error: &str) {
96        warn!(
97            job_id = %self.job_id,
98            job_type = %self.job_type,
99            attempt = self.attempt,
100            max_retries = self.max_retries,
101            retry_after_ms = retry_after_ms,
102            error = error,
103            "Job failed, will retry"
104        );
105    }
106
107    /// Log job cancellation.
108    pub fn log_cancellation(&self) {
109        let duration_ms = self.execution_duration_ms();
110        warn!(
111            job_id = %self.job_id,
112            job_type = %self.job_type,
113            duration_ms = duration_ms,
114            "Job cancelled"
115        );
116    }
117}
118
119/// Job queue observability metrics logger.
120pub struct JobQueueObserver;
121
122impl JobQueueObserver {
123    /// Log job enqueued.
124    pub fn log_enqueued(job_id: JobId, job_type: &str, priority: i32) {
125        debug!(
126            job_id = %job_id,
127            job_type = job_type,
128            priority = priority,
129            "Job enqueued"
130        );
131    }
132
133    /// Log job dequeued for processing.
134    pub fn log_dequeued(job_id: JobId, job_type: &str, queue_time_ms: u64) {
135        debug!(
136            job_id = %job_id,
137            job_type = job_type,
138            queue_time_ms = queue_time_ms,
139            "Job dequeued for processing"
140        );
141    }
142
143    /// Log queue full rejection.
144    pub fn log_queue_full(job_id: JobId, job_type: &str, max_size: usize) {
145        warn!(
146            job_id = %job_id,
147            job_type = job_type,
148            max_queue_size = max_size,
149            "Job rejected: queue is full"
150        );
151    }
152
153    /// Log dead letter queue movement.
154    pub fn log_moved_to_dlq(
155        job_id: JobId,
156        job_type: &str,
157        attempts: u32,
158        final_error: &str,
159    ) {
160        error!(
161            job_id = %job_id,
162            job_type = job_type,
163            attempts = attempts,
164            error = final_error,
165            "Job moved to dead letter queue after exhausting retries"
166        );
167    }
168
169    /// Log job status change.
170    pub fn log_status_change(job_id: JobId, job_type: &str, old_status: &JobStatus, new_status: &JobStatus) {
171        debug!(
172            job_id = %job_id,
173            job_type = job_type,
174            old_status = old_status.name(),
175            new_status = new_status.name(),
176            "Job status changed"
177        );
178    }
179}
180
181/// Performance metrics recorder for jobs.
182pub struct JobPerformanceRecorder {
183    /// Job type for aggregation.
184    pub job_type: String,
185}
186
187impl JobPerformanceRecorder {
188    /// Create a new performance recorder for a job type.
189    #[must_use]
190    pub const fn new(job_type: String) -> Self {
191        Self { job_type }
192    }
193
194    /// Record execution time for a job.
195    pub fn record_execution_time(&self, job_id: JobId, duration_ms: u64, success: bool) {
196        debug!(
197            job_id = %job_id,
198            job_type = %self.job_type,
199            duration_ms = duration_ms,
200            success = success,
201            "Job execution completed"
202        );
203
204        // In production, this would emit metrics to OpenTelemetry/Prometheus
205        // For now, we just log it
206    }
207
208    /// Record queue wait time (time from enqueue to start).
209    pub fn record_queue_wait_time(&self, job_id: JobId, wait_time_ms: u64) {
210        debug!(
211            job_id = %job_id,
212            job_type = %self.job_type,
213            queue_wait_ms = wait_time_ms,
214            "Job queue wait time"
215        );
216    }
217
218    /// Record retry attempt.
219    pub fn record_retry(&self, job_id: JobId, attempt: u32) {
220        debug!(
221            job_id = %job_id,
222            job_type = %self.job_type,
223            attempt = attempt,
224            "Job retry attempt"
225        );
226    }
227}
228
229#[cfg(feature = "otel-metrics")]
230mod otel {
231    use super::*;
232    use opentelemetry::metrics::{Counter, Histogram, Meter};
233    use std::sync::Arc;
234
235    /// OpenTelemetry metrics for job processing.
236    pub struct JobMetricsCollector {
237        /// Job execution counter.
238        jobs_executed: Counter<u64>,
239        /// Job failure counter.
240        jobs_failed: Counter<u64>,
241        /// Job execution duration histogram.
242        execution_duration: Histogram<u64>,
243        /// Queue wait time histogram.
244        queue_wait_time: Histogram<u64>,
245    }
246
247    impl JobMetricsCollector {
248        /// Create a new metrics collector.
249        ///
250        /// # Errors
251        ///
252        /// Returns error if metric creation fails.
253        pub fn new(meter: &Meter) -> Result<Arc<Self>, opentelemetry::metrics::MetricsError> {
254            Ok(Arc::new(Self {
255                jobs_executed: meter
256                    .u64_counter("acton_htmx.jobs.executed")
257                    .with_description("Number of jobs executed")
258                    .build()?,
259                jobs_failed: meter
260                    .u64_counter("acton_htmx.jobs.failed")
261                    .with_description("Number of jobs that failed")
262                    .build()?,
263                execution_duration: meter
264                    .u64_histogram("acton_htmx.jobs.execution_duration_ms")
265                    .with_description("Job execution duration in milliseconds")
266                    .build()?,
267                queue_wait_time: meter
268                    .u64_histogram("acton_htmx.jobs.queue_wait_time_ms")
269                    .with_description("Time jobs spend waiting in queue (milliseconds)")
270                    .build()?,
271            }))
272        }
273
274        /// Record a job execution.
275        pub fn record_execution(&self, job_type: &str, duration_ms: u64, success: bool) {
276            let attributes = &[
277                opentelemetry::KeyValue::new("job_type", job_type.to_string()),
278                opentelemetry::KeyValue::new("success", success),
279            ];
280
281            if success {
282                self.jobs_executed.add(1, attributes);
283            } else {
284                self.jobs_failed.add(1, attributes);
285            }
286
287            self.execution_duration.record(duration_ms, attributes);
288        }
289
290        /// Record queue wait time.
291        pub fn record_queue_wait(&self, job_type: &str, wait_time_ms: u64) {
292            let attributes = &[opentelemetry::KeyValue::new("job_type", job_type.to_string())];
293            self.queue_wait_time.record(wait_time_ms, attributes);
294        }
295    }
296}
297
298#[cfg(feature = "otel-metrics")]
299pub use otel::JobMetricsCollector;
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    #[test]
306    fn test_job_execution_context_creation() {
307        let ctx = JobExecutionContext::new(
308            JobId::new(),
309            "TestJob".to_string(),
310            5,
311            0,
312            3,
313        );
314
315        assert_eq!(ctx.job_type, "TestJob");
316        assert_eq!(ctx.priority, 5);
317        assert_eq!(ctx.attempt, 0);
318        assert_eq!(ctx.max_retries, 3);
319    }
320
321    #[test]
322    fn test_execution_duration() {
323        let ctx = JobExecutionContext::new(
324            JobId::new(),
325            "TestJob".to_string(),
326            0,
327            0,
328            3,
329        );
330
331        // Duration should be very small (< 100ms)
332        let duration = ctx.execution_duration_ms();
333        assert!(duration < 100);
334    }
335
336    #[test]
337    fn test_performance_recorder() {
338        let recorder = JobPerformanceRecorder::new("TestJob".to_string());
339        assert_eq!(recorder.job_type, "TestJob");
340
341        // These should not panic
342        recorder.record_execution_time(JobId::new(), 100, true);
343        recorder.record_queue_wait_time(JobId::new(), 50);
344        recorder.record_retry(JobId::new(), 1);
345    }
346}