#![cfg(test)]
use crate::observability::otlp_trace_exporter::{
ExportError, LoadSheddingTraceExporter, OtlpSpan, SpanBatch, TraceExporter,
};
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
struct InMemoryCollectorExporter {
export_delay: Duration,
flush_delay: Duration,
export_count: Arc<AtomicU64>,
flush_count: Arc<AtomicU64>,
should_fail_exports: Arc<AtomicBool>,
in_flight_exports: Arc<AtomicUsize>,
collector_ack_received: Arc<AtomicBool>,
}
impl InMemoryCollectorExporter {
fn new(export_delay: Duration, flush_delay: Duration) -> Self {
Self {
export_delay,
flush_delay,
export_count: Arc::new(AtomicU64::new(0)),
flush_count: Arc::new(AtomicU64::new(0)),
should_fail_exports: Arc::new(AtomicBool::new(false)),
in_flight_exports: Arc::new(AtomicUsize::new(0)),
collector_ack_received: Arc::new(AtomicBool::new(false)),
}
}
fn set_export_failure(&self, should_fail: bool) {
self.should_fail_exports
.store(should_fail, Ordering::Relaxed);
}
fn export_count(&self) -> u64 {
self.export_count.load(Ordering::Relaxed)
}
fn flush_count(&self) -> u64 {
self.flush_count.load(Ordering::Relaxed)
}
fn in_flight_count(&self) -> usize {
self.in_flight_exports.load(Ordering::Relaxed)
}
fn ack_received(&self) -> bool {
self.collector_ack_received.load(Ordering::Relaxed)
}
fn reset_ack(&self) {
self.collector_ack_received.store(false, Ordering::Relaxed);
}
}
impl TraceExporter for InMemoryCollectorExporter {
fn export(&self, _batch: &SpanBatch) -> Result<(), ExportError> {
if self.should_fail_exports.load(Ordering::Relaxed) {
return Err(ExportError::Transport("configured export failure".into()));
}
self.in_flight_exports.fetch_add(1, Ordering::Relaxed);
std::thread::sleep(self.export_delay);
self.collector_ack_received.store(true, Ordering::Relaxed);
self.export_count.fetch_add(1, Ordering::Relaxed);
self.in_flight_exports.fetch_sub(1, Ordering::Relaxed);
Ok(())
}
fn flush(&self) -> Result<(), ExportError> {
self.flush_count.fetch_add(1, Ordering::Relaxed);
let start = Instant::now();
let timeout = Duration::from_secs(5);
while self.in_flight_exports.load(Ordering::Relaxed) > 0 {
if start.elapsed() > timeout {
return Err(ExportError::Transport(
"Flush timeout waiting for exports".into(),
));
}
std::thread::sleep(Duration::from_millis(10));
}
std::thread::sleep(self.flush_delay);
Ok(())
}
}
#[test]
fn audit_force_flush_waits_for_collector_ack() {
println!("🔍 AUDIT: force_flush() waits for collector ACK (data preservation)");
println!("📋 OTLP SDK force_flush requirements:");
println!(" • Wait for all pending span exports to complete");
println!(" • Wait for collector ACK/response");
println!(" • Do NOT return success with in-flight exports");
println!(" • Prevent data loss during shutdown");
let export_delay = Duration::from_millis(200); let flush_delay = Duration::from_millis(50);
let memory_collector = InMemoryCollectorExporter::new(export_delay, flush_delay);
let exporter = LoadSheddingTraceExporter::new(
Box::new(memory_collector.clone()),
100, Duration::from_secs(1),
);
println!("📊 Test scenario setup:");
println!(
" Export delay: {:?} (deterministic collector latency)",
export_delay
);
println!(
" Flush delay: {:?} (deterministic flush processing)",
flush_delay
);
println!("📊 Phase 1: Export spans");
let span_count = 3;
for i in 1..=span_count {
let span = OtlpSpan::new(
format!("flush-test-span-{}", i),
"flush_test_operation".to_string(),
1000000000 + (i * 1000),
1000001000 + (i * 1000),
vec![
("test_case".to_string(), "force_flush".to_string()),
("span_index".to_string(), i.to_string()),
],
);
let batch = SpanBatch {
batch_id: i,
spans: vec![span],
created_at: Instant::now(),
};
exporter.export(&batch).expect("Export should succeed");
println!(" Exported span batch {}", i);
}
let stats_before = exporter.load_shedding_stats();
println!(" Queued batches: {}", stats_before.queue_depth);
println!("📊 Phase 2: Call force_flush() and verify waiting behavior");
memory_collector.reset_ack();
let flush_start = Instant::now();
let flush_result = exporter.flush();
let flush_duration = flush_start.elapsed();
println!(" flush() result: {:?}", flush_result);
println!(" flush() duration: {:?}", flush_duration);
assert!(
flush_result.is_ok(),
"force_flush() should succeed when collector is responsive"
);
let expected_min_duration = export_delay; assert!(
flush_duration >= expected_min_duration,
"force_flush() should wait for exports to complete. Expected >= {:?}, got {:?}",
expected_min_duration,
flush_duration
);
let final_exports = memory_collector.export_count();
let final_flushes = memory_collector.flush_count();
let final_in_flight = memory_collector.in_flight_count();
let ack_received = memory_collector.ack_received();
println!("📊 Force flush completion verification:");
println!(" Exported batches: {}", final_exports);
println!(" Flush calls: {}", final_flushes);
println!(" In-flight exports: {}", final_in_flight);
println!(" Collector ACK received: {}", ack_received);
assert_eq!(
final_exports, span_count,
"All {} span batches should be exported after force_flush()",
span_count
);
assert_eq!(
final_in_flight, 0,
"No in-flight exports should remain after force_flush()"
);
assert!(
ack_received,
"Collector ACK should be received after force_flush()"
);
let stats_after = exporter.load_shedding_stats();
assert_eq!(
stats_after.queue_depth, 0,
"Export queue should be empty after force_flush()"
);
println!("✅ FORCE_FLUSH DATA PRESERVATION: SOUND");
println!(" • Waited for all exports to complete");
println!(" • Received collector ACK");
println!(" • No data loss during flush");
println!(" • Queue fully drained");
}
#[test]
fn audit_force_flush_collector_failure_handling() {
println!("🔍 AUDIT: force_flush() behavior with collector failures");
let memory_collector =
InMemoryCollectorExporter::new(Duration::from_millis(50), Duration::from_millis(10));
let exporter = LoadSheddingTraceExporter::new(
Box::new(memory_collector.clone()),
100,
Duration::from_secs(1),
);
let span = OtlpSpan::new(
"failure-test-span".to_string(),
"failure_test_operation".to_string(),
1000000000,
1000001000,
vec![("test_case".to_string(), "collector_failure".to_string())],
);
let batch = SpanBatch {
batch_id: 1,
spans: vec![span],
created_at: Instant::now(),
};
exporter.export(&batch).expect("Export should succeed");
memory_collector.set_export_failure(true);
println!("📊 Testing flush with collector failure");
let flush_result = exporter.flush();
println!(" flush() with failure: {:?}", flush_result);
match flush_result {
Ok(()) => {
println!(" ✅ flush() succeeded (may have retry logic)");
}
Err(e) => {
println!(" ⚠️ flush() failed with error: {}", e);
println!(" 📋 Application can handle error gracefully");
}
}
println!("✅ COLLECTOR FAILURE HANDLING: Behavior is well-defined");
}
#[test]
fn audit_concurrent_flush_and_export() {
println!("🔍 AUDIT: Concurrent flush() and export() operations");
let memory_collector = InMemoryCollectorExporter::new(
Duration::from_millis(100), Duration::from_millis(20), );
let exporter = LoadSheddingTraceExporter::new(
Box::new(memory_collector.clone()),
100,
Duration::from_secs(1),
);
println!("📊 Testing concurrent operations:");
let exporter_clone = std::sync::Arc::new(exporter);
let export_handle = {
let exporter = Arc::clone(&exporter_clone);
std::thread::spawn(move || {
for i in 1..=5 {
let span = OtlpSpan::new(
format!("concurrent-span-{}", i),
"concurrent_operation".to_string(),
1000000000 + (i * 1000),
1000001000 + (i * 1000),
vec![("concurrent".to_string(), "true".to_string())],
);
let batch = SpanBatch {
batch_id: i,
spans: vec![span],
created_at: Instant::now(),
};
exporter.export(&batch).expect("Export should succeed");
println!(" Background exported batch {}", i);
std::thread::sleep(Duration::from_millis(50));
}
})
};
std::thread::sleep(Duration::from_millis(150));
println!(" Calling flush() during ongoing exports");
let flush_start = Instant::now();
let flush_result = exporter_clone.flush();
let flush_duration = flush_start.elapsed();
export_handle
.join()
.expect("Background thread should complete");
println!("📊 Concurrent operation results:");
println!(" flush() result: {:?}", flush_result);
println!(" flush() duration: {:?}", flush_duration);
println!(" Final exports: {}", memory_collector.export_count());
println!(" Final in-flight: {}", memory_collector.in_flight_count());
assert!(
flush_result.is_ok(),
"flush() should handle concurrent exports"
);
assert_eq!(
memory_collector.in_flight_count(),
0,
"No exports should be in-flight after flush completion"
);
println!("✅ CONCURRENT OPERATIONS: Proper coordination verified");
}
#[test]
fn audit_send_and_forget_antipattern() {
println!("🚨 AUDIT: Send-and-forget flush anti-pattern (data loss risk)");
println!("📋 DEFECTIVE anti-pattern:");
println!(" • force_flush() returns immediately");
println!(" • Does not wait for exports to complete");
println!(" • Application shuts down with pending exports");
println!(" • Result: data loss");
#[derive(Debug, Clone)]
struct SendAndForgetExporter {
export_count: Arc<AtomicU64>,
pending_exports: Arc<AtomicUsize>,
}
impl SendAndForgetExporter {
fn new() -> Self {
Self {
export_count: Arc::new(AtomicU64::new(0)),
pending_exports: Arc::new(AtomicUsize::new(0)),
}
}
fn pending_count(&self) -> usize {
self.pending_exports.load(Ordering::Relaxed)
}
}
impl TraceExporter for SendAndForgetExporter {
fn export(&self, _batch: &SpanBatch) -> Result<(), ExportError> {
self.pending_exports.fetch_add(1, Ordering::Relaxed);
std::thread::spawn({
let count = Arc::clone(&self.export_count);
let pending = Arc::clone(&self.pending_exports);
move || {
std::thread::sleep(Duration::from_millis(500));
count.fetch_add(1, Ordering::Relaxed);
pending.fetch_sub(1, Ordering::Relaxed);
}
});
Ok(())
}
fn flush(&self) -> Result<(), ExportError> {
println!(" 🚨 DEFECTIVE: flush() returns immediately without waiting");
Ok(()) }
}
let defective_exporter = SendAndForgetExporter::new();
let exporter = LoadSheddingTraceExporter::new(
Box::new(defective_exporter.clone()),
100,
Duration::from_secs(1),
);
let span = OtlpSpan::new(
"defective-test-span".to_string(),
"defective_operation".to_string(),
1000000000,
1000001000,
vec![("antipattern".to_string(), "send_and_forget".to_string())],
);
let batch = SpanBatch {
batch_id: 1,
spans: vec![span],
created_at: Instant::now(),
};
exporter.export(&batch).expect("Export should succeed");
println!("📊 Exercising defective behavior:");
let pending_before = defective_exporter.pending_count();
println!(" Pending exports before flush: {}", pending_before);
let flush_start = Instant::now();
let flush_result = exporter.flush();
let flush_duration = flush_start.elapsed();
let pending_after = defective_exporter.pending_count();
println!(" flush() duration: {:?} (too fast!)", flush_duration);
println!(" flush() result: {:?}", flush_result);
println!(" Pending exports after flush: {}", pending_after);
assert!(
flush_duration < Duration::from_millis(100),
"DEFECTIVE: flush() returned too quickly"
);
if pending_after > 0 {
println!("🚨 DATA LOSS RISK DEMONSTRATED:");
println!(" • flush() returned success");
println!(" • {} exports still pending", pending_after);
println!(" • Application shutdown would lose data");
}
std::thread::sleep(Duration::from_millis(600));
let final_pending = defective_exporter.pending_count();
println!(" Pending exports after delay: {}", final_pending);
println!("🚨 ANTIPATTERN DEMONSTRATED: Send-and-forget causes data loss");
println!("💡 SOLUTION: force_flush() must wait for collector ACK");
}