use std::task::Context;
use std::task::Poll;
use futures::future::BoxFuture;
use http::StatusCode;
use opentelemetry_prometheus::ResourceSelector;
use prometheus::Encoder;
use prometheus::Registry;
use prometheus::TextEncoder;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::BoxError;
use tower_service::Service;
use crate::ListenAddr;
use crate::metrics::aggregation::MeterProviderType;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::metrics::OverflowMetricExporter;
use crate::plugins::telemetry::reload::metrics::MetricsBuilder;
use crate::plugins::telemetry::reload::metrics::MetricsConfigurator;
use crate::services::router;
#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
#[schemars(rename = "PrometheusMetricsConfig")]
pub(crate) struct Config {
pub(crate) enabled: bool,
pub(crate) resource_selector: ResourceSelectorConfig,
pub(crate) listen: ListenAddr,
pub(crate) path: String,
}
#[derive(Debug, Clone, Copy, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ResourceSelectorConfig {
All,
#[default]
None,
}
impl From<ResourceSelectorConfig> for ResourceSelector {
fn from(value: ResourceSelectorConfig) -> Self {
match value {
ResourceSelectorConfig::All => ResourceSelector::All,
ResourceSelectorConfig::None => ResourceSelector::None,
}
}
}
impl Default for Config {
fn default() -> Self {
Self {
enabled: false,
resource_selector: ResourceSelectorConfig::default(),
listen: ListenAddr::SocketAddr("127.0.0.1:9090".parse().expect("valid listenAddr")),
path: "/metrics".to_string(),
}
}
}
impl MetricsConfigurator for Config {
fn config(conf: &Conf) -> &Self {
&conf.exporters.metrics.prometheus
}
fn is_enabled(&self) -> bool {
self.enabled
}
fn configure(&self, builder: &mut MetricsBuilder) -> Result<(), BoxError> {
let registry = Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_resource_selector(self.resource_selector)
.with_registry(registry.clone())
.build()?;
let reader = OverflowMetricExporter::new_pull(exporter);
builder.with_reader(MeterProviderType::Public, reader);
builder.with_prometheus_registry(registry);
Ok(())
}
}
pub(crate) struct PrometheusService {
pub(crate) registry: Registry,
}
impl Service<router::Request> for PrometheusService {
type Response = router::Response;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: router::Request) -> Self::Future {
let metric_families = self.registry.gather();
Box::pin(async move {
let encoder = TextEncoder::new();
let mut result = Vec::new();
encoder.encode(&metric_families, &mut result)?;
let stats = String::from_utf8_lossy(&result);
let modified_stats = stats.replace("_total_total", "_total");
router::Response::http_response_builder()
.response(
http::Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4")
.body(router::body::from_bytes(modified_stats))
.map_err(BoxError::from)?,
)
.context(req.context)
.build()
})
}
}