#![cfg(test)]
#![allow(dead_code)]
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
pub struct NegotiationCompressionConfig {
enabled: bool,
algorithm: String,
fallback_enabled: bool,
}
impl NegotiationCompressionConfig {
fn new(enabled: bool) -> Self {
Self {
enabled,
algorithm: "gzip".to_string(),
fallback_enabled: false,
}
}
fn with_fallback(mut self) -> Self {
self.fallback_enabled = true;
self
}
}
#[derive(Debug, Clone)]
pub struct NegotiationHttpResponse {
status: u16,
headers: Vec<(String, String)>,
body: Vec<u8>,
}
impl NegotiationHttpResponse {
fn new(status: u16) -> Self {
Self {
status,
headers: vec![],
body: vec![],
}
}
fn with_header(mut self, name: &str, value: &str) -> Self {
self.headers.push((name.to_string(), value.to_string()));
self
}
}
#[derive(Debug, Clone)]
pub struct NegotiationHttpRequest {
method: String,
url: String,
headers: Vec<(String, String)>,
body: Vec<u8>,
}
impl NegotiationHttpRequest {
fn new(method: &str, url: &str, headers: Vec<(String, String)>, body: Vec<u8>) -> Self {
Self {
method: method.to_string(),
url: url.to_string(),
headers,
body,
}
}
fn has_header(&self, name: &str) -> bool {
self.headers
.iter()
.any(|(key, _)| key.eq_ignore_ascii_case(name))
}
fn get_header(&self, name: &str) -> Option<&String> {
self.headers
.iter()
.find(|(key, _)| key.eq_ignore_ascii_case(name))
.map(|(_, value)| value)
}
fn is_compressed(&self) -> bool {
self.get_header("content-encoding")
.is_some_and(|encoding| encoding == "gzip")
}
}
#[derive(Debug)]
pub struct InMemoryNegotiatingOtlpHttpExporter {
config: NegotiationCompressionConfig,
requests: Arc<Mutex<Vec<NegotiationHttpRequest>>>,
responses: Arc<Mutex<Vec<NegotiationHttpResponse>>>,
attempt_count: Arc<Mutex<usize>>,
}
impl InMemoryNegotiatingOtlpHttpExporter {
fn new(config: NegotiationCompressionConfig) -> Self {
Self {
config,
requests: Arc::new(Mutex::new(vec![])),
responses: Arc::new(Mutex::new(vec![])),
attempt_count: Arc::new(Mutex::new(0)),
}
}
fn add_response(&self, response: NegotiationHttpResponse) {
self.responses.lock().unwrap().push(response);
}
fn export_spans(&self, spans: &[u8]) -> Result<(), String> {
let attempt_number = {
let mut attempt = self.attempt_count.lock().unwrap();
*attempt += 1;
*attempt
};
let use_compression = if attempt_number == 1 {
self.config.enabled
} else if attempt_number == 2 && self.config.fallback_enabled {
false
} else {
return Err("Too many retry attempts".to_string());
};
let (body, headers) = if use_compression {
let compressed_body = format!("GZIP[{}]", String::from_utf8_lossy(spans));
let headers = vec![
(
"Content-Type".to_string(),
"application/x-protobuf".to_string(),
),
("Content-Encoding".to_string(), "gzip".to_string()),
];
(compressed_body.into_bytes(), headers)
} else {
let headers = vec![(
"Content-Type".to_string(),
"application/x-protobuf".to_string(),
)];
(spans.to_vec(), headers)
};
let request = NegotiationHttpRequest::new("POST", "/v1/traces", headers, body);
self.requests.lock().unwrap().push(request.clone());
let response = {
let mut responses = self.responses.lock().unwrap();
if responses.is_empty() {
NegotiationHttpResponse::new(500) } else {
responses.remove(0)
}
};
match response.status {
200..=299 => Ok(()),
415 => {
if self.config.fallback_enabled && use_compression {
self.export_spans(spans)
} else {
Err(format!("Compression not supported: {}", response.status))
}
}
_ => Err(format!("Request failed: {}", response.status)),
}
}
fn get_request_count(&self) -> usize {
self.requests.lock().unwrap().len()
}
fn get_requests(&self) -> Vec<NegotiationHttpRequest> {
self.requests.lock().unwrap().clone()
}
}
#[test]
fn audit_otlp_compression_fallback_on_415() {
println!("🔍 AUDIT: OTLP compression negotiation with 415 Unsupported Media Type");
println!("📋 OTLP compression negotiation requirements:");
println!(" • Client sends gzip when configured for compression");
println!(" • Collector returns 415 if compression not supported");
println!(" • Client SHOULD downgrade to identity and retry once");
println!(" • NOT: fail-fast without attempting uncompressed");
println!(" • NOT: ignore 415 and keep sending gzip");
let config = NegotiationCompressionConfig::new(true).with_fallback();
let exporter = InMemoryNegotiatingOtlpHttpExporter::new(config);
exporter.add_response(NegotiationHttpResponse::new(415)); exporter.add_response(NegotiationHttpResponse::new(200));
let test_spans = b"test span data";
println!("📊 Testing compression negotiation sequence:");
let result = exporter.export_spans(test_spans);
match result {
Ok(()) => {
println!(" ✅ SUCCESS: Export completed with graceful degradation");
}
Err(e) => {
println!(" ❌ FAILURE: Export failed - {}", e);
panic!("Compression fallback should succeed when properly implemented");
}
}
let requests = exporter.get_requests();
println!(" Request count: {}", requests.len());
if requests.len() == 2 {
println!(" ✅ CORRECT: Two requests made (compressed + uncompressed)");
if requests[0].is_compressed() {
println!(" ✅ CORRECT: First request used gzip compression");
} else {
println!(" ❌ INCORRECT: First request should be compressed");
}
if !requests[1].is_compressed() {
println!(" ✅ CORRECT: Second request used identity encoding");
} else {
println!(" ❌ INCORRECT: Second request should be uncompressed");
}
} else {
println!(
" ❌ INCORRECT: Should make exactly 2 requests (got {})",
requests.len()
);
panic!("Compression fallback should make exactly 2 requests");
}
println!("✅ COMPRESSION FALLBACK AUDIT COMPLETE");
println!("🏆 FINDING: Graceful compression degradation working correctly");
}
#[test]
fn audit_current_otlp_compression_behavior() {
println!("🔍 AUDIT: Current OTLP compression behavior with 415 response");
println!("📊 Current implementation analysis:");
println!(" File: src/observability/otel.rs");
println!(" Lines 1001-1024: Compression logic");
println!(" Lines 1062-1067: 415 handling (400-499 range)");
println!(" Behavior: 415 treated as non-retryable client error");
let config = NegotiationCompressionConfig::new(true); let exporter = InMemoryNegotiatingOtlpHttpExporter::new(config);
exporter.add_response(NegotiationHttpResponse::new(415));
let test_spans = b"test span data";
println!("📊 Testing current implementation behavior:");
let result = exporter.export_spans(test_spans);
match result {
Ok(()) => {
println!(" ❌ UNEXPECTED: Export should fail with current implementation");
panic!("Current implementation should fail on 415 without fallback");
}
Err(e) => {
println!(" ✅ EXPECTED: Export failed - {}", e);
println!(" 📋 ANALYSIS: Current implementation fails fast on 415");
}
}
let requests = exporter.get_requests();
println!(" Request count: {}", requests.len());
if requests.len() == 1 {
println!(" ✅ EXPECTED: Only one request made (no retry)");
if requests[0].is_compressed() {
println!(" ✅ EXPECTED: Request used gzip compression");
}
} else {
println!(" ❌ UNEXPECTED: Should make exactly 1 request");
}
println!("🚨 CURRENT IMPLEMENTATION DEFECTS:");
println!(" • No compression fallback mechanism");
println!(" • 415 Unsupported Media Type treated as non-retryable");
println!(" • Fails immediately instead of degrading gracefully");
println!(" • Poor interoperability with compression-unaware collectors");
println!("📋 REQUIRED IMPROVEMENTS:");
println!(" 1. Add compression fallback capability to OtlpHttpExporter");
println!(" 2. Special handling for 415 status code");
println!(" 3. Retry mechanism with identity encoding after 415");
println!(" 4. Configuration option for compression fallback behavior");
println!("✅ CURRENT BEHAVIOR AUDIT COMPLETE");
println!("🚨 FINDING: Current implementation lacks compression negotiation");
}
#[test]
fn audit_compression_header_edge_cases() {
println!("🔍 AUDIT: Compression header edge cases and negotiation robustness");
let edge_case_scenarios = vec![
(
415,
"Unsupported Media Type",
"Standard compression rejection",
),
(406, "Not Acceptable", "Alternative compression rejection"),
(400, "Bad Request", "Malformed compressed content"),
(413, "Payload Too Large", "Compressed content too large"),
];
println!("📊 Testing compression-related error responses:");
for (status_code, status_text, description) in edge_case_scenarios {
println!(
" Testing: HTTP {} - {} ({})",
status_code, status_text, description
);
let config = NegotiationCompressionConfig::new(true).with_fallback();
let exporter = InMemoryNegotiatingOtlpHttpExporter::new(config);
exporter.add_response(NegotiationHttpResponse::new(status_code));
exporter.add_response(NegotiationHttpResponse::new(200));
let result = exporter.export_spans(b"test data");
match status_code {
415 => {
if result.is_ok() {
println!(" ✅ CORRECT: Graceful fallback on compression rejection");
} else {
println!(" ❌ INCORRECT: Should fallback on 415");
}
}
406 | 400 | 413 => {
println!(
" 📋 ANALYSIS: Status {} behavior depends on fallback policy",
status_code
);
}
_ => {}
}
}
println!("📋 Compression negotiation best practices:");
println!(" • 415 Unsupported Media Type: Always retry without compression");
println!(" • 406 Not Acceptable: Consider retry without compression");
println!(" • 400 Bad Request: May indicate compression corruption");
println!(" • 413 Payload Too Large: May benefit from no compression");
println!(" • Other 4xx: Generally not compression-related");
println!("✅ COMPRESSION EDGE CASES AUDIT COMPLETE");
println!("📊 FINDING: Robust compression negotiation requires 415 special handling");
}