use std::{
borrow::Cow,
future::Future,
ops::Deref,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::{Duration, Instant},
};
use axum::{
body::HttpBody,
extract::{MatchedPath, State},
http::{header, StatusCode},
response::{IntoResponse, Response},
routing::{self, Router},
};
use hyper::{Method, Request};
use opentelemetry::{
metrics::{Counter, Gauge, Histogram, Meter, UpDownCounter},
KeyValue,
};
use opentelemetry_otlp::{
ExporterBuildError, MetricExporter as OtlpMetricExporter, WithExportConfig, WithTonicConfig,
};
use opentelemetry_prometheus_text_exporter::PrometheusExporter;
use opentelemetry_sdk::{
metrics::{SdkMeterProvider, Temporality},
Resource,
};
use opentelemetry_stdout::MetricExporter as StdoutMetricExporter;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use tower::{Layer, Service};
use tracing::{debug_span, trace, Instrument};
use url::Url;
use crate::{
crypto::TonicTlsConfig,
errors::{self, IoError},
layers::ext::HandlerName,
telemetry::OtlpProtocol,
};
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MetricsError {
#[error("OTel metrics setup error: {0}")]
OpenTelemetry(#[from] ExporterBuildError),
#[error("Error loading files in configuration: {0}")]
ConfigRead(IoError),
#[error("Error collecting Prometheus metrics: {0}")]
Prometheus(IoError),
#[error("Metrics subsystem is misconfigured")]
RuntimeConfig,
}
impl IntoResponse for MetricsError {
fn into_response(self) -> Response {
problemdetails::new(StatusCode::INTERNAL_SERVER_ERROR)
.with_type(errors::TAG_UXUM_METRICS)
.with_title(self.to_string())
.into_response()
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct MetricsBuilder {
#[serde(
default = "MetricsBuilder::default_exporters",
skip_serializing_if = "Vec::is_empty"
)]
exporters: Vec<MetricsExporterConfig>,
#[serde(default = "MetricsBuilder::default_duration_buckets")]
duration_buckets: Vec<f64>,
#[serde(default = "MetricsBuilder::default_size_buckets")]
size_buckets: Vec<f64>,
#[serde(
default = "MetricsBuilder::default_runtime_metrics_interval",
with = "humantime_serde"
)]
pub runtime_metrics_interval: Duration,
}
impl Default for MetricsBuilder {
fn default() -> Self {
Self {
exporters: Self::default_exporters(),
duration_buckets: Self::default_duration_buckets(),
size_buckets: Self::default_size_buckets(),
runtime_metrics_interval: Self::default_runtime_metrics_interval(),
}
}
}
impl From<MetricsExporterConfig> for MetricsBuilder {
fn from(value: MetricsExporterConfig) -> Self {
Self {
exporters: vec![value],
..Default::default()
}
}
}
impl MetricsBuilder {
#[must_use]
#[inline]
fn default_exporters() -> Vec<MetricsExporterConfig> {
vec![MetricsExporterConfig::default()]
}
#[must_use]
#[inline]
fn default_duration_buckets() -> Vec<f64> {
[
0.0_f64, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0,
2.5, 5.0, 7.5, 10.0, 30.0,
]
.into()
}
#[must_use]
#[inline]
fn default_size_buckets() -> Vec<f64> {
const KB: f64 = 1024.0;
const MB: f64 = 1024.0 * KB;
[
0.0_f64,
64.0,
128.0,
256.0,
512.0,
1.0 * KB,
2.0 * KB,
4.0 * KB,
8.0 * KB,
16.0 * KB,
32.0 * KB,
64.0 * KB,
128.0 * KB,
256.0 * KB,
512.0 * KB,
1.0 * MB,
2.0 * MB,
4.0 * MB,
8.0 * MB,
]
.into()
}
#[must_use]
#[inline]
fn default_runtime_metrics_interval() -> Duration {
Duration::from_secs(15)
}
#[must_use]
pub fn with_duration_buckets<B, I>(mut self, buckets: B) -> Self
where
B: IntoIterator<Item = I>,
I: Into<f64>,
{
self.duration_buckets = buckets.into_iter().map(Into::into).collect();
self
}
#[must_use]
pub fn with_size_buckets<B, I>(mut self, buckets: B) -> Self
where
B: IntoIterator<Item = I>,
I: Into<f64>,
{
self.size_buckets = buckets.into_iter().map(Into::into).collect();
self
}
#[must_use]
pub fn with_runtime_metrics_interval(mut self, interval: Duration) -> Self {
self.runtime_metrics_interval = interval;
self
}
pub async fn build_provider(
&self,
resource: Resource,
) -> Result<(SdkMeterProvider, Option<PrometheusExporter>), MetricsError> {
let span = debug_span!("build_tracing_provider");
async {
let mut prom = None;
let mut provider = SdkMeterProvider::builder().with_resource(resource);
for exp_cfg in &self.exporters {
match exp_cfg {
MetricsExporterConfig::Prometheus(cfg) => {
let exp = cfg.build_exporter();
prom = Some(exp.clone());
provider = provider.with_reader(exp);
}
MetricsExporterConfig::Otlp(cfg) => {
let exp = cfg.build_exporter().await?;
provider = provider.with_periodic_exporter(exp);
}
MetricsExporterConfig::Stdout(cfg) => {
let exp = cfg.build_exporter();
provider = provider.with_periodic_exporter(exp);
}
}
}
Ok((provider.build(), prom))
}
.instrument(span)
.await
}
pub fn build_state(&self, meter: &Meter, prom: Option<PrometheusExporter>) -> MetricsState {
let request_duration = meter
.f64_histogram("http.server.request.duration")
.with_unit("s")
.with_boundaries(self.duration_buckets.clone())
.with_description("The HTTP request latencies in seconds.")
.build();
let requests_total = meter
.u64_counter("http.server.requests")
.with_description(
"How many HTTP requests processed, partitioned by status code and HTTP method.",
)
.build();
let requests_active = meter
.i64_up_down_counter("http.server.active_requests")
.with_description("The number of active HTTP requests.")
.build();
let request_body_size = meter
.u64_histogram("http.server.request.body.size")
.with_unit("By")
.with_boundaries(self.size_buckets.clone())
.with_description("The HTTP request body sizes in bytes.")
.build();
let response_body_size = meter
.u64_histogram("http.server.response.body.size")
.with_unit("By")
.with_boundaries(self.size_buckets.clone())
.with_description("The HTTP reponse body sizes in bytes.")
.build();
let http_server = HttpServerMetrics {
request_duration,
requests_total,
requests_active,
request_body_size,
response_body_size,
};
let request_duration = meter
.f64_histogram("http.client.request.duration")
.with_unit("s")
.with_boundaries(self.duration_buckets.clone())
.with_description("The HTTP request latencies in seconds.")
.build();
let requests_total = meter
.u64_counter("http.client.requests")
.with_description(
"How many HTTP requests processed, partitioned by status code and HTTP method.",
)
.build();
let requests_rejected = meter
.u64_counter("http.client.rejected_requests")
.with_description("Rejected requests due to open circuit breaker.")
.build();
let requests_errored = meter
.u64_counter("http.client.errored_requests")
.with_description("Other errors when trying to send a request.")
.build();
let requests_active = meter
.i64_up_down_counter("http.client.active_requests")
.with_description("The number of active HTTP requests.")
.build();
let request_body_size = meter
.u64_histogram("http.client.request.body.size")
.with_unit("By")
.with_boundaries(self.size_buckets.clone())
.with_description("The HTTP request body sizes in bytes.")
.build();
let response_body_size = meter
.u64_histogram("http.client.response.body.size")
.with_unit("By")
.with_boundaries(self.size_buckets.clone())
.with_description("The HTTP reponse body sizes in bytes.")
.build();
let http_client = HttpClientMetricsInner {
request_duration,
requests_total,
requests_rejected,
requests_errored,
requests_active,
request_body_size,
response_body_size,
};
let http_client = HttpClientMetrics(Arc::new(http_client));
let num_workers = meter
.u64_gauge("runtime.workers")
.with_description("Number of worker threads used by the runtime.")
.build();
let num_alive_tasks = meter
.u64_gauge("runtime.alive_tasks")
.with_description("Current number of alive tasks in the runtime.")
.build();
let global_queue_depth = meter
.u64_gauge("runtime.global_queue_depth")
.with_description("Number of tasks currently scheduled in the runtime’s global queue.")
.build();
let runtime = RuntimeMetrics {
num_workers,
num_alive_tasks,
global_queue_depth,
};
MetricsState {
http_server,
http_client,
runtime,
prom,
}
}
pub fn build_router(&self, metrics_state: &MetricsState) -> Router {
let _span = debug_span!("build_metrics_router").entered();
let mut rtr = Router::new();
for exp_cfg in &self.exporters {
if let MetricsExporterConfig::Prometheus(cfg) = exp_cfg {
rtr = rtr.route(&cfg.path, routing::get(get_prom_metrics))
}
}
rtr.with_state(metrics_state.clone())
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
#[serde(tag = "type", rename_all = "snake_case")]
enum MetricsExporterConfig {
#[serde(alias = "prom")]
Prometheus(PrometheusMetricsExporterConfig),
Otlp(OtlpMetricsExporterConfig),
Stdout(StdoutMetricsExporterConfig),
}
impl Default for MetricsExporterConfig {
fn default() -> Self {
Self::Prometheus(Default::default())
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
struct PrometheusMetricsExporterConfig {
#[serde(default = "PrometheusMetricsExporterConfig::default_path")]
path: String,
#[serde(default = "crate::util::default_true")]
with_units: bool,
#[serde(default = "crate::util::default_true")]
with_counter_suffixes: bool,
#[serde(default = "crate::util::default_true")]
with_target_info: bool,
#[serde(default = "crate::util::default_true")]
with_scope_info: bool,
}
impl Default for PrometheusMetricsExporterConfig {
fn default() -> Self {
Self {
path: Self::default_path(),
with_units: true,
with_counter_suffixes: true,
with_target_info: true,
with_scope_info: true,
}
}
}
impl PrometheusMetricsExporterConfig {
#[must_use]
#[inline]
fn default_path() -> String {
String::from("/metrics")
}
#[must_use]
fn build_exporter(&self) -> PrometheusExporter {
let mut builder = PrometheusExporter::builder();
if !self.with_units {
builder = builder.without_units();
}
if !self.with_counter_suffixes {
builder = builder.without_counter_suffixes();
}
if !self.with_target_info {
builder = builder.without_target_info();
}
if !self.with_scope_info {
builder = builder.without_scope_info();
}
builder.build()
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
struct OtlpMetricsExporterConfig {
#[serde(default = "OtlpMetricsExporterConfig::default_endpoint")]
endpoint: Url,
#[serde(default, alias = "format")]
protocol: OtlpProtocol,
#[serde(
default = "OtlpMetricsExporterConfig::default_timeout",
with = "humantime_serde"
)]
timeout: Duration,
#[serde(default)]
temporality: MetricsTemporality,
#[serde(default)]
tls: TonicTlsConfig,
}
impl Default for OtlpMetricsExporterConfig {
fn default() -> Self {
Self {
endpoint: Self::default_endpoint(),
protocol: OtlpProtocol::default(),
timeout: Self::default_timeout(),
temporality: MetricsTemporality::default(),
tls: TonicTlsConfig::default(),
}
}
}
impl OtlpMetricsExporterConfig {
#[must_use]
#[inline]
#[allow(clippy::unwrap_used)]
fn default_endpoint() -> Url {
Url::parse("http://localhost:9090/api/v1/otlp/v1/metrics").unwrap()
}
#[must_use]
#[inline]
fn default_timeout() -> Duration {
opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT
}
async fn build_exporter(&self) -> Result<OtlpMetricExporter, MetricsError> {
OtlpMetricExporter::builder()
.with_tonic()
.with_endpoint(self.endpoint.to_string())
.with_protocol(self.protocol.into())
.with_timeout(self.timeout)
.with_tls_config(
self.tls
.to_tonic_config()
.await
.map_err(|err| MetricsError::ConfigRead(err.into()))?,
)
.with_temporality(self.temporality.into())
.build()
.map_err(Into::into)
}
}
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
struct StdoutMetricsExporterConfig {
#[serde(default)]
temporality: MetricsTemporality,
}
impl StdoutMetricsExporterConfig {
fn build_exporter(&self) -> StdoutMetricExporter {
StdoutMetricExporter::builder()
.with_temporality(self.temporality.into())
.build()
}
}
#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Eq, Hash, Serialize)]
#[non_exhaustive]
#[serde(rename_all = "snake_case")]
pub enum MetricsTemporality {
#[default]
Cumulative,
Delta,
LowMemory,
}
impl From<MetricsTemporality> for Temporality {
fn from(value: MetricsTemporality) -> Self {
match value {
MetricsTemporality::Cumulative => Self::Cumulative,
MetricsTemporality::Delta => Self::Delta,
MetricsTemporality::LowMemory => Self::LowMemory,
}
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct MetricsState {
http_server: HttpServerMetrics,
http_client: HttpClientMetrics,
runtime: RuntimeMetrics,
prom: Option<PrometheusExporter>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub(crate) struct HttpServerMetrics {
request_duration: Histogram<f64>,
requests_total: Counter<u64>,
requests_active: UpDownCounter<i64>,
request_body_size: Histogram<u64>,
response_body_size: Histogram<u64>,
}
#[derive(Clone, Debug)]
#[repr(transparent)]
pub struct HttpClientMetrics(Arc<HttpClientMetricsInner>);
impl Deref for HttpClientMetrics {
type Target = HttpClientMetricsInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct HttpClientMetricsInner {
pub request_duration: Histogram<f64>,
pub requests_total: Counter<u64>,
pub requests_rejected: Counter<u64>,
pub requests_errored: Counter<u64>,
pub requests_active: UpDownCounter<i64>,
pub request_body_size: Histogram<u64>,
pub response_body_size: Histogram<u64>,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub(crate) struct RuntimeMetrics {
num_workers: Gauge<u64>,
num_alive_tasks: Gauge<u64>,
global_queue_depth: Gauge<u64>,
}
impl<S> Layer<S> for MetricsState {
type Service = HttpMetrics<S>;
fn layer(&self, inner: S) -> Self::Service {
HttpMetrics {
state: self.clone(),
inner,
}
}
}
impl MetricsState {
#[must_use]
pub fn client_metrics(&self, name: impl AsRef<str>) -> ClientMetricsState {
ClientMetricsState {
name: name.as_ref().to_string(),
metrics: self.http_client.clone(),
}
}
pub fn export_text(&self) -> Result<Option<Vec<u8>>, std::io::Error> {
if let Some(exporter) = &self.prom {
let mut buf = Vec::with_capacity(256);
exporter.export(&mut buf)?;
Ok(Some(buf))
} else {
Ok(None)
}
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ClientMetricsState {
name: String,
metrics: HttpClientMetrics,
}
impl ClientMetricsState {
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn metrics(&self) -> &HttpClientMetrics {
&self.metrics
}
}
#[derive(Clone)]
#[non_exhaustive]
pub struct HttpMetrics<S> {
state: MetricsState,
inner: S,
}
impl<S, T, U> Service<Request<T>> for HttpMetrics<S>
where
S: Service<Request<T>, Response = Response<U>>,
T: HttpBody,
U: HttpBody,
{
type Response = S::Response;
type Error = S::Error;
type Future = HttpMetricsFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<T>) -> Self::Future {
let start = Instant::now();
let ext = req.extensions();
let method = req.method().clone();
let scheme = match req.uri().scheme() {
Some(sch) => sch.to_string(),
None => String::new(),
};
let path = ext.get::<MatchedPath>().cloned();
let request_size = req.size_hint().upper().unwrap_or_default();
self.state.http_server.requests_active.add(
1,
&[
KeyValue::new("http.request.method", method.to_string()),
KeyValue::new("url.scheme", scheme.clone()),
],
);
HttpMetricsFuture {
inner: self.inner.call(req),
state: self.state.clone(),
start,
method,
scheme,
path,
request_size,
}
}
}
#[pin_project]
#[non_exhaustive]
pub struct HttpMetricsFuture<F> {
#[pin]
inner: F,
state: MetricsState,
start: Instant,
method: Method,
scheme: String,
path: Option<MatchedPath>,
request_size: u64,
}
impl<F, U, E> Future for HttpMetricsFuture<F>
where
F: Future<Output = Result<Response<U>, E>>,
U: HttpBody,
{
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let resp_result = ready!(this.inner.poll(cx));
let kv_method = KeyValue::new("http.request.method", this.method.to_string());
let kv_scheme = KeyValue::new("url.scheme", this.scheme.clone());
this.state
.http_server
.requests_active
.add(-1, &[kv_method.clone(), kv_scheme.clone()]);
let resp = resp_result?;
let handler = resp.extensions().get::<HandlerName>();
let duration = this.start.elapsed().as_secs_f64();
let status = resp.status().as_str().to_owned();
let response_size = resp.size_hint().upper().unwrap_or(0);
let labels = [
kv_method,
kv_scheme,
KeyValue::new("http.response.status_code", status),
KeyValue::new(
"http.route",
this.path.as_ref().map_or(Cow::Borrowed(""), |path| {
Cow::Owned(path.as_str().to_owned())
}),
),
KeyValue::new("uxum.handler", handler.map_or("", |hdl| hdl.as_str())),
];
this.state.http_server.requests_total.add(1, &labels);
this.state
.http_server
.request_duration
.record(duration, &labels);
this.state
.http_server
.request_body_size
.record(*this.request_size, &labels);
this.state
.http_server
.response_body_size
.record(response_size, &labels);
trace!("metrics recorded");
Poll::Ready(Ok(resp))
}
}
pub(crate) async fn gather_runtime_metrics(
metrics: MetricsState,
period: Duration,
cancel: CancellationToken,
) {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
let rt_metrics = tokio::runtime::Handle::current().metrics();
metrics
.runtime
.num_workers
.record(rt_metrics.num_workers() as u64, &[]);
metrics
.runtime
.num_alive_tasks
.record(rt_metrics.num_alive_tasks() as u64, &[]);
metrics
.runtime
.global_queue_depth
.record(rt_metrics.global_queue_depth() as u64, &[]);
}
}
}
}
async fn get_prom_metrics(metrics: State<MetricsState>) -> Result<impl IntoResponse, MetricsError> {
let buf = match metrics.export_text() {
Ok(Some(buf)) => buf,
Ok(None) => return Err(MetricsError::RuntimeConfig),
Err(err) => return Err(MetricsError::Prometheus(err.into())),
};
Ok(([(header::CONTENT_TYPE, "text/plain; version=0.0.4")], buf))
}