oxigdal_streaming/metrics/
tracker.rs1use crate::error::Result;
4use chrono::{DateTime, Duration, Utc};
5use std::collections::VecDeque;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9
10type IntervalSamples = VecDeque<(DateTime<Utc>, u64, u64)>;
12
13pub struct PerformanceTracker {
15 start_time: Instant,
16 checkpoints: Arc<RwLock<Vec<(String, Instant)>>>,
17 enabled: Arc<RwLock<bool>>,
18}
19
20impl PerformanceTracker {
21 pub fn new() -> Self {
23 Self {
24 start_time: Instant::now(),
25 checkpoints: Arc::new(RwLock::new(Vec::new())),
26 enabled: Arc::new(RwLock::new(true)),
27 }
28 }
29
30 pub async fn enable(&self) {
32 *self.enabled.write().await = true;
33 }
34
35 pub async fn disable(&self) {
37 *self.enabled.write().await = false;
38 }
39
40 pub async fn checkpoint(&self, name: String) -> Result<()> {
42 if !*self.enabled.read().await {
43 return Ok(());
44 }
45
46 let mut checkpoints = self.checkpoints.write().await;
47 checkpoints.push((name, Instant::now()));
48
49 Ok(())
50 }
51
52 pub fn elapsed(&self) -> std::time::Duration {
54 self.start_time.elapsed()
55 }
56
57 pub async fn get_checkpoints(&self) -> Vec<(String, std::time::Duration)> {
59 let checkpoints = self.checkpoints.read().await;
60 let start = self.start_time;
61
62 checkpoints
63 .iter()
64 .map(|(name, instant)| {
65 let duration = instant.duration_since(start);
66 (name.clone(), duration)
67 })
68 .collect()
69 }
70
71 pub async fn clear(&self) {
73 self.checkpoints.write().await.clear();
74 }
75
76 pub async fn reset(&self) {
78 self.clear().await;
79 }
80}
81
82impl Default for PerformanceTracker {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88pub struct LatencyTracker {
90 samples: Arc<RwLock<VecDeque<std::time::Duration>>>,
91 max_samples: usize,
92 buckets: Vec<std::time::Duration>,
93 histogram: Arc<RwLock<Vec<u64>>>,
94}
95
96impl LatencyTracker {
97 pub fn new(max_samples: usize) -> Self {
99 let buckets = vec![
100 std::time::Duration::from_millis(1),
101 std::time::Duration::from_millis(5),
102 std::time::Duration::from_millis(10),
103 std::time::Duration::from_millis(50),
104 std::time::Duration::from_millis(100),
105 std::time::Duration::from_millis(500),
106 std::time::Duration::from_secs(1),
107 std::time::Duration::from_secs(5),
108 ];
109
110 let histogram = vec![0; buckets.len()];
111
112 Self {
113 samples: Arc::new(RwLock::new(VecDeque::with_capacity(max_samples))),
114 max_samples,
115 buckets,
116 histogram: Arc::new(RwLock::new(histogram)),
117 }
118 }
119
120 pub async fn record(&self, latency: std::time::Duration) {
122 let mut samples = self.samples.write().await;
123
124 if samples.len() >= self.max_samples {
125 samples.pop_front();
126 }
127
128 samples.push_back(latency);
129
130 let mut histogram = self.histogram.write().await;
131 for (i, &bucket) in self.buckets.iter().enumerate() {
132 if latency <= bucket {
133 histogram[i] += 1;
134 }
135 }
136 }
137
138 pub async fn average(&self) -> Option<std::time::Duration> {
140 let samples = self.samples.read().await;
141
142 if samples.is_empty() {
143 return None;
144 }
145
146 let sum: std::time::Duration = samples.iter().sum();
147 Some(sum / samples.len() as u32)
148 }
149
150 pub async fn min(&self) -> Option<std::time::Duration> {
152 let samples = self.samples.read().await;
153 samples.iter().min().copied()
154 }
155
156 pub async fn max(&self) -> Option<std::time::Duration> {
158 let samples = self.samples.read().await;
159 samples.iter().max().copied()
160 }
161
162 pub async fn median(&self) -> Option<std::time::Duration> {
164 let samples = self.samples.read().await;
165
166 if samples.is_empty() {
167 return None;
168 }
169
170 let mut sorted: Vec<_> = samples.iter().copied().collect();
171 sorted.sort();
172
173 Some(sorted[sorted.len() / 2])
174 }
175
176 pub async fn p95(&self) -> Option<std::time::Duration> {
178 self.percentile(0.95).await
179 }
180
181 pub async fn p99(&self) -> Option<std::time::Duration> {
183 self.percentile(0.99).await
184 }
185
186 pub async fn percentile(&self, p: f64) -> Option<std::time::Duration> {
188 let samples = self.samples.read().await;
189
190 if samples.is_empty() {
191 return None;
192 }
193
194 let mut sorted: Vec<_> = samples.iter().copied().collect();
195 sorted.sort();
196
197 let index = ((sorted.len() as f64 * p) as usize).min(sorted.len() - 1);
198 Some(sorted[index])
199 }
200
201 pub async fn histogram(&self) -> Vec<(std::time::Duration, u64)> {
203 let histogram = self.histogram.read().await;
204
205 self.buckets
206 .iter()
207 .zip(histogram.iter())
208 .map(|(&bucket, &count)| (bucket, count))
209 .collect()
210 }
211
212 pub async fn clear(&self) {
214 self.samples.write().await.clear();
215 *self.histogram.write().await = vec![0; self.buckets.len()];
216 }
217}
218
219pub struct ThroughputTracker {
221 start_time: DateTime<Utc>,
222 element_count: Arc<RwLock<u64>>,
223 byte_count: Arc<RwLock<u64>>,
224 interval_samples: Arc<RwLock<IntervalSamples>>,
225 interval_duration: Duration,
226 max_intervals: usize,
227}
228
229impl ThroughputTracker {
230 pub fn new(interval_duration: Duration, max_intervals: usize) -> Self {
232 Self {
233 start_time: Utc::now(),
234 element_count: Arc::new(RwLock::new(0)),
235 byte_count: Arc::new(RwLock::new(0)),
236 interval_samples: Arc::new(RwLock::new(VecDeque::with_capacity(max_intervals))),
237 interval_duration,
238 max_intervals,
239 }
240 }
241
242 pub async fn record_elements(&self, count: u64) {
244 *self.element_count.write().await += count;
245 }
246
247 pub async fn record_bytes(&self, bytes: u64) {
249 *self.byte_count.write().await += bytes;
250 }
251
252 pub async fn record(&self, elements: u64, bytes: u64) {
254 self.record_elements(elements).await;
255 self.record_bytes(bytes).await;
256 }
257
258 pub async fn snapshot(&self) {
260 let now = Utc::now();
261 let elements = *self.element_count.read().await;
262 let bytes = *self.byte_count.read().await;
263
264 let mut samples = self.interval_samples.write().await;
265
266 if samples.len() >= self.max_intervals {
267 samples.pop_front();
268 }
269
270 samples.push_back((now, elements, bytes));
271 }
272
273 pub async fn elements_per_second(&self) -> f64 {
275 let elapsed = (Utc::now() - self.start_time).num_milliseconds() as f64 / 1000.0;
276 let count = *self.element_count.read().await as f64;
277
278 if elapsed > 0.0 { count / elapsed } else { 0.0 }
279 }
280
281 pub async fn bytes_per_second(&self) -> f64 {
283 let elapsed = (Utc::now() - self.start_time).num_milliseconds() as f64 / 1000.0;
284 let bytes = *self.byte_count.read().await as f64;
285
286 if elapsed > 0.0 { bytes / elapsed } else { 0.0 }
287 }
288
289 pub async fn average_elements_per_second(&self) -> f64 {
291 let samples = self.interval_samples.read().await;
292
293 if samples.len() < 2 {
294 return 0.0;
295 }
296
297 let first = &samples[0];
298 let last = &samples[samples.len() - 1];
299
300 let elapsed = (last.0 - first.0).num_milliseconds() as f64 / 1000.0;
301 let elements = (last.1 - first.1) as f64;
302
303 if elapsed > 0.0 {
304 elements / elapsed
305 } else {
306 0.0
307 }
308 }
309
310 pub fn interval_duration(&self) -> Duration {
312 self.interval_duration
313 }
314
315 pub async fn peak_elements_per_second(&self) -> f64 {
317 let samples = self.interval_samples.read().await;
318
319 if samples.len() < 2 {
320 return 0.0;
321 }
322
323 let mut max_rate: f64 = 0.0;
324
325 let samples_vec: Vec<_> = samples.iter().copied().collect();
327
328 for window in samples_vec.windows(2) {
329 let (t1, e1, _) = &window[0];
330 let (t2, e2, _) = &window[1];
331
332 let elapsed = (*t2 - *t1).num_milliseconds() as f64 / 1000.0;
333 let elements = (e2 - e1) as f64;
334
335 if elapsed > 0.0 {
336 let rate = elements / elapsed;
337 max_rate = max_rate.max(rate);
338 }
339 }
340
341 max_rate
342 }
343
344 pub async fn clear(&self) {
346 *self.element_count.write().await = 0;
347 *self.byte_count.write().await = 0;
348 self.interval_samples.write().await.clear();
349 }
350
351 pub async fn reset(&self) {
353 self.clear().await;
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[tokio::test]
362 async fn test_performance_tracker() {
363 let tracker = PerformanceTracker::new();
364
365 tracker
366 .checkpoint("start".to_string())
367 .await
368 .expect("Failed to record start checkpoint in test");
369 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
370 tracker
371 .checkpoint("middle".to_string())
372 .await
373 .expect("Failed to record middle checkpoint in test");
374 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
375 tracker
376 .checkpoint("end".to_string())
377 .await
378 .expect("Failed to record end checkpoint in test");
379
380 let checkpoints = tracker.get_checkpoints().await;
381 assert_eq!(checkpoints.len(), 3);
382 }
383
384 #[tokio::test]
385 async fn test_latency_tracker() {
386 let tracker = LatencyTracker::new(100);
387
388 tracker.record(std::time::Duration::from_millis(10)).await;
389 tracker.record(std::time::Duration::from_millis(20)).await;
390 tracker.record(std::time::Duration::from_millis(30)).await;
391
392 let avg = tracker
393 .average()
394 .await
395 .expect("Failed to get average latency in test");
396 assert!(avg >= std::time::Duration::from_millis(19));
397 assert!(avg <= std::time::Duration::from_millis(21));
398
399 let min = tracker
400 .min()
401 .await
402 .expect("Failed to get minimum latency in test");
403 assert_eq!(min, std::time::Duration::from_millis(10));
404
405 let max = tracker
406 .max()
407 .await
408 .expect("Failed to get maximum latency in test");
409 assert_eq!(max, std::time::Duration::from_millis(30));
410 }
411
412 #[tokio::test]
413 async fn test_throughput_tracker() {
414 let tracker = ThroughputTracker::new(Duration::seconds(1), 10);
415
416 tracker.record(100, 1000).await;
417
418 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
419
420 let eps = tracker.elements_per_second().await;
421 assert!(eps > 0.0);
422
423 let bps = tracker.bytes_per_second().await;
424 assert!(bps > 0.0);
425 }
426
427 #[tokio::test]
428 async fn test_latency_percentiles() {
429 let tracker = LatencyTracker::new(100);
430
431 for i in 1..=100 {
432 tracker.record(std::time::Duration::from_millis(i)).await;
433 }
434
435 let p95 = tracker
436 .p95()
437 .await
438 .expect("Failed to get 95th percentile latency in test");
439 assert!(p95 >= std::time::Duration::from_millis(94));
440 assert!(p95 <= std::time::Duration::from_millis(96));
441
442 let p99 = tracker
443 .p99()
444 .await
445 .expect("Failed to get 99th percentile latency in test");
446 assert!(p99 >= std::time::Duration::from_millis(98));
447 }
448}