use anyhow::Result; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use prost::Message;
use std::env;
use std::fmt;
use tracing::{self, instrument};
use crate::telemetry::TelemetryData;
fn decode_otlp_payload(payload: &[u8]) -> Result<ExportTraceServiceRequest> {
ExportTraceServiceRequest::decode(payload)
.map_err(|_| anyhow::anyhow!("Failed to decode protobuf payload")) }
fn encode_otlp_payload(request: &ExportTraceServiceRequest) -> Vec<u8> {
request.encode_to_vec()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompressionPreference {
Gzip,
None,
}
impl CompressionPreference {
const fn as_str(&self) -> &'static str {
match self {
Self::Gzip => "gzip",
Self::None => "none",
}
}
}
impl fmt::Display for CompressionPreference {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct SpanCompactionConfig {
pub compression: CompressionPreference,
pub gzip_compression_level: u32,
}
impl Default for SpanCompactionConfig {
fn default() -> Self {
let compression_setting = env::var("OTEL_EXPORTER_OTLP_TRACES_COMPRESSION")
.map(|val| ("OTEL_EXPORTER_OTLP_TRACES_COMPRESSION", val))
.or_else(|_| {
env::var("OTEL_EXPORTER_OTLP_COMPRESSION")
.map(|val| ("OTEL_EXPORTER_OTLP_COMPRESSION", val))
})
.ok();
let compression_preference =
compression_setting.map_or(CompressionPreference::None, |(env_var, val)| {
match val.to_lowercase().as_str() {
"gzip" => CompressionPreference::Gzip,
"none" => CompressionPreference::None, _ => {
tracing::warn!(
env_var,
"Invalid OTLP compression setting; defaulting to no compression"
);
CompressionPreference::None
}
}
});
let default_compression_level = 9;
let gzip_compression_level = env::var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL")
.ok()
.and_then(|val_str| {
match val_str.parse::<u32>() {
Ok(level) if (0..=9).contains(&level) => Some(level),
Ok(_) => {
tracing::warn!(
env_var = "OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL",
default_compression_level,
"Invalid OTLP compression level; defaulting to configured fallback"
);
None }
Err(_) => {
tracing::warn!(
env_var = "OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL",
default_compression_level,
"Failed to parse OTLP compression level; defaulting to configured fallback"
);
None }
}
})
.unwrap_or(default_compression_level);
Self {
compression: compression_preference,
gzip_compression_level, }
}
}
#[instrument(
name = "span_compactor/compact_telemetry_payloads",
skip_all,
fields(
compact_telemetry_payloads.records.count = batch.len() as i64,
requested_compression = %config.compression
)
)]
pub fn compact_telemetry_payloads(
batch: Vec<TelemetryData>,
config: &SpanCompactionConfig,
) -> Result<TelemetryData> {
if batch.is_empty() {
return Err(anyhow::anyhow!(
"Cannot compact an empty batch of telemetry data."
));
}
if batch.len() == 1 {
let mut telemetry_to_return = batch.into_iter().next().unwrap();
match config.compression {
CompressionPreference::Gzip => {
telemetry_to_return
.compress(config.gzip_compression_level)
.map_err(|_| anyhow::anyhow!("Failed to compress single payload"))?;
}
CompressionPreference::None => {
telemetry_to_return.content_encoding = None;
}
}
return Ok(telemetry_to_return);
}
let original_count = batch.len();
let mut decoded_requests = Vec::new();
let first_item_source = batch[0].source.clone();
let first_item_endpoint = batch[0].endpoint.clone();
for telemetry_item in batch {
match decode_otlp_payload(&telemetry_item.payload) {
Ok(request) => decoded_requests.push(request),
Err(_) => {
tracing::warn!("Failed to decode telemetry payload for compaction; skipping item");
}
}
}
if decoded_requests.is_empty() {
return Err(anyhow::anyhow!(
"All payloads in batch failed to decode for compaction"
));
}
let mut merged_resource_spans = Vec::new();
for request in decoded_requests {
merged_resource_spans.extend(request.resource_spans);
}
let merged_request = ExportTraceServiceRequest {
resource_spans: merged_resource_spans,
};
let merged_payload = encode_otlp_payload(&merged_request);
let mut result_telemetry_data = TelemetryData {
source: first_item_source, endpoint: first_item_endpoint, payload: merged_payload,
content_type: "application/x-protobuf".to_string(),
content_encoding: None, };
match config.compression {
CompressionPreference::Gzip => {
result_telemetry_data
.compress(config.gzip_compression_level)
.map_err(|_| anyhow::anyhow!("Failed to compress merged payload"))?;
}
CompressionPreference::None => {
result_telemetry_data.content_encoding = None;
}
}
tracing::info!(
compact_telemetry_payloads.records.count = original_count as i64,
compression = result_telemetry_data
.content_encoding
.as_deref()
.unwrap_or("none"),
"Compacted telemetry items"
);
Ok(result_telemetry_data)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::telemetry::TelemetryData; use crate::tracing_capture::EventCaptureLayer;
use flate2::read::GzDecoder;
use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span};
use serial_test::serial;
use std::io::Read; use tracing_subscriber::{prelude::*, registry::Registry};
fn create_test_request(span_count: usize) -> ExportTraceServiceRequest {
let mut spans = Vec::new();
for i in 0..span_count {
spans.push(Span {
name: format!("test-span-{i}"),
..Default::default()
});
}
ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
scope_spans: vec![ScopeSpans {
spans,
..Default::default()
}],
..Default::default()
}],
}
}
fn create_test_telemetry_uncompressed(span_count: usize, source: &str) -> TelemetryData {
let request = create_test_request(span_count);
let payload = encode_otlp_payload(&request);
TelemetryData {
source: source.to_string(),
endpoint: "http://example.com/v1/traces".to_string(),
payload,
content_type: "application/x-protobuf".to_string(),
content_encoding: None, }
}
#[test]
fn test_decode_encode_roundtrip() {
let request = create_test_request(1);
let encoded = encode_otlp_payload(&request);
let decoded = decode_otlp_payload(&encoded).unwrap();
assert_eq!(request.resource_spans.len(), decoded.resource_spans.len());
}
#[test]
#[serial] fn test_span_compaction_config_default_is_none_if_no_env_var() {
std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_COMPRESSION");
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION");
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
let config = SpanCompactionConfig::default();
assert_eq!(config.compression, CompressionPreference::None);
assert_eq!(config.gzip_compression_level, 9);
}
#[test]
#[serial] fn test_span_compaction_config_env_none_explicitly() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION", "none");
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
let config = SpanCompactionConfig::default();
assert_eq!(config.compression, CompressionPreference::None);
assert_eq!(config.gzip_compression_level, 9); std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION");
}
#[test]
#[serial] fn test_span_compaction_config_env_gzip_explicitly() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION", "gzip");
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
let config = SpanCompactionConfig::default();
assert_eq!(config.compression, CompressionPreference::Gzip);
assert_eq!(config.gzip_compression_level, 9); std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION");
}
#[test]
#[serial] fn test_span_compaction_config_env_traces_precedence_gzip() {
std::env::set_var("OTEL_EXPORTER_OTLP_TRACES_COMPRESSION", "gzip");
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION", "none"); std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
let config = SpanCompactionConfig::default();
assert_eq!(config.compression, CompressionPreference::Gzip);
assert_eq!(config.gzip_compression_level, 9);
std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_COMPRESSION");
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION");
}
#[test]
#[serial] fn test_span_compaction_config_env_traces_precedence_none() {
std::env::set_var("OTEL_EXPORTER_OTLP_TRACES_COMPRESSION", "none");
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION", "gzip"); std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
let config = SpanCompactionConfig::default();
assert_eq!(config.compression, CompressionPreference::None);
assert_eq!(config.gzip_compression_level, 9);
std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_COMPRESSION");
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION");
}
#[test]
#[serial] fn test_span_compaction_config_invalid_env_defaults_to_none() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION", "invalid_value");
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
let config = SpanCompactionConfig::default();
assert_eq!(config.compression, CompressionPreference::None);
assert_eq!(config.gzip_compression_level, 9);
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION");
}
#[test]
#[serial] fn test_compression_level_env_var_valid() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL", "5");
let config = SpanCompactionConfig::default();
assert_eq!(config.gzip_compression_level, 5);
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
}
#[test]
#[serial] fn test_compression_level_env_var_invalid_string() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL", "not_a_number");
let config = SpanCompactionConfig::default();
assert_eq!(config.gzip_compression_level, 9); std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
}
#[test]
#[serial] fn test_compression_level_env_var_invalid_too_high() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL", "10");
let config = SpanCompactionConfig::default();
assert_eq!(config.gzip_compression_level, 9); std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
}
#[test]
#[serial] fn test_compression_level_env_var_invalid_negative() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL", "-1");
let config = SpanCompactionConfig::default();
assert_eq!(config.gzip_compression_level, 9); std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
}
#[test]
#[serial] fn test_compression_level_env_var_empty_string() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL", "");
let config = SpanCompactionConfig::default();
assert_eq!(config.gzip_compression_level, 9); std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
}
#[test]
#[serial] fn test_compression_level_zero_is_valid() {
std::env::set_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL", "0");
let config = SpanCompactionConfig::default();
assert_eq!(config.gzip_compression_level, 0);
std::env::remove_var("OTEL_EXPORTER_OTLP_COMPRESSION_LEVEL");
}
#[test]
fn test_compact_single_payload_with_gzip_preference() {
let telemetry = create_test_telemetry_uncompressed(1, "s1");
let config = SpanCompactionConfig {
compression: CompressionPreference::Gzip,
gzip_compression_level: 9,
};
let result = compact_telemetry_payloads(vec![telemetry.clone()], &config).unwrap();
assert_eq!(result.content_encoding, Some("gzip".to_string()));
}
#[test]
fn test_compact_single_payload_with_none_preference() {
let telemetry = create_test_telemetry_uncompressed(1, "s1");
let config = SpanCompactionConfig {
compression: CompressionPreference::None,
gzip_compression_level: 9,
};
let result = compact_telemetry_payloads(vec![telemetry.clone()], &config).unwrap();
assert_eq!(result.content_encoding, None);
assert_eq!(result.payload, telemetry.payload); }
#[test]
fn test_compact_multiple_payloads_with_gzip_preference() {
let telemetry1 = create_test_telemetry_uncompressed(2, "s1");
let telemetry2 = create_test_telemetry_uncompressed(3, "s2");
let config = SpanCompactionConfig {
compression: CompressionPreference::Gzip,
gzip_compression_level: 9,
};
let result = compact_telemetry_payloads(vec![telemetry1, telemetry2], &config).unwrap();
assert_eq!(result.content_encoding, Some("gzip".to_string()));
let mut decoder = GzDecoder::new(&result.payload[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).unwrap();
let decoded_request = ExportTraceServiceRequest::decode(decompressed.as_slice()).unwrap();
assert_eq!(
decoded_request.resource_spans[0].scope_spans[0].spans.len()
+ decoded_request.resource_spans[1].scope_spans[0].spans.len(),
5
);
}
#[test]
fn test_compact_multiple_payloads_with_none_preference() {
let telemetry1 = create_test_telemetry_uncompressed(2, "s1");
let telemetry2 = create_test_telemetry_uncompressed(3, "s2");
let config = SpanCompactionConfig {
compression: CompressionPreference::None,
gzip_compression_level: 9,
};
let result = compact_telemetry_payloads(vec![telemetry1, telemetry2], &config).unwrap();
assert_eq!(result.content_encoding, None);
let decoded_request = ExportTraceServiceRequest::decode(result.payload.as_slice()).unwrap();
assert_eq!(
decoded_request.resource_spans[0].scope_spans[0].spans.len()
+ decoded_request.resource_spans[1].scope_spans[0].spans.len(),
5
);
}
#[test]
#[serial]
fn test_compaction_emits_low_cardinality_event() {
let telemetry1 = create_test_telemetry_uncompressed(2, "s1");
let telemetry2 = create_test_telemetry_uncompressed(3, "s2");
let config = SpanCompactionConfig {
compression: CompressionPreference::Gzip,
gzip_compression_level: 9,
};
let capture_layer = EventCaptureLayer::new();
let captured_events = capture_layer.events();
let subscriber = Registry::default().with(capture_layer);
let result = tracing::subscriber::with_default(subscriber, || {
compact_telemetry_payloads(vec![telemetry1, telemetry2], &config)
})
.unwrap();
assert_eq!(result.content_encoding.as_deref(), Some("gzip"));
let events = captured_events.lock().unwrap();
let compaction_event = events
.iter()
.find(|event| {
event
.fields
.get("message")
.is_some_and(|message| message.contains("Compacted telemetry items"))
})
.unwrap_or_else(|| panic!("expected compaction event to be emitted, got {events:#?}"));
assert_eq!(
compaction_event
.fields
.get("compact_telemetry_payloads.records.count")
.map(String::as_str),
Some("2")
);
assert_eq!(
compaction_event
.fields
.get("compression")
.map(String::as_str),
Some("gzip")
);
assert!(compaction_event
.fields
.values()
.all(|value| !value.contains("Some(")));
}
#[test]
fn test_compact_empty_batch_returns_error() {
let config = SpanCompactionConfig::default();
let result = compact_telemetry_payloads(Vec::new(), &config);
assert!(result.is_err());
}
#[test]
fn test_compact_with_one_decode_failure() {
let telemetry_good = create_test_telemetry_uncompressed(1, "s1");
let telemetry_bad_payload = TelemetryData {
payload: vec![0, 1, 2], ..create_test_telemetry_uncompressed(0, "s2")
};
let config = SpanCompactionConfig {
compression: CompressionPreference::None,
gzip_compression_level: 9,
};
let result =
compact_telemetry_payloads(vec![telemetry_good, telemetry_bad_payload], &config)
.unwrap();
let decoded_request = ExportTraceServiceRequest::decode(result.payload.as_slice()).unwrap();
assert_eq!(
decoded_request.resource_spans[0].scope_spans[0].spans.len(),
1
);
}
#[test]
fn test_compact_all_decode_failures() {
let telemetry_bad1 = TelemetryData {
payload: vec![1],
..create_test_telemetry_uncompressed(0, "s1")
};
let telemetry_bad2 = TelemetryData {
payload: vec![2],
..create_test_telemetry_uncompressed(0, "s2")
};
let config = SpanCompactionConfig::default();
let result = compact_telemetry_payloads(vec![telemetry_bad1, telemetry_bad2], &config);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("All payloads in batch failed to decode"));
}
}