#![cfg(test)]
use crate::observability::otlp_trace_exporter::{
InMemoryOtlpHttpExporter, LoadSheddingTraceExporter, OtlpSpan, SpanBatch, TraceExporter,
};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};
struct HighFrequencySpanBenchmark {
thread_count: usize,
spans_per_thread: usize,
batch_size: usize,
start_barrier: Arc<Barrier>,
completion_counter: Arc<AtomicU64>,
}
impl HighFrequencySpanBenchmark {
fn new(thread_count: usize, spans_per_thread: usize, batch_size: usize) -> Self {
Self {
thread_count,
spans_per_thread,
batch_size,
start_barrier: Arc::new(Barrier::new(thread_count)),
completion_counter: Arc::new(AtomicU64::new(0)),
}
}
fn run_span_export_benchmark(&self, exporter: Arc<LoadSheddingTraceExporter>) -> Duration {
let mut handles = Vec::with_capacity(self.thread_count);
for thread_id in 0..self.thread_count {
let barrier = Arc::clone(&self.start_barrier);
let counter = Arc::clone(&self.completion_counter);
let exporter = Arc::clone(&exporter);
let spans_per_thread = self.spans_per_thread;
let batch_size = self.batch_size;
handles.push(thread::spawn(move || {
barrier.wait();
let start = Instant::now();
let batch_count = spans_per_thread / batch_size;
for batch_id in 0..batch_count {
let spans: Vec<OtlpSpan> = (0..batch_size)
.map(|i| OtlpSpan {
span_id: format!("span-{}-{}-{}", thread_id, batch_id, i),
name: "high_frequency_operation".to_string(),
start_time_unix_nano: 1000000000,
end_time_unix_nano: 1000001000,
attributes: vec![
("thread_id".to_string(), thread_id.to_string()),
("batch_id".to_string(), batch_id.to_string()),
],
trace_flags: Some(0x01), })
.collect();
let batch = SpanBatch {
batch_id: batch_id as u64,
spans,
created_at: Instant::now(),
};
if let Err(e) = exporter.export(&batch) {
eprintln!("Export failed: {}", e);
}
}
let duration = start.elapsed();
counter.fetch_add(1, Ordering::Relaxed);
(thread_id, duration, spans_per_thread)
}));
}
let overall_start = Instant::now();
let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
let overall_duration = overall_start.elapsed();
for (thread_id, thread_duration, spans_exported) in results {
let spans_per_sec = spans_exported as f64 / thread_duration.as_secs_f64();
println!(
" Thread {}: {} spans in {:?} ({:.0} spans/sec)",
thread_id, spans_exported, thread_duration, spans_per_sec
);
}
overall_duration
}
}
fn create_test_span(span_id: &str, name: &str) -> OtlpSpan {
OtlpSpan {
span_id: span_id.to_string(),
name: name.to_string(),
start_time_unix_nano: 1000000000,
end_time_unix_nano: 1000001000,
attributes: vec![("service".to_string(), "test".to_string())],
trace_flags: Some(0x01), }
}
#[test]
fn audit_high_frequency_span_processing_lock_analysis() {
println!("🔍 AUDIT: OTLP span processor work-queue under 100K+ spans/sec load");
println!("📋 High-frequency performance requirements:");
println!(" • Lock-free work-queue (preferred)");
println!(" • No mutex contention on span export");
println!(" • Linear scaling with thread count");
println!(" • Sustained 100K+ spans/sec throughput");
let thread_count = 8;
let spans_per_thread = 12_500; let batch_size = 100; let total_spans = thread_count * spans_per_thread;
println!("📊 Benchmark configuration:");
println!(" Threads: {}", thread_count);
println!(" Spans per thread: {}", spans_per_thread);
println!(" Batch size: {}", batch_size);
println!(" Total spans: {}", total_spans);
let memory_exporter = InMemoryOtlpHttpExporter::new(Duration::from_millis(1));
let exporter = Arc::new(LoadSheddingTraceExporter::new(
Box::new(memory_exporter),
1000, Duration::from_secs(1),
));
println!("📊 High-frequency span export benchmark:");
let benchmark = HighFrequencySpanBenchmark::new(thread_count, spans_per_thread, batch_size);
let benchmark_duration = benchmark.run_span_export_benchmark(Arc::clone(&exporter));
let total_throughput = total_spans as f64 / benchmark_duration.as_secs_f64();
println!(
" Overall: {} spans in {:?}",
total_spans, benchmark_duration
);
println!(" Throughput: {:.0} spans/sec", total_throughput);
println!("📊 Work-queue implementation analysis:");
println!(" Queue type: BoundedExportQueue<SpanBatch>");
println!(" Implementation: Mutex<VecDeque<T>> - MUTEX-PROTECTED ⚠️");
println!(" Location: src/observability/otlp_trace_exporter.rs:83");
println!(" Contention points:");
println!(" - enqueue(): self.queue.lock() (line 101)");
println!(" - dequeue(): self.queue.lock().pop_front() (line 115)");
println!(" - len(): self.queue.lock().len() (line 120)");
let target_throughput = 100_000.0;
if total_throughput >= target_throughput {
println!(
"✅ THROUGHPUT TARGET: Achieved {:.0} spans/sec (≥ 100K)",
total_throughput
);
} else {
println!(
"⚠️ THROUGHPUT BELOW TARGET: {:.0} spans/sec (< 100K)",
total_throughput
);
println!("💡 POTENTIAL CAUSE: Mutex contention in work-queue");
}
let expected_linear_scaling = total_throughput / thread_count as f64;
println!("📊 Concurrency scaling analysis:");
println!(
" Expected per-thread (linear): {:.0} spans/sec",
expected_linear_scaling
);
if benchmark_duration > Duration::from_secs(2) {
println!("⚠️ SLOW EXPORT DETECTED: Duration > 2s suggests contention");
println!("💡 LIKELY CAUSE: Mutex serialization in BoundedExportQueue");
} else {
println!("⏱️ ACCEPTABLE DURATION: Export completed in reasonable time");
}
println!("📊 Mutex contention assessment:");
println!("❌ MUTEX-PROTECTED QUEUE DETECTED:");
println!(" • Every span export acquires exclusive lock");
println!(" • Serializes concurrent span creation threads");
println!(" • Potential thread parking/unparking overhead");
println!(" • Lock contention increases with thread count");
println!("🔧 PERFORMANCE RECOMMENDATION:");
println!(" • Replace Mutex<VecDeque<T>> with lock-free queue");
println!(" • Consider crossbeam-queue for MPSC scenario");
println!(" • Use atomic operations for queue management");
println!(" • Target: 100K+ spans/sec with linear thread scaling");
let stats = exporter.load_shedding_stats();
println!("📊 Load shedding impact:");
println!(
" Queue depth: {}/{}",
stats.queue_depth, stats.queue_capacity
);
println!(" Dropped batches: {}", stats.dropped_batches);
assert!(total_spans > 0, "Benchmark should export spans");
println!("✅ HIGH-FREQUENCY SPAN PROCESSING AUDIT COMPLETE");
println!("🚨 PERFORMANCE ISSUE: Mutex-protected work-queue identified");
}
#[test]
fn audit_mutex_contention_thread_scaling() {
println!("🔍 AUDIT: Mutex contention scaling analysis");
println!("📋 Contention scaling test:");
println!(" • Test thread counts: 1, 2, 4, 8");
println!(" • Fixed spans per thread for fair comparison");
println!(" • Measure throughput degradation with thread count");
let spans_per_thread = 5000;
let batch_size = 50;
let thread_counts = vec![1, 2, 4, 8];
let mut throughputs = Vec::new();
for thread_count in thread_counts {
println!("📊 Testing {} threads:", thread_count);
let memory_exporter = InMemoryOtlpHttpExporter::new(Duration::from_millis(1));
let exporter = Arc::new(LoadSheddingTraceExporter::new(
Box::new(memory_exporter),
500,
Duration::from_secs(1),
));
let benchmark = HighFrequencySpanBenchmark::new(thread_count, spans_per_thread, batch_size);
let duration = benchmark.run_span_export_benchmark(Arc::clone(&exporter));
let total_spans = thread_count * spans_per_thread;
let throughput = total_spans as f64 / duration.as_secs_f64();
throughputs.push(throughput);
println!(" Result: {:.0} spans/sec", throughput);
}
println!("📊 Scaling analysis:");
let baseline_throughput = throughputs[0];
for (i, &throughput) in throughputs.iter().enumerate() {
let thread_count = 1 << i; let expected_linear = baseline_throughput * thread_count as f64;
let scaling_efficiency = throughput / expected_linear;
println!(
" {} threads: {:.0} spans/sec (efficiency: {:.1}%)",
thread_count,
throughput,
scaling_efficiency * 100.0
);
}
let final_efficiency = throughputs.last().unwrap() / (baseline_throughput * 8.0);
if final_efficiency < 0.7 {
println!(
"🚨 SEVERE CONTENTION: 8-thread efficiency {:.1}% (< 70%)",
final_efficiency * 100.0
);
println!("💡 EVIDENCE: Mutex serialization prevents linear scaling");
} else if final_efficiency < 0.9 {
println!(
"⚠️ MODERATE CONTENTION: 8-thread efficiency {:.1}% (< 90%)",
final_efficiency * 100.0
);
println!("💡 LIKELY CAUSE: Occasional mutex contention");
} else {
println!(
"✅ GOOD SCALING: 8-thread efficiency {:.1}% (≥ 90%)",
final_efficiency * 100.0
);
}
println!("✅ MUTEX CONTENTION SCALING ANALYSIS COMPLETE");
println!("🔧 RECOMMENDATION: Implement lock-free queue for linear scaling");
}
#[test]
fn audit_queue_operation_latency_profile() {
println!("🔍 AUDIT: Queue operation latency under contention");
println!("📋 Queue operation profiling:");
println!(" • Measure enqueue latency under load");
println!(" • Profile mutex acquisition time");
println!(" • Identify serialization bottlenecks");
let memory_exporter = InMemoryOtlpHttpExporter::new(Duration::from_nanos(1));
let exporter = Arc::new(LoadSheddingTraceExporter::new(
Box::new(memory_exporter),
100,
Duration::from_secs(1),
));
let batch_count: usize = 1000;
let batches: Vec<SpanBatch> = (0..batch_count)
.map(|i| SpanBatch {
batch_id: i as u64,
spans: vec![create_test_span(&format!("span-{}", i), "latency_test")],
created_at: Instant::now(),
})
.collect();
println!("📊 Sequential operation baseline:");
let sequential_start = Instant::now();
for batch in &batches[..100] {
let _ = exporter.export(batch);
}
let sequential_duration = sequential_start.elapsed();
let sequential_avg = sequential_duration / 100;
println!(
" Average enqueue latency: {:?} (sequential)",
sequential_avg
);
println!("📊 Concurrent operation contention:");
let thread_count = 4;
let batches_per_thread = batch_count / thread_count;
let concurrent_start = Instant::now();
let handles: Vec<_> = (0..thread_count)
.map(|thread_id| {
let exporter = Arc::clone(&exporter);
let start_idx = thread_id * batches_per_thread;
let end_idx = start_idx + batches_per_thread;
let thread_batches = batches[start_idx..end_idx].to_vec();
thread::spawn(move || {
for batch in thread_batches {
let op_start = Instant::now();
let _ = exporter.export(&batch);
let _op_duration = op_start.elapsed();
}
})
})
.collect();
for handle in handles {
if let Ok(_latencies) = handle.join() {
}
}
let concurrent_duration = concurrent_start.elapsed();
let concurrent_avg = concurrent_duration / batch_count as u32;
println!(
" Average enqueue latency: {:?} (concurrent)",
concurrent_avg
);
println!("📊 Mutex overhead assessment:");
if concurrent_avg > sequential_avg * 2 {
println!(
"🚨 HIGH MUTEX OVERHEAD: Concurrent latency {}x sequential",
concurrent_avg.as_nanos() / sequential_avg.as_nanos().max(1)
);
println!("💡 EVIDENCE: Mutex contention increases operation latency");
} else {
println!(
"⚠️ MODERATE OVERHEAD: Concurrent latency {}x sequential",
concurrent_avg.as_nanos() / sequential_avg.as_nanos().max(1)
);
}
println!("🔍 Implementation details:");
println!(" • Queue lock acquisition: parking_lot::Mutex (line 83)");
println!(" • Critical section: VecDeque operations inside lock");
println!(" • Contention impact: Thread parking under high frequency");
println!("✅ QUEUE OPERATION LATENCY PROFILE COMPLETE");
println!("📊 FINDING: Mutex-protected queue confirmed via latency analysis");
}