use std::fmt::Debug;
use std::sync::Weak;
use std::time::Duration;
use opentelemetry::Value;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::metrics::InstrumentKind;
use opentelemetry_sdk::metrics::Pipeline;
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
use opentelemetry_sdk::metrics::data::Metric;
use opentelemetry_sdk::metrics::data::MetricData;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
use opentelemetry_sdk::metrics::reader::MetricReader;
const OTEL_METRIC_OVERFLOW_KEY: &str = "otel.metric.overflow";
const CARDINALITY_OVERFLOW_METRIC: &str = "apollo.router.telemetry.metrics.cardinality_overflow";
pub(crate) struct OverflowMetricExporter<T> {
inner: T,
}
impl<T: Clone> Clone for OverflowMetricExporter<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> OverflowMetricExporter<T> {
pub(crate) fn new_push(inner: T) -> Self {
Self { inner }
}
pub(crate) fn new_pull(inner: T) -> Self {
Self { inner }
}
}
impl<T: Debug> Debug for OverflowMetricExporter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OverflowMetricExporter")
.field("inner", &self.inner)
.finish()
}
}
impl<T: PushMetricExporter> PushMetricExporter for OverflowMetricExporter<T> {
fn export(
&self,
metrics: &ResourceMetrics,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
report_cardinality_overflow(metrics);
self.inner.export(metrics)
}
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
fn temporality(&self) -> Temporality {
self.inner.temporality()
}
}
impl<T: MetricReader> MetricReader for OverflowMetricExporter<T> {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.inner.register_pipeline(pipeline)
}
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
let result = self.inner.collect(rm);
if result.is_ok() {
report_cardinality_overflow(rm);
}
result
}
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}
fn shutdown(&self) -> OTelSdkResult {
self.inner.shutdown()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.inner.temporality(kind)
}
}
fn report_cardinality_overflow(metrics: &ResourceMetrics) {
for scope_metrics in metrics.scope_metrics() {
for metric in scope_metrics.metrics() {
if metric.name() == CARDINALITY_OVERFLOW_METRIC {
continue;
}
if has_overflow_data_point(metric) {
u64_counter_with_unit!(
"apollo.router.telemetry.metrics.cardinality_overflow",
"Counts metrics that have exceeded their cardinality limit",
"count",
1,
[opentelemetry::KeyValue::new(
"metric.name",
metric.name().to_string(),
)]
);
}
}
}
}
fn has_overflow_data_point(metric: &Metric) -> bool {
match metric.data() {
AggregatedMetrics::F64(data) => has_overflow_in_metric_data(data),
AggregatedMetrics::U64(data) => has_overflow_in_metric_data(data),
AggregatedMetrics::I64(data) => has_overflow_in_metric_data(data),
}
}
fn has_overflow_in_metric_data<T>(data: &MetricData<T>) -> bool {
match data {
MetricData::Gauge(gauge) => gauge
.data_points()
.any(|dp| has_overflow_attribute(dp.attributes())),
MetricData::Sum(sum) => sum
.data_points()
.any(|dp| has_overflow_attribute(dp.attributes())),
MetricData::Histogram(hist) => hist
.data_points()
.any(|dp| has_overflow_attribute(dp.attributes())),
MetricData::ExponentialHistogram(exp_hist) => exp_hist
.data_points()
.any(|dp| has_overflow_attribute(dp.attributes())),
}
}
fn has_overflow_attribute<'a>(attrs: impl Iterator<Item = &'a opentelemetry::KeyValue>) -> bool {
attrs
.into_iter()
.any(|kv| kv.key.as_str() == OTEL_METRIC_OVERFLOW_KEY && kv.value == Value::Bool(true))
}
#[cfg(test)]
mod tests {
use opentelemetry::KeyValue;
use opentelemetry::Value;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::metrics::Stream;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
use opentelemetry_sdk::metrics::reader::MetricReader;
use super::*;
use crate::metrics::FutureMetricsExt;
use crate::metrics::test_utils::ClonableManualReader;
#[test]
fn detects_overflow_attribute() {
let attrs = [
KeyValue::new("http.method", "GET"),
KeyValue::new(OTEL_METRIC_OVERFLOW_KEY, true),
];
assert!(has_overflow_attribute(attrs.iter()));
}
#[test]
fn no_overflow_when_attribute_missing() {
let attrs = [
KeyValue::new("http.method", "GET"),
KeyValue::new("http.status_code", 200),
];
assert!(!has_overflow_attribute(attrs.iter()));
}
#[test]
fn no_overflow_when_attribute_is_false() {
let attrs = [KeyValue::new(OTEL_METRIC_OVERFLOW_KEY, false)];
assert!(!has_overflow_attribute(attrs.iter()));
}
#[test]
fn no_overflow_when_attribute_is_wrong_type() {
let attrs = [KeyValue::new(
OTEL_METRIC_OVERFLOW_KEY,
Value::String("true".into()),
)];
assert!(!has_overflow_attribute(attrs.iter()));
}
#[test]
fn no_overflow_on_empty_attributes() {
let attrs: Vec<KeyValue> = vec![];
assert!(!has_overflow_attribute(attrs.iter()));
}
#[tokio::test]
async fn increments_counter_on_cardinality_overflow() {
async {
let reader = ClonableManualReader::default();
let provider = SdkMeterProvider::builder()
.with_reader(reader.clone())
.with_resource(Resource::builder_empty().build())
.with_view(|instrument: &opentelemetry_sdk::metrics::Instrument| {
if instrument.name() == "test.overflow.metric" {
Some(
Stream::builder()
.with_cardinality_limit(2) .build()
.expect("valid stream"),
)
} else {
None
}
})
.build();
let meter = provider.meter("test");
let counter = meter.u64_counter("test.overflow.metric").build();
counter.add(1, &[opentelemetry::KeyValue::new("key", "value1")]);
counter.add(1, &[opentelemetry::KeyValue::new("key", "value2")]);
counter.add(1, &[opentelemetry::KeyValue::new("key", "value3")]);
let mut resource_metrics = ResourceMetrics::default();
reader.collect(&mut resource_metrics).unwrap();
let inner_exporter = InMemoryMetricExporter::default();
let exporter = OverflowMetricExporter::new_push(inner_exporter);
exporter.export(&resource_metrics).await.unwrap();
assert_counter!(
"apollo.router.telemetry.metrics.cardinality_overflow",
1,
"metric.name" = "test.overflow.metric"
);
}
.with_metrics()
.await
}
#[tokio::test]
async fn pull_reader_increments_counter_on_overflow() {
async {
let inner_reader = ClonableManualReader::default();
let reader = OverflowMetricExporter::new_pull(inner_reader);
let reader_for_collect = reader.clone();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::builder_empty().build())
.with_view(|instrument: &opentelemetry_sdk::metrics::Instrument| {
if instrument.name() == "test.pull.overflow.metric" {
Some(
Stream::builder()
.with_cardinality_limit(2)
.build()
.expect("valid stream"),
)
} else {
None
}
})
.build();
let meter = provider.meter("test");
let counter = meter.u64_counter("test.pull.overflow.metric").build();
counter.add(1, &[opentelemetry::KeyValue::new("key", "value1")]);
counter.add(1, &[opentelemetry::KeyValue::new("key", "value2")]);
counter.add(1, &[opentelemetry::KeyValue::new("key", "value3")]);
let mut resource_metrics = ResourceMetrics::default();
reader_for_collect.collect(&mut resource_metrics).unwrap();
assert_counter!(
"apollo.router.telemetry.metrics.cardinality_overflow",
1,
"metric.name" = "test.pull.overflow.metric"
);
}
.with_metrics()
.await
}
}