use std::{sync::Arc, time::Duration};
use opentelemetry_proto::tonic::{
collector::{
logs::v1::{ExportLogsServiceRequest, logs_service_client::LogsServiceClient},
metrics::v1::{ExportMetricsServiceRequest, metrics_service_client::MetricsServiceClient},
trace::v1::{ExportTraceServiceRequest, trace_service_client::TraceServiceClient},
},
common::v1::{AnyValue, KeyValue, any_value::Value as AnyValueKind},
logs::v1::{LogRecord as OtlpLogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
metrics::v1::{
AggregationTemporality, Gauge, Histogram, HistogramDataPoint, Metric, NumberDataPoint,
ResourceMetrics, ScopeMetrics, Sum, metric::Data as MetricData,
number_data_point::Value as NumberValue,
},
resource::v1::Resource,
trace::v1::{
ResourceSpans, ScopeSpans, Span as OtlpSpan, Status, span::SpanKind, status::StatusCode,
},
};
use parking_lot::RwLock;
use tonic::{
Request,
metadata::{AsciiMetadataValue, MetadataKey},
transport::Channel,
};
use crate::{
OtlpError, env_config::OtlpEndpoint, logs::OtlpLogPayload, mapping::SpanRecord,
metrics::OtlpMetricPayload, sink::OtlpExporter, traces::OtlpTracePayload,
};
pub struct GrpcOtlpExporter {
runtime: Arc<tokio::runtime::Runtime>,
inner: Arc<GrpcInner>,
timeout: Duration,
}
struct GrpcInner {
logs: RwLock<LogsServiceClient<Channel>>,
metrics: RwLock<MetricsServiceClient<Channel>>,
traces: RwLock<TraceServiceClient<Channel>>,
}
impl std::fmt::Debug for GrpcOtlpExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GrpcOtlpExporter")
.field("timeout_ms", &self.timeout.as_millis())
.finish()
}
}
impl GrpcOtlpExporter {
pub fn connect(endpoint: &OtlpEndpoint) -> Result<Self, OtlpError> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.thread_name("obs-otlp-grpc")
.build()
.map_err(|e| OtlpError::Transport(format!("runtime build: {e}")))?;
let runtime = Arc::new(runtime);
let url = endpoint.url.clone();
let timeout = Duration::from_millis(endpoint.timeout_ms.max(1));
let channel = runtime.block_on(async move {
tonic::transport::Endpoint::from_shared(url)
.map_err(|e| OtlpError::Transport(format!("endpoint: {e}")))?
.timeout(timeout)
.connect_timeout(timeout)
.keep_alive_while_idle(true)
.connect()
.await
.map_err(|e| OtlpError::Transport(format!("connect: {e}")))
})?;
let inner = Arc::new(GrpcInner {
logs: RwLock::new(LogsServiceClient::new(channel.clone())),
metrics: RwLock::new(MetricsServiceClient::new(channel.clone())),
traces: RwLock::new(TraceServiceClient::new(channel)),
});
Ok(Self {
runtime,
inner,
timeout,
})
}
fn block_on<T, F>(&self, fut: F) -> T
where
F: std::future::Future<Output = T>,
{
let _enter = self.runtime.enter();
self.runtime.block_on(fut)
}
}
impl OtlpExporter for GrpcOtlpExporter {
fn export_logs(&self, payload: &OtlpLogPayload) -> Result<(), OtlpError> {
let request = build_logs_request(payload);
let inner = Arc::clone(&self.inner);
let timeout = self.timeout;
self.block_on(async move {
let mut req = Request::new(request);
req.set_timeout(timeout);
attach_traceparent(&mut req);
let mut client = inner.logs.write().clone();
client
.export(req)
.await
.map(|_| ())
.map_err(|e| OtlpError::Transport(format!("logs: {e}")))
})
}
fn export_metrics(&self, payload: &OtlpMetricPayload) -> Result<(), OtlpError> {
let request = build_metrics_request(payload);
let inner = Arc::clone(&self.inner);
let timeout = self.timeout;
self.block_on(async move {
let mut req = Request::new(request);
req.set_timeout(timeout);
attach_traceparent(&mut req);
let mut client = inner.metrics.write().clone();
client
.export(req)
.await
.map(|_| ())
.map_err(|e| OtlpError::Transport(format!("metrics: {e}")))
})
}
fn export_traces(&self, payload: &OtlpTracePayload) -> Result<(), OtlpError> {
let request = build_traces_request(payload);
let inner = Arc::clone(&self.inner);
let timeout = self.timeout;
self.block_on(async move {
let mut req = Request::new(request);
req.set_timeout(timeout);
attach_traceparent(&mut req);
let mut client = inner.traces.write().clone();
client
.export(req)
.await
.map(|_| ())
.map_err(|e| OtlpError::Transport(format!("traces: {e}")))
})
}
}
fn attach_traceparent<T>(req: &mut Request<T>) {
if let Some((trace_id, span_id)) = obs_core::scope::active_correlation() {
let flags = if obs_core::scope::active_sampled().unwrap_or(true) {
"01"
} else {
"00"
};
let value = format!("00-{trace_id}-{span_id}-{flags}");
if let (Ok(key), Ok(val)) = (
MetadataKey::from_bytes(b"traceparent"),
AsciiMetadataValue::try_from(value.as_str()),
) {
req.metadata_mut().insert(key, val);
}
}
}
fn build_resource(attrs: &crate::logs::ResourceMessage) -> Resource {
let kvs = attrs.attributes.iter().map(|(k, v)| kv_str(k, v)).collect();
Resource {
attributes: kvs,
dropped_attributes_count: 0,
entity_refs: Vec::new(),
}
}
fn kv_str(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.to_string(),
value: Some(AnyValue {
value: Some(AnyValueKind::StringValue(value.to_string())),
}),
}
}
fn hex_to_trace_bytes(hex: &str) -> Vec<u8> {
decode_hex(hex, 16)
}
fn hex_to_span_bytes(hex: &str) -> Vec<u8> {
decode_hex(hex, 8)
}
fn decode_hex(hex: &str, expected_len: usize) -> Vec<u8> {
if hex.is_empty() {
return vec![0u8; expected_len];
}
let mut out = Vec::with_capacity(expected_len);
let bytes = hex.as_bytes();
let mut i = 0;
while i + 1 < bytes.len() && out.len() < expected_len {
let (Some(&hi_b), Some(&lo_b)) = (bytes.get(i), bytes.get(i + 1)) else {
return vec![0u8; expected_len];
};
match (nibble(hi_b), nibble(lo_b)) {
(Some(h), Some(l)) => out.push((h << 4) | l),
_ => return vec![0u8; expected_len],
}
i += 2;
}
while out.len() < expected_len {
out.push(0);
}
out
}
fn nibble(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(b - b'a' + 10),
b'A'..=b'F' => Some(b - b'A' + 10),
_ => None,
}
}
fn build_logs_request(payload: &OtlpLogPayload) -> ExportLogsServiceRequest {
let resource = build_resource(&payload.resource);
let records: Vec<OtlpLogRecord> = payload
.records
.iter()
.map(|r| OtlpLogRecord {
time_unix_nano: r.time_unix_nano,
observed_time_unix_nano: r.observed_time_unix_nano,
severity_number: severity_to_proto(r.severity_number),
severity_text: r.severity_text.clone(),
body: if r.body_bytes.is_empty() {
None
} else {
Some(AnyValue {
value: Some(AnyValueKind::BytesValue(r.body_bytes.clone())),
})
},
attributes: r.attributes.iter().map(|(k, v)| kv_str(k, v)).collect(),
dropped_attributes_count: 0,
flags: 0,
trace_id: hex_to_trace_bytes(&r.trace_id),
span_id: hex_to_span_bytes(&r.span_id),
event_name: r.attributes.get("event.name").cloned().unwrap_or_default(),
})
.collect();
ExportLogsServiceRequest {
resource_logs: vec![ResourceLogs {
resource: Some(resource),
scope_logs: vec![ScopeLogs {
scope: None,
log_records: records,
schema_url: payload.resource.schema_url.clone(),
}],
schema_url: payload.resource.schema_url.clone(),
}],
}
}
fn build_metrics_request(payload: &OtlpMetricPayload) -> ExportMetricsServiceRequest {
let resource = build_resource(&payload.resource);
let schema_url = payload.resource.schema_url.clone();
let metrics: Vec<Metric> = payload.points.iter().map(metric_point_to_otlp).collect();
ExportMetricsServiceRequest {
resource_metrics: vec![ResourceMetrics {
resource: Some(resource),
scope_metrics: vec![ScopeMetrics {
scope: None,
metrics,
schema_url: schema_url.clone(),
}],
schema_url,
}],
}
}
fn metric_point_to_otlp(p: &crate::mapping::MetricPoint) -> Metric {
let data = match p.kind.as_str() {
"gauge" => MetricData::Gauge(Gauge {
data_points: vec![number_point(p)],
}),
"histogram" => MetricData::Histogram(Histogram {
data_points: vec![histogram_point(p)],
aggregation_temporality: AggregationTemporality::Delta as i32,
}),
_ => MetricData::Sum(Sum {
data_points: vec![number_point(p)],
aggregation_temporality: AggregationTemporality::Delta as i32,
is_monotonic: true,
}),
};
Metric {
name: p.instrument.clone(),
description: String::new(),
unit: p.unit.clone(),
metadata: Vec::new(),
data: Some(data),
}
}
fn number_point(p: &crate::mapping::MetricPoint) -> NumberDataPoint {
NumberDataPoint {
attributes: p.attributes.iter().map(|(k, v)| kv_str(k, v)).collect(),
start_time_unix_nano: 0,
time_unix_nano: 0,
exemplars: Vec::new(),
flags: 0,
value: metric_number_value(p),
}
}
fn metric_number_value(p: &crate::mapping::MetricPoint) -> Option<NumberValue> {
if let Some(v) = p.value_f64 {
Some(NumberValue::AsDouble(v))
} else {
Some(NumberValue::AsInt(
i64::try_from(p.value_u64.unwrap_or(0)).unwrap_or(i64::MAX),
))
}
}
fn histogram_point(p: &crate::mapping::MetricPoint) -> HistogramDataPoint {
let value = p
.value_f64
.unwrap_or_else(|| p.value_u64.unwrap_or(0) as f64);
let mut bucket_counts = vec![0; p.bounds.len() + 1];
let bucket_idx = p
.bounds
.iter()
.position(|bound| value <= *bound)
.unwrap_or(p.bounds.len());
if let Some(count) = bucket_counts.get_mut(bucket_idx) {
*count = 1;
}
HistogramDataPoint {
attributes: p.attributes.iter().map(|(k, v)| kv_str(k, v)).collect(),
start_time_unix_nano: 0,
time_unix_nano: 0,
count: 1,
sum: Some(value),
bucket_counts,
explicit_bounds: p.bounds.clone(),
exemplars: Vec::new(),
flags: 0,
min: Some(value),
max: Some(value),
}
}
fn build_traces_request(payload: &OtlpTracePayload) -> ExportTraceServiceRequest {
let resource = build_resource(&payload.resource);
let schema_url = payload.resource.schema_url.clone();
let spans: Vec<OtlpSpan> = payload.spans.iter().map(span_record_to_otlp).collect();
ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(resource),
scope_spans: vec![ScopeSpans {
scope: None,
spans,
schema_url: schema_url.clone(),
}],
schema_url,
}],
}
}
fn span_record_to_otlp(s: &SpanRecord) -> OtlpSpan {
OtlpSpan {
trace_id: hex_to_trace_bytes(&s.trace_id),
span_id: hex_to_span_bytes(&s.span_id),
trace_state: String::new(),
parent_span_id: hex_to_span_bytes(&s.parent_span_id),
flags: 0,
name: s.name.clone(),
kind: kind_str_to_proto(&s.kind) as i32,
start_time_unix_nano: s.start_time_unix_nano,
end_time_unix_nano: s.end_time_unix_nano,
attributes: s.attributes.iter().map(|(k, v)| kv_str(k, v)).collect(),
dropped_attributes_count: 0,
events: s
.events
.iter()
.map(|e| opentelemetry_proto::tonic::trace::v1::span::Event {
time_unix_nano: e.time_unix_nano,
name: e.name.clone(),
attributes: e.attributes.iter().map(|(k, v)| kv_str(k, v)).collect(),
dropped_attributes_count: 0,
})
.collect(),
dropped_events_count: 0,
links: Vec::new(),
dropped_links_count: 0,
status: Some(Status {
message: String::new(),
code: status_str_to_proto(&s.status_code) as i32,
}),
}
}
fn severity_to_proto(otlp_number: i32) -> i32 {
let _ = SeverityNumber::Trace;
otlp_number
}
fn kind_str_to_proto(s: &str) -> SpanKind {
match s {
"SERVER" | "server" => SpanKind::Server,
"CLIENT" | "client" => SpanKind::Client,
"PRODUCER" | "producer" => SpanKind::Producer,
"CONSUMER" | "consumer" => SpanKind::Consumer,
_ => SpanKind::Internal,
}
}
fn status_str_to_proto(s: &str) -> StatusCode {
match s {
"ERROR" | "error" => StatusCode::Error,
"OK" | "ok" => StatusCode::Ok,
_ => StatusCode::Unset,
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use opentelemetry_proto::tonic::metrics::v1::metric::Data as MetricData;
use super::*;
use crate::mapping::MetricPoint;
#[test]
fn test_decode_hex_should_zero_pad_short_input() {
let out = decode_hex("ab", 4);
assert_eq!(out, vec![0xab, 0, 0, 0]);
}
#[test]
fn test_decode_hex_should_truncate_long_input() {
let out = decode_hex("0123456789abcdef0123", 4);
assert_eq!(out, vec![0x01, 0x23, 0x45, 0x67]);
}
#[test]
fn test_decode_hex_should_return_zeros_on_invalid() {
let out = decode_hex("nothex!!", 4);
assert_eq!(out, vec![0, 0, 0, 0]);
}
#[test]
fn test_decode_hex_should_handle_uppercase() {
let out = decode_hex("DEADBEEF", 4);
assert_eq!(out, vec![0xDE, 0xAD, 0xBE, 0xEF]);
}
#[test]
fn test_metric_point_to_otlp_should_emit_gauge_double() -> Result<(), &'static str> {
let metric = metric_point_to_otlp(&MetricPoint {
instrument: "test.temperature".to_string(),
unit: "Cel".to_string(),
kind: "gauge".to_string(),
attributes: BTreeMap::new(),
value_u64: None,
value_f64: Some(12.5),
bounds: Vec::new(),
});
let Some(MetricData::Gauge(gauge)) = metric.data else {
return Err("expected gauge data");
};
let Some(point) = gauge.data_points.first() else {
return Err("expected gauge data point");
};
assert!(matches!(
&point.value,
Some(NumberValue::AsDouble(v)) if *v == 12.5
));
Ok(())
}
#[test]
fn test_metric_point_to_otlp_should_emit_histogram_with_bounds() -> Result<(), &'static str> {
let metric = metric_point_to_otlp(&MetricPoint {
instrument: "test.latency".to_string(),
unit: "ms".to_string(),
kind: "histogram".to_string(),
attributes: BTreeMap::new(),
value_u64: None,
value_f64: Some(15.0),
bounds: vec![10.0, 20.0],
});
let Some(MetricData::Histogram(histogram)) = metric.data else {
return Err("expected histogram data");
};
let Some(point) = histogram.data_points.first() else {
return Err("expected histogram data point");
};
assert_eq!(point.explicit_bounds, vec![10.0, 20.0]);
assert_eq!(point.bucket_counts, vec![0, 1, 0]);
assert_eq!(point.sum, Some(15.0));
Ok(())
}
}