use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value::Value};
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
use tokio::sync::mpsc;
pub(crate) fn build_export_request(
mut md: MetricsData,
client_instance_id: &str,
) -> ExportMetricsServiceRequest {
for rm in &mut md.resource_metrics {
let resource = rm.resource.get_or_insert_with(Default::default);
resource.attributes.push(KeyValue {
key: "client_instance_id".to_string(),
value: Some(AnyValue {
value: Some(Value::StringValue(client_instance_id.to_string())),
}),
..Default::default()
});
}
ExportMetricsServiceRequest {
resource_metrics: md.resource_metrics,
}
}
pub(crate) struct OtlpForwarder {
tx: Option<mpsc::Sender<(MetricsData, String)>>,
}
impl OtlpForwarder {
pub(crate) fn disabled() -> Self {
Self { tx: None }
}
pub(crate) fn spawn(endpoint: String, capacity: usize) -> Self {
let (tx, mut rx) = mpsc::channel::<(MetricsData, String)>(capacity);
tokio::spawn(async move {
let client = reqwest::Client::new();
let url = format!("{}/v1/metrics", endpoint.trim_end_matches('/'));
while let Some((md, instance)) = rx.recv().await {
let req = build_export_request(md, &instance);
let body = {
use prost::Message;
req.encode_to_vec()
};
if let Err(e) = client
.post(&url)
.header("content-type", "application/x-protobuf")
.body(body)
.send()
.await
{
tracing::debug!(error = %e, "client-metrics OTLP forward failed");
}
}
});
Self { tx: Some(tx) }
}
#[allow(dead_code)] pub(crate) fn is_enabled(&self) -> bool {
self.tx.is_some()
}
pub(crate) fn forward(&self, md: MetricsData, client_instance_id: &str) {
if let Some(tx) = &self.tx
&& let Err(e) = tx.try_send((md, client_instance_id.to_string()))
{
tracing::debug!(error = %e, "client-metrics OTLP forward queue full; dropping");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry_proto::tonic::metrics::v1::{MetricsData, ResourceMetrics};
#[test]
fn wraps_and_injects_instance_id() {
let md = MetricsData {
resource_metrics: vec![ResourceMetrics::default()],
};
let req = build_export_request(md, "abc-123");
assert_eq!(req.resource_metrics.len(), 1);
let res = req.resource_metrics[0].resource.as_ref().expect("resource");
assert!(
res.attributes
.iter()
.any(|kv| kv.key == "client_instance_id")
);
}
#[test]
fn disabled_forwarder_is_noop() {
let f = OtlpForwarder::disabled();
assert!(!f.is_enabled());
f.forward(MetricsData::default(), "x");
}
}