#![cfg(test)]
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct SpanBatchFixture {
pub span_count: usize,
pub batch_id: String,
pub size_bytes: usize,
}
impl SpanBatchFixture {
fn new(batch_id: &str, span_count: usize) -> Self {
Self {
span_count,
batch_id: batch_id.to_string(),
size_bytes: span_count * 1024, }
}
}
#[derive(Debug, Default)]
pub struct ExportMetricsFixture {
pub spans_exported_success: usize,
pub spans_dropped_cancellation: usize,
pub spans_dropped_error: usize,
pub batches_cancelled: usize,
pub export_attempts: usize,
}
impl ExportMetricsFixture {
fn record_export_success(&mut self, span_count: usize) {
self.spans_exported_success += span_count;
self.export_attempts += 1;
}
fn record_export_cancellation(&mut self, span_count: usize) {
self.spans_dropped_cancellation += span_count;
self.batches_cancelled += 1;
self.export_attempts += 1;
}
fn record_export_error(&mut self, span_count: usize) {
self.spans_dropped_error += span_count;
self.export_attempts += 1;
}
fn total_spans_lost(&self) -> usize {
self.spans_dropped_cancellation + self.spans_dropped_error
}
fn data_loss_rate(&self) -> f64 {
let total_spans = self.spans_exported_success + self.total_spans_lost();
if total_spans == 0 {
0.0
} else {
self.total_spans_lost() as f64 / total_spans as f64
}
}
}
#[derive(Debug)]
pub struct CancellationAwareExporterFixture {
pub metrics: Arc<Mutex<ExportMetricsFixture>>,
pub export_results: Vec<(String, String)>, pub should_cancel_after_ms: Option<u64>,
pub should_error: bool,
}
impl CancellationAwareExporterFixture {
fn new() -> Self {
Self {
metrics: Arc::new(Mutex::new(ExportMetricsFixture::default())),
export_results: Vec::new(),
should_cancel_after_ms: None,
should_error: false,
}
}
fn with_cancellation_after(mut self, ms: u64) -> Self {
self.should_cancel_after_ms = Some(ms);
self
}
fn with_error(mut self) -> Self {
self.should_error = true;
self
}
async fn export_batch_defective(&mut self, batch: SpanBatchFixture) -> Result<(), String> {
if let Some(cancel_ms) = self.should_cancel_after_ms {
std::thread::sleep(Duration::from_millis(cancel_ms / 2));
self.export_results.push((batch.batch_id.clone(), "cancelled_silent".to_string()));
return Err("Export cancelled".to_string());
}
if self.should_error {
self.export_results.push((batch.batch_id.clone(), "error_silent".to_string()));
return Err("Export failed".to_string());
}
self.export_results.push((batch.batch_id.clone(), "success".to_string()));
Ok(())
}
async fn export_batch_correct(&mut self, batch: SpanBatchFixture) -> Result<(), String> {
if let Some(cancel_ms) = self.should_cancel_after_ms {
std::thread::sleep(Duration::from_millis(cancel_ms / 2));
{
let mut metrics = self.metrics.lock().unwrap();
metrics.record_export_cancellation(batch.span_count);
}
self.export_results.push((batch.batch_id.clone(), "cancelled_with_metrics".to_string()));
return Err(format!("Export cancelled: {} spans dropped", batch.span_count));
}
if self.should_error {
{
let mut metrics = self.metrics.lock().unwrap();
metrics.record_export_error(batch.span_count);
}
self.export_results.push((batch.batch_id.clone(), "error_with_metrics".to_string()));
return Err(format!("Export failed: {} spans dropped", batch.span_count));
}
{
let mut metrics = self.metrics.lock().unwrap();
metrics.record_export_success(batch.span_count);
}
self.export_results.push((batch.batch_id.clone(), "success_with_metrics".to_string()));
Ok(())
}
fn get_metrics(&self) -> ExportMetricsFixture {
self.metrics.lock().unwrap().clone()
}
}
#[test]
fn audit_export_cancellation_span_loss() {
println!("🔍 AUDIT: OTLP export cancellation span loss handling");
println!("📋 OTLP cancellation requirements:");
println!(" • Export task may be cancelled during async operations");
println!(" • Pending spans should not be silently lost");
println!(" • Operators need visibility into data loss events");
println!(" • Bounded loss preferred over unbounded buffering");
let test_batch = SpanBatchFixture::new("batch-001", 100);
println!("📊 Test scenario:");
println!(" Batch: {} spans ({} bytes)", test_batch.span_count, test_batch.size_bytes);
println!(" Cancellation: Mid-export (simulated)");
println!(" Expected: Span loss recorded in metrics");
println!("📊 Testing defective implementation (silent loss):");
let mut defective_exporter = CancellationAwareExporterFixture::new()
.with_cancellation_after(100);
let defective_result = futures::executor::block_on(
defective_exporter.export_batch_defective(test_batch.clone())
);
println!(" Result: {:?}", defective_result);
println!(" Export results: {:?}", defective_exporter.export_results);
assert!(defective_result.is_err());
assert_eq!(defective_exporter.export_results.len(), 1);
assert_eq!(defective_exporter.export_results[0].1, "cancelled_silent");
let defective_metrics = defective_exporter.get_metrics();
assert_eq!(defective_metrics.spans_dropped_cancellation, 0);
assert_eq!(defective_metrics.batches_cancelled, 0);
println!("⚠️ DEFECTIVE: 100 spans silently lost with no metrics");
println!("📊 Testing correct implementation (metrics-aware):");
let mut correct_exporter = CancellationAwareExporterFixture::new()
.with_cancellation_after(100);
let correct_result = futures::executor::block_on(
correct_exporter.export_batch_correct(test_batch.clone())
);
println!(" Result: {:?}", correct_result);
println!(" Export results: {:?}", correct_exporter.export_results);
assert!(correct_result.is_err());
assert_eq!(correct_exporter.export_results[0].1, "cancelled_with_metrics");
let correct_metrics = correct_exporter.get_metrics();
assert_eq!(correct_metrics.spans_dropped_cancellation, 100);
assert_eq!(correct_metrics.batches_cancelled, 1);
assert_eq!(correct_metrics.export_attempts, 1);
println!("✅ CORRECT: 100 spans dropped with visibility metrics");
println!("🚨 AUDIT FINDING: DEFECTIVE");
println!(" Current: Cancellation causes silent span loss");
println!(" Required: Record span loss metrics for visibility");
}
#[test]
fn audit_export_error_span_loss_metrics() {
println!("🔍 AUDIT: OTLP export error span loss metrics");
println!("📋 Error handling requirements:");
println!(" • Network errors should record span loss separately from cancellation");
println!(" • Different error types may require different retry strategies");
println!(" • Metrics should distinguish error causes for debugging");
let error_batch = SpanBatchFixture::new("batch-error", 50);
println!("📊 Error scenario:");
println!(" Batch: {} spans", error_batch.span_count);
println!(" Failure: Network error (non-cancellation)");
println!(" Expected: Error-specific span loss metrics");
let mut exporter = CancellationAwareExporterFixture::new().with_error();
let result = futures::executor::block_on(
exporter.export_batch_correct(error_batch)
);
println!(" Result: {:?}", result);
assert!(result.is_err());
assert!(result.unwrap_err().contains("spans dropped"));
let metrics = exporter.get_metrics();
println!("📊 Error metrics:");
println!(" Spans dropped (error): {}", metrics.spans_dropped_error);
println!(" Spans dropped (cancellation): {}", metrics.spans_dropped_cancellation);
println!(" Total export attempts: {}", metrics.export_attempts);
assert_eq!(metrics.spans_dropped_error, 50);
assert_eq!(metrics.spans_dropped_cancellation, 0);
assert_eq!(metrics.export_attempts, 1);
println!("✅ CORRECT: Error spans tracked separately from cancellation");
}
#[test]
fn audit_data_loss_rate_monitoring() {
println!("🔍 AUDIT: OTLP data loss rate monitoring");
println!("📋 Monitoring requirements:");
println!(" • Calculate percentage of spans lost vs successfully exported");
println!(" • Support alerting on high data loss rates");
println!(" • Distinguish temporary vs persistent loss patterns");
let mut exporter = CancellationAwareExporterFixture::new();
let scenarios = vec![
(SpanBatchFixture::new("success-1", 100), false, false),
(SpanBatchFixture::new("success-2", 150), false, false),
(SpanBatchFixture::new("cancelled-1", 75), true, false), (SpanBatchFixture::new("error-1", 50), false, true), (SpanBatchFixture::new("success-3", 200), false, false),
];
println!("📊 Mixed export scenario:");
for (i, (batch, should_cancel, should_error)) in scenarios.iter().enumerate() {
println!(" Batch {}: {} spans, cancel={}, error={}",
i+1, batch.span_count, should_cancel, should_error);
if *should_cancel {
exporter.should_cancel_after_ms = Some(50);
} else {
exporter.should_cancel_after_ms = None;
}
exporter.should_error = *should_error;
let _ = futures::executor::block_on(
exporter.export_batch_correct(batch.clone())
);
}
let metrics = exporter.get_metrics();
println!("📊 Final metrics:");
println!(" Spans exported: {}", metrics.spans_exported_success);
println!(" Spans lost (cancellation): {}", metrics.spans_dropped_cancellation);
println!(" Spans lost (error): {}", metrics.spans_dropped_error);
println!(" Total spans lost: {}", metrics.total_spans_lost());
println!(" Data loss rate: {:.2}%", metrics.data_loss_rate() * 100.0);
assert_eq!(metrics.spans_exported_success, 450); assert_eq!(metrics.spans_dropped_cancellation, 75);
assert_eq!(metrics.spans_dropped_error, 50);
assert_eq!(metrics.total_spans_lost(), 125);
assert_eq!(metrics.export_attempts, 5);
let expected_loss_rate = 125.0 / 575.0; assert!((metrics.data_loss_rate() - expected_loss_rate).abs() < 0.001);
println!("✅ DATA LOSS MONITORING: {:.1}% spans lost (alertable metric)",
metrics.data_loss_rate() * 100.0);
if metrics.data_loss_rate() > 0.1 { println!("⚠️ HIGH DATA LOSS: Loss rate exceeds 10% threshold");
}
}
#[test]
fn audit_otlp_best_practice_compliance() {
println!("🔍 AUDIT: OTLP span loss best practice compliance");
println!("📋 OTLP/OpenTelemetry best practices:");
println!(" 1. Bounded loss: Prefer dropping data over OOM");
println!(" 2. Visibility: Always record when telemetry data is lost");
println!(" 3. Categorization: Track loss by cause (network, cancellation, etc.)");
println!(" 4. Alerting: Provide metrics for data loss rate monitoring");
println!(" 5. Graceful degradation: Degrade service, not observability");
println!("📊 Current implementation assessment:");
println!(" ✅ Bounded loss: LoadSheddingExporter drops oldest batches");
println!(" Location: lines 800-812 in otel.rs");
println!(" Behavior: Correct queue management");
println!(" ❌ Visibility: No cancellation span loss metrics");
println!(" Gap: send_otlp_protobuf() cancellation invisible");
println!(" Impact: Operators unaware of telemetry data loss");
println!(" ❌ Categorization: No error-type-specific span loss tracking");
println!(" Gap: Cannot distinguish cancellation vs network vs server errors");
println!(" Impact: Poor debugging and alerting granularity");
println!(" ❌ Alerting: No data loss rate metrics");
println!(" Gap: No percentage-based loss monitoring");
println!(" Impact: Cannot alert on high data loss rates");
println!(" ⚠️ Graceful degradation: Partial compliance");
println!(" Current: LoadSheddingExporter handles queue pressure");
println!(" Gap: Export task cancellation not graceful");
println!("📊 Compliance score: 1/5 practices fully implemented");
println!("📌 Required implementations:");
println!(" 1. Add spans_dropped_cancellation metric");
println!(" 2. Add spans_dropped_export_error metric");
println!(" 3. Add data_loss_rate gauge metric");
println!(" 4. Modify send_otlp_protobuf() to record cancellation");
println!(" 5. Add structured logging for span loss events");
println!("📊 Full compliance example:");
let mut compliant_metrics = ExportMetricsFixture::default();
compliant_metrics.record_export_success(1000);
compliant_metrics.record_export_cancellation(50);
compliant_metrics.record_export_error(25);
println!(" otel.spans.exported.success: {}", compliant_metrics.spans_exported_success);
println!(" otel.spans.dropped.cancellation: {}", compliant_metrics.spans_dropped_cancellation);
println!(" otel.spans.dropped.error: {}", compliant_metrics.spans_dropped_error);
println!(" otel.export.data_loss_rate: {:.3}", compliant_metrics.data_loss_rate());
println!(" otel.export.batches_cancelled: {}", compliant_metrics.batches_cancelled);
println!("🚨 COMPLIANCE GAP: Missing 4/5 OTLP observability best practices");
println!(" Priority: HIGH - Invisible telemetry loss impacts production debugging");
}
#[test]
fn audit_proposed_cancellation_aware_design() {
println!("🔍 AUDIT: Proposed cancellation-aware export design");
println!("📋 Design requirements:");
println!(" • Track spans in flight during export");
println!(" • Record metrics on cancellation or error");
println!(" • Provide structured error context");
println!(" • Maintain async cancellation semantics");
#[derive(Debug)]
struct CancellationAwareExportTracker {
spans_in_flight: Arc<Mutex<usize>>,
export_metrics: Arc<Mutex<ExportMetricsFixture>>,
}
impl CancellationAwareExportTracker {
fn new() -> Self {
Self {
spans_in_flight: Arc::new(Mutex::new(0)),
export_metrics: Arc::new(Mutex::new(ExportMetricsFixture::default())),
}
}
fn start_export(&self, span_count: usize) {
*self.spans_in_flight.lock().unwrap() = span_count;
}
fn complete_export(&self, span_count: usize, outcome: &str) {
*self.spans_in_flight.lock().unwrap() = 0;
let mut metrics = self.export_metrics.lock().unwrap();
match outcome {
"success" => metrics.record_export_success(span_count),
"cancelled" => metrics.record_export_cancellation(span_count),
"error" => metrics.record_export_error(span_count),
_ => {}
}
}
fn handle_cancellation(&self) -> String {
let spans_lost = *self.spans_in_flight.lock().unwrap();
if spans_lost > 0 {
self.complete_export(spans_lost, "cancelled");
format!("Export cancelled: {} spans dropped with metrics recorded", spans_lost)
} else {
"Export cancelled: no spans in flight".to_string()
}
}
fn get_metrics(&self) -> ExportMetricsFixture {
self.export_metrics.lock().unwrap().clone()
}
}
let tracker = CancellationAwareExportTracker::new();
println!("📊 Design validation:");
tracker.start_export(100);
tracker.complete_export(100, "success");
tracker.start_export(75);
let cancellation_message = tracker.handle_cancellation();
println!(" Cancellation: {}", cancellation_message);
tracker.start_export(50);
tracker.complete_export(50, "error");
let final_metrics = tracker.get_metrics();
println!(" Final spans exported: {}", final_metrics.spans_exported_success);
println!(" Final spans dropped (cancelled): {}", final_metrics.spans_dropped_cancellation);
println!(" Final spans dropped (error): {}", final_metrics.spans_dropped_error);
assert_eq!(final_metrics.spans_exported_success, 100);
assert_eq!(final_metrics.spans_dropped_cancellation, 75);
assert_eq!(final_metrics.spans_dropped_error, 50);
println!("✅ DESIGN VALIDATED: Cancellation-aware tracking with metrics");
println!("📌 Integration points for send_otlp_protobuf():");
println!(" 1. Call start_export() before async operations");
println!(" 2. Wrap .await points with cancellation detection");
println!(" 3. Call handle_cancellation() in Drop impl or catch unwind");
println!(" 4. Call complete_export() on success or error");
println!(" 5. Expose metrics via MetricsProvider integration");
println!("✅ IMPLEMENTATION FEASIBLE: Clear integration path for existing code");
}