use metrics::Unit;
use opentelemetry::{global, metrics as otel_metrics};
use opentelemetry_otlp::{MetricExporter, Protocol, WithExportConfig};
use opentelemetry_sdk::metrics::{
Aggregation, Instrument, InstrumentKind, PeriodicReader, SdkMeterProvider, Stream, Temporality,
};
use std::time::Duration;
use crate::metrics::defs::{MetricStability, to_ucum};
fn get_temporality_from_env() -> Temporality {
let prefer_delta = std::env::var("EXPERIMENTAL_MOUNTPOINT_OTLP_METRICS_TEMPORALITY_PREFERENCE")
.map(|v| v.to_lowercase() != "cumulative")
.unwrap_or(true);
if prefer_delta {
Temporality::Delta
} else {
Temporality::Cumulative
}
}
#[derive(Debug, Clone)]
pub struct OtlpConfig {
pub endpoint: String,
pub interval_secs: u64,
}
impl OtlpConfig {
pub fn new(endpoint: &str) -> Self {
Self {
endpoint: endpoint.to_string(),
interval_secs: 60,
}
}
pub fn with_interval_secs(mut self, secs: u64) -> Self {
self.interval_secs = secs;
self
}
}
#[derive(Debug)]
pub struct OtlpMetricsExporter {
meter: otel_metrics::Meter,
}
impl OtlpMetricsExporter {
#[cfg(test)]
pub fn new_for_test(meter: otel_metrics::Meter) -> Self {
Self { meter }
}
pub fn new(config: &OtlpConfig) -> Result<Self, Box<dyn std::error::Error>> {
let endpoint_url = if !config.endpoint.ends_with("/v1/metrics") {
if config.endpoint.ends_with('/') {
format!("{}v1/metrics", config.endpoint)
} else {
format!("{}/v1/metrics", config.endpoint)
}
} else {
config.endpoint.to_string()
};
let exporter = MetricExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint(&endpoint_url)
.with_temporality(get_temporality_from_env())
.build()?;
let resource = opentelemetry_sdk::resource::Resource::builder_empty().build();
let meter_provider = SdkMeterProvider::builder()
.with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(config.interval_secs))
.build(),
)
.with_resource(resource)
.with_view(|instrument: &Instrument| {
if matches!(instrument.kind(), InstrumentKind::Histogram) {
Some(
Stream::builder()
.with_aggregation(Aggregation::Base2ExponentialHistogram {
max_size: 160,
max_scale: 20,
record_min_max: true,
})
.build()
.unwrap(),
)
} else {
None
}
})
.build();
global::set_meter_provider(meter_provider);
let meter = global::meter("mountpoint-s3");
Ok(Self { meter })
}
fn otlp_metric_name(&self, name: &str, stability: MetricStability) -> String {
match stability {
MetricStability::Experimental => format!("experimental.{name}"),
_ => name.to_string(),
}
}
pub fn create_counter_instrument(
&self,
name: &str,
unit: Unit,
stability: MetricStability,
) -> otel_metrics::Counter<u64> {
let metric_name = self.otlp_metric_name(name, stability);
self.meter.u64_counter(metric_name).with_unit(to_ucum(unit)).build()
}
pub fn create_gauge_instrument(
&self,
name: &str,
unit: Unit,
stability: MetricStability,
) -> otel_metrics::Gauge<f64> {
let metric_name = self.otlp_metric_name(name, stability);
self.meter.f64_gauge(metric_name).with_unit(to_ucum(unit)).build()
}
pub fn create_histogram_instrument(
&self,
name: &str,
unit: Unit,
stability: MetricStability,
) -> otel_metrics::Histogram<f64> {
let metric_name = self.otlp_metric_name(name, stability);
self.meter.f64_histogram(metric_name).with_unit(to_ucum(unit)).build()
}
}
impl TryFrom<&OtlpConfig> for OtlpMetricsExporter {
type Error = Box<dyn std::error::Error>;
fn try_from(config: &OtlpConfig) -> Result<Self, Self::Error> {
OtlpMetricsExporter::new(config)
}
}