use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::Instant;
use axum::Router;
use axum::extract::{MatchedPath, Request, State};
use axum::http::{Method, StatusCode};
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use axum::routing::get;
use prometheus::{
Encoder, HistogramVec, IntCounterVec, IntGauge, Registry, TextEncoder,
histogram_opts, opts, register_gauge_with_registry,
register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_with_registry,
};
const DURATION_BUCKETS: &[f64] = &[
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];
#[derive(Clone)]
pub struct Metrics {
registry: Registry,
requests_total: IntCounterVec,
request_duration: HistogramVec,
in_flight: IntGauge,
storage_blobs: IntGauge,
}
impl std::fmt::Debug for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Metrics").finish_non_exhaustive()
}
}
impl Metrics {
#[must_use]
pub fn new() -> Self {
let registry = Registry::new();
let requests_total = register_int_counter_vec_with_registry!(
opts!(
"ferrooci_http_requests_total",
"Total HTTP requests, labelled by method, matched handler, and status code."
),
&["method", "handler", "status"],
registry
)
.expect("register ferrooci_http_requests_total");
let request_duration = register_histogram_vec_with_registry!(
histogram_opts!(
"ferrooci_http_request_duration_seconds",
"HTTP request latency in seconds, by method and matched handler.",
DURATION_BUCKETS.to_vec()
),
&["method", "handler"],
registry
)
.expect("register ferrooci_http_request_duration_seconds");
let in_flight = register_int_gauge_with_registry!(
opts!(
"ferrooci_uploads_in_flight",
"HTTP requests currently being served (in-flight)."
),
registry
)
.expect("register ferrooci_uploads_in_flight");
let storage_blobs = register_int_gauge_with_registry!(
opts!(
"ferrooci_storage_blobs",
"Distinct blobs currently held in the blob store."
),
registry
)
.expect("register ferrooci_storage_blobs");
let _storage_bytes = register_gauge_with_registry!(
opts!(
"ferrooci_storage_bytes",
"Best-effort blob-store size in bytes; 0 when the backend cannot report sizes cheaply."
),
registry
)
.expect("register ferrooci_storage_bytes");
let build_info = register_gauge_with_registry!(
opts!(
"ferrooci_build_info",
"Build information; constant 1 carrying the version label."
)
.const_label("version", env!("CARGO_PKG_VERSION")),
registry
)
.expect("register ferrooci_build_info");
build_info.set(1.0);
Self {
registry,
requests_total,
request_duration,
in_flight,
storage_blobs,
}
}
fn handler_for(matched: Option<&str>, path: &str) -> &'static str {
match matched {
Some("/v2/" | "/v2") => "version",
Some("/v2/_catalog") => "catalog",
Some("/live") => "live",
Some("/ready") => "ready",
Some("/healthz") => "healthz",
Some("/metrics") => "metrics",
_ => {
if path.ends_with("/tags/list") {
"tags"
} else if path.contains("/referrers/") {
"referrers"
} else if path.contains("/manifests/") {
"manifests"
} else if path.contains("/blobs/uploads") {
"uploads"
} else if path.contains("/blobs/") {
"blobs"
} else {
"other"
}
}
}
}
pub fn refresh_storage(&self, blob_count: i64) {
self.storage_blobs.set(blob_count);
}
fn encode(&self) -> String {
let mut buf = Vec::new();
let encoder = TextEncoder::new();
if encoder.encode(&self.registry.gather(), &mut buf).is_ok() {
String::from_utf8(buf).unwrap_or_default()
} else {
String::new()
}
}
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct MetricsState {
pub metrics: Metrics,
pub blob_count: Arc<AtomicI64>,
}
pub async fn track_metrics(
State(state): State<MetricsState>,
request: Request,
next: Next,
) -> Response {
let method = request.method().clone();
let matched = request
.extensions()
.get::<MatchedPath>()
.map(|m| m.as_str().to_owned());
let path = request.uri().path().to_owned();
let handler = Metrics::handler_for(matched.as_deref(), &path);
let m = &state.metrics;
m.in_flight.inc();
let started = Instant::now();
let response = next.run(request).await;
let elapsed = started.elapsed().as_secs_f64();
m.in_flight.dec();
let method_str = method_label(&method);
let status = response.status().as_u16().to_string();
m.requests_total
.with_label_values(&[method_str, handler, &status])
.inc();
m.request_duration
.with_label_values(&[method_str, handler])
.observe(elapsed);
response
}
const fn method_label(method: &Method) -> &'static str {
match *method {
Method::GET => "GET",
Method::HEAD => "HEAD",
Method::POST => "POST",
Method::PUT => "PUT",
Method::PATCH => "PATCH",
Method::DELETE => "DELETE",
Method::OPTIONS => "OPTIONS",
_ => "OTHER",
}
}
async fn metrics_handler(State(state): State<MetricsState>) -> Response {
state
.metrics
.refresh_storage(state.blob_count.load(Ordering::Relaxed));
let body = state.metrics.encode();
(
StatusCode::OK,
[(
axum::http::header::CONTENT_TYPE,
"text/plain; version=0.0.4; charset=utf-8",
)],
body,
)
.into_response()
}
pub fn metrics_routes(state: MetricsState) -> Router {
Router::new()
.route("/metrics", get(metrics_handler))
.with_state(state)
}
pub fn instrument(app: Router, metrics: Metrics, blob_count: Arc<AtomicI64>) -> Router {
let state = MetricsState {
metrics,
blob_count,
};
app.merge(metrics_routes(state.clone()))
.layer(axum::middleware::from_fn_with_state(state, track_metrics))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn handler_for_classifies_known_routes() {
assert_eq!(Metrics::handler_for(Some("/v2/"), "/v2/"), "version");
assert_eq!(
Metrics::handler_for(Some("/v2/_catalog"), "/v2/_catalog"),
"catalog"
);
assert_eq!(Metrics::handler_for(Some("/healthz"), "/healthz"), "healthz");
}
#[test]
fn handler_for_classifies_wildcard_tails() {
let m = Some("/v2/{*rest}");
assert_eq!(Metrics::handler_for(m, "/v2/alpine/tags/list"), "tags");
assert_eq!(
Metrics::handler_for(m, "/v2/alpine/manifests/latest"),
"manifests"
);
assert_eq!(
Metrics::handler_for(m, "/v2/alpine/blobs/uploads/abc"),
"uploads"
);
assert_eq!(
Metrics::handler_for(m, "/v2/alpine/blobs/sha256:deadbeef"),
"blobs"
);
assert_eq!(
Metrics::handler_for(m, "/v2/alpine/referrers/sha256:abcd"),
"referrers"
);
}
#[test]
fn handler_for_unmatched_is_bounded_to_other() {
assert_eq!(Metrics::handler_for(None, "/random/garbage/path"), "other");
}
#[test]
fn handler_for_probe_routes_are_labelled() {
assert_eq!(Metrics::handler_for(Some("/live"), "/live"), "live");
assert_eq!(Metrics::handler_for(Some("/ready"), "/ready"), "ready");
}
#[test]
fn method_label_maps_every_known_method() {
use axum::http::Method;
assert_eq!(method_label(&Method::GET), "GET");
assert_eq!(method_label(&Method::HEAD), "HEAD");
assert_eq!(method_label(&Method::POST), "POST");
assert_eq!(method_label(&Method::PUT), "PUT");
assert_eq!(method_label(&Method::PATCH), "PATCH");
assert_eq!(method_label(&Method::DELETE), "DELETE");
assert_eq!(method_label(&Method::OPTIONS), "OPTIONS");
assert_eq!(method_label(&Method::TRACE), "OTHER");
}
#[test]
fn metrics_debug_impl_names_the_struct() {
let m = Metrics::new();
let rendered = format!("{m:?}");
assert!(
rendered.contains("Metrics"),
"Debug output must name the struct, got: {rendered:?}"
);
}
#[test]
fn encode_emits_prometheus_text_format() {
let m = Metrics::new();
m.requests_total
.with_label_values(&["GET", "version", "200"])
.inc();
m.request_duration
.with_label_values(&["GET", "version"])
.observe(0.01);
let text = m.encode();
assert!(text.contains("# HELP ferrooci_http_requests_total"));
assert!(text.contains("# TYPE ferrooci_http_requests_total counter"));
assert!(text.contains("ferrooci_build_info"));
assert!(text.contains("ferrooci_http_request_duration_seconds"));
assert!(text.contains(r#"handler="version""#));
}
}