use std::collections::HashMap;
use std::sync::Arc;
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use opentelemetry_proto::tonic::common::v1::{KeyValue, any_value};
use opentelemetry_proto::tonic::trace::v1::Span;
use tonic::{Request, Response, Status, async_trait};
use crate::event::{EventSource, EventType, SpanEvent};
use crate::report::metrics::{OtlpRejectReason, OtlpSpanFilterReason};
pub trait MetricsSink: Send + Sync {
fn record_otlp_reject(&self, reason: OtlpRejectReason);
fn record_otlp_spans(&self, stats: SpanConversionStats);
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct SpanConversionStats {
pub received: u64,
pub filtered_not_io: u64,
pub filtered_missing_db_statement: u64,
pub filtered_missing_http_url: u64,
}
impl SpanConversionStats {
fn count_filtered(&mut self, reason: OtlpSpanFilterReason) {
match reason {
OtlpSpanFilterReason::NotIo => self.filtered_not_io += 1,
OtlpSpanFilterReason::MissingDbStatement => self.filtered_missing_db_statement += 1,
OtlpSpanFilterReason::MissingHttpUrl => self.filtered_missing_http_url += 1,
}
}
#[must_use]
pub fn filtered_counts(&self) -> [(OtlpSpanFilterReason, u64); 3] {
[
(OtlpSpanFilterReason::NotIo, self.filtered_not_io),
(
OtlpSpanFilterReason::MissingDbStatement,
self.filtered_missing_db_statement,
),
(
OtlpSpanFilterReason::MissingHttpUrl,
self.filtered_missing_http_url,
),
]
}
}
fn bytes_to_hex(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len() * 2);
for &b in bytes {
out.push(HEX[(b >> 4) as usize] as char);
out.push(HEX[(b & 0x0f) as usize] as char);
}
out
}
use crate::time::nanos_to_iso8601;
#[inline]
fn any_value_as_str(value: Option<&any_value::Value>) -> Option<&str> {
match value {
Some(any_value::Value::StringValue(s)) => Some(s.as_str()),
_ => None,
}
}
#[inline]
fn any_value_as_int(value: Option<&any_value::Value>) -> Option<i64> {
match value {
Some(any_value::Value::IntValue(i)) => Some(*i),
_ => None,
}
}
fn get_str_attribute<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a str> {
attrs
.iter()
.find(|kv| kv.key == key)
.and_then(|kv| any_value_as_str(kv.value.as_ref().and_then(|v| v.value.as_ref())))
}
const CODE_ATTRS_MAX_DEPTH: usize = 8;
const MAX_SPANS_PER_RESOURCE: usize = 100_000;
#[derive(Default, Clone, Copy)]
struct CodeAttrs<'a> {
function_name: Option<&'a str>,
filepath: Option<&'a str>,
lineno: Option<i64>,
namespace: Option<&'a str>,
}
impl CodeAttrs<'_> {
#[inline]
fn has_any(&self) -> bool {
self.function_name.is_some()
|| self.filepath.is_some()
|| self.lineno.is_some()
|| self.namespace.is_some()
}
}
#[derive(Default)]
struct ClassifiedAttrs<'a> {
db_statement: Option<&'a str>,
db_query_text: Option<&'a str>,
db_system: Option<&'a str>,
http_url: Option<&'a str>,
url_full: Option<&'a str>,
http_method: Option<&'a str>,
http_request_method: Option<&'a str>,
http_status_code: Option<i64>,
http_response_status_code: Option<i64>,
http_response_body_size: Option<i64>,
http_response_content_length: Option<i64>,
cloud_region: Option<&'a str>,
code_function_name: Option<&'a str>,
code_function: Option<&'a str>,
code_file_path: Option<&'a str>,
code_filepath: Option<&'a str>,
code_line_number: Option<i64>,
code_lineno: Option<i64>,
code_namespace: Option<&'a str>,
}
impl<'a> ClassifiedAttrs<'a> {
fn code_attrs(&self) -> CodeAttrs<'a> {
let function_name = self.code_function_name.or(self.code_function);
let filepath = self.code_file_path.or(self.code_filepath);
let lineno = self.code_line_number.or(self.code_lineno);
let namespace = self.code_namespace.or_else(|| {
self.code_function_name
.and_then(|fq| fq.rsplit_once('.').map(|(ns, _)| ns))
});
CodeAttrs {
function_name,
filepath,
lineno,
namespace,
}
}
}
fn classify_span_attrs(attrs: &[KeyValue]) -> ClassifiedAttrs<'_> {
let mut out = ClassifiedAttrs::default();
for kv in attrs {
let value = kv.value.as_ref().and_then(|v| v.value.as_ref());
match kv.key.as_str() {
"db.statement" => out.db_statement = any_value_as_str(value),
"db.query.text" => out.db_query_text = any_value_as_str(value),
"db.system" => out.db_system = any_value_as_str(value),
"http.url" => out.http_url = any_value_as_str(value),
"url.full" => out.url_full = any_value_as_str(value),
"http.method" => out.http_method = any_value_as_str(value),
"http.request.method" => out.http_request_method = any_value_as_str(value),
"http.status_code" => out.http_status_code = any_value_as_int(value),
"http.response.status_code" => out.http_response_status_code = any_value_as_int(value),
"http.response.body.size" => out.http_response_body_size = any_value_as_int(value),
"http.response_content_length" => {
out.http_response_content_length = any_value_as_int(value);
}
"cloud.region" => out.cloud_region = any_value_as_str(value),
"code.function.name" => out.code_function_name = any_value_as_str(value),
"code.function" => out.code_function = any_value_as_str(value),
"code.file.path" => out.code_file_path = any_value_as_str(value),
"code.filepath" => out.code_filepath = any_value_as_str(value),
"code.line.number" => out.code_line_number = any_value_as_int(value),
"code.lineno" => out.code_lineno = any_value_as_int(value),
"code.namespace" => out.code_namespace = any_value_as_str(value),
_ => {}
}
}
out
}
fn read_code_attrs(attrs: &[KeyValue]) -> CodeAttrs<'_> {
let mut function_name_stable = None;
let mut function_name_legacy = None;
let mut filepath_stable = None;
let mut filepath_legacy = None;
let mut lineno_stable = None;
let mut lineno_legacy = None;
let mut namespace_explicit = None;
for kv in attrs {
let value = kv.value.as_ref().and_then(|v| v.value.as_ref());
match kv.key.as_str() {
"code.function.name" => function_name_stable = any_value_as_str(value),
"code.function" => function_name_legacy = any_value_as_str(value),
"code.file.path" => filepath_stable = any_value_as_str(value),
"code.filepath" => filepath_legacy = any_value_as_str(value),
"code.line.number" => lineno_stable = any_value_as_int(value),
"code.lineno" => lineno_legacy = any_value_as_int(value),
"code.namespace" => namespace_explicit = any_value_as_str(value),
_ => {}
}
}
let namespace = namespace_explicit
.or_else(|| function_name_stable.and_then(|fq| fq.rsplit_once('.').map(|(ns, _)| ns)));
CodeAttrs {
function_name: function_name_stable.or(function_name_legacy),
filepath: filepath_stable.or(filepath_legacy),
lineno: lineno_stable.or(lineno_legacy),
namespace,
}
}
fn walk_parents_for_code_attrs<'a>(
leaf: CodeAttrs<'a>,
parent_span_id: &[u8],
span_index: &HashMap<&[u8], &'a Span>,
) -> CodeAttrs<'a> {
if leaf.has_any() || parent_span_id.is_empty() {
return leaf;
}
let mut current_parent_id = parent_span_id;
let mut depth = 0;
loop {
let Some(parent) = span_index.get(current_parent_id) else {
return CodeAttrs::default();
};
let attrs = read_code_attrs(&parent.attributes);
if attrs.has_any() {
return attrs;
}
if parent.parent_span_id.is_empty() || depth >= CODE_ATTRS_MAX_DEPTH {
return CodeAttrs::default();
}
current_parent_id = parent.parent_span_id.as_slice();
depth += 1;
}
}
fn build_span_index(
resource_spans: &opentelemetry_proto::tonic::trace::v1::ResourceSpans,
) -> HashMap<&[u8], &Span> {
let mut index: HashMap<&[u8], &Span> = HashMap::new();
let mut count = 0usize;
'outer: for scope_spans in &resource_spans.scope_spans {
for span in &scope_spans.spans {
index.insert(&span.span_id, span);
count += 1;
if count >= MAX_SPANS_PER_RESOURCE {
tracing::warn!(
"OTLP span index capped at {} entries, parent lookup may be degraded for remaining spans",
MAX_SPANS_PER_RESOURCE
);
break 'outer;
}
}
}
index
}
fn build_scope_index(
resource_spans: &opentelemetry_proto::tonic::trace::v1::ResourceSpans,
) -> HashMap<&[u8], &str> {
let mut index: HashMap<&[u8], &str> = HashMap::new();
let mut count = 0usize;
'outer: for scope_spans in &resource_spans.scope_spans {
let scope_name = scope_spans.scope.as_ref().map_or("", |s| s.name.as_str());
if scope_name.is_empty() {
continue;
}
for span in &scope_spans.spans {
index.insert(&span.span_id, scope_name);
count += 1;
if count >= MAX_SPANS_PER_RESOURCE {
break 'outer;
}
}
}
index
}
fn collect_instrumentation_scopes(
span: &Span,
span_index: &HashMap<&[u8], &Span>,
scope_index: &HashMap<&[u8], &str>,
) -> Vec<Arc<str>> {
let mut out: Vec<Arc<str>> = Vec::new();
let mut current = span;
let mut depth = 0;
loop {
if let Some(name) = scope_index.get(current.span_id.as_slice())
&& !out.iter().any(|s| s.as_ref() == *name)
{
out.push(Arc::from(*name));
}
if current.parent_span_id.is_empty() || depth >= CODE_ATTRS_MAX_DEPTH {
return out;
}
let Some(parent) = span_index.get(current.parent_span_id.as_slice()) else {
return out;
};
current = *parent;
depth += 1;
}
}
#[must_use]
pub fn convert_otlp_request(request: &ExportTraceServiceRequest) -> Vec<SpanEvent> {
convert_otlp_request_counted(request).0
}
#[must_use]
pub fn convert_otlp_request_counted(
request: &ExportTraceServiceRequest,
) -> (Vec<SpanEvent>, SpanConversionStats) {
let mut events = Vec::new();
let mut stats = SpanConversionStats::default();
for resource_spans in &request.resource_spans {
let service_arc: Arc<str> = Arc::from(
resource_spans
.resource
.as_ref()
.and_then(|r| get_str_attribute(&r.attributes, "service.name"))
.unwrap_or("unknown"),
);
let resource_cloud_region: Option<Arc<str>> = resource_spans
.resource
.as_ref()
.and_then(|r| get_str_attribute(&r.attributes, "cloud.region"))
.filter(|s| crate::score::carbon::is_valid_region_id(s))
.map(Arc::from);
let span_index = build_span_index(resource_spans);
let scope_index = build_scope_index(resource_spans);
for scope_spans in &resource_spans.scope_spans {
for span in &scope_spans.spans {
stats.received += 1;
match convert_span(
span,
&service_arc,
resource_cloud_region.as_ref(),
&span_index,
&scope_index,
) {
Ok(event) => events.push(event),
Err(reason) => stats.count_filtered(reason),
}
}
}
}
(events, stats)
}
fn span_filter_reason(classified: &ClassifiedAttrs<'_>, kind: i32) -> OtlpSpanFilterReason {
let server = kind == opentelemetry_proto::tonic::trace::v1::span::SpanKind::Server as i32;
if classified.db_system.is_some() {
OtlpSpanFilterReason::MissingDbStatement
} else if !server
&& classified
.http_method
.or(classified.http_request_method)
.is_some()
{
OtlpSpanFilterReason::MissingHttpUrl
} else {
OtlpSpanFilterReason::NotIo
}
}
fn convert_span(
span: &Span,
service_arc: &Arc<str>,
resource_cloud_region: Option<&Arc<str>>,
span_index: &HashMap<&[u8], &Span>,
scope_index: &HashMap<&[u8], &str>,
) -> Result<SpanEvent, OtlpSpanFilterReason> {
let classified = classify_span_attrs(&span.attributes);
let (event_type, target, operation) =
if let Some(statement) = classified.db_statement.or(classified.db_query_text) {
let op = classified.db_system.unwrap_or("sql").to_string();
(EventType::Sql, statement.to_string(), op)
} else if let Some(url) = classified.http_url.or(classified.url_full) {
let method = classified
.http_method
.or(classified.http_request_method)
.unwrap_or("GET")
.to_string();
(EventType::HttpOut, url.to_string(), method)
} else {
return Err(span_filter_reason(&classified, span.kind));
};
let start_nanos = span.start_time_unix_nano;
let end_nanos = span.end_time_unix_nano;
let timestamp = nanos_to_iso8601(start_nanos);
if end_nanos < start_nanos {
tracing::trace!("Span has end_time < start_time (clock skew?), duration forced to 0");
}
let duration_us = end_nanos.saturating_sub(start_nanos) / 1000;
let trace_id = bytes_to_hex(&span.trace_id);
let span_id = bytes_to_hex(&span.span_id);
let status_code = if event_type == EventType::HttpOut {
classified
.http_status_code
.or(classified.http_response_status_code)
.and_then(|c| u16::try_from(c).ok())
} else {
None
};
let response_size_bytes = if event_type == EventType::HttpOut {
classified
.http_response_body_size
.or(classified.http_response_content_length)
.and_then(|v| u64::try_from(v).ok())
} else {
None
};
let (source_endpoint, source_method) = if span.parent_span_id.is_empty() {
("unknown".to_string(), span.name.clone())
} else if let Some(parent) = span_index.get(span.parent_span_id.as_slice()) {
let endpoint = get_str_attribute(&parent.attributes, "http.route")
.or_else(|| get_str_attribute(&parent.attributes, "http.url"))
.or_else(|| get_str_attribute(&parent.attributes, "url.full"))
.unwrap_or("unknown")
.to_string();
let method = get_str_attribute(&parent.attributes, "code.function")
.map_or_else(|| parent.name.clone(), ToString::to_string);
(endpoint, method)
} else {
("unknown".to_string(), span.name.clone())
};
let parent_span_id = if span.parent_span_id.is_empty() {
None
} else {
Some(bytes_to_hex(&span.parent_span_id))
};
let cloud_region: Option<Arc<str>> = resource_cloud_region.cloned().or_else(|| {
classified
.cloud_region
.filter(|s| crate::score::carbon::is_valid_region_id(s))
.map(Arc::from)
});
let code =
walk_parents_for_code_attrs(classified.code_attrs(), &span.parent_span_id, span_index);
let code_function: Option<Arc<str>> = code.function_name.map(Arc::from);
let code_filepath: Option<Arc<str>> = code.filepath.map(Arc::from);
let code_lineno = code.lineno.and_then(|v| u32::try_from(v).ok());
let code_namespace: Option<Arc<str>> = code.namespace.map(Arc::from);
let instrumentation_scopes = collect_instrumentation_scopes(span, span_index, scope_index);
let mut event = SpanEvent {
timestamp,
trace_id,
span_id,
parent_span_id,
service: Arc::clone(service_arc),
cloud_region,
event_type,
operation,
target,
duration_us,
source: EventSource {
endpoint: source_endpoint,
method: source_method,
},
status_code,
response_size_bytes,
code_function,
code_filepath,
code_lineno,
code_namespace,
instrumentation_scopes,
};
crate::event::sanitize_span_event(&mut event);
Ok(event)
}
const INGEST_ENQUEUE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
pub struct OtlpGrpcService {
sender: tokio::sync::mpsc::Sender<Vec<SpanEvent>>,
metrics: Option<Arc<dyn MetricsSink>>,
}
impl OtlpGrpcService {
#[must_use]
pub fn new(
sender: tokio::sync::mpsc::Sender<Vec<SpanEvent>>,
metrics: Option<Arc<dyn MetricsSink>>,
) -> Self {
Self { sender, metrics }
}
}
#[async_trait]
impl opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService
for OtlpGrpcService
{
async fn export(
&self,
request: Request<ExportTraceServiceRequest>,
) -> Result<Response<ExportTraceServiceResponse>, Status> {
let (events, stats) = convert_otlp_request_counted(request.get_ref());
if let Some(m) = self.metrics.as_ref() {
m.record_otlp_spans(stats);
}
if !events.is_empty()
&& let Err(e) = self
.sender
.send_timeout(events, INGEST_ENQUEUE_TIMEOUT)
.await
{
if let Some(m) = self.metrics.as_ref() {
m.record_otlp_reject(OtlpRejectReason::ChannelFull);
}
return Err(match e {
tokio::sync::mpsc::error::SendTimeoutError::Timeout(_) => {
Status::unavailable("ingest queue full, retry")
}
tokio::sync::mpsc::error::SendTimeoutError::Closed(_) => {
Status::internal("event channel closed")
}
});
}
Ok(Response::new(ExportTraceServiceResponse {
partial_success: None,
}))
}
}
#[derive(Clone)]
struct OtlpHttpState {
sender: tokio::sync::mpsc::Sender<Vec<SpanEvent>>,
metrics: Option<Arc<dyn MetricsSink>>,
}
pub fn otlp_http_router(
sender: tokio::sync::mpsc::Sender<Vec<SpanEvent>>,
max_payload_size: usize,
metrics: Option<Arc<dyn MetricsSink>>,
) -> axum::Router {
use axum::{
Router,
extract::State,
http::{HeaderMap, StatusCode, header},
routing::post,
};
async fn handle_traces(
State(state): State<OtlpHttpState>,
headers: HeaderMap,
body: axum::body::Bytes,
) -> StatusCode {
let content_type_ok = headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.is_some_and(|ct| {
let base = ct.split(';').next().unwrap_or("").trim();
base.eq_ignore_ascii_case("application/x-protobuf")
});
if !content_type_ok {
if let Some(m) = state.metrics.as_ref() {
m.record_otlp_reject(OtlpRejectReason::UnsupportedMediaType);
}
return StatusCode::UNSUPPORTED_MEDIA_TYPE;
}
let Ok(request) = <ExportTraceServiceRequest as prost::Message>::decode(body.as_ref())
else {
if let Some(m) = state.metrics.as_ref() {
m.record_otlp_reject(OtlpRejectReason::ParseError);
}
return StatusCode::BAD_REQUEST;
};
let (events, stats) = convert_otlp_request_counted(&request);
if let Some(m) = state.metrics.as_ref() {
m.record_otlp_spans(stats);
}
if !events.is_empty()
&& state
.sender
.send_timeout(events, INGEST_ENQUEUE_TIMEOUT)
.await
.is_err()
{
tracing::warn!("OTLP HTTP: event channel full or closed, dropping events");
if let Some(m) = state.metrics.as_ref() {
m.record_otlp_reject(OtlpRejectReason::ChannelFull);
}
return StatusCode::SERVICE_UNAVAILABLE;
}
StatusCode::OK
}
const MAX_CONCURRENT_OTLP_HTTP: usize = 32;
let state = OtlpHttpState { sender, metrics };
let router = Router::new()
.route("/v1/traces", post(handle_traces))
.route_layer(tower::limit::GlobalConcurrencyLimitLayer::new(
MAX_CONCURRENT_OTLP_HTTP,
))
.with_state(state)
.layer(axum::extract::DefaultBodyLimit::max(max_payload_size));
#[cfg(feature = "daemon")]
let router = router
.layer(tower_http::decompression::RequestDecompressionLayer::new())
.layer(tower_http::limit::RequestBodyLimitLayer::new(
max_payload_size,
));
router
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "daemon")]
use crate::report::metrics::MetricsState;
use opentelemetry_proto::tonic::common::v1::AnyValue;
use opentelemetry_proto::tonic::resource::v1::Resource;
use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans};
#[cfg(feature = "daemon")]
fn fresh_metrics_sink() -> (Arc<MetricsState>, Arc<dyn MetricsSink>) {
let state = Arc::new(MetricsState::new());
let sink: Arc<dyn MetricsSink> = state.clone();
(state, sink)
}
fn make_kv(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue(value.to_string())),
}),
..Default::default()
}
}
fn make_int_kv(key: &str, value: i64) -> KeyValue {
KeyValue {
key: key.to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::IntValue(value)),
}),
..Default::default()
}
}
fn make_sql_span(
trace_id: &[u8],
span_id: &[u8],
parent_span_id: &[u8],
statement: &str,
start_ns: u64,
end_ns: u64,
) -> Span {
Span {
trace_id: trace_id.to_vec(),
span_id: span_id.to_vec(),
parent_span_id: parent_span_id.to_vec(),
name: "db.query".to_string(),
start_time_unix_nano: start_ns,
end_time_unix_nano: end_ns,
attributes: vec![
make_kv("db.statement", statement),
make_kv("db.system", "postgresql"),
],
..Default::default()
}
}
#[allow(clippy::too_many_arguments)] fn make_http_span(
trace_id: &[u8],
span_id: &[u8],
parent_span_id: &[u8],
url: &str,
method: &str,
status: i64,
start_ns: u64,
end_ns: u64,
) -> Span {
Span {
trace_id: trace_id.to_vec(),
span_id: span_id.to_vec(),
parent_span_id: parent_span_id.to_vec(),
name: "http.request".to_string(),
start_time_unix_nano: start_ns,
end_time_unix_nano: end_ns,
attributes: vec![
make_kv("http.url", url),
make_kv("http.method", method),
make_int_kv("http.status_code", status),
],
..Default::default()
}
}
fn make_parent_span(span_id: &[u8], route: &str) -> Span {
Span {
trace_id: vec![1; 16],
span_id: span_id.to_vec(),
parent_span_id: vec![],
name: "HandleRequest".to_string(),
start_time_unix_nano: 0,
end_time_unix_nano: 1_000_000_000,
attributes: vec![
make_kv("http.route", route),
make_kv("code.function", "OrderService::create_order"),
],
..Default::default()
}
}
fn make_request(service: &str, spans: Vec<Span>) -> ExportTraceServiceRequest {
ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: vec![make_kv("service.name", service)],
..Default::default()
}),
scope_spans: vec![ScopeSpans {
spans,
..Default::default()
}],
..Default::default()
}],
}
}
#[test]
fn empty_request_returns_empty() {
let req = ExportTraceServiceRequest {
resource_spans: vec![],
};
assert!(convert_otlp_request(&req).is_empty());
}
#[test]
fn sql_span_maps_correctly() {
let span = make_sql_span(
&[1; 16],
&[2; 8],
&[],
"SELECT * FROM order_item WHERE order_id = 42",
1_720_621_921_000_000_000, 1_720_621_921_001_200_000, );
let req = make_request("order-svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, EventType::Sql);
assert_eq!(events[0].operation, "postgresql");
assert_eq!(
events[0].target,
"SELECT * FROM order_item WHERE order_id = 42"
);
assert_eq!(&*events[0].service, "order-svc");
assert_eq!(events[0].duration_us, 1200);
assert!(events[0].status_code.is_none());
}
#[test]
fn http_span_maps_correctly() {
let span = make_http_span(
&[1; 16],
&[3; 8],
&[],
"http://user-svc:5000/api/users/123",
"GET",
200,
1_720_621_921_000_000_000,
1_720_621_921_015_000_000, );
let req = make_request("order-svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, EventType::HttpOut);
assert_eq!(events[0].operation, "GET");
assert_eq!(events[0].target, "http://user-svc:5000/api/users/123");
assert_eq!(events[0].status_code, Some(200));
assert_eq!(events[0].duration_us, 15000);
}
#[test]
fn non_io_span_skipped() {
let span = Span {
trace_id: vec![1; 16],
span_id: vec![4; 8],
name: "internal.processing".to_string(),
start_time_unix_nano: 1_720_621_921_000_000_000,
end_time_unix_nano: 1_720_619_521_000_500_000,
attributes: vec![make_kv("custom.attr", "value")],
..Default::default()
};
let req = make_request("order-svc", vec![span]);
assert!(convert_otlp_request(&req).is_empty());
}
fn make_bare_span(span_id: &[u8], attributes: Vec<KeyValue>) -> Span {
Span {
trace_id: vec![1; 16],
span_id: span_id.to_vec(),
name: "fixture".to_string(),
start_time_unix_nano: 1_720_621_921_000_000_000,
end_time_unix_nano: 1_720_621_921_000_500_000,
attributes,
..Default::default()
}
}
#[test]
fn counted_conversion_classifies_filtered_spans() {
let internal = make_bare_span(&[4; 8], vec![make_kv("custom.attr", "value")]);
let db_no_statement = make_bare_span(&[5; 8], vec![make_kv("db.system", "postgresql")]);
let http_no_url = make_bare_span(&[6; 8], vec![make_kv("http.method", "GET")]);
let sql = make_sql_span(&[1; 16], &[7; 8], &[], "SELECT 1", 0, 1000);
let req = make_request(
"order-svc",
vec![internal, db_no_statement, http_no_url, sql],
);
let (events, stats) = convert_otlp_request_counted(&req);
assert_eq!(events.len(), 1);
assert_eq!(
stats,
SpanConversionStats {
received: 4,
filtered_not_io: 1,
filtered_missing_db_statement: 1,
filtered_missing_http_url: 1,
}
);
}
#[test]
fn server_span_without_url_counts_not_io_not_missing_url() {
use opentelemetry_proto::tonic::trace::v1::span::SpanKind;
let mut server = make_bare_span(&[5; 8], vec![make_kv("http.request.method", "GET")]);
server.kind = SpanKind::Server as i32;
let mut client = make_bare_span(&[6; 8], vec![make_kv("http.request.method", "GET")]);
client.kind = SpanKind::Client as i32;
let req = make_request("order-svc", vec![server, client]);
let (events, stats) = convert_otlp_request_counted(&req);
assert!(events.is_empty());
assert_eq!(
stats,
SpanConversionStats {
received: 2,
filtered_not_io: 1,
filtered_missing_db_statement: 0,
filtered_missing_http_url: 1,
}
);
}
#[test]
fn counted_conversion_all_filtered_yields_zero_events() {
let internal = make_bare_span(&[4; 8], vec![make_kv("custom.attr", "value")]);
let req = make_request("order-svc", vec![internal]);
let (events, stats) = convert_otlp_request_counted(&req);
assert!(events.is_empty());
assert_eq!(stats.received, 1);
assert_eq!(stats.filtered_not_io, 1);
}
#[cfg(feature = "daemon")]
#[test]
fn record_otlp_spans_moves_received_and_filtered_counters() {
let (state, sink) = fresh_metrics_sink();
sink.record_otlp_spans(SpanConversionStats {
received: 5,
filtered_not_io: 2,
filtered_missing_db_statement: 1,
filtered_missing_http_url: 0,
});
assert_eq!(state.otlp_spans_received_total.get(), 5);
let filtered = |reason: &str| {
state
.otlp_spans_filtered_total
.with_label_values(&[reason])
.get()
};
assert_eq!(filtered("not_io"), 2);
assert_eq!(filtered("missing_db_statement"), 1);
assert_eq!(filtered("missing_http_url"), 0);
}
#[test]
fn parent_span_provides_source_endpoint() {
let parent = make_parent_span(&[10; 8], "POST /api/orders/{id}/submit");
let child = make_sql_span(
&[1; 16],
&[20; 8],
&[10; 8], "SELECT * FROM order_item WHERE order_id = 42",
1_720_621_921_000_000_000,
1_720_621_921_001_200_000,
);
let req = make_request("order-svc", vec![parent, child]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].source.endpoint, "POST /api/orders/{id}/submit");
assert_eq!(events[0].source.method, "OrderService::create_order");
}
#[test]
fn parent_span_http_route_takes_precedence_over_http_url() {
let parent = Span {
trace_id: vec![1; 16],
span_id: vec![10; 8],
parent_span_id: vec![],
name: "HandleRequest".to_string(),
start_time_unix_nano: 0,
end_time_unix_nano: 1_000_000_000,
attributes: vec![
make_kv("http.route", "POST /api/orders/{id}/submit"),
make_kv("http.url", "http://order-svc/api/orders/42/submit"),
make_kv("code.function", "OrderService::create_order"),
],
..Default::default()
};
let child = make_sql_span(
&[1; 16],
&[20; 8],
&[10; 8],
"SELECT * FROM order_item WHERE order_id = 42",
1_720_621_921_000_000_000,
1_720_621_921_001_200_000,
);
let req = make_request("order-svc", vec![parent, child]);
let events = convert_otlp_request(&req);
let sql = events
.iter()
.find(|e| e.event_type == EventType::Sql)
.expect("sql child event present");
assert_eq!(sql.source.endpoint, "POST /api/orders/{id}/submit");
}
#[test]
fn parent_span_http_url_used_only_when_route_absent() {
let parent = Span {
trace_id: vec![1; 16],
span_id: vec![10; 8],
parent_span_id: vec![],
name: "HandleRequest".to_string(),
start_time_unix_nano: 0,
end_time_unix_nano: 1_000_000_000,
attributes: vec![make_kv("http.url", "http://order-svc/api/orders/42/submit")],
..Default::default()
};
let child = make_sql_span(&[1; 16], &[20; 8], &[10; 8], "SELECT 1", 0, 1_000_000);
let req = make_request("order-svc", vec![parent, child]);
let events = convert_otlp_request(&req);
let sql = events
.iter()
.find(|e| e.event_type == EventType::Sql)
.expect("sql child event present");
assert_eq!(sql.source.endpoint, "http://order-svc/api/orders/42/submit");
}
#[test]
fn parent_span_url_full_used_when_neither_route_nor_url_present() {
let parent = Span {
trace_id: vec![1; 16],
span_id: vec![10; 8],
parent_span_id: vec![],
name: "HandleRequest".to_string(),
start_time_unix_nano: 0,
end_time_unix_nano: 1_000_000_000,
attributes: vec![make_kv("url.full", "http://order-svc/api/orders/42")],
..Default::default()
};
let child = make_sql_span(&[1; 16], &[20; 8], &[10; 8], "SELECT 1", 0, 1_000_000);
let req = make_request("order-svc", vec![parent, child]);
let events = convert_otlp_request(&req);
let sql = events
.iter()
.find(|e| e.event_type == EventType::Sql)
.expect("sql child event present");
assert_eq!(sql.source.endpoint, "http://order-svc/api/orders/42");
}
#[test]
fn missing_parent_falls_back() {
let child = make_sql_span(
&[1; 16],
&[20; 8],
&[99; 8], "SELECT * FROM order_item WHERE order_id = 42",
1_720_621_921_000_000_000,
1_720_621_921_001_200_000,
);
let req = make_request("order-svc", vec![child]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].source.endpoint, "unknown");
assert_eq!(events[0].source.method, "db.query");
}
#[test]
fn trace_id_hex_encoding() {
let trace_bytes: Vec<u8> = (0..16).collect();
assert_eq!(
bytes_to_hex(&trace_bytes),
"000102030405060708090a0b0c0d0e0f"
);
}
#[test]
fn timestamp_nanos_to_iso8601() {
let nanos: u64 = 1_720_621_921_123_000_000;
let iso = nanos_to_iso8601(nanos);
assert_eq!(iso, "2024-07-10T14:32:01.123Z");
}
#[test]
fn timestamp_epoch_zero() {
assert_eq!(nanos_to_iso8601(0), "1970-01-01T00:00:00.000Z");
}
#[test]
fn duration_calculation() {
let span = make_sql_span(
&[1; 16],
&[2; 8],
&[],
"SELECT 1",
1_000_000_000, 1_002_500_000, );
let req = make_request("test", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].duration_us, 2500);
}
#[test]
fn status_code_extraction() {
let span = make_http_span(
&[1; 16],
&[3; 8],
&[],
"http://svc/api/health",
"GET",
404,
1_000_000_000,
1_001_000_000,
);
let req = make_request("test", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].status_code, Some(404));
}
#[test]
fn service_name_from_resource() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request("my-service", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(&*events[0].service, "my-service");
}
#[test]
fn span_with_both_db_and_http_prefers_sql() {
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value};
let mut span = make_sql_span(
&[1; 16],
&[2; 8],
&[],
"SELECT 1",
1_000_000_000,
1_001_000_000,
);
span.attributes.push(KeyValue {
key: "http.url".to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("http://svc/api".to_string())),
}),
..Default::default()
});
let req = make_request("test", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].event_type, EventType::Sql);
}
#[test]
fn clock_skew_duration_is_zero() {
let span = make_sql_span(
&[1; 16],
&[2; 8],
&[],
"SELECT 1",
2_000_000_000, 1_000_000_000, );
let req = make_request("test", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events[0].duration_us, 0);
}
#[test]
fn bytes_to_hex_empty() {
assert_eq!(bytes_to_hex(&[]), "");
}
#[test]
fn bytes_to_hex_all_values() {
assert_eq!(bytes_to_hex(&[0x00, 0xff, 0xab]), "00ffab");
}
#[test]
fn nanos_to_iso8601_leap_year() {
let nanos: u64 = 1_709_164_800_000_000_000;
let iso = nanos_to_iso8601(nanos);
assert_eq!(iso, "2024-02-29T00:00:00.000Z");
}
#[test]
fn empty_trace_id_produces_empty_hex() {
assert_eq!(bytes_to_hex(&[]), "");
}
#[test]
fn short_span_id_produces_short_hex() {
assert_eq!(bytes_to_hex(&[0xab]), "ab");
}
#[test]
fn missing_service_name_defaults_to_unknown() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: vec![], ..Default::default()
}),
scope_spans: vec![ScopeSpans {
spans: vec![span],
..Default::default()
}],
..Default::default()
}],
};
let events = convert_otlp_request(&req);
assert_eq!(&*events[0].service, "unknown");
}
#[test]
fn no_resource_defaults_to_unknown_service() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: None,
scope_spans: vec![ScopeSpans {
spans: vec![span],
..Default::default()
}],
..Default::default()
}],
};
let events = convert_otlp_request(&req);
assert_eq!(&*events[0].service, "unknown");
}
fn make_request_with_resource_attrs(
attrs: Vec<KeyValue>,
spans: Vec<Span>,
) -> ExportTraceServiceRequest {
ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: attrs,
..Default::default()
}),
scope_spans: vec![ScopeSpans {
spans,
..Default::default()
}],
..Default::default()
}],
}
}
#[test]
fn cloud_region_extracted_from_resource_attributes() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", "eu-west-3"),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].cloud_region.as_deref(), Some("eu-west-3"));
}
#[test]
fn cloud_region_falls_back_to_span_attribute() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
span.attributes.push(make_kv("cloud.region", "us-east-1"));
let req = make_request_with_resource_attrs(
vec![make_kv("service.name", "order-svc")],
vec![span],
);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].cloud_region.as_deref(), Some("us-east-1"));
}
#[test]
fn cloud_region_resource_wins_over_span() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
span.attributes.push(make_kv("cloud.region", "us-east-1"));
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", "eu-west-3"),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert_eq!(events[0].cloud_region.as_deref(), Some("eu-west-3"));
}
#[test]
fn no_cloud_region_yields_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request("order-svc", vec![span]);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
#[test]
fn cloud_region_with_space_is_sanitized_to_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", "eu west 3"),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert!(
events[0].cloud_region.is_none(),
"region with space must be sanitized to None"
);
}
#[test]
fn oversized_cloud_region_is_sanitized_to_none() {
let long_region = "a".repeat(65);
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", &long_region),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
#[test]
fn cloud_region_with_control_char_is_sanitized_to_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", "eu-west-3\n2026-04-07 ERROR fake"),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
#[test]
fn cloud_region_span_level_fallback_also_sanitized() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
span.attributes.push(make_kv("cloud.region", "bad region!"));
let req = make_request("order-svc", vec![span]);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
fn scoped_request(service: &str, scoped: Vec<(&str, Vec<Span>)>) -> ExportTraceServiceRequest {
use opentelemetry_proto::tonic::common::v1::InstrumentationScope;
ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: vec![make_kv("service.name", service)],
..Default::default()
}),
scope_spans: scoped
.into_iter()
.map(|(name, spans)| ScopeSpans {
scope: Some(InstrumentationScope {
name: name.to_string(),
..Default::default()
}),
spans,
..Default::default()
})
.collect(),
..Default::default()
}],
}
}
#[test]
fn instrumentation_scope_captured_from_leaf_only() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
let req = scoped_request("svc", vec![("io.opentelemetry.jdbc", vec![span])]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
let scopes: Vec<&str> = events[0]
.instrumentation_scopes
.iter()
.map(AsRef::as_ref)
.collect();
assert_eq!(scopes, vec!["io.opentelemetry.jdbc"]);
}
#[test]
fn instrumentation_scopes_walk_parent_chain_deduped() {
let http = make_span_with_code_attrs(
&[10; 8],
&[],
"GET /api/orders",
vec![make_kv("http.route", "GET /api/orders")],
);
let spring_data =
make_span_with_code_attrs(&[11; 8], &[10; 8], "OrderRepository.findById", vec![]);
let hibernate = make_span_with_code_attrs(&[12; 8], &[11; 8], "Session.find", vec![]);
let jdbc = make_sql_span(&[1; 16], &[13; 8], &[12; 8], "SELECT 1", 0, 1_000_000);
let req = scoped_request(
"svc",
vec![
("io.opentelemetry.spring-webmvc-6.0", vec![http]),
("io.opentelemetry.spring-data-3.0", vec![spring_data]),
("io.opentelemetry.hibernate-6.0", vec![hibernate]),
("io.opentelemetry.jdbc", vec![jdbc]),
],
);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1, "only the JDBC span yields a SpanEvent");
let scopes: Vec<&str> = events[0]
.instrumentation_scopes
.iter()
.map(AsRef::as_ref)
.collect();
assert_eq!(
scopes,
vec![
"io.opentelemetry.jdbc",
"io.opentelemetry.hibernate-6.0",
"io.opentelemetry.spring-data-3.0",
"io.opentelemetry.spring-webmvc-6.0",
],
"leaf-to-root order, deduplicated"
);
}
#[test]
fn instrumentation_scopes_empty_when_scope_absent() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
let req = make_request("svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert!(events[0].instrumentation_scopes.is_empty());
}
#[test]
fn cloud_region_empty_string_is_sanitized_to_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1000);
let req = make_request_with_resource_attrs(
vec![
make_kv("service.name", "order-svc"),
make_kv("cloud.region", ""),
],
vec![span],
);
let events = convert_otlp_request(&req);
assert!(events[0].cloud_region.is_none());
}
fn make_span_with_code_attrs(
span_id: &[u8],
parent_span_id: &[u8],
name: &str,
code_attrs: Vec<KeyValue>,
) -> Span {
Span {
trace_id: vec![1; 16],
span_id: span_id.to_vec(),
parent_span_id: parent_span_id.to_vec(),
name: name.to_string(),
start_time_unix_nano: 0,
end_time_unix_nano: 1_000_000,
attributes: code_attrs,
..Default::default()
}
}
#[test]
fn code_attrs_inherited_from_immediate_parent() {
let parent = make_span_with_code_attrs(
&[10; 8],
&[],
"GET /api/orders",
vec![
make_kv("http.route", "GET /api/orders"),
make_kv("code.namespace", "com.foo.OrderController"),
make_kv("code.function", "list"),
],
);
let child = make_sql_span(
&[1; 16],
&[20; 8],
&[10; 8],
"SELECT * FROM orders",
0,
1_000_000,
);
let req = make_request("order-svc", vec![parent, child]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(
events[0].code_namespace.as_deref(),
Some("com.foo.OrderController")
);
assert_eq!(events[0].code_function.as_deref(), Some("list"));
}
#[test]
fn code_attrs_inherited_from_grandparent() {
let http = make_span_with_code_attrs(
&[10; 8],
&[],
"GET /api/orders",
vec![make_kv("http.route", "GET /api/orders")],
);
let service = make_span_with_code_attrs(
&[11; 8],
&[10; 8],
"OrderService.list",
vec![
make_kv("code.namespace", "com.foo.OrderService"),
make_kv("code.function", "list"),
],
);
let hibernate = make_span_with_code_attrs(&[12; 8], &[11; 8], "Hibernate.query", vec![]);
let jdbc = make_sql_span(&[1; 16], &[13; 8], &[12; 8], "SELECT 1", 0, 1_000_000);
let req = make_request("order-svc", vec![http, service, hibernate, jdbc]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(
events[0].code_namespace.as_deref(),
Some("com.foo.OrderService")
);
}
#[test]
fn code_attrs_orphan_span_returns_none() {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
let req = make_request("svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert!(events[0].code_namespace.is_none());
assert!(events[0].code_function.is_none());
}
#[test]
fn code_attrs_max_depth_safety() {
let depth = u8::try_from(CODE_ATTRS_MAX_DEPTH * 2 + 4).unwrap();
let mut spans = Vec::new();
for i in 0..depth {
let id = [i + 1; 8];
let parent = if i == 0 { vec![] } else { vec![i; 8] };
spans.push(make_span_with_code_attrs(
&id,
&parent,
&format!("level.{i}"),
vec![],
));
}
let leaf = make_sql_span(&[1; 16], &[100; 8], &[depth; 8], "SELECT 1", 0, 1_000_000);
spans.push(leaf);
let req = make_request("svc", spans);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert!(events[0].code_namespace.is_none());
}
#[test]
fn code_attrs_self_takes_precedence() {
let parent = make_span_with_code_attrs(
&[10; 8],
&[],
"GET /api/x",
vec![make_kv("code.namespace", "com.parent")],
);
let mut child = make_sql_span(&[1; 16], &[20; 8], &[10; 8], "SELECT 1", 0, 1_000_000);
child
.attributes
.push(make_kv("code.namespace", "com.child"));
let req = make_request("svc", vec![parent, child]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].code_namespace.as_deref(), Some("com.child"));
}
#[test]
fn code_attrs_stable_conventions() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
span.attributes.extend(vec![
make_kv("code.function.name", "com.foo.OrderService.findItems"),
make_kv("code.file.path", "src/main/java/com/foo/OrderService.java"),
make_int_kv("code.line.number", 42),
]);
let req = make_request("svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(
events[0].code_function.as_deref(),
Some("com.foo.OrderService.findItems")
);
assert_eq!(
events[0].code_filepath.as_deref(),
Some("src/main/java/com/foo/OrderService.java")
);
assert_eq!(events[0].code_lineno, Some(42));
assert_eq!(
events[0].code_namespace.as_deref(),
Some("com.foo.OrderService")
);
}
#[test]
fn code_attrs_legacy_conventions_still_work() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
span.attributes.extend(vec![
make_kv("code.function", "findItems"),
make_kv("code.namespace", "com.foo.OrderService"),
make_kv("code.filepath", "src/OrderService.java"),
make_int_kv("code.lineno", 99),
]);
let req = make_request("svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].code_function.as_deref(), Some("findItems"));
assert_eq!(
events[0].code_namespace.as_deref(),
Some("com.foo.OrderService")
);
assert_eq!(events[0].code_lineno, Some(99));
}
#[test]
fn code_attrs_legacy_namespace_wins_over_derivation() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
span.attributes.extend(vec![
make_kv("code.function.name", "com.foo.X.y"),
make_kv("code.namespace", "com.bar.X"),
]);
let req = make_request("svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].code_namespace.as_deref(), Some("com.bar.X"));
}
#[test]
fn code_attrs_legacy_function_does_not_derive_namespace() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
span.attributes
.push(make_kv("code.function", "OrderService.findItems"));
let req = make_request("svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(
events[0].code_function.as_deref(),
Some("OrderService.findItems")
);
assert!(events[0].code_namespace.is_none());
}
#[test]
fn code_attrs_no_dot_in_fq_name() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
span.attributes.push(make_kv("code.function.name", "main"));
let req = make_request("svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
assert_eq!(events[0].code_function.as_deref(), Some("main"));
assert!(events[0].code_namespace.is_none());
}
#[test]
fn java_rules_match_via_derived_namespace() {
let mut span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
span.attributes.push(make_kv(
"code.function.name",
"org.springframework.data.jpa.repository.support.SimpleJpaRepository.findAll",
));
let req = make_request("svc", vec![span]);
let events = convert_otlp_request(&req);
assert_eq!(events.len(), 1);
let ns = events[0]
.code_namespace
.as_deref()
.expect("namespace derived from FQ name");
assert_eq!(
ns,
"org.springframework.data.jpa.repository.support.SimpleJpaRepository"
);
assert!(ns.contains("org.springframework.data.jpa"));
}
#[cfg(feature = "daemon")]
mod http_handler {
use super::*;
use axum::body::Body;
use axum::http::{Request, StatusCode, header};
use flate2::Compression;
use flate2::write::GzEncoder;
use prost::Message;
use std::io::Write;
use tokio::sync::mpsc;
use tower::ServiceExt;
fn build_minimal_request_bytes() -> Vec<u8> {
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
let req = make_request("svc", vec![span]);
req.encode_to_vec()
}
fn gzip(body: &[u8]) -> Vec<u8> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(body).expect("gzip encode");
encoder.finish().expect("gzip finish")
}
fn unsupported_media_type_request() -> Request<Body> {
Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(Vec::<u8>::new()))
.expect("build request")
}
#[tokio::test]
async fn otlp_http_accepts_gzip_request() {
let (tx, mut rx) = mpsc::channel(8);
let router = otlp_http_router(tx, 1_048_576, None);
let body = build_minimal_request_bytes();
let gzipped = gzip(&body);
let req = Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/x-protobuf")
.header(header::CONTENT_ENCODING, "gzip")
.body(Body::from(gzipped))
.expect("build request");
let response = router.oneshot(req).await.expect("router runs");
assert_eq!(response.status(), StatusCode::OK);
let events = rx.recv().await.expect("event batch sent");
assert_eq!(events.len(), 1);
assert_eq!(events[0].target, "SELECT 1");
}
#[tokio::test]
async fn otlp_http_accepts_uncompressed_request() {
let (tx, mut rx) = mpsc::channel(8);
let router = otlp_http_router(tx, 1_048_576, None);
let body = build_minimal_request_bytes();
let req = Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/x-protobuf")
.body(Body::from(body))
.expect("build request");
let response = router.oneshot(req).await.expect("router runs");
assert_eq!(response.status(), StatusCode::OK);
let events = rx.recv().await.expect("event batch sent");
assert_eq!(events.len(), 1);
}
#[tokio::test]
async fn otlp_http_rejects_unsupported_encoding() {
let (tx, _rx) = mpsc::channel::<Vec<SpanEvent>>(8);
let router = otlp_http_router(tx, 1_048_576, None);
let body = build_minimal_request_bytes();
let req = Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/x-protobuf")
.header(header::CONTENT_ENCODING, "br")
.body(Body::from(body))
.expect("build request");
let response = router.oneshot(req).await.expect("router runs");
assert_eq!(response.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
}
#[tokio::test]
async fn otlp_http_rejects_oversize_compressed_body() {
let (tx, _rx) = mpsc::channel::<Vec<SpanEvent>>(8);
let cap = 256_usize;
let router = otlp_http_router(tx, cap, None);
let payload: Vec<u8> = vec![0u8; 4096];
let req = Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/x-protobuf")
.header(header::CONTENT_LENGTH, payload.len())
.body(Body::from(payload))
.expect("build request");
let response = router.oneshot(req).await.expect("router runs");
assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE);
}
#[tokio::test]
async fn otlp_http_content_type_check_with_gzip() {
let (tx, _rx) = mpsc::channel::<Vec<SpanEvent>>(8);
let router = otlp_http_router(tx, 1_048_576, None);
let body = build_minimal_request_bytes();
let gzipped = gzip(&body);
let req = Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/json")
.header(header::CONTENT_ENCODING, "gzip")
.body(Body::from(gzipped))
.expect("build request");
let response = router.oneshot(req).await.expect("router runs");
assert_eq!(response.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
}
#[tokio::test]
async fn http_handler_records_unsupported_media_type() {
let (tx, _rx) = mpsc::channel::<Vec<SpanEvent>>(8);
let (metrics, sink) = fresh_metrics_sink();
let router = otlp_http_router(tx, 1_048_576, Some(sink));
let response = router
.oneshot(unsupported_media_type_request())
.await
.expect("router runs");
assert_eq!(response.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
assert_eq!(
metrics
.otlp_rejected_total
.with_label_values(&["unsupported_media_type"])
.get(),
1
);
}
#[tokio::test]
async fn http_handler_records_parse_error() {
let (tx, _rx) = mpsc::channel::<Vec<SpanEvent>>(8);
let (metrics, sink) = fresh_metrics_sink();
let router = otlp_http_router(tx, 1_048_576, Some(sink));
let req = Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/x-protobuf")
.body(Body::from(vec![0xff_u8, 0xff, 0xff, 0xff, 0xff, 0xff]))
.expect("build request");
let response = router.oneshot(req).await.expect("router runs");
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
assert_eq!(
metrics
.otlp_rejected_total
.with_label_values(&["parse_error"])
.get(),
1
);
}
#[tokio::test]
async fn http_handler_records_channel_full() {
let (tx, rx) = mpsc::channel::<Vec<SpanEvent>>(1);
drop(rx);
let (metrics, sink) = fresh_metrics_sink();
let router = otlp_http_router(tx, 1_048_576, Some(sink));
let body = build_minimal_request_bytes();
let req = Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/x-protobuf")
.body(Body::from(body))
.expect("build request");
let response = router.oneshot(req).await.expect("router runs");
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(
metrics
.otlp_rejected_total
.with_label_values(&["channel_full"])
.get(),
1
);
}
#[tokio::test(start_paused = true)]
async fn http_handler_rejects_when_channel_full_but_open() {
let (tx, _rx) = mpsc::channel::<Vec<SpanEvent>>(1);
tx.try_send(vec![]).expect("fill the only slot");
let (metrics, sink) = fresh_metrics_sink();
let router = otlp_http_router(tx, 1_048_576, Some(sink));
let body = build_minimal_request_bytes();
let req = Request::builder()
.method("POST")
.uri("/v1/traces")
.header(header::CONTENT_TYPE, "application/x-protobuf")
.body(Body::from(body))
.expect("build request");
let response = router.oneshot(req).await.expect("router runs");
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(
metrics
.otlp_rejected_total
.with_label_values(&["channel_full"])
.get(),
1
);
}
#[tokio::test]
async fn http_handler_no_metrics_state_does_not_panic() {
let (tx, _rx) = mpsc::channel::<Vec<SpanEvent>>(8);
let router = otlp_http_router(tx, 1_048_576, None);
let response = router
.oneshot(unsupported_media_type_request())
.await
.expect("router runs");
assert_eq!(response.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
}
#[tokio::test]
async fn grpc_handler_records_channel_full() {
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService;
let (tx, rx) = mpsc::channel::<Vec<SpanEvent>>(1);
drop(rx);
let metrics = Arc::new(MetricsState::new());
let svc = OtlpGrpcService::new(tx, Some(metrics.clone()));
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
let req = tonic::Request::new(make_request("svc", vec![span]));
let result = svc.export(req).await;
assert_eq!(
result.expect_err("closed channel rejects").code(),
tonic::Code::Internal,
"shutdown (closed channel) is a genuine internal state"
);
assert_eq!(
metrics
.otlp_rejected_total
.with_label_values(&["channel_full"])
.get(),
1
);
}
#[tokio::test(start_paused = true)]
async fn grpc_handler_returns_unavailable_when_channel_full_but_open() {
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService;
let (tx, _rx) = mpsc::channel::<Vec<SpanEvent>>(1);
tx.try_send(vec![]).expect("fill the only slot");
let metrics = Arc::new(MetricsState::new());
let svc = OtlpGrpcService::new(tx, Some(metrics.clone()));
let span = make_sql_span(&[1; 16], &[2; 8], &[], "SELECT 1", 0, 1_000_000);
let req = tonic::Request::new(make_request("svc", vec![span]));
let result = svc.export(req).await;
assert_eq!(
result.expect_err("full channel rejects").code(),
tonic::Code::Unavailable
);
assert_eq!(
metrics
.otlp_rejected_total
.with_label_values(&["channel_full"])
.get(),
1
);
}
}
}