use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use daemon::Status;
use opentelemetry::global;
use opentelemetry::metrics::ObservableGauge;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::{Aggregation, PeriodicReader, SdkMeterProvider, Stream};
use prometheus::Registry;
use crate::telemetry::{OtlpProtocol, OtlpSignal, otlp_protocol};
const OTLP_PUSH_INTERVAL: Duration = Duration::from_secs(10);
const FLUSH_BUCKETS_SECONDS: &[f64] = &[
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];
pub(crate) struct Metrics {
provider: SdkMeterProvider,
pub registry: Option<Registry>,
}
impl std::fmt::Debug for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Metrics")
.field("prometheus", &self.registry.is_some())
.finish_non_exhaustive()
}
}
impl Metrics {
pub(crate) fn shutdown(&self) {
if let Err(error) = self.provider.shutdown() {
tracing::warn!(%error, "failed to shut down meter provider");
}
}
}
pub(crate) fn init(prometheus: bool) -> anyhow::Result<Metrics> {
let flush_view = |instrument: &opentelemetry_sdk::metrics::Instrument| {
(instrument.name() == "flusso.flush.duration")
.then(|| {
Stream::builder()
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: FLUSH_BUCKETS_SECONDS.to_vec(),
record_min_max: true,
})
.build()
.ok()
})
.flatten()
};
let mut builder = SdkMeterProvider::builder()
.with_resource(resource())
.with_view(flush_view);
let registry = if prometheus {
let registry = Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.without_scope_info()
.build()
.context("building the Prometheus metrics exporter")?;
builder = builder.with_reader(exporter);
Some(registry)
} else {
None
};
if otlp_configured() {
match otlp_reader() {
Ok(reader) => {
builder = builder.with_reader(reader);
tracing::info!("OTLP metric export enabled");
}
Err(error) => {
tracing::warn!(error = format!("{error:#}"), "OTLP metric export disabled");
}
}
}
let provider = builder.build();
global::set_meter_provider(provider.clone());
Ok(Metrics { provider, registry })
}
pub(crate) fn register_in_flight_gauge(status: Arc<Status>) -> ObservableGauge<u64> {
global::meter("flusso")
.u64_observable_gauge("flusso.changes.in_flight")
.with_description("Captured but not yet committed changes (back-pressure)")
.with_callback(move |observer| observer.observe(status.in_flight(), &[]))
.build()
}
pub(crate) fn otlp_configured() -> bool {
std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some()
|| std::env::var_os("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT").is_some()
}
fn otlp_reader() -> anyhow::Result<PeriodicReader<opentelemetry_otlp::MetricExporter>> {
let builder = opentelemetry_otlp::MetricExporter::builder();
let exporter = match otlp_protocol(OtlpSignal::Metrics) {
OtlpProtocol::HttpProtobuf => builder.with_http().build(),
OtlpProtocol::Grpc => builder.with_tonic().build(),
}
.context("building the OTLP metric exporter")?;
Ok(PeriodicReader::builder(exporter)
.with_interval(OTLP_PUSH_INTERVAL)
.build())
}
fn resource() -> Resource {
Resource::builder().with_service_name("flusso").build()
}