use crate::error::{OtlpError, OtlpExportError};
use crate::otlp::metrics_data::*;
use opentelemetry::KeyValue;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::KeyValue as ProtoKeyValue;
use opentelemetry_proto::tonic::metrics::v1::{
HistogramDataPoint as ProtoHistogramDataPoint, Metric, NumberDataPoint as ProtoNumberDataPoint,
};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use tracing::warn;
pub async fn extract_metrics_from_resource_metrics(
metrics: &ResourceMetrics,
sdk_extraction_enabled: bool,
) -> Result<InternalResourceMetrics, OtlpError> {
if !sdk_extraction_enabled {
return Ok(InternalResourceMetrics {
resource: InternalResource {
attributes: vec![],
dropped_attributes_count: 0,
},
scope_metrics: vec![],
schema_url: String::new(),
});
}
match convert_via_otlp_exporter(metrics).await {
Ok(protobuf) => extract_from_protobuf(&protobuf).map_err(|e| {
OtlpError::Export(OtlpExportError::FormatConversionError(format!(
"Failed to extract from protobuf: {}",
e
)))
}),
Err(e) => {
warn!(
error = %e,
"Failed to extract metrics via opentelemetry-otlp. \
Consider using gRPC ingestion path which preserves protobuf format."
);
Ok(InternalResourceMetrics {
resource: InternalResource {
attributes: vec![],
dropped_attributes_count: 0,
},
scope_metrics: vec![],
schema_url: String::new(),
})
}
}
}
async fn convert_via_otlp_exporter(
metrics: &ResourceMetrics,
) -> Result<ExportMetricsServiceRequest, anyhow::Error> {
use tokio::sync::oneshot;
use tokio::time::{Duration, timeout};
let (tx, rx) = oneshot::channel();
use crate::otlp::server::create_temporary_metrics_server;
let (server_handle, addr_str) = create_temporary_metrics_server(tx)
.await
.map_err(|e| anyhow::anyhow!("Failed to create temporary server: {}", e))?;
tokio::time::sleep(Duration::from_millis(100)).await;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
type TonicMetricExporterBuilder =
opentelemetry_otlp::MetricExporterBuilder<opentelemetry_otlp::TonicExporterBuilderSet>;
let builder: TonicMetricExporterBuilder = opentelemetry_otlp::MetricExporterBuilder::default();
let exporter = builder
.with_endpoint(&addr_str)
.build()
.map_err(|e| anyhow::anyhow!("Failed to build metrics exporter: {}", e))?;
exporter
.export(metrics)
.await
.map_err(|e| anyhow::anyhow!("Failed to export metrics: {}", e))?;
let captured_request = timeout(Duration::from_secs(5), rx)
.await
.map_err(|_| anyhow::anyhow!("Timeout waiting for captured request"))?
.map_err(|_| anyhow::anyhow!("Failed to receive captured request"))?;
server_handle.abort();
Ok(captured_request)
}
pub fn extract_from_protobuf(
request: &ExportMetricsServiceRequest,
) -> Result<InternalResourceMetrics, anyhow::Error> {
if request.resource_metrics.is_empty() {
return Err(anyhow::anyhow!("Empty resource metrics"));
}
let proto_rm = &request.resource_metrics[0];
let resource = if let Some(ref proto_resource) = proto_rm.resource {
InternalResource {
attributes: proto_resource
.attributes
.iter()
.filter_map(proto_key_value_to_sdk)
.collect(),
dropped_attributes_count: proto_resource.dropped_attributes_count,
}
} else {
InternalResource {
attributes: vec![],
dropped_attributes_count: 0,
}
};
let scope_metrics = proto_rm
.scope_metrics
.iter()
.map(|proto_sm| {
let scope = if let Some(ref proto_scope) = proto_sm.scope {
InternalInstrumentationScope {
name: proto_scope.name.clone(),
version: if proto_scope.version.is_empty() {
None
} else {
Some(proto_scope.version.clone())
},
attributes: proto_scope
.attributes
.iter()
.filter_map(proto_key_value_to_sdk)
.collect(),
dropped_attributes_count: proto_scope.dropped_attributes_count,
}
} else {
InternalInstrumentationScope {
name: "unknown".to_string(),
version: None,
attributes: vec![],
dropped_attributes_count: 0,
}
};
let metrics = proto_sm
.metrics
.iter()
.filter_map(proto_metric_to_internal)
.collect();
InternalScopeMetrics {
scope,
metrics,
schema_url: proto_sm.schema_url.clone(),
}
})
.collect();
Ok(InternalResourceMetrics {
resource,
scope_metrics,
schema_url: proto_rm.schema_url.clone(),
})
}
fn proto_key_value_to_sdk(kv: &ProtoKeyValue) -> Option<KeyValue> {
let value = kv.value.as_ref()?.value.as_ref()?;
let otel_value = match value {
opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s) => {
opentelemetry::Value::String(s.clone().into())
}
opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(i) => {
opentelemetry::Value::I64(*i)
}
opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(d) => {
opentelemetry::Value::F64(*d)
}
opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(b) => {
opentelemetry::Value::Bool(*b)
}
opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_) => {
return None;
}
opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_) => {
return None;
}
opentelemetry_proto::tonic::common::v1::any_value::Value::BytesValue(_) => {
return None;
}
};
Some(KeyValue::new(kv.key.clone(), otel_value))
}
fn proto_metric_to_internal(proto_metric: &Metric) -> Option<InternalMetric> {
let data = match proto_metric.data.as_ref()? {
opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(gauge) => {
let data_points = gauge
.data_points
.iter()
.filter_map(proto_number_data_point_to_internal)
.collect();
InternalMetricData::Gauge(InternalGauge { data_points })
}
opentelemetry_proto::tonic::metrics::v1::metric::Data::Sum(sum) => {
let data_points = sum
.data_points
.iter()
.filter_map(proto_number_data_point_to_internal)
.collect();
InternalMetricData::Sum(InternalSum {
data_points,
aggregation_temporality: sum.aggregation_temporality,
is_monotonic: sum.is_monotonic,
})
}
opentelemetry_proto::tonic::metrics::v1::metric::Data::Histogram(hist) => {
let data_points = hist
.data_points
.iter()
.filter_map(proto_histogram_data_point_to_internal)
.collect();
InternalMetricData::Histogram(InternalHistogram {
data_points,
aggregation_temporality: hist.aggregation_temporality,
})
}
_ => return None, };
Some(InternalMetric {
name: proto_metric.name.clone(),
description: if proto_metric.description.is_empty() {
None
} else {
Some(proto_metric.description.clone())
},
unit: if proto_metric.unit.is_empty() {
None
} else {
Some(proto_metric.unit.clone())
},
data,
})
}
fn proto_number_data_point_to_internal(
dp: &ProtoNumberDataPoint,
) -> Option<InternalNumberDataPoint> {
let value = match dp.value.as_ref()? {
opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(i) => {
InternalNumberValue::AsInt(*i)
}
opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsDouble(d) => {
InternalNumberValue::AsDouble(*d)
}
};
Some(InternalNumberDataPoint {
attributes: dp
.attributes
.iter()
.filter_map(proto_key_value_to_sdk)
.collect(),
start_time_unix_nano: if dp.start_time_unix_nano == 0 {
None
} else {
Some(dp.start_time_unix_nano)
},
time_unix_nano: dp.time_unix_nano,
value,
})
}
fn proto_histogram_data_point_to_internal(
dp: &ProtoHistogramDataPoint,
) -> Option<InternalHistogramDataPoint> {
Some(InternalHistogramDataPoint {
attributes: dp
.attributes
.iter()
.filter_map(proto_key_value_to_sdk)
.collect(),
start_time_unix_nano: if dp.start_time_unix_nano == 0 {
None
} else {
Some(dp.start_time_unix_nano)
},
time_unix_nano: dp.time_unix_nano,
count: dp.count,
sum: dp.sum,
bucket_counts: dp.bucket_counts.clone(),
explicit_bounds: dp.explicit_bounds.clone(),
min: dp.min,
max: dp.max,
})
}