use async_trait::async_trait;
use prometheus::{
Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder,
};
use std::sync::Arc;
use std::time::Instant;
use super::middleware::{JobMiddleware, Next};
use super::{WorkerContext, WorkerResult};
use crate::core::Job;
use crate::error::{QmlError, Result};
pub const DEFAULT_JOB_DURATION_BUCKETS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0,
];
pub struct PrometheusMetrics {
registry: Registry,
jobs_enqueued: IntCounterVec,
jobs_processed: IntCounterVec,
job_duration: HistogramVec,
workers_active: IntGaugeVec,
}
impl std::fmt::Debug for PrometheusMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrometheusMetrics")
.field("families", &self.registry.gather().len())
.finish()
}
}
impl PrometheusMetrics {
pub fn new() -> Result<Arc<Self>> {
Self::with_duration_buckets(DEFAULT_JOB_DURATION_BUCKETS.to_vec())
}
pub fn with_duration_buckets(buckets: Vec<f64>) -> Result<Arc<Self>> {
let registry = Registry::new();
let jobs_enqueued = IntCounterVec::new(
Opts::new(
"qml_jobs_enqueued_total",
"Total number of jobs enqueued, labeled by queue name.",
),
&["queue"],
)
.map_err(map_prom_error)?;
let jobs_processed = IntCounterVec::new(
Opts::new(
"qml_jobs_processed_total",
"Total number of jobs that reached a terminal execution \
outcome, labeled by queue and result state \
(succeeded / retry / failed / error).",
),
&["queue", "state"],
)
.map_err(map_prom_error)?;
let job_duration = HistogramVec::new(
HistogramOpts::new(
"qml_job_duration_seconds",
"Wall-clock duration of job execution through the middleware \
stack, labeled by method name.",
)
.buckets(buckets),
&["method"],
)
.map_err(map_prom_error)?;
let workers_active = IntGaugeVec::new(
Opts::new(
"qml_workers_active",
"Number of worker threads currently executing a job, \
labeled by server name.",
),
&["server"],
)
.map_err(map_prom_error)?;
registry
.register(Box::new(jobs_enqueued.clone()))
.map_err(map_prom_error)?;
registry
.register(Box::new(jobs_processed.clone()))
.map_err(map_prom_error)?;
registry
.register(Box::new(job_duration.clone()))
.map_err(map_prom_error)?;
registry
.register(Box::new(workers_active.clone()))
.map_err(map_prom_error)?;
Ok(Arc::new(Self {
registry,
jobs_enqueued,
jobs_processed,
job_duration,
workers_active,
}))
}
pub fn encode_text(&self) -> Result<String> {
let metric_families = self.registry.gather();
let encoder = TextEncoder::new();
let mut buf = Vec::new();
encoder
.encode(&metric_families, &mut buf)
.map_err(map_prom_error)?;
String::from_utf8(buf).map_err(|e| QmlError::SerializationError {
message: format!("prometheus text encoder produced invalid UTF-8: {}", e),
})
}
pub fn record_enqueued(&self, queue: &str) {
self.jobs_enqueued.with_label_values(&[queue]).inc();
}
pub fn registry(&self) -> &Registry {
&self.registry
}
}
fn map_prom_error(e: prometheus::Error) -> QmlError {
QmlError::ConfigurationError {
message: format!("prometheus: {}", e),
}
}
pub struct PrometheusMiddleware {
metrics: Arc<PrometheusMetrics>,
server: String,
}
impl PrometheusMiddleware {
pub fn new(metrics: Arc<PrometheusMetrics>, server: impl Into<String>) -> Self {
Self {
metrics,
server: server.into(),
}
}
}
#[async_trait]
impl JobMiddleware for PrometheusMiddleware {
async fn call<'a>(
&'a self,
job: &'a Job,
ctx: &'a WorkerContext,
next: Next<'a>,
) -> Result<WorkerResult> {
let active = self
.metrics
.workers_active
.with_label_values(&[&self.server]);
active.inc();
let start = Instant::now();
let result = next.run(job, ctx).await;
let elapsed = start.elapsed().as_secs_f64();
self.metrics
.job_duration
.with_label_values(&[&job.method])
.observe(elapsed);
let state = match &result {
Ok(WorkerResult::Success { .. }) => "succeeded",
Ok(WorkerResult::Retry { .. }) => "retry",
Ok(WorkerResult::Failure { .. }) => "failed",
Err(_) => "error",
};
self.metrics
.jobs_processed
.with_label_values(&[job.queue.as_str(), state])
.inc();
active.dec();
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processing::middleware::run_stack;
use crate::processing::{Worker, WorkerConfig};
use async_trait::async_trait;
struct OkWorker;
#[async_trait]
impl Worker for OkWorker {
async fn execute(&self, _job: &Job, _ctx: &WorkerContext) -> Result<WorkerResult> {
Ok(WorkerResult::success(None, 0))
}
fn method_name(&self) -> &str {
"ok"
}
}
struct RetryWorker;
#[async_trait]
impl Worker for RetryWorker {
async fn execute(&self, _job: &Job, _ctx: &WorkerContext) -> Result<WorkerResult> {
Ok(WorkerResult::retry("nope".to_string(), None))
}
fn method_name(&self) -> &str {
"retry"
}
}
struct BadWorker;
#[async_trait]
impl Worker for BadWorker {
async fn execute(&self, _job: &Job, _ctx: &WorkerContext) -> Result<WorkerResult> {
Ok(WorkerResult::failure("boom".to_string()))
}
fn method_name(&self) -> &str {
"bad"
}
}
struct ErroringWorker;
#[async_trait]
impl Worker for ErroringWorker {
async fn execute(&self, _job: &Job, _ctx: &WorkerContext) -> Result<WorkerResult> {
Err(QmlError::WorkerError {
message: "exploded".to_string(),
})
}
fn method_name(&self) -> &str {
"err"
}
}
fn job_for(method: &str, queue: &str) -> Job {
let mut job = Job::new(method, serde_json::Value::Null);
job.queue = queue.to_string();
job
}
fn ctx() -> WorkerContext {
WorkerContext::new(WorkerConfig::new("test-worker"))
}
#[tokio::test]
async fn new_registers_all_four_metric_families() {
let metrics = PrometheusMetrics::new().unwrap();
metrics.record_enqueued("probe");
metrics
.jobs_processed
.with_label_values(&["probe", "succeeded"])
.inc();
metrics
.job_duration
.with_label_values(&["probe"])
.observe(0.1);
metrics.workers_active.with_label_values(&["probe"]).set(0);
let text = metrics.encode_text().unwrap();
assert!(text.contains("qml_jobs_enqueued_total"));
assert!(text.contains("qml_jobs_processed_total"));
assert!(text.contains("qml_job_duration_seconds"));
assert!(text.contains("qml_workers_active"));
}
#[tokio::test]
async fn record_enqueued_increments_the_counter() {
let metrics = PrometheusMetrics::new().unwrap();
metrics.record_enqueued("default");
metrics.record_enqueued("default");
metrics.record_enqueued("critical");
let default_count = metrics.jobs_enqueued.with_label_values(&["default"]).get();
let critical_count = metrics.jobs_enqueued.with_label_values(&["critical"]).get();
assert_eq!(default_count, 2);
assert_eq!(critical_count, 1);
}
#[tokio::test]
async fn middleware_labels_processed_by_result_state() {
let metrics = PrometheusMetrics::new().unwrap();
let mw: Arc<dyn JobMiddleware> =
Arc::new(PrometheusMiddleware::new(metrics.clone(), "srv"));
let stack = vec![mw];
let ctx = ctx();
run_stack(&stack, &OkWorker, &job_for("ok", "q"), &ctx)
.await
.unwrap();
run_stack(&stack, &RetryWorker, &job_for("retry", "q"), &ctx)
.await
.unwrap();
run_stack(&stack, &BadWorker, &job_for("bad", "q"), &ctx)
.await
.unwrap();
let err_result = run_stack(&stack, &ErroringWorker, &job_for("err", "q"), &ctx).await;
assert!(err_result.is_err());
let processed = &metrics.jobs_processed;
assert_eq!(processed.with_label_values(&["q", "succeeded"]).get(), 1);
assert_eq!(processed.with_label_values(&["q", "retry"]).get(), 1);
assert_eq!(processed.with_label_values(&["q", "failed"]).get(), 1);
assert_eq!(processed.with_label_values(&["q", "error"]).get(), 1);
}
#[tokio::test]
async fn middleware_records_duration_histogram_per_method() {
let metrics = PrometheusMetrics::new().unwrap();
let mw: Arc<dyn JobMiddleware> =
Arc::new(PrometheusMiddleware::new(metrics.clone(), "srv"));
let stack = vec![mw];
let ctx = ctx();
run_stack(&stack, &OkWorker, &job_for("ok", "q"), &ctx)
.await
.unwrap();
run_stack(&stack, &OkWorker, &job_for("ok", "q"), &ctx)
.await
.unwrap();
let hist = metrics.job_duration.with_label_values(&["ok"]);
assert_eq!(hist.get_sample_count(), 2);
assert!(hist.get_sample_sum() >= 0.0);
}
#[tokio::test]
async fn workers_active_gauge_returns_to_zero_after_execution() {
let metrics = PrometheusMetrics::new().unwrap();
let mw: Arc<dyn JobMiddleware> =
Arc::new(PrometheusMiddleware::new(metrics.clone(), "srv"));
let stack = vec![mw];
let ctx = ctx();
run_stack(&stack, &OkWorker, &job_for("ok", "q"), &ctx)
.await
.unwrap();
assert_eq!(metrics.workers_active.with_label_values(&["srv"]).get(), 0);
}
#[tokio::test]
async fn encode_text_contains_data_points_after_collection() {
let metrics = PrometheusMetrics::new().unwrap();
metrics.record_enqueued("default");
let mw: Arc<dyn JobMiddleware> =
Arc::new(PrometheusMiddleware::new(metrics.clone(), "srv"));
let stack = vec![mw];
run_stack(&stack, &OkWorker, &job_for("ok", "default"), &ctx())
.await
.unwrap();
let text = metrics.encode_text().unwrap();
assert!(text.contains("qml_jobs_enqueued_total{queue=\"default\"} 1"));
assert!(text.contains("qml_jobs_processed_total{queue=\"default\",state=\"succeeded\"} 1"));
assert!(text.contains("qml_job_duration_seconds_count{method=\"ok\"} 1"));
}
}