Skip to main content

oxigdal_streaming/metrics/
tracker.rs

1//! Performance tracking for streaming operations.
2
3use crate::error::Result;
4use chrono::{DateTime, Duration, Utc};
5use std::collections::VecDeque;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9
10/// Type alias for throughput interval samples (timestamp, element_count, byte_count).
11type IntervalSamples = VecDeque<(DateTime<Utc>, u64, u64)>;
12
13/// Performance tracker for streaming operations.
14pub struct PerformanceTracker {
15    start_time: Instant,
16    checkpoints: Arc<RwLock<Vec<(String, Instant)>>>,
17    enabled: Arc<RwLock<bool>>,
18}
19
20impl PerformanceTracker {
21    /// Create a new performance tracker.
22    pub fn new() -> Self {
23        Self {
24            start_time: Instant::now(),
25            checkpoints: Arc::new(RwLock::new(Vec::new())),
26            enabled: Arc::new(RwLock::new(true)),
27        }
28    }
29
30    /// Enable tracking.
31    pub async fn enable(&self) {
32        *self.enabled.write().await = true;
33    }
34
35    /// Disable tracking.
36    pub async fn disable(&self) {
37        *self.enabled.write().await = false;
38    }
39
40    /// Record a checkpoint.
41    pub async fn checkpoint(&self, name: String) -> Result<()> {
42        if !*self.enabled.read().await {
43            return Ok(());
44        }
45
46        let mut checkpoints = self.checkpoints.write().await;
47        checkpoints.push((name, Instant::now()));
48
49        Ok(())
50    }
51
52    /// Get elapsed time since start.
53    pub fn elapsed(&self) -> std::time::Duration {
54        self.start_time.elapsed()
55    }
56
57    /// Get all checkpoints.
58    pub async fn get_checkpoints(&self) -> Vec<(String, std::time::Duration)> {
59        let checkpoints = self.checkpoints.read().await;
60        let start = self.start_time;
61
62        checkpoints
63            .iter()
64            .map(|(name, instant)| {
65                let duration = instant.duration_since(start);
66                (name.clone(), duration)
67            })
68            .collect()
69    }
70
71    /// Clear all checkpoints.
72    pub async fn clear(&self) {
73        self.checkpoints.write().await.clear();
74    }
75
76    /// Reset the tracker.
77    pub async fn reset(&self) {
78        self.clear().await;
79    }
80}
81
82impl Default for PerformanceTracker {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88/// Latency tracker with histogram support.
89pub struct LatencyTracker {
90    samples: Arc<RwLock<VecDeque<std::time::Duration>>>,
91    max_samples: usize,
92    buckets: Vec<std::time::Duration>,
93    histogram: Arc<RwLock<Vec<u64>>>,
94}
95
96impl LatencyTracker {
97    /// Create a new latency tracker.
98    pub fn new(max_samples: usize) -> Self {
99        let buckets = vec![
100            std::time::Duration::from_millis(1),
101            std::time::Duration::from_millis(5),
102            std::time::Duration::from_millis(10),
103            std::time::Duration::from_millis(50),
104            std::time::Duration::from_millis(100),
105            std::time::Duration::from_millis(500),
106            std::time::Duration::from_secs(1),
107            std::time::Duration::from_secs(5),
108        ];
109
110        let histogram = vec![0; buckets.len()];
111
112        Self {
113            samples: Arc::new(RwLock::new(VecDeque::with_capacity(max_samples))),
114            max_samples,
115            buckets,
116            histogram: Arc::new(RwLock::new(histogram)),
117        }
118    }
119
120    /// Record a latency sample.
121    pub async fn record(&self, latency: std::time::Duration) {
122        let mut samples = self.samples.write().await;
123
124        if samples.len() >= self.max_samples {
125            samples.pop_front();
126        }
127
128        samples.push_back(latency);
129
130        let mut histogram = self.histogram.write().await;
131        for (i, &bucket) in self.buckets.iter().enumerate() {
132            if latency <= bucket {
133                histogram[i] += 1;
134            }
135        }
136    }
137
138    /// Get the average latency.
139    pub async fn average(&self) -> Option<std::time::Duration> {
140        let samples = self.samples.read().await;
141
142        if samples.is_empty() {
143            return None;
144        }
145
146        let sum: std::time::Duration = samples.iter().sum();
147        Some(sum / samples.len() as u32)
148    }
149
150    /// Get the minimum latency.
151    pub async fn min(&self) -> Option<std::time::Duration> {
152        let samples = self.samples.read().await;
153        samples.iter().min().copied()
154    }
155
156    /// Get the maximum latency.
157    pub async fn max(&self) -> Option<std::time::Duration> {
158        let samples = self.samples.read().await;
159        samples.iter().max().copied()
160    }
161
162    /// Get the median latency.
163    pub async fn median(&self) -> Option<std::time::Duration> {
164        let samples = self.samples.read().await;
165
166        if samples.is_empty() {
167            return None;
168        }
169
170        let mut sorted: Vec<_> = samples.iter().copied().collect();
171        sorted.sort();
172
173        Some(sorted[sorted.len() / 2])
174    }
175
176    /// Get the 95th percentile latency.
177    pub async fn p95(&self) -> Option<std::time::Duration> {
178        self.percentile(0.95).await
179    }
180
181    /// Get the 99th percentile latency.
182    pub async fn p99(&self) -> Option<std::time::Duration> {
183        self.percentile(0.99).await
184    }
185
186    /// Get a specific percentile.
187    pub async fn percentile(&self, p: f64) -> Option<std::time::Duration> {
188        let samples = self.samples.read().await;
189
190        if samples.is_empty() {
191            return None;
192        }
193
194        let mut sorted: Vec<_> = samples.iter().copied().collect();
195        sorted.sort();
196
197        let index = ((sorted.len() as f64 * p) as usize).min(sorted.len() - 1);
198        Some(sorted[index])
199    }
200
201    /// Get the histogram.
202    pub async fn histogram(&self) -> Vec<(std::time::Duration, u64)> {
203        let histogram = self.histogram.read().await;
204
205        self.buckets
206            .iter()
207            .zip(histogram.iter())
208            .map(|(&bucket, &count)| (bucket, count))
209            .collect()
210    }
211
212    /// Clear all samples.
213    pub async fn clear(&self) {
214        self.samples.write().await.clear();
215        *self.histogram.write().await = vec![0; self.buckets.len()];
216    }
217}
218
219/// Throughput tracker.
220pub struct ThroughputTracker {
221    start_time: DateTime<Utc>,
222    element_count: Arc<RwLock<u64>>,
223    byte_count: Arc<RwLock<u64>>,
224    interval_samples: Arc<RwLock<IntervalSamples>>,
225    interval_duration: Duration,
226    max_intervals: usize,
227}
228
229impl ThroughputTracker {
230    /// Create a new throughput tracker.
231    pub fn new(interval_duration: Duration, max_intervals: usize) -> Self {
232        Self {
233            start_time: Utc::now(),
234            element_count: Arc::new(RwLock::new(0)),
235            byte_count: Arc::new(RwLock::new(0)),
236            interval_samples: Arc::new(RwLock::new(VecDeque::with_capacity(max_intervals))),
237            interval_duration,
238            max_intervals,
239        }
240    }
241
242    /// Record elements processed.
243    pub async fn record_elements(&self, count: u64) {
244        *self.element_count.write().await += count;
245    }
246
247    /// Record bytes processed.
248    pub async fn record_bytes(&self, bytes: u64) {
249        *self.byte_count.write().await += bytes;
250    }
251
252    /// Record both elements and bytes.
253    pub async fn record(&self, elements: u64, bytes: u64) {
254        self.record_elements(elements).await;
255        self.record_bytes(bytes).await;
256    }
257
258    /// Take an interval snapshot.
259    pub async fn snapshot(&self) {
260        let now = Utc::now();
261        let elements = *self.element_count.read().await;
262        let bytes = *self.byte_count.read().await;
263
264        let mut samples = self.interval_samples.write().await;
265
266        if samples.len() >= self.max_intervals {
267            samples.pop_front();
268        }
269
270        samples.push_back((now, elements, bytes));
271    }
272
273    /// Get the overall throughput (elements per second).
274    pub async fn elements_per_second(&self) -> f64 {
275        let elapsed = (Utc::now() - self.start_time).num_milliseconds() as f64 / 1000.0;
276        let count = *self.element_count.read().await as f64;
277
278        if elapsed > 0.0 { count / elapsed } else { 0.0 }
279    }
280
281    /// Get the overall throughput (bytes per second).
282    pub async fn bytes_per_second(&self) -> f64 {
283        let elapsed = (Utc::now() - self.start_time).num_milliseconds() as f64 / 1000.0;
284        let bytes = *self.byte_count.read().await as f64;
285
286        if elapsed > 0.0 { bytes / elapsed } else { 0.0 }
287    }
288
289    /// Get the average throughput over recent intervals.
290    pub async fn average_elements_per_second(&self) -> f64 {
291        let samples = self.interval_samples.read().await;
292
293        if samples.len() < 2 {
294            return 0.0;
295        }
296
297        let first = &samples[0];
298        let last = &samples[samples.len() - 1];
299
300        let elapsed = (last.0 - first.0).num_milliseconds() as f64 / 1000.0;
301        let elements = (last.1 - first.1) as f64;
302
303        if elapsed > 0.0 {
304            elements / elapsed
305        } else {
306            0.0
307        }
308    }
309
310    /// Get the configured interval duration.
311    pub fn interval_duration(&self) -> Duration {
312        self.interval_duration
313    }
314
315    /// Get the peak throughput.
316    pub async fn peak_elements_per_second(&self) -> f64 {
317        let samples = self.interval_samples.read().await;
318
319        if samples.len() < 2 {
320            return 0.0;
321        }
322
323        let mut max_rate: f64 = 0.0;
324
325        // Convert VecDeque to Vec to use windows()
326        let samples_vec: Vec<_> = samples.iter().copied().collect();
327
328        for window in samples_vec.windows(2) {
329            let (t1, e1, _) = &window[0];
330            let (t2, e2, _) = &window[1];
331
332            let elapsed = (*t2 - *t1).num_milliseconds() as f64 / 1000.0;
333            let elements = (e2 - e1) as f64;
334
335            if elapsed > 0.0 {
336                let rate = elements / elapsed;
337                max_rate = max_rate.max(rate);
338            }
339        }
340
341        max_rate
342    }
343
344    /// Clear all counters.
345    pub async fn clear(&self) {
346        *self.element_count.write().await = 0;
347        *self.byte_count.write().await = 0;
348        self.interval_samples.write().await.clear();
349    }
350
351    /// Reset the tracker.
352    pub async fn reset(&self) {
353        self.clear().await;
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[tokio::test]
362    async fn test_performance_tracker() {
363        let tracker = PerformanceTracker::new();
364
365        tracker
366            .checkpoint("start".to_string())
367            .await
368            .expect("Failed to record start checkpoint in test");
369        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
370        tracker
371            .checkpoint("middle".to_string())
372            .await
373            .expect("Failed to record middle checkpoint in test");
374        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
375        tracker
376            .checkpoint("end".to_string())
377            .await
378            .expect("Failed to record end checkpoint in test");
379
380        let checkpoints = tracker.get_checkpoints().await;
381        assert_eq!(checkpoints.len(), 3);
382    }
383
384    #[tokio::test]
385    async fn test_latency_tracker() {
386        let tracker = LatencyTracker::new(100);
387
388        tracker.record(std::time::Duration::from_millis(10)).await;
389        tracker.record(std::time::Duration::from_millis(20)).await;
390        tracker.record(std::time::Duration::from_millis(30)).await;
391
392        let avg = tracker
393            .average()
394            .await
395            .expect("Failed to get average latency in test");
396        assert!(avg >= std::time::Duration::from_millis(19));
397        assert!(avg <= std::time::Duration::from_millis(21));
398
399        let min = tracker
400            .min()
401            .await
402            .expect("Failed to get minimum latency in test");
403        assert_eq!(min, std::time::Duration::from_millis(10));
404
405        let max = tracker
406            .max()
407            .await
408            .expect("Failed to get maximum latency in test");
409        assert_eq!(max, std::time::Duration::from_millis(30));
410    }
411
412    #[tokio::test]
413    async fn test_throughput_tracker() {
414        let tracker = ThroughputTracker::new(Duration::seconds(1), 10);
415
416        tracker.record(100, 1000).await;
417
418        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
419
420        let eps = tracker.elements_per_second().await;
421        assert!(eps > 0.0);
422
423        let bps = tracker.bytes_per_second().await;
424        assert!(bps > 0.0);
425    }
426
427    #[tokio::test]
428    async fn test_latency_percentiles() {
429        let tracker = LatencyTracker::new(100);
430
431        for i in 1..=100 {
432            tracker.record(std::time::Duration::from_millis(i)).await;
433        }
434
435        let p95 = tracker
436            .p95()
437            .await
438            .expect("Failed to get 95th percentile latency in test");
439        assert!(p95 >= std::time::Duration::from_millis(94));
440        assert!(p95 <= std::time::Duration::from_millis(96));
441
442        let p99 = tracker
443            .p99()
444            .await
445            .expect("Failed to get 99th percentile latency in test");
446        assert!(p99 >= std::time::Duration::from_millis(98));
447    }
448}