clnrm_core/telemetry/
adaptive_flush.rs

1//! Adaptive Flush Timeout Management for OTLP Exports
2//!
3//! This module implements intelligent flush timeout calculation based on
4//! export statistics, ensuring >99.9% delivery success rate to Weaver.
5//!
6//! # Problem
7//!
8//! Fixed 500ms flush timeout (current implementation) may be:
9//! - Too short for slow networks → data loss
10//! - Too long for fast networks → unnecessary wait
11//!
12//! # Solution
13//!
14//! Calculate flush timeout dynamically based on:
15//! - Recent export success rate
16//! - P95 export latency
17//! - Failure count
18//!
19//! # Target
20//!
21//! >99.9% export success rate (1 failure per 1000 exports allowed)
22
23use std::collections::VecDeque;
24use std::sync::{Arc, Mutex};
25use std::time::{Duration, Instant};
26
27/// Export attempt result with timing
28#[derive(Debug, Clone)]
29pub struct ExportAttempt {
30    /// When export was attempted
31    pub timestamp: Instant,
32    /// Export duration
33    pub duration: Duration,
34    /// Whether export succeeded
35    pub success: bool,
36}
37
38/// Export statistics tracker
39///
40/// Tracks recent export attempts to calculate adaptive timeouts.
41/// Thread-safe for use across async exporters.
42#[derive(Debug, Clone)]
43pub struct ExportStatistics {
44    /// Recent export attempts (circular buffer, max 1000 entries)
45    attempts: Arc<Mutex<VecDeque<ExportAttempt>>>,
46    /// Maximum attempts to track
47    max_attempts: usize,
48}
49
50impl Default for ExportStatistics {
51    fn default() -> Self {
52        Self::new(1000)
53    }
54}
55
56impl ExportStatistics {
57    /// Create new export statistics tracker
58    ///
59    /// # Arguments
60    ///
61    /// * `max_attempts` - Maximum number of attempts to track (default: 1000)
62    pub fn new(max_attempts: usize) -> Self {
63        Self {
64            attempts: Arc::new(Mutex::new(VecDeque::with_capacity(max_attempts))),
65            max_attempts,
66        }
67    }
68
69    /// Record successful export
70    pub fn record_success(&self, duration: Duration) {
71        self.record_attempt(ExportAttempt {
72            timestamp: Instant::now(),
73            duration,
74            success: true,
75        });
76    }
77
78    /// Record failed export
79    pub fn record_failure(&self, duration: Duration) {
80        self.record_attempt(ExportAttempt {
81            timestamp: Instant::now(),
82            duration,
83            success: false,
84        });
85    }
86
87    /// Record export attempt
88    fn record_attempt(&self, attempt: ExportAttempt) {
89        if let Ok(mut attempts) = self.attempts.lock() {
90            // Add new attempt
91            attempts.push_back(attempt);
92
93            // Remove oldest if exceeding max
94            if attempts.len() > self.max_attempts {
95                attempts.pop_front();
96            }
97        }
98    }
99
100    /// Calculate export success rate (0.0 to 1.0)
101    pub fn success_rate(&self) -> f64 {
102        let attempts = self.attempts.lock().ok();
103        if attempts.is_none() {
104            return 1.0; // Assume healthy if can't lock
105        }
106
107        let attempts = attempts.unwrap();
108        if attempts.is_empty() {
109            return 1.0; // No data yet, assume healthy
110        }
111
112        let successful = attempts.iter().filter(|a| a.success).count();
113        successful as f64 / attempts.len() as f64
114    }
115
116    /// Calculate P95 export latency
117    ///
118    /// Returns the 95th percentile export duration.
119    /// This represents worst-case latency for 95% of exports.
120    pub fn p95_latency(&self) -> Duration {
121        let attempts = self.attempts.lock().ok();
122        if attempts.is_none() {
123            return Duration::from_millis(500); // Default
124        }
125
126        let attempts = attempts.unwrap();
127        if attempts.is_empty() {
128            return Duration::from_millis(500); // Default
129        }
130
131        // Collect durations and sort
132        let mut durations: Vec<Duration> = attempts.iter().map(|a| a.duration).collect();
133        durations.sort();
134
135        // Calculate P95 index
136        let p95_index = (durations.len() as f64 * 0.95).ceil() as usize;
137        let p95_index = p95_index.min(durations.len() - 1);
138
139        durations[p95_index]
140    }
141
142    /// Get count of failed exports
143    pub fn failed_exports(&self) -> usize {
144        let attempts = self.attempts.lock().ok();
145        if attempts.is_none() {
146            return 0;
147        }
148
149        let attempts = attempts.unwrap();
150        attempts.iter().filter(|a| !a.success).count()
151    }
152
153    /// Get total export count
154    pub fn total_exports(&self) -> usize {
155        let attempts = self.attempts.lock().ok();
156        if attempts.is_none() {
157            return 0;
158        }
159
160        attempts.unwrap().len()
161    }
162
163    /// Get age of last export attempt
164    pub fn last_export_age(&self) -> Option<Duration> {
165        let attempts = self.attempts.lock().ok()?;
166        attempts.back().map(|a| a.timestamp.elapsed())
167    }
168}
169
170/// Adaptive flush timeout calculator
171///
172/// Calculates optimal flush timeout based on export statistics.
173/// Ensures >99.9% delivery success rate while minimizing wait time.
174#[derive(Debug, Clone)]
175pub struct AdaptiveFlush {
176    /// Export statistics tracker
177    stats: ExportStatistics,
178    /// Base timeout (minimum, default: 500ms)
179    base_timeout: Duration,
180    /// Max timeout (cap, default: 10s)
181    max_timeout: Duration,
182}
183
184impl Default for AdaptiveFlush {
185    fn default() -> Self {
186        Self::new(Duration::from_millis(500), Duration::from_secs(10))
187    }
188}
189
190impl AdaptiveFlush {
191    /// Create new adaptive flush calculator
192    ///
193    /// # Arguments
194    ///
195    /// * `base_timeout` - Minimum timeout (e.g., 500ms)
196    /// * `max_timeout` - Maximum timeout (e.g., 10s)
197    pub fn new(base_timeout: Duration, max_timeout: Duration) -> Self {
198        Self {
199            stats: ExportStatistics::default(),
200            base_timeout,
201            max_timeout,
202        }
203    }
204
205    /// Get export statistics for monitoring
206    pub fn stats(&self) -> &ExportStatistics {
207        &self.stats
208    }
209
210    /// Record successful export
211    pub fn record_success(&self, duration: Duration) {
212        self.stats.record_success(duration);
213    }
214
215    /// Record failed export
216    pub fn record_failure(&self, duration: Duration) {
217        self.stats.record_failure(duration);
218    }
219
220    /// Calculate adaptive flush timeout
221    ///
222    /// # Algorithm
223    ///
224    /// 1. If success rate > 99.9%: Use P95 latency + 10% buffer
225    /// 2. If success rate > 99.0%: Use P95 latency + 25% buffer
226    /// 3. If success rate > 95.0%: Use P95 latency + 50% buffer
227    /// 4. If success rate < 95.0%: Use max timeout (network issues)
228    ///
229    /// Always clamp result between base_timeout and max_timeout.
230    ///
231    /// # Returns
232    ///
233    /// Optimal flush timeout based on recent export performance.
234    pub fn calculate_timeout(&self) -> Duration {
235        let success_rate = self.stats.success_rate();
236        let p95 = self.stats.p95_latency();
237
238        // Calculate buffer multiplier based on success rate
239        let buffer_multiplier = if success_rate >= 0.999 {
240            // >99.9% success - use P95 + 10% (tight tolerance)
241            1.10
242        } else if success_rate >= 0.99 {
243            // >99.0% success - use P95 + 25% (moderate tolerance)
244            1.25
245        } else if success_rate >= 0.95 {
246            // >95.0% success - use P95 + 50% (loose tolerance)
247            1.50
248        } else {
249            // <95.0% success - use max timeout (network issues)
250            tracing::warn!(
251                success_rate = %format!("{:.2}%", success_rate * 100.0),
252                "Low export success rate detected, using max timeout"
253            );
254            return self.max_timeout;
255        };
256
257        // Calculate timeout with buffer
258        let timeout = Duration::from_millis((p95.as_millis() as f64 * buffer_multiplier) as u64);
259
260        // Clamp to [base_timeout, max_timeout]
261        timeout.max(self.base_timeout).min(self.max_timeout)
262    }
263
264    /// Get recommended timeout with diagnostic info
265    ///
266    /// Returns tuple of (timeout, diagnostics string) for logging.
267    pub fn calculate_timeout_with_diagnostics(&self) -> (Duration, String) {
268        let timeout = self.calculate_timeout();
269        let success_rate = self.stats.success_rate();
270        let p95 = self.stats.p95_latency();
271        let failed = self.stats.failed_exports();
272        let total = self.stats.total_exports();
273
274        let diagnostics = format!(
275            "timeout={:?} (success_rate={:.2}%, p95={:?}, failures={}/{})",
276            timeout,
277            success_rate * 100.0,
278            p95,
279            failed,
280            total
281        );
282
283        (timeout, diagnostics)
284    }
285
286    /// Check if exports are healthy (>99.9% success rate)
287    pub fn is_healthy(&self) -> bool {
288        self.stats.success_rate() >= 0.999
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    #[test]
297    fn test_export_statistics_empty() {
298        let stats = ExportStatistics::new(100);
299        assert_eq!(stats.success_rate(), 1.0); // Assume healthy when empty
300        assert_eq!(stats.failed_exports(), 0);
301        assert_eq!(stats.total_exports(), 0);
302    }
303
304    #[test]
305    fn test_export_statistics_all_success() {
306        let stats = ExportStatistics::new(100);
307
308        // Record 10 successful exports
309        for _ in 0..10 {
310            stats.record_success(Duration::from_millis(100));
311        }
312
313        assert_eq!(stats.success_rate(), 1.0);
314        assert_eq!(stats.failed_exports(), 0);
315        assert_eq!(stats.total_exports(), 10);
316    }
317
318    #[test]
319    fn test_export_statistics_with_failures() {
320        let stats = ExportStatistics::new(100);
321
322        // 99 successes + 1 failure = 99% success rate
323        for _ in 0..99 {
324            stats.record_success(Duration::from_millis(100));
325        }
326        stats.record_failure(Duration::from_millis(100));
327
328        assert_eq!(stats.success_rate(), 0.99);
329        assert_eq!(stats.failed_exports(), 1);
330        assert_eq!(stats.total_exports(), 100);
331    }
332
333    #[test]
334    fn test_p95_latency_calculation() {
335        let stats = ExportStatistics::new(100);
336
337        // Record exports with varying latencies
338        for i in 0..100 {
339            stats.record_success(Duration::from_millis(i * 10));
340        }
341
342        let p95 = stats.p95_latency();
343        // P95 should be around 950ms (95th of 0-990ms range)
344        assert!(p95.as_millis() >= 900 && p95.as_millis() <= 1000);
345    }
346
347    #[test]
348    fn test_adaptive_flush_high_success() {
349        let flush = AdaptiveFlush::default();
350
351        // Record 1000 fast successful exports
352        for _ in 0..1000 {
353            flush.record_success(Duration::from_millis(50));
354        }
355
356        let timeout = flush.calculate_timeout();
357        // Should be close to P95 + 10% = ~55ms, but clamped to base_timeout (500ms)
358        assert!(timeout >= Duration::from_millis(500));
359        assert!(timeout <= Duration::from_millis(600));
360        assert!(flush.is_healthy());
361    }
362
363    #[test]
364    fn test_adaptive_flush_low_success() {
365        let flush = AdaptiveFlush::default();
366
367        // Record 100 exports with 90% success rate (10 failures)
368        for _ in 0..90 {
369            flush.record_success(Duration::from_millis(100));
370        }
371        for _ in 0..10 {
372            flush.record_failure(Duration::from_millis(100));
373        }
374
375        let timeout = flush.calculate_timeout();
376        // Should use max timeout due to low success rate
377        assert_eq!(timeout, Duration::from_secs(10));
378        assert!(!flush.is_healthy());
379    }
380
381    #[test]
382    fn test_adaptive_flush_diagnostics() {
383        let flush = AdaptiveFlush::default();
384
385        // Record some exports
386        for _ in 0..10 {
387            flush.record_success(Duration::from_millis(100));
388        }
389
390        let (timeout, diagnostics) = flush.calculate_timeout_with_diagnostics();
391        assert!(timeout >= Duration::from_millis(500));
392        assert!(diagnostics.contains("success_rate"));
393        assert!(diagnostics.contains("p95"));
394    }
395}