use std::sync::Arc;
use std::time::Duration;
use axum::http::header::CONTENT_TYPE;
use axum::http::{HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use prometheus::{
Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder,
};
use thiserror::Error;
const TEXT_FORMAT: &str = "text/plain; version=0.0.4; charset=utf-8";
#[derive(Debug, Error)]
pub enum MetricsError {
#[error("failed to register prometheus metric: {0}")]
Register(#[from] prometheus::Error),
#[error("failed to encode prometheus metrics: {0}")]
Encode(String),
}
#[derive(Clone, Debug)]
pub struct Metrics {
inner: Arc<MetricsInner>,
}
#[derive(Debug)]
struct MetricsInner {
registry: Registry,
workflows_started: IntCounterVec,
workflows_completed: IntCounterVec,
activities_dispatched: IntCounterVec,
activities_completed: IntCounterVec,
activity_duration: HistogramVec,
store_operation_duration: HistogramVec,
connected_workers: IntGaugeVec,
inflight_activities: IntGaugeVec,
signals_delivered: IntCounterVec,
schedules_fired: IntCounterVec,
deploy_operations: IntCounterVec,
deploy_denied: IntCounterVec,
loaded_workflow_versions: IntGaugeVec,
}
impl MetricsInner {
fn register_collectors(&self) -> Result<(), prometheus::Error> {
self.registry
.register(Box::new(self.workflows_started.clone()))?;
self.registry
.register(Box::new(self.workflows_completed.clone()))?;
self.registry
.register(Box::new(self.activities_dispatched.clone()))?;
self.registry
.register(Box::new(self.activities_completed.clone()))?;
self.registry
.register(Box::new(self.activity_duration.clone()))?;
self.registry
.register(Box::new(self.store_operation_duration.clone()))?;
self.registry
.register(Box::new(self.connected_workers.clone()))?;
self.registry
.register(Box::new(self.inflight_activities.clone()))?;
self.registry
.register(Box::new(self.signals_delivered.clone()))?;
self.registry
.register(Box::new(self.schedules_fired.clone()))?;
self.registry
.register(Box::new(self.deploy_operations.clone()))?;
self.registry
.register(Box::new(self.deploy_denied.clone()))?;
self.registry
.register(Box::new(self.loaded_workflow_versions.clone()))?;
Ok(())
}
}
impl Metrics {
pub fn new() -> Result<Self, MetricsError> {
let inner = build_metrics_inner()?;
inner.register_collectors()?;
initialize_default_label_sets(&inner);
Ok(Self {
inner: Arc::new(inner),
})
}
pub fn encode(&self) -> Result<Vec<u8>, MetricsError> {
let encoder = TextEncoder::new();
let families = self.inner.registry.gather();
let mut buffer = Vec::new();
encoder
.encode(&families, &mut buffer)
.map_err(|error| MetricsError::Encode(error.to_string()))?;
Ok(buffer)
}
pub fn workflow_started(&self, namespace: &str, workflow_type: &str) {
self.inner
.workflows_started
.with_label_values(&[namespace, workflow_type])
.inc();
}
pub fn workflow_completed(&self, namespace: &str, status: &str) {
self.inner
.workflows_completed
.with_label_values(&[namespace, status])
.inc();
}
pub fn activity_dispatched(&self, namespace: &str, activity_type: &str) {
self.inner
.activities_dispatched
.with_label_values(&[namespace, activity_type])
.inc();
self.inner
.inflight_activities
.with_label_values(&[namespace])
.inc();
}
pub fn activity_completed(
&self,
namespace: &str,
activity_type: &str,
outcome: &str,
duration: Duration,
) {
self.inner
.activities_completed
.with_label_values(&[namespace, outcome])
.inc();
self.inner
.activity_duration
.with_label_values(&[namespace, activity_type])
.observe(duration.as_secs_f64());
self.inner
.inflight_activities
.with_label_values(&[namespace])
.dec();
}
pub fn activity_abandoned(&self, namespace: &str) {
self.inner
.inflight_activities
.with_label_values(&[namespace])
.dec();
}
pub fn store_operation(&self, operation: &str, duration: Duration) {
self.inner
.store_operation_duration
.with_label_values(&[operation])
.observe(duration.as_secs_f64());
}
pub fn worker_connected(&self, namespace: &str) {
self.inner
.connected_workers
.with_label_values(&[namespace])
.inc();
}
pub fn worker_disconnected(&self, namespace: &str) {
self.inner
.connected_workers
.with_label_values(&[namespace])
.dec();
}
pub fn signal_delivered(&self, namespace: &str, residency: &str) {
self.inner
.signals_delivered
.with_label_values(&[namespace, residency])
.inc();
}
pub fn schedule_fired(&self, namespace: &str) {
self.inner
.schedules_fired
.with_label_values(&[namespace])
.inc();
}
pub fn deploy_operation(&self, operation: &str, outcome: &str) {
self.inner
.deploy_operations
.with_label_values(&[operation, outcome])
.inc();
}
pub fn deploy_denied(&self, transport: &str) {
self.inner
.deploy_denied
.with_label_values(&[transport])
.inc();
}
pub fn set_loaded_workflow_versions(&self, workflow_type: &str, count: i64) {
self.inner
.loaded_workflow_versions
.with_label_values(&[workflow_type])
.set(count);
}
}
fn build_metrics_inner() -> Result<MetricsInner, MetricsError> {
let registry = Registry::new();
let workflows_started = IntCounterVec::new(
Opts::new(
"aion_workflows_started_total",
"Total workflow executions started by namespace and workflow type.",
),
&["namespace", "workflow_type"],
)?;
let workflows_completed = IntCounterVec::new(
Opts::new(
"aion_workflows_completed_total",
"Total workflow executions that reached a terminal status by namespace and status.",
),
&["namespace", "status"],
)?;
let activities_dispatched = IntCounterVec::new(
Opts::new(
"aion_activities_dispatched_total",
"Total activities dispatched to workers by namespace and activity type.",
),
&["namespace", "activity_type"],
)?;
let activities_completed = IntCounterVec::new(
Opts::new(
"aion_activities_completed_total",
"Total activity results received by namespace and outcome.",
),
&["namespace", "outcome"],
)?;
let activity_duration = HistogramVec::new(
HistogramOpts::new(
"aion_activity_duration_seconds",
"Wall-clock activity execution latency from dispatch to result by namespace and activity type.",
)
.buckets(vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0,
]),
&["namespace", "activity_type"],
)?;
let store_operation_duration = HistogramVec::new(
HistogramOpts::new(
"aion_store_operation_duration_seconds",
"Store operation latency by operation.",
)
.buckets(vec![
0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
]),
&["operation"],
)?;
let connected_workers = IntGaugeVec::new(
Opts::new(
"aion_connected_workers",
"Current connected worker streams by namespace.",
),
&["namespace"],
)?;
let inflight_activities = IntGaugeVec::new(
Opts::new(
"aion_inflight_activities",
"Current dispatched activities awaiting worker completion by namespace.",
),
&["namespace"],
)?;
let signals_delivered = IntCounterVec::new(
Opts::new(
"aion_signals_delivered_total",
"Total signals delivered by namespace and residency classification.",
),
&["namespace", "residency"],
)?;
let schedules_fired = IntCounterVec::new(
Opts::new(
"aion_schedules_fired_total",
"Total schedule timer evaluations that started a workflow by namespace.",
),
&["namespace"],
)?;
let (deploy_operations, deploy_denied, loaded_workflow_versions) = build_deploy_metrics()?;
Ok(MetricsInner {
registry,
workflows_started,
workflows_completed,
activities_dispatched,
activities_completed,
activity_duration,
store_operation_duration,
connected_workers,
inflight_activities,
signals_delivered,
schedules_fired,
deploy_operations,
deploy_denied,
loaded_workflow_versions,
})
}
fn build_deploy_metrics() -> Result<(IntCounterVec, IntCounterVec, IntGaugeVec), MetricsError> {
let deploy_operations = IntCounterVec::new(
Opts::new(
"aion_deploy_operations_total",
"Total deploy API mutations by operation and outcome class.",
),
&["operation", "outcome"],
)?;
let deploy_denied = IntCounterVec::new(
Opts::new(
"aion_deploy_denied_total",
"Total deploy API authorization denials by transport.",
),
&["transport"],
)?;
let loaded_workflow_versions = IntGaugeVec::new(
Opts::new(
"aion_loaded_workflow_versions",
"Currently loaded package versions per workflow type.",
),
&["workflow_type"],
)?;
Ok((deploy_operations, deploy_denied, loaded_workflow_versions))
}
fn initialize_default_label_sets(inner: &MetricsInner) {
for operation in ["append", "read_history", "list_active", "list_workflow_ids"] {
inner
.store_operation_duration
.with_label_values(&[operation]);
}
inner
.activity_duration
.with_label_values(&["default", "default"]);
}
pub async fn metrics_handler(
axum::extract::State(metrics): axum::extract::State<Metrics>,
) -> Response {
match metrics.encode() {
Ok(body) => {
let mut response = body.into_response();
response
.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static(TEXT_FORMAT));
response
}
Err(error) => (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()).into_response(),
}
}