use super::{JobId, JobStatus};
use chrono::{DateTime, Utc};
use serde::Serialize;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize)]
pub struct JobExecutionContext {
pub job_id: JobId,
pub job_type: String,
pub started_at: DateTime<Utc>,
pub priority: i32,
pub attempt: u32,
pub max_retries: u32,
}
impl JobExecutionContext {
#[must_use]
pub fn new(
job_id: JobId,
job_type: String,
priority: i32,
attempt: u32,
max_retries: u32,
) -> Self {
Self {
job_id,
job_type,
started_at: Utc::now(),
priority,
attempt,
max_retries,
}
}
#[must_use]
pub fn execution_duration_ms(&self) -> u64 {
Utc::now()
.signed_duration_since(self.started_at)
.num_milliseconds()
.max(0)
.try_into()
.unwrap_or(0)
}
pub fn log_start(&self) {
info!(
job_id = %self.job_id,
job_type = %self.job_type,
priority = self.priority,
attempt = self.attempt,
max_retries = self.max_retries,
"Job execution started"
);
}
pub fn log_completion(&self) {
let duration_ms = self.execution_duration_ms();
info!(
job_id = %self.job_id,
job_type = %self.job_type,
duration_ms = duration_ms,
attempt = self.attempt,
"Job completed successfully"
);
}
pub fn log_failure(&self, error: &str) {
let duration_ms = self.execution_duration_ms();
error!(
job_id = %self.job_id,
job_type = %self.job_type,
duration_ms = duration_ms,
attempt = self.attempt,
max_retries = self.max_retries,
error = error,
"Job execution failed"
);
}
pub fn log_retry(&self, retry_after_ms: u64, error: &str) {
warn!(
job_id = %self.job_id,
job_type = %self.job_type,
attempt = self.attempt,
max_retries = self.max_retries,
retry_after_ms = retry_after_ms,
error = error,
"Job failed, will retry"
);
}
pub fn log_cancellation(&self) {
let duration_ms = self.execution_duration_ms();
warn!(
job_id = %self.job_id,
job_type = %self.job_type,
duration_ms = duration_ms,
"Job cancelled"
);
}
}
pub struct JobQueueObserver;
impl JobQueueObserver {
pub fn log_enqueued(job_id: JobId, job_type: &str, priority: i32) {
debug!(
job_id = %job_id,
job_type = job_type,
priority = priority,
"Job enqueued"
);
}
pub fn log_dequeued(job_id: JobId, job_type: &str, queue_time_ms: u64) {
debug!(
job_id = %job_id,
job_type = job_type,
queue_time_ms = queue_time_ms,
"Job dequeued for processing"
);
}
pub fn log_queue_full(job_id: JobId, job_type: &str, max_size: usize) {
warn!(
job_id = %job_id,
job_type = job_type,
max_queue_size = max_size,
"Job rejected: queue is full"
);
}
pub fn log_moved_to_dlq(
job_id: JobId,
job_type: &str,
attempts: u32,
final_error: &str,
) {
error!(
job_id = %job_id,
job_type = job_type,
attempts = attempts,
error = final_error,
"Job moved to dead letter queue after exhausting retries"
);
}
pub fn log_status_change(job_id: JobId, job_type: &str, old_status: &JobStatus, new_status: &JobStatus) {
debug!(
job_id = %job_id,
job_type = job_type,
old_status = old_status.name(),
new_status = new_status.name(),
"Job status changed"
);
}
}
pub struct JobPerformanceRecorder {
pub job_type: String,
}
impl JobPerformanceRecorder {
#[must_use]
pub const fn new(job_type: String) -> Self {
Self { job_type }
}
pub fn record_execution_time(&self, job_id: JobId, duration_ms: u64, success: bool) {
debug!(
job_id = %job_id,
job_type = %self.job_type,
duration_ms = duration_ms,
success = success,
"Job execution completed"
);
}
pub fn record_queue_wait_time(&self, job_id: JobId, wait_time_ms: u64) {
debug!(
job_id = %job_id,
job_type = %self.job_type,
queue_wait_ms = wait_time_ms,
"Job queue wait time"
);
}
pub fn record_retry(&self, job_id: JobId, attempt: u32) {
debug!(
job_id = %job_id,
job_type = %self.job_type,
attempt = attempt,
"Job retry attempt"
);
}
}
#[cfg(feature = "otel-metrics")]
mod otel {
use super::*;
use opentelemetry::metrics::{Counter, Histogram, Meter};
use std::sync::Arc;
pub struct JobMetricsCollector {
jobs_executed: Counter<u64>,
jobs_failed: Counter<u64>,
execution_duration: Histogram<u64>,
queue_wait_time: Histogram<u64>,
}
impl JobMetricsCollector {
pub fn new(meter: &Meter) -> Result<Arc<Self>, opentelemetry::metrics::MetricsError> {
Ok(Arc::new(Self {
jobs_executed: meter
.u64_counter("acton_htmx.jobs.executed")
.with_description("Number of jobs executed")
.build()?,
jobs_failed: meter
.u64_counter("acton_htmx.jobs.failed")
.with_description("Number of jobs that failed")
.build()?,
execution_duration: meter
.u64_histogram("acton_htmx.jobs.execution_duration_ms")
.with_description("Job execution duration in milliseconds")
.build()?,
queue_wait_time: meter
.u64_histogram("acton_htmx.jobs.queue_wait_time_ms")
.with_description("Time jobs spend waiting in queue (milliseconds)")
.build()?,
}))
}
pub fn record_execution(&self, job_type: &str, duration_ms: u64, success: bool) {
let attributes = &[
opentelemetry::KeyValue::new("job_type", job_type.to_string()),
opentelemetry::KeyValue::new("success", success),
];
if success {
self.jobs_executed.add(1, attributes);
} else {
self.jobs_failed.add(1, attributes);
}
self.execution_duration.record(duration_ms, attributes);
}
pub fn record_queue_wait(&self, job_type: &str, wait_time_ms: u64) {
let attributes = &[opentelemetry::KeyValue::new("job_type", job_type.to_string())];
self.queue_wait_time.record(wait_time_ms, attributes);
}
}
}
#[cfg(feature = "otel-metrics")]
pub use otel::JobMetricsCollector;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_execution_context_creation() {
let ctx = JobExecutionContext::new(
JobId::new(),
"TestJob".to_string(),
5,
0,
3,
);
assert_eq!(ctx.job_type, "TestJob");
assert_eq!(ctx.priority, 5);
assert_eq!(ctx.attempt, 0);
assert_eq!(ctx.max_retries, 3);
}
#[test]
fn test_execution_duration() {
let ctx = JobExecutionContext::new(
JobId::new(),
"TestJob".to_string(),
0,
0,
3,
);
let duration = ctx.execution_duration_ms();
assert!(duration < 100);
}
#[test]
fn test_performance_recorder() {
let recorder = JobPerformanceRecorder::new("TestJob".to_string());
assert_eq!(recorder.job_type, "TestJob");
recorder.record_execution_time(JobId::new(), 100, true);
recorder.record_queue_wait_time(JobId::new(), 50);
recorder.record_retry(JobId::new(), 1);
}
}