clnrm_core/telemetry/
adaptive_flush.rs1use std::collections::VecDeque;
24use std::sync::{Arc, Mutex};
25use std::time::{Duration, Instant};
26
27#[derive(Debug, Clone)]
29pub struct ExportAttempt {
30 pub timestamp: Instant,
32 pub duration: Duration,
34 pub success: bool,
36}
37
38#[derive(Debug, Clone)]
43pub struct ExportStatistics {
44 attempts: Arc<Mutex<VecDeque<ExportAttempt>>>,
46 max_attempts: usize,
48}
49
50impl Default for ExportStatistics {
51 fn default() -> Self {
52 Self::new(1000)
53 }
54}
55
56impl ExportStatistics {
57 pub fn new(max_attempts: usize) -> Self {
63 Self {
64 attempts: Arc::new(Mutex::new(VecDeque::with_capacity(max_attempts))),
65 max_attempts,
66 }
67 }
68
69 pub fn record_success(&self, duration: Duration) {
71 self.record_attempt(ExportAttempt {
72 timestamp: Instant::now(),
73 duration,
74 success: true,
75 });
76 }
77
78 pub fn record_failure(&self, duration: Duration) {
80 self.record_attempt(ExportAttempt {
81 timestamp: Instant::now(),
82 duration,
83 success: false,
84 });
85 }
86
87 fn record_attempt(&self, attempt: ExportAttempt) {
89 if let Ok(mut attempts) = self.attempts.lock() {
90 attempts.push_back(attempt);
92
93 if attempts.len() > self.max_attempts {
95 attempts.pop_front();
96 }
97 }
98 }
99
100 pub fn success_rate(&self) -> f64 {
102 let attempts = self.attempts.lock().ok();
103 if attempts.is_none() {
104 return 1.0; }
106
107 let attempts = attempts.unwrap();
108 if attempts.is_empty() {
109 return 1.0; }
111
112 let successful = attempts.iter().filter(|a| a.success).count();
113 successful as f64 / attempts.len() as f64
114 }
115
116 pub fn p95_latency(&self) -> Duration {
121 let attempts = self.attempts.lock().ok();
122 if attempts.is_none() {
123 return Duration::from_millis(500); }
125
126 let attempts = attempts.unwrap();
127 if attempts.is_empty() {
128 return Duration::from_millis(500); }
130
131 let mut durations: Vec<Duration> = attempts.iter().map(|a| a.duration).collect();
133 durations.sort();
134
135 let p95_index = (durations.len() as f64 * 0.95).ceil() as usize;
137 let p95_index = p95_index.min(durations.len() - 1);
138
139 durations[p95_index]
140 }
141
142 pub fn failed_exports(&self) -> usize {
144 let attempts = self.attempts.lock().ok();
145 if attempts.is_none() {
146 return 0;
147 }
148
149 let attempts = attempts.unwrap();
150 attempts.iter().filter(|a| !a.success).count()
151 }
152
153 pub fn total_exports(&self) -> usize {
155 let attempts = self.attempts.lock().ok();
156 if attempts.is_none() {
157 return 0;
158 }
159
160 attempts.unwrap().len()
161 }
162
163 pub fn last_export_age(&self) -> Option<Duration> {
165 let attempts = self.attempts.lock().ok()?;
166 attempts.back().map(|a| a.timestamp.elapsed())
167 }
168}
169
170#[derive(Debug, Clone)]
175pub struct AdaptiveFlush {
176 stats: ExportStatistics,
178 base_timeout: Duration,
180 max_timeout: Duration,
182}
183
184impl Default for AdaptiveFlush {
185 fn default() -> Self {
186 Self::new(Duration::from_millis(500), Duration::from_secs(10))
187 }
188}
189
190impl AdaptiveFlush {
191 pub fn new(base_timeout: Duration, max_timeout: Duration) -> Self {
198 Self {
199 stats: ExportStatistics::default(),
200 base_timeout,
201 max_timeout,
202 }
203 }
204
205 pub fn stats(&self) -> &ExportStatistics {
207 &self.stats
208 }
209
210 pub fn record_success(&self, duration: Duration) {
212 self.stats.record_success(duration);
213 }
214
215 pub fn record_failure(&self, duration: Duration) {
217 self.stats.record_failure(duration);
218 }
219
220 pub fn calculate_timeout(&self) -> Duration {
235 let success_rate = self.stats.success_rate();
236 let p95 = self.stats.p95_latency();
237
238 let buffer_multiplier = if success_rate >= 0.999 {
240 1.10
242 } else if success_rate >= 0.99 {
243 1.25
245 } else if success_rate >= 0.95 {
246 1.50
248 } else {
249 tracing::warn!(
251 success_rate = %format!("{:.2}%", success_rate * 100.0),
252 "Low export success rate detected, using max timeout"
253 );
254 return self.max_timeout;
255 };
256
257 let timeout = Duration::from_millis((p95.as_millis() as f64 * buffer_multiplier) as u64);
259
260 timeout.max(self.base_timeout).min(self.max_timeout)
262 }
263
264 pub fn calculate_timeout_with_diagnostics(&self) -> (Duration, String) {
268 let timeout = self.calculate_timeout();
269 let success_rate = self.stats.success_rate();
270 let p95 = self.stats.p95_latency();
271 let failed = self.stats.failed_exports();
272 let total = self.stats.total_exports();
273
274 let diagnostics = format!(
275 "timeout={:?} (success_rate={:.2}%, p95={:?}, failures={}/{})",
276 timeout,
277 success_rate * 100.0,
278 p95,
279 failed,
280 total
281 );
282
283 (timeout, diagnostics)
284 }
285
286 pub fn is_healthy(&self) -> bool {
288 self.stats.success_rate() >= 0.999
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295
296 #[test]
297 fn test_export_statistics_empty() {
298 let stats = ExportStatistics::new(100);
299 assert_eq!(stats.success_rate(), 1.0); assert_eq!(stats.failed_exports(), 0);
301 assert_eq!(stats.total_exports(), 0);
302 }
303
304 #[test]
305 fn test_export_statistics_all_success() {
306 let stats = ExportStatistics::new(100);
307
308 for _ in 0..10 {
310 stats.record_success(Duration::from_millis(100));
311 }
312
313 assert_eq!(stats.success_rate(), 1.0);
314 assert_eq!(stats.failed_exports(), 0);
315 assert_eq!(stats.total_exports(), 10);
316 }
317
318 #[test]
319 fn test_export_statistics_with_failures() {
320 let stats = ExportStatistics::new(100);
321
322 for _ in 0..99 {
324 stats.record_success(Duration::from_millis(100));
325 }
326 stats.record_failure(Duration::from_millis(100));
327
328 assert_eq!(stats.success_rate(), 0.99);
329 assert_eq!(stats.failed_exports(), 1);
330 assert_eq!(stats.total_exports(), 100);
331 }
332
333 #[test]
334 fn test_p95_latency_calculation() {
335 let stats = ExportStatistics::new(100);
336
337 for i in 0..100 {
339 stats.record_success(Duration::from_millis(i * 10));
340 }
341
342 let p95 = stats.p95_latency();
343 assert!(p95.as_millis() >= 900 && p95.as_millis() <= 1000);
345 }
346
347 #[test]
348 fn test_adaptive_flush_high_success() {
349 let flush = AdaptiveFlush::default();
350
351 for _ in 0..1000 {
353 flush.record_success(Duration::from_millis(50));
354 }
355
356 let timeout = flush.calculate_timeout();
357 assert!(timeout >= Duration::from_millis(500));
359 assert!(timeout <= Duration::from_millis(600));
360 assert!(flush.is_healthy());
361 }
362
363 #[test]
364 fn test_adaptive_flush_low_success() {
365 let flush = AdaptiveFlush::default();
366
367 for _ in 0..90 {
369 flush.record_success(Duration::from_millis(100));
370 }
371 for _ in 0..10 {
372 flush.record_failure(Duration::from_millis(100));
373 }
374
375 let timeout = flush.calculate_timeout();
376 assert_eq!(timeout, Duration::from_secs(10));
378 assert!(!flush.is_healthy());
379 }
380
381 #[test]
382 fn test_adaptive_flush_diagnostics() {
383 let flush = AdaptiveFlush::default();
384
385 for _ in 0..10 {
387 flush.record_success(Duration::from_millis(100));
388 }
389
390 let (timeout, diagnostics) = flush.calculate_timeout_with_diagnostics();
391 assert!(timeout >= Duration::from_millis(500));
392 assert!(diagnostics.contains("success_rate"));
393 assert!(diagnostics.contains("p95"));
394 }
395}