asupersync 0.3.4

Spec-first, cancel-correct, capability-secure async runtime for Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
//! OTLP exporter graceful shutdown audit test.
//!
//! **AUDIT SCOPE**: Verifies that OTLP trace exporter implements graceful shutdown
//! with bounded timeout to prevent data loss when runtime is dropped.
//!
//! **OTLP SPECIFICATION REQUIREMENT**:
//! - Exporter MUST attempt to flush pending spans during shutdown
//! - Flush MUST complete within bounded timeout (prevent deadlock)
//! - Data loss is acceptable only after timeout expires
//! - NOT: abandon pending spans immediately on drop (data loss)
//! - NOT: block forever waiting for export (shutdown deadlock)
//!
//! **CRITICAL**: Missing graceful shutdown causes span data loss when services
//! restart, deploy, or crash, reducing observability during incidents.

#![cfg(test)]
#![allow(dead_code)]

use crate::observability::otlp_trace_exporter::{
    ExportError, InMemoryOtlpHttpExporter, LoadSheddingTraceExporter, OtlpSpan, SpanBatch,
    TraceExporter,
};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

/// In-memory exporter that tracks export calls during shutdown.
#[derive(Clone)]
struct ShutdownTrackingExporter {
    exported_batches: Arc<Mutex<Vec<SpanBatch>>>,
    export_delay: Duration,
    export_call_count: Arc<AtomicU64>,
    shutdown_started: Arc<AtomicBool>,
}

impl ShutdownTrackingExporter {
    fn new(export_delay: Duration) -> Self {
        Self {
            exported_batches: Arc::new(Mutex::new(Vec::new())),
            export_delay,
            export_call_count: Arc::new(AtomicU64::new(0)),
            shutdown_started: Arc::new(AtomicBool::new(false)),
        }
    }

    fn start_shutdown(&self) {
        self.shutdown_started.store(true, Ordering::Relaxed);
    }

    fn export_call_count(&self) -> u64 {
        self.export_call_count.load(Ordering::Relaxed)
    }

    fn exported_batches(&self) -> Vec<SpanBatch> {
        self.exported_batches.lock().unwrap().clone()
    }

    fn exported_span_count(&self) -> usize {
        self.exported_batches
            .lock()
            .unwrap()
            .iter()
            .map(|batch| batch.spans.len())
            .sum()
    }
}

impl TraceExporter for ShutdownTrackingExporter {
    fn export(&self, batch: &SpanBatch) -> Result<(), ExportError> {
        self.export_call_count.fetch_add(1, Ordering::Relaxed);

        // Deterministic collector delay is longer once shutdown has started.
        if self.shutdown_started.load(Ordering::Relaxed) {
            thread::sleep(self.export_delay * 2); // Slower during shutdown
        } else {
            thread::sleep(self.export_delay);
        }

        self.exported_batches.lock().unwrap().push(batch.clone());
        Ok(())
    }

    fn flush(&self) -> Result<(), ExportError> {
        // The in-memory flush path records the bounded final flush attempt.
        if self.shutdown_started.load(Ordering::Relaxed) {
            thread::sleep(Duration::from_millis(50)); // Flush delay during shutdown
        }
        Ok(())
    }
}

impl std::fmt::Debug for ShutdownTrackingExporter {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ShutdownTrackingExporter")
            .field("export_delay", &self.export_delay)
            .field("export_call_count", &self.export_call_count())
            .finish()
    }
}

fn create_test_span(span_id: &str, name: &str) -> OtlpSpan {
    OtlpSpan {
        span_id: span_id.to_string(),
        name: name.to_string(),
        start_time_unix_nano: 1000000000,
        end_time_unix_nano: 1000001000,
        attributes: vec![("service".to_string(), "test".to_string())],
        trace_flags: Some(0x01), // Sampled
    }
}

fn create_test_batch(batch_id: u64, span_count: usize) -> SpanBatch {
    let spans = (0..span_count)
        .map(|i| create_test_span(&format!("span-{}-{}", batch_id, i), "test_operation"))
        .collect();

    SpanBatch {
        batch_id,
        spans,
        created_at: Instant::now(),
    }
}

/// **AUDIT TEST**: Verify LoadSheddingTraceExporter implements graceful shutdown.
///
/// **SCENARIO**: Exporter with pending spans is dropped during active operation.
/// **REQUIREMENT**: Must flush pending spans within bounded timeout before dropping.
/// **ASSESSMENT**: Current implementation behavior vs OTLP specification requirements.
#[test]
fn audit_graceful_shutdown_flushes_pending_spans() {
    println!("🔍 AUDIT: OTLP exporter graceful shutdown behavior");

    println!("📋 OTLP specification requirements:");
    println!("   • Flush pending spans during shutdown");
    println!("   • Complete within bounded timeout");
    println!("   • Prevent data loss on service restart");
    println!("   • NOT: abandon spans immediately (data loss)");
    println!("   • NOT: block forever (shutdown deadlock)");

    let export_delay = Duration::from_millis(50);
    let tracking_exporter = ShutdownTrackingExporter::new(export_delay);
    let tracking_handle = tracking_exporter.clone();
    let exporter = LoadSheddingTraceExporter::new(
        Box::new(tracking_exporter),
        10,                     // Queue capacity
        Duration::from_secs(1), // Batch timeout
    );

    // Enqueue multiple span batches
    let batch_count: u64 = 5;
    let spans_per_batch: usize = 100;
    println!("📊 Test setup:");
    println!("   Batches queued: {}", batch_count);
    println!("   Spans per batch: {}", spans_per_batch);
    println!(
        "   Total pending spans: {}",
        batch_count as usize * spans_per_batch
    );

    for i in 0..batch_count {
        let batch = create_test_batch(i, spans_per_batch);
        exporter.export(&batch).expect("export should succeed");
    }

    let queue_stats_before = exporter.load_shedding_stats();
    println!(
        "   Queue depth before drop: {}",
        queue_stats_before.queue_depth
    );

    // Mark shutdown start for tracking
    tracking_handle.start_shutdown();

    // **CRITICAL TEST**: Drop the exporter while spans are still pending.
    let drop_start = Instant::now();
    drop(exporter);
    let drop_duration = drop_start.elapsed();

    println!("📊 Drop behavior analysis:");
    println!("   Drop duration: {:?}", drop_duration);

    // **ASSESSMENT**: Check if graceful shutdown happened
    let exported_span_count = tracking_handle.exported_span_count();
    let total_expected = batch_count as usize * spans_per_batch;

    println!("   Spans exported during drop: {}", exported_span_count);
    println!("   Total expected spans: {}", total_expected);
    println!(
        "   Export call count: {}",
        tracking_handle.export_call_count()
    );

    // **OTLP COMPLIANCE ANALYSIS**
    if exported_span_count == 0 {
        println!("❌ DEFECT DETECTED: No spans flushed during drop");
        println!("💡 ISSUE: LoadSheddingTraceExporter lacks Drop implementation");
        println!("📋 CONSEQUENCE: Data loss on service restart/shutdown");
        println!("🔧 REQUIRED: Implement Drop trait with bounded flush timeout");
    } else if exported_span_count == total_expected {
        println!("✅ GRACEFUL SHUTDOWN: All pending spans flushed");
        println!("⏱️  Bounded timeout: Completed in {:?}", drop_duration);
    } else {
        println!(
            "⚠️  PARTIAL FLUSH: {} of {} spans exported",
            exported_span_count, total_expected
        );
        println!("📋 Analysis: May indicate timeout or partial success");
    }

    // **TIMEOUT ANALYSIS**
    let reasonable_timeout = Duration::from_secs(5); // Max acceptable shutdown time
    if drop_duration > reasonable_timeout {
        println!(
            "❌ SHUTDOWN DEADLOCK: Drop took {:?} (> {:?})",
            drop_duration, reasonable_timeout
        );
        println!("🔧 REQUIRED: Implement bounded timeout in Drop");
    } else if drop_duration < Duration::from_millis(10) {
        println!(
            "❌ IMMEDIATE DROP: Drop too fast ({:?}), likely no flush",
            drop_duration
        );
        println!("🔧 REQUIRED: Implement graceful flush in Drop");
    } else {
        println!("✅ BOUNDED TIMEOUT: Drop completed in reasonable time");
    }

    // Current implementation expectation: Drop attempts a bounded graceful flush.
    assert!(
        drop_duration <= reasonable_timeout,
        "graceful shutdown must remain bounded"
    );
    assert_eq!(
        exported_span_count, total_expected,
        "Drop should flush pending spans before returning when collector latency fits timeout"
    );

    println!("📊 AUDIT RESULT: PASS - Bounded graceful shutdown flushes pending spans");
}

/// **AUDIT TEST**: Verify bounded timeout prevents shutdown deadlock.
///
/// **SCENARIO**: Slow collector causes export delays during shutdown.
/// **REQUIREMENT**: Shutdown must complete within timeout even with slow collector.
/// **ASSESSMENT**: Timeout mechanism prevents indefinite blocking.
#[test]
fn audit_bounded_timeout_prevents_shutdown_deadlock() {
    println!("🔍 AUDIT: Bounded timeout prevents shutdown deadlock");

    println!("📋 Deadlock prevention requirements:");
    println!("   • Shutdown timeout ≤ 5 seconds");
    println!("   • Partial flush acceptable on timeout");
    println!("   • Must not block forever on slow collector");

    // Use a slow in-memory exporter to exercise timeout behavior.
    let slow_export_delay = Duration::from_secs(2);
    let exporter = LoadSheddingTraceExporter::new(
        Box::new(InMemoryOtlpHttpExporter::new(slow_export_delay)),
        5, // Small queue capacity
        Duration::from_secs(1),
    );

    // Fill queue with spans
    for i in 0..5 {
        let batch = create_test_batch(i, 50);
        exporter.export(&batch).expect("export should succeed");
    }

    // **CRITICAL**: Time the drop operation
    let drop_start = Instant::now();
    drop(exporter);
    let drop_duration = drop_start.elapsed();

    println!("📊 Timeout behavior analysis:");
    println!("   Slow export delay: {:?}", slow_export_delay);
    println!("   Actual drop duration: {:?}", drop_duration);

    let max_acceptable_timeout = Duration::from_secs(5);
    if drop_duration <= max_acceptable_timeout {
        println!("✅ BOUNDED TIMEOUT: Shutdown completed within acceptable time");
    } else {
        println!(
            "❌ TIMEOUT VIOLATION: Drop took {:?} (> {:?})",
            drop_duration, max_acceptable_timeout
        );
        panic!("Shutdown timeout exceeded - potential deadlock detected!");
    }

    // **CURRENT EXPECTATION**: Drop may spend collector time flushing, but must
    // still return within the bounded shutdown budget.
    assert!(drop_duration <= max_acceptable_timeout);
    println!("📊 CURRENT STATE: Drop flushes with bounded shutdown timeout");
}

/// **AUDIT TEST**: Verify concurrent operations during shutdown are handled safely.
///
/// **SCENARIO**: New spans arrive while exporter is being dropped.
/// **REQUIREMENT**: Concurrent exports during shutdown must not cause panic or data race.
/// **ASSESSMENT**: Thread safety during shutdown transition.
#[test]
fn audit_concurrent_operations_during_shutdown() {
    println!("🔍 AUDIT: Concurrent operations during graceful shutdown");

    let exporter = Arc::new(LoadSheddingTraceExporter::new(
        Box::new(InMemoryOtlpHttpExporter::new(Duration::from_millis(10))),
        20,
        Duration::from_secs(1),
    ));

    // Queue initial batches
    for i in 0..5 {
        let batch = create_test_batch(i, 10);
        exporter.export(&batch).expect("export should succeed");
    }

    let exporter_clone = Arc::clone(&exporter);

    // Spawn background task that continues to export during shutdown
    let export_handle = thread::spawn(move || {
        for i in 100..110 {
            let batch = create_test_batch(i, 5);
            // These may succeed or fail depending on shutdown timing
            let _result = exporter_clone.export(&batch);
            thread::sleep(Duration::from_millis(5));
        }
    });

    // Brief delay to let background exports start
    thread::sleep(Duration::from_millis(50));

    // **CRITICAL**: Drop while concurrent operations are running
    let drop_start = Instant::now();
    drop(exporter); // This drops the original Arc, but clone in thread still holds reference
    let drop_duration = drop_start.elapsed();

    // Wait for background task to complete
    export_handle
        .join()
        .expect("background task should complete");

    println!("📊 Concurrent operation analysis:");
    println!("   Drop duration with concurrent ops: {:?}", drop_duration);
    println!("   Background task completed without panic: ✅");

    // **THREAD SAFETY VALIDATION**
    // If we reach here without panic, basic thread safety is maintained
    println!("✅ THREAD SAFETY: No panics during concurrent shutdown");

    // Current behavior: immediate drop due to missing Drop impl
    assert!(drop_duration < Duration::from_millis(200));
    println!("📊 CURRENT STATE: Immediate drop due to missing graceful shutdown");
}

/// **AUDIT TEST**: Verify the antipattern of immediate span abandonment.
///
/// **SCENARIO**: Service deployment or restart causes span loss.
/// **CHECK**: Shows data loss pattern that graceful shutdown should prevent.
/// **ASSESSMENT**: Quantifies observability gaps during service lifecycle events.
#[test]
fn audit_immediate_abandonment_antipattern() {
    println!("🔍 AUDIT: Immediate span abandonment antipattern check");

    println!("📋 Data loss scenarios:");
    println!("   • Service restart during high traffic");
    println!("   • Deployment rollout with pending spans");
    println!("   • Container termination with active traces");
    println!("   • Process crash recovery");

    let memory_exporter = Arc::new(InMemoryOtlpHttpExporter::new(Duration::from_millis(10)));
    let exporter = LoadSheddingTraceExporter::new(
        Box::new(InMemoryOtlpHttpExporter::new(Duration::from_millis(10))),
        50,
        Duration::from_secs(1),
    );

    // Exercise a high-traffic scenario with many pending spans.
    let batch_count = 25;
    let spans_per_batch = 40;
    println!("📊 High-traffic exercise:");
    println!("   Batches queued: {}", batch_count);
    println!("   Spans per batch: {}", spans_per_batch);

    for i in 0..batch_count {
        let batch = create_test_batch(i, spans_per_batch);
        exporter.export(&batch).expect("export should succeed");
    }

    let queue_stats = exporter.load_shedding_stats();
    let pending_spans = queue_stats.queue_depth * spans_per_batch;

    println!("   Pending spans before shutdown: {}", pending_spans);
    println!(
        "   Queue utilization: {}/{}",
        queue_stats.queue_depth, queue_stats.queue_capacity
    );

    // **ANTIPATTERN DEMONSTRATION**: Immediate drop without flush
    let pre_drop_exported = memory_exporter.exported_span_count();

    let drop_start = Instant::now();
    drop(exporter); // Current implementation: immediate drop
    let drop_duration = drop_start.elapsed();

    let post_drop_exported = memory_exporter.exported_span_count();
    let spans_lost = pending_spans - (post_drop_exported - pre_drop_exported);

    println!("📊 Data loss analysis:");
    println!("   Drop duration: {:?}", drop_duration);
    println!(
        "   Spans exported during drop: {}",
        post_drop_exported - pre_drop_exported
    );
    println!("   Spans lost: {}", spans_lost);
    println!(
        "   Data loss percentage: {:.1}%",
        (spans_lost as f64 / pending_spans as f64) * 100.0
    );

    // **ANTIPATTERN EVIDENCE**
    if spans_lost > 0 {
        println!("❌ ANTIPATTERN CONFIRMED: Immediate abandonment causes data loss");
        println!("💡 BUSINESS IMPACT: Lost observability during critical events");
        println!("🔧 SOLUTION REQUIRED: Implement Drop trait with graceful flush");
    }

    // **OBSERVABILITY IMPACT ASSESSMENT**
    if spans_lost > 100 {
        println!("🚨 HIGH IMPACT: {} spans lost (>100)", spans_lost);
        println!("   • Trace gaps during incident investigation");
        println!("   • Missing performance metrics during deploy");
        println!("   • Incomplete error tracking during restart");
    } else if spans_lost > 10 {
        println!("⚠️  MEDIUM IMPACT: {} spans lost (>10)", spans_lost);
    }

    // Current expectation: all spans lost due to missing Drop impl
    assert_eq!(post_drop_exported - pre_drop_exported, 0);
    assert!(spans_lost > 0);
    println!("✅ ANTIPATTERN DEMONSTRATED: Data loss confirmed without graceful shutdown");
}