use std::sync::Arc;
use axum::{
Router,
extract::State,
http::{StatusCode, header},
response::IntoResponse,
routing::get,
};
use prometheus::{Encoder, Registry, TextEncoder};
use solti_model::{
AdmissionPolicy, BackoffPolicy, JitterPolicy, RestartPolicy, TaskKind, TaskSpec,
};
use taskvisor::{TaskError, TaskFn, TaskRef};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
pub const METRICS_SERVER_SLOT: &str = "solti-metrics-server";
const METRICS_SERVER_TIMEOUT_MS: u64 = u64::MAX;
const METRICS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
const BACKOFF_FIRST_MS: u64 = 1_000;
const BACKOFF_MAX_MS: u64 = 30_000;
const BACKOFF_FACTOR: f64 = 2.0;
pub fn server(registry: Arc<Registry>, addr: impl Into<String>) -> (TaskRef, TaskSpec) {
let addr: String = addr.into();
let task: TaskRef = TaskFn::arc(METRICS_SERVER_SLOT, move |ctx: CancellationToken| {
let addr = addr.clone();
let registry = registry.clone();
async move {
if ctx.is_cancelled() {
return Err(TaskError::Canceled);
}
let app = Router::new()
.route("/metrics", get(metrics_handler))
.with_state(registry);
let listener =
tokio::net::TcpListener::bind(&addr)
.await
.map_err(|e| TaskError::Fail {
reason: format!("metrics listener bind failed on {addr}: {e}"),
exit_code: None,
})?;
debug!(addr = %addr, "metrics server started");
info!("metrics http://{addr}/metrics");
let shutdown_ctx = ctx.clone();
let serve_result = axum::serve(listener, app)
.with_graceful_shutdown(async move { shutdown_ctx.cancelled().await })
.await;
if ctx.is_cancelled() {
debug!("metrics server canceled");
return Err(TaskError::Canceled);
}
Err(TaskError::Fail {
reason: match serve_result {
Ok(()) => "metrics server exited unexpectedly".to_string(),
Err(e) => format!("metrics server error: {e}"),
},
exit_code: None,
})
}
});
let backoff = BackoffPolicy {
jitter: JitterPolicy::Equal,
first_ms: BACKOFF_FIRST_MS,
max_ms: BACKOFF_MAX_MS,
factor: BACKOFF_FACTOR,
};
let spec = TaskSpec::builder(
METRICS_SERVER_SLOT,
TaskKind::Embedded,
METRICS_SERVER_TIMEOUT_MS,
)
.restart(RestartPolicy::always())
.backoff(backoff)
.admission(AdmissionPolicy::Replace)
.build()
.expect("metrics server spec must be valid");
(task, spec)
}
async fn metrics_handler(State(registry): State<Arc<Registry>>) -> axum::response::Response {
let encoder = TextEncoder::new();
let mut buf = Vec::with_capacity(4096);
match encoder.encode(®istry.gather(), &mut buf) {
Ok(()) => ([(header::CONTENT_TYPE, METRICS_CONTENT_TYPE)], buf).into_response(),
Err(e) => {
error!("metrics encode error: {e}");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}