use std::collections::HashMap;
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use opentelemetry_proto::tonic::common::v1::{KeyValue, any_value};
use opentelemetry_proto::tonic::trace::v1::Span;
use crate::event::{EventSource, EventType, SpanEvent};
fn bytes_to_hex(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len() * 2);
for &b in bytes {
out.push(HEX[(b >> 4) as usize] as char);
out.push(HEX[(b & 0x0f) as usize] as char);
}
out
}
use crate::time::nanos_to_iso8601;
fn get_str_attribute<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a str> {
attrs.iter().find(|kv| kv.key == key).and_then(|kv| {
kv.value.as_ref().and_then(|v| match &v.value {
Some(any_value::Value::StringValue(s)) => Some(s.as_str()),
_ => None,
})
})
}
fn get_int_attribute(attrs: &[KeyValue], key: &str) -> Option<i64> {
attrs.iter().find(|kv| kv.key == key).and_then(|kv| {
kv.value.as_ref().and_then(|v| match &v.value {
Some(any_value::Value::IntValue(i)) => Some(*i),
_ => None,
})
})
}
fn build_span_index(
resource_spans: &opentelemetry_proto::tonic::trace::v1::ResourceSpans,
) -> HashMap<&[u8], &Span> {
let mut index: HashMap<&[u8], &Span> = HashMap::new();
let mut count = 0usize;
'outer: for scope_spans in &resource_spans.scope_spans {
for span in &scope_spans.spans {
index.insert(&span.span_id, span);
count += 1;
if count >= 100_000 {
tracing::warn!(
"OTLP span index capped at 100k entries, parent lookup may be degraded for remaining spans"
);
break 'outer;
}
}
}
index
}
#[must_use]
pub fn convert_otlp_request(request: &ExportTraceServiceRequest) -> Vec<SpanEvent> {
let mut events = Vec::new();
for resource_spans in &request.resource_spans {
let service_name = resource_spans
.resource
.as_ref()
.and_then(|r| get_str_attribute(&r.attributes, "service.name"))
.unwrap_or("unknown")
.to_string();
let resource_cloud_region = resource_spans
.resource
.as_ref()
.and_then(|r| get_str_attribute(&r.attributes, "cloud.region"))
.filter(|s| crate::score::carbon::is_valid_region_id(s))
.map(str::to_string);
let span_index = build_span_index(resource_spans);
for scope_spans in &resource_spans.scope_spans {
for span in &scope_spans.spans {
if let Some(event) = convert_span(
span,
&service_name,
resource_cloud_region.as_deref(),
&span_index,
) {
events.push(event);
}
}
}
}
events
}
fn convert_span(
span: &Span,
service_name: &str,
resource_cloud_region: Option<&str>,
span_index: &HashMap<&[u8], &Span>,
) -> Option<SpanEvent> {
let attrs = &span.attributes;
let (event_type, target, operation) = if let Some(statement) =
get_str_attribute(attrs, "db.statement")
.or_else(|| get_str_attribute(attrs, "db.query.text"))
{
let op = get_str_attribute(attrs, "db.system")
.unwrap_or("sql")
.to_string();
(EventType::Sql, statement.to_string(), op)
} else if let Some(url) =
get_str_attribute(attrs, "http.url").or_else(|| get_str_attribute(attrs, "url.full"))
{
let method = get_str_attribute(attrs, "http.method")
.or_else(|| get_str_attribute(attrs, "http.request.method"))
.unwrap_or("GET")
.to_string();
(EventType::HttpOut, url.to_string(), method)
} else {
return None;
};
let start_nanos = span.start_time_unix_nano;
let end_nanos = span.end_time_unix_nano;
let timestamp = nanos_to_iso8601(start_nanos);
if end_nanos < start_nanos {
tracing::trace!("Span has end_time < start_time (clock skew?), duration forced to 0");
}
let duration_us = end_nanos.saturating_sub(start_nanos) / 1000;
let trace_id = bytes_to_hex(&span.trace_id);
let span_id = bytes_to_hex(&span.span_id);
let status_code = if event_type == EventType::HttpOut {
get_int_attribute(attrs, "http.status_code")
.or_else(|| get_int_attribute(attrs, "http.response.status_code"))
.and_then(|c| u16::try_from(c).ok())
} else {
None
};
let response_size_bytes = if event_type == EventType::HttpOut {
get_int_attribute(attrs, "http.response.body.size")
.or_else(|| get_int_attribute(attrs, "http.response_content_length"))
.and_then(|v| u64::try_from(v).ok())
} else {
None
};
let (source_endpoint, source_method) = if span.parent_span_id.is_empty() {
("unknown".to_string(), span.name.clone())
} else if let Some(parent) = span_index.get(span.parent_span_id.as_slice()) {
let endpoint = get_str_attribute(&parent.attributes, "http.route")
.or_else(|| get_str_attribute(&parent.attributes, "http.url"))
.or_else(|| get_str_attribute(&parent.attributes, "url.full"))
.unwrap_or("unknown")
.to_string();
let method = get_str_attribute(&parent.attributes, "code.function")
.map_or_else(|| parent.name.clone(), ToString::to_string);
(endpoint, method)
} else {
("unknown".to_string(), span.name.clone())
};
let parent_span_id = if span.parent_span_id.is_empty() {
None
} else {
Some(bytes_to_hex(&span.parent_span_id))
};
let cloud_region = resource_cloud_region.map(str::to_string).or_else(|| {
get_str_attribute(attrs, "cloud.region")
.filter(|s| crate::score::carbon::is_valid_region_id(s))
.map(str::to_string)
});
let code_function = get_str_attribute(attrs, "code.function").map(str::to_string);
let code_filepath = get_str_attribute(attrs, "code.filepath").map(str::to_string);
let code_lineno = get_int_attribute(attrs, "code.lineno").and_then(|v| u32::try_from(v).ok());
let code_namespace = get_str_attribute(attrs, "code.namespace").map(str::to_string);
let mut event = SpanEvent {
timestamp,
trace_id,
span_id,
parent_span_id,
service: service_name.to_string(), cloud_region,
event_type,
operation,
target,
duration_us,
source: EventSource {
endpoint: source_endpoint,
method: source_method,
},
status_code,
response_size_bytes,
code_function,
code_filepath,
code_lineno,
code_namespace,
};
crate::event::sanitize_span_event(&mut event);
Some(event)
}
pub struct OtlpGrpcService {
sender: tokio::sync::mpsc::Sender<Vec<SpanEvent>>,
}
impl OtlpGrpcService {
#[must_use]
pub const fn new(sender: tokio::sync::mpsc::Sender<Vec<SpanEvent>>) -> Self {
Self { sender }
}
}
#[tonic::async_trait]
impl opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService
for OtlpGrpcService
{
async fn export(
&self,
request: tonic::Request<ExportTraceServiceRequest>,
) -> Result<tonic::Response<ExportTraceServiceResponse>, tonic::Status> {
let events = convert_otlp_request(request.get_ref());
if !events.is_empty() {
self.sender
.send(events)
.await
.map_err(|_| tonic::Status::internal("event channel closed"))?;
}
Ok(tonic::Response::new(ExportTraceServiceResponse {
partial_success: None,
}))
}
}
pub fn otlp_http_router(
sender: tokio::sync::mpsc::Sender<Vec<SpanEvent>>,
max_payload_size: usize,
) -> axum::Router {
use axum::{Router, extract::State, http::StatusCode, routing::post};
async fn handle_traces(
State(sender): State<tokio::sync::mpsc::Sender<Vec<SpanEvent>>>,
body: axum::body::Bytes,
) -> StatusCode {
let request: ExportTraceServiceRequest = match prost::Message::decode(body.as_ref()) {
Ok(req) => req,
Err(_) => return StatusCode::BAD_REQUEST,
};
let events = convert_otlp_request(&request);
if !events.is_empty() && sender.send(events).await.is_err() {
tracing::warn!("OTLP HTTP: event channel full or closed, dropping events");
return StatusCode::SERVICE_UNAVAILABLE;
}
StatusCode::OK
}
Router::new()
.route("/v1/traces", post(handle_traces))
.with_state(sender)
.layer(axum::extract::DefaultBodyLimit::max(max_payload_size))
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry_proto::tonic::common::v1::AnyValue;
use opentelemetry_proto::tonic::resource::v1::Resource;
use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans};
fn make_kv(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue(value.to_string())),
}),
}
}
fn make_int_kv(key: &str, value: i64) -> KeyValue {
KeyValue {
key: key.to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::IntValue(value)),
}),
}
}
fn make_sql_span(
trace_id: &[u8],
span_id: &[u8],
parent_span_id: &[u8],
statement: &str,
start_ns: u64,
end_ns: u64,
) -> Span {
Span {
trace_id: trace_id.to_vec(),
span_id: span_id.to_vec(),
parent_span_id: parent_span_id.to_vec(),
name: "db.query".to_string(),
start_time_unix_nano: start_ns,
end_time_unix_nano: end_ns,
attributes: vec![
make_kv("db.statement", statement),
make_kv("db.system", "postgresql"),
],
..Default::default()
}
}
#[allow(clippy::too_many_arguments)] fn make_http_span(
trace_id: &[u8],
span_id: &[u8],
parent_span_id: &[u8],
url: &str,
method: &str,
status: i64,
start_ns: u64,
end_ns: u64,
) -> Span {
Span {
trace_id: trace_id.to_vec(),
span_id: span_id.to_vec(),
parent_span_id: parent_span_id.to_vec(),
name: "http.request".to_string(),
start_time_unix_nano: start_ns,
end_time_unix_nano: end_ns,
attributes: vec![
make_kv("http.url", url),
make_kv("http.method", method),
make_int_kv("http.status_code", status),
],
..Default::default()
}
}
fn make_parent_span(span_id: &[u8], route: &str) -> Span {
Span {
trace_id: vec![1; 16],
span_id: span_id.to_vec(),
parent_span_id: vec![],
name: "HandleRequest".to_string(),
start_time_unix_nano: 0,
end_time_unix_nano: 1_000_000_000,
attributes: vec![
make_kv("http.route", route),
make_kv("code.function", "OrderService::create_order"),
],
..Default::default()
}
}
fn make_request(service: &str, spans: Vec<Span>) -> ExportTraceServiceRequest {
ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: vec![make_kv("service.name", service)],
..Default::default()
}),
scope_spans: vec![ScopeSpans {
spans,
..Default::default()
}],
..Default::default()
}],
}
}
#[test]
fn empty_request_returns_empty() {
let req = ExportTraceServiceRequest {
resource_spans: vec![],
};
assert!(convert_otlp_request(&req).is_empty());
}
#[test]
fn sql_span_maps_correctly() {
let span = make_sql_span(
&[1; 16],
&[2; 8],
&[],
"SELECT * FROM order_item WHERE order_id = 42",
1_720_621_921_000_000_000, 1_720_621_921_001_200_000, );
let req = make_request("order-svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, EventType::Sql);
assert_eq!(events[0].operation, "postgresql");
assert_eq!(
events[0].target,
"SELECT * FROM order_item WHERE order_id = 42"
);
assert_eq!(events[0].service, "order-svc");
assert_eq!(events[0].duration_us, 1200);
assert!(events[0].status_code.is_none());
}
#[test]
fn http_span_maps_correctly() {
let span = make_http_span(
&[1; 16],
&[3; 8],
&[],
"http://user-svc:5000/api/users/123",
"GET",
200,
1_720_621_921_000_000_000,
1_720_621_921_015_000_000, );
let req = make_request("order-svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, EventType::HttpOut);
assert_eq!(events[0].operation, "GET");
assert_eq!(events[0].target, "http://user-svc:5000/api/users/123");
assert_eq!(events[0].status_code, Some(200));
assert_eq!(events[0].duration_us, 15000);
}
#[test]
fn non_io_span_skipped() {
let span = Span {
trace_id: vec![1; 16],
span_id: vec![4; 8],
name: "internal.processing".to_string(),
start_time_unix_nano: 1_720_621_921_000_000_000,
end_time_unix_nano: 1_720_619_521_000_500_000,
attributes: vec![make_kv("custom.attr", "value")],
..Default::default()
};
let req = make_request("order-svc", vec![span]);
assert!(convert_otlp_request(&req).is_empty());
}
#[test]
fn parent_span_provides_source_endpoint() {
let parent = make_parent_span(&[10; 8], "POST /api/orders/{id}/submit");
let child = make_sql_span(
&[1; 16],
&[20; 8],
&[10; 8], "SELECT * FROM order_item WHERE order_id = 42",
1_720_621_921_000_000_000,
1_720_621_921_001_200_000,
);
let req = make_request("order-svc", vec![parent, child]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].source.endpoint, "POST /api/orders/{id}/submit");
assert_eq!(events[0].source.method, "OrderService::create_order");
}
#[test]
fn missing_parent_falls_back() {
let child = make_sql_span(
&[1; 16],
&[20; 8],
&[99; 8], "SELECT * FROM order_item WHERE order_id = 42",
1_720_621_921_000_000_000,
1_720_621_921_001_200_000,
);
let req = make_request("order-svc", vec![child]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].source.endpoint, "unknown");
assert_eq!(events[0].source.method, "db.query");
}
#[test]
fn trace_id_hex_encoding() {
let trace_bytes: Vec<u8> = (0..16).collect();
assert_eq!(
bytes_to_hex(&trace_bytes),
"000102030405060708090a0b0c0d0e0f"
);
}
#[test]
fn timestamp_nanos_to_iso8601() {
let nanos: u64 = 1_720_621_921_123_000_000;
let iso = nanos_to_iso8601(nanos);
assert_eq!(iso, "2024-07-10T14:32:01.123Z");
}
#[test]
fn timestamp_epoch_zero() {
assert_eq!(nanos_to_iso8601(0), "1970-01-01T00:00:00.000Z");
}
#[test]
fn duration_calculation() {
let span = make_sql_span(
&[1; 16],
&[2; 8],
&[],
"SELECT 1",
1_000_000_000, 1_002_500_000, );
let req = make_request("test", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].duration_us, 2500);
}
#[test]
fn status_code_extraction() {
let span = make_http_span(
&[1; 16],
&[3; 8],
&[],
"http://svc/api/health",
"GET",
404,
1_000_000_000,
1_001_000_000,
);
let req = make_request("test", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].status_code, Some(404));
}
#[test]
fn service_name_from_resource() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request("my-service", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].service, "my-service");
}
#[test]
fn span_with_both_db_and_http_prefers_sql() {
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value};
let mut span = make_sql_span(
&[1; 16],
&[2; 8],
&[],
"SELECT 1",
1_000_000_000,
1_001_000_000,
);
span.attributes.push(KeyValue {
key: "http.url".to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("http://svc/api".to_string())),
}),
});
let req = make_request("test", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].event_type, EventType::Sql);
}
#[test]
fn clock_skew_duration_is_zero() {
let span = make_sql_span(
&[1; 16],
&[2; 8],
&[],
"SELECT 1",
2_000_000_000, 1_000_000_000, );
let req = make_request("test", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].duration_us, 0);
}
#[test]
fn bytes_to_hex_empty() {
assert_eq!(bytes_to_hex(&[]), "");
}
#[test]
fn bytes_to_hex_all_values() {
assert_eq!(bytes_to_hex(&[0x00, 0xff, 0xab]), "00ffab");
}
#[test]
fn nanos_to_iso8601_leap_year() {
let nanos: u64 = 1_709_164_800_000_000_000;
let iso = nanos_to_iso8601(nanos);
assert_eq!(iso, "2024-02-29T00:00:00.000Z");
}
#[test]
fn empty_trace_id_produces_empty_hex() {
assert_eq!(bytes_to_hex(&[]), "");
}
#[test]
fn short_span_id_produces_short_hex() {
assert_eq!(bytes_to_hex(&[0xab]), "ab");
}
#[test]
fn missing_service_name_defaults_to_unknown() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: vec![], ..Default::default()
}),
scope_spans: vec![ScopeSpans {
spans: vec![span],
..Default::default()
}],
..Default::default()
}],
};
let events = convert_otlp_request(&req);
assert_eq!(events[0].service, "unknown");
}
#[test]
fn no_resource_defaults_to_unknown_service() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: None,
scope_spans: vec![ScopeSpans {
spans: vec![span],
..Default::default()
}],
..Default::default()
}],
};
let events = convert_otlp_request(&req);
assert_eq!(events[0].service, "unknown");
}
fn make_request_with_resource_attrs(
attrs: Vec<KeyValue>,
spans: Vec<Span>,
) -> ExportTraceServiceRequest {
ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: attrs,
..Default::default()
}),
scope_spans: vec![ScopeSpans {
spans,
..Default::default()
}],
..Default::default()
}],
}
}
#[test]
fn cloud_region_extracted_from_resource_attributes() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", "eu-west-3"),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].cloud_region.as_deref(), Some("eu-west-3"));
}
#[test]
fn cloud_region_falls_back_to_span_attribute() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
span.attributes.push(make_kv("cloud.region", "us-east-1"));
let req = make_request_with_resource_attrs(
vec![make_kv("service.name", "order-svc")],
vec![span],
);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].cloud_region.as_deref(), Some("us-east-1"));
}
#[test]
fn cloud_region_resource_wins_over_span() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
span.attributes.push(make_kv("cloud.region", "us-east-1"));
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", "eu-west-3"),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert_eq!(events[0].cloud_region.as_deref(), Some("eu-west-3"));
}
#[test]
fn no_cloud_region_yields_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request("order-svc", vec![span]);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
#[test]
fn cloud_region_with_space_is_sanitized_to_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", "eu west 3"),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert!(
events[0].cloud_region.is_none(),
"region with space must be sanitized to None"
);
}
#[test]
fn oversized_cloud_region_is_sanitized_to_none() {
let long_region = "a".repeat(65);
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", &long_region),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
#[test]
fn cloud_region_with_control_char_is_sanitized_to_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", "eu-west-3\n2026-04-07 ERROR fake"),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
#[test]
fn cloud_region_span_level_fallback_also_sanitized() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
span.attributes.push(make_kv("cloud.region", "bad region!"));
let req = make_request("order-svc", vec![span]);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
#[test]
fn cloud_region_empty_string_is_sanitized_to_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", ""),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
}