use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use futures::future::BoxFuture;
use http::StatusCode;
use once_cell::sync::Lazy;
use opentelemetry::sdk::Resource;
use opentelemetry::sdk::metrics::MeterProvider;
use opentelemetry::sdk::metrics::View;
use prometheus::Encoder;
use prometheus::Registry;
use prometheus::TextEncoder;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::BoxError;
use tower::ServiceExt;
use tower_service::Service;
use crate::ListenAddr;
use crate::plugins::telemetry::config::MetricView;
use crate::plugins::telemetry::config::MetricsCommon;
use crate::plugins::telemetry::metrics::CustomAggregationSelector;
use crate::plugins::telemetry::metrics::MetricsBuilder;
use crate::plugins::telemetry::metrics::MetricsConfigurator;
use crate::router_factory::Endpoint;
use crate::services::router;
use crate::services::router::Body;
#[derive(Debug, Clone, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Config {
pub(crate) enabled: bool,
pub(crate) listen: ListenAddr,
pub(crate) path: String,
}
impl Default for Config {
fn default() -> Self {
Self {
enabled: false,
listen: ListenAddr::SocketAddr("127.0.0.1:9090".parse().expect("valid listenAddr")),
path: "/metrics".to_string(),
}
}
}
static EXISTING_PROMETHEUS: Lazy<Mutex<Option<(PrometheusConfig, Registry)>>> =
Lazy::new(Default::default);
static NEW_PROMETHEUS: Lazy<Mutex<Option<(PrometheusConfig, Registry)>>> =
Lazy::new(Default::default);
#[derive(PartialEq, Clone)]
struct PrometheusConfig {
resource: Resource,
buckets: Vec<f64>,
views: Vec<MetricView>,
}
pub(crate) fn commit_prometheus() {
if let Some(prometheus) = NEW_PROMETHEUS.lock().expect("lock poisoned").take() {
tracing::debug!("committing prometheus registry");
EXISTING_PROMETHEUS
.lock()
.expect("lock poisoned")
.replace(prometheus);
}
}
impl MetricsConfigurator for Config {
fn enabled(&self) -> bool {
self.enabled
}
fn apply(
&self,
mut builder: MetricsBuilder,
metrics_config: &MetricsCommon,
) -> Result<MetricsBuilder, BoxError> {
let prometheus_config = PrometheusConfig {
resource: builder.resource.clone(),
buckets: metrics_config.buckets.clone(),
views: metrics_config.views.clone(),
};
#[cfg(not(test))]
if let Some((last_config, last_registry)) =
EXISTING_PROMETHEUS.lock().expect("lock poisoned").clone()
{
if prometheus_config == last_config {
tracing::debug!("prometheus registry can be reused");
builder.custom_endpoints.insert(
self.listen.clone(),
Endpoint::from_router_service(
self.path.clone(),
PrometheusService {
registry: last_registry.clone(),
}
.boxed(),
),
);
tracing::info!(
"Prometheus endpoint exposed at {}{}",
self.listen,
self.path
);
return Ok(builder);
} else {
tracing::debug!("prometheus registry cannot be reused");
}
}
let registry = prometheus::Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_aggregation_selector(
CustomAggregationSelector::builder()
.boundaries(metrics_config.buckets.clone())
.record_min_max(true)
.build(),
)
.with_registry(registry.clone())
.build()?;
let mut meter_provider_builder = MeterProvider::builder()
.with_reader(exporter)
.with_resource(builder.resource.clone());
for metric_view in metrics_config.views.clone() {
let view: Box<dyn View> = metric_view.try_into()?;
meter_provider_builder = meter_provider_builder.with_view(view);
}
let meter_provider = meter_provider_builder.build();
builder.custom_endpoints.insert(
self.listen.clone(),
Endpoint::from_router_service(
self.path.clone(),
PrometheusService {
registry: registry.clone(),
}
.boxed(),
),
);
builder.prometheus_meter_provider = Some(meter_provider.clone());
NEW_PROMETHEUS
.lock()
.expect("lock poisoned")
.replace((prometheus_config, registry));
tracing::info!(
"Prometheus endpoint exposed at {}{}",
self.listen,
self.path
);
Ok(builder)
}
}
#[derive(Clone)]
pub(crate) struct PrometheusService {
registry: Registry,
}
impl Service<router::Request> for PrometheusService {
type Response = router::Response;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: router::Request) -> Self::Future {
let metric_families = self.registry.gather();
Box::pin(async move {
let encoder = TextEncoder::new();
let mut result = Vec::new();
encoder.encode(&metric_families, &mut result)?;
let stats = String::from_utf8_lossy(&result);
let modified_stats = stats.replace("_total_total", "_total");
Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4")
.body::<Body>(modified_stats.into())
.map_err(BoxError::from)?,
context: req.context,
})
})
}
}