1use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17#[allow(dead_code)]
18#[derive(Debug, Default)]
19struct TimedAction {
20 count: u64,
21 acc_time: u64,
22 min_time: Option<u64>,
23 max_time: Option<u64>,
24 bytes: u64,
25}
26
27#[allow(dead_code)]
28impl TimedAction {
29 pub fn avg(&self) -> Option<std::time::Duration> {
31 if self.count == 0 {
32 return None;
33 }
34 Some(std::time::Duration::from_nanos(self.acc_time / self.count))
35 }
36
37 pub fn avg_bytes(&self) -> u64 {
39 if self.count == 0 {
40 return 0;
41 }
42 self.bytes / self.count
43 }
44
45 pub fn merge(&mut self, other: TimedAction) {
47 self.count += other.count;
48 self.acc_time += other.acc_time;
49 self.bytes += other.bytes;
50
51 if self.count == 0 {
52 self.min_time = other.min_time;
53 }
54 if let Some(other_min) = other.min_time {
55 self.min_time = self.min_time.map_or(Some(other_min), |min| Some(min.min(other_min)));
56 }
57
58 self.max_time = self
59 .max_time
60 .map_or(other.max_time, |max| Some(max.max(other.max_time.unwrap_or(0))));
61 }
62}
63
64#[allow(dead_code)]
65#[derive(Debug)]
66enum SizeCategory {
67 SizeLessThan1KiB = 0,
68 SizeLessThan1MiB,
69 SizeLessThan10MiB,
70 SizeLessThan100MiB,
71 SizeLessThan1GiB,
72 SizeGreaterThan1GiB,
73 SizeLastElemMarker,
75}
76
77impl std::fmt::Display for SizeCategory {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 let s = match *self {
80 SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB",
81 SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB",
82 SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB",
83 SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB",
84 SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB",
85 SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB",
86 SizeCategory::SizeLastElemMarker => "SizeLastElemMarker",
87 };
88 write!(f, "{s}")
89 }
90}
91
92#[derive(Clone, Debug, Default, Copy)]
93pub struct AccElem {
94 pub total: u64,
95 pub size: u64,
96 pub n: u64,
97}
98
99impl AccElem {
100 pub fn add(&mut self, dur: &Duration) {
101 let dur = dur.as_secs();
102 self.total = self.total.wrapping_add(dur);
103 self.n = self.n.wrapping_add(1);
104 }
105
106 pub fn merge(&mut self, b: &AccElem) {
107 self.n = self.n.wrapping_add(b.n);
108 self.total = self.total.wrapping_add(b.total);
109 self.size = self.size.wrapping_add(b.size);
110 }
111
112 pub fn avg(&self) -> Duration {
113 if self.n >= 1 && self.total > 0 {
114 return Duration::from_secs(self.total / self.n);
115 }
116 Duration::from_secs(0)
117 }
118}
119
120#[derive(Clone, Debug)]
121pub struct LastMinuteLatency {
122 pub totals: Vec<AccElem>,
123 pub last_sec: u64,
124}
125
126impl Default for LastMinuteLatency {
127 fn default() -> Self {
128 Self {
129 totals: vec![AccElem::default(); 60],
130 last_sec: Default::default(),
131 }
132 }
133}
134
135impl LastMinuteLatency {
136 pub fn merge(&mut self, o: &LastMinuteLatency) -> LastMinuteLatency {
137 let mut merged = LastMinuteLatency::default();
138 let mut x = o.clone();
139 if self.last_sec > o.last_sec {
140 x.forward_to(self.last_sec);
141 merged.last_sec = self.last_sec;
142 } else {
143 self.forward_to(o.last_sec);
144 merged.last_sec = o.last_sec;
145 }
146
147 for i in 0..merged.totals.len() {
148 merged.totals[i] = AccElem {
149 total: self.totals[i].total + o.totals[i].total,
150 n: self.totals[i].n + o.totals[i].n,
151 size: self.totals[i].size + o.totals[i].size,
152 }
153 }
154 merged
155 }
156
157 pub fn add(&mut self, t: &Duration) {
158 let sec = SystemTime::now()
159 .duration_since(UNIX_EPOCH)
160 .expect("Time went backwards")
161 .as_secs();
162 self.forward_to(sec);
163 let win_idx = sec % 60;
164 self.totals[win_idx as usize].add(t);
165 self.last_sec = sec;
166 }
167
168 pub fn add_all(&mut self, sec: u64, a: &AccElem) {
169 self.forward_to(sec);
170 let win_idx = sec % 60;
171 self.totals[win_idx as usize].merge(a);
172 self.last_sec = sec;
173 }
174
175 pub fn get_total(&mut self) -> AccElem {
176 let mut res = AccElem::default();
177 let sec = SystemTime::now()
178 .duration_since(UNIX_EPOCH)
179 .expect("Time went backwards")
180 .as_secs();
181 self.forward_to(sec);
182 for elem in self.totals.iter() {
183 res.merge(elem);
184 }
185 res
186 }
187
188 pub fn forward_to(&mut self, t: u64) {
189 if self.last_sec >= t {
190 return;
191 }
192 if t - self.last_sec >= 60 {
193 self.totals = vec![AccElem::default(); 60];
194 self.last_sec = t;
195 return;
196 }
197 while self.last_sec != t {
198 let idx = (self.last_sec + 1) % 60;
199 self.totals[idx as usize] = AccElem::default();
200 self.last_sec += 1;
201 }
202 }
203}
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use std::time::Duration;
208
209 #[test]
210 fn test_acc_elem_default() {
211 let elem = AccElem::default();
212 assert_eq!(elem.total, 0);
213 assert_eq!(elem.size, 0);
214 assert_eq!(elem.n, 0);
215 }
216
217 #[test]
218 fn test_acc_elem_add_single_duration() {
219 let mut elem = AccElem::default();
220 let duration = Duration::from_secs(5);
221
222 elem.add(&duration);
223
224 assert_eq!(elem.total, 5);
225 assert_eq!(elem.n, 1);
226 assert_eq!(elem.size, 0); }
228
229 #[test]
230 fn test_acc_elem_add_multiple_durations() {
231 let mut elem = AccElem::default();
232
233 elem.add(&Duration::from_secs(3));
234 elem.add(&Duration::from_secs(7));
235 elem.add(&Duration::from_secs(2));
236
237 assert_eq!(elem.total, 12);
238 assert_eq!(elem.n, 3);
239 assert_eq!(elem.size, 0);
240 }
241
242 #[test]
243 fn test_acc_elem_add_zero_duration() {
244 let mut elem = AccElem::default();
245 let duration = Duration::from_secs(0);
246
247 elem.add(&duration);
248
249 assert_eq!(elem.total, 0);
250 assert_eq!(elem.n, 1);
251 }
252
253 #[test]
254 fn test_acc_elem_add_subsecond_duration() {
255 let mut elem = AccElem::default();
256 let duration = Duration::from_millis(500);
258
259 elem.add(&duration);
260
261 assert_eq!(elem.total, 0); assert_eq!(elem.n, 1);
263 }
264
265 #[test]
266 fn test_acc_elem_merge_empty_elements() {
267 let mut elem1 = AccElem::default();
268 let elem2 = AccElem::default();
269
270 elem1.merge(&elem2);
271
272 assert_eq!(elem1.total, 0);
273 assert_eq!(elem1.size, 0);
274 assert_eq!(elem1.n, 0);
275 }
276
277 #[test]
278 fn test_acc_elem_merge_with_data() {
279 let mut elem1 = AccElem {
280 total: 10,
281 size: 100,
282 n: 2,
283 };
284 let elem2 = AccElem {
285 total: 15,
286 size: 200,
287 n: 3,
288 };
289
290 elem1.merge(&elem2);
291
292 assert_eq!(elem1.total, 25);
293 assert_eq!(elem1.size, 300);
294 assert_eq!(elem1.n, 5);
295 }
296
297 #[test]
298 fn test_acc_elem_merge_one_empty() {
299 let mut elem1 = AccElem {
300 total: 10,
301 size: 100,
302 n: 2,
303 };
304 let elem2 = AccElem::default();
305
306 elem1.merge(&elem2);
307
308 assert_eq!(elem1.total, 10);
309 assert_eq!(elem1.size, 100);
310 assert_eq!(elem1.n, 2);
311 }
312
313 #[test]
314 fn test_acc_elem_avg_with_data() {
315 let elem = AccElem {
316 total: 15,
317 size: 0,
318 n: 3,
319 };
320
321 let avg = elem.avg();
322 assert_eq!(avg, Duration::from_secs(5)); }
324
325 #[test]
326 fn test_acc_elem_avg_zero_count() {
327 let elem = AccElem {
328 total: 10,
329 size: 0,
330 n: 0,
331 };
332
333 let avg = elem.avg();
334 assert_eq!(avg, Duration::from_secs(0));
335 }
336
337 #[test]
338 fn test_acc_elem_avg_zero_total() {
339 let elem = AccElem { total: 0, size: 0, n: 5 };
340
341 let avg = elem.avg();
342 assert_eq!(avg, Duration::from_secs(0));
343 }
344
345 #[test]
346 fn test_acc_elem_avg_rounding() {
347 let elem = AccElem {
348 total: 10,
349 size: 0,
350 n: 3,
351 };
352
353 let avg = elem.avg();
354 assert_eq!(avg, Duration::from_secs(3)); }
356
357 #[test]
358 fn test_last_minute_latency_default() {
359 let latency = LastMinuteLatency::default();
360
361 assert_eq!(latency.totals.len(), 60);
362 assert_eq!(latency.last_sec, 0);
363
364 for elem in &latency.totals {
366 assert_eq!(elem.total, 0);
367 assert_eq!(elem.size, 0);
368 assert_eq!(elem.n, 0);
369 }
370 }
371
372 #[test]
373 fn test_last_minute_latency_forward_to_same_time() {
374 let mut latency = LastMinuteLatency {
375 last_sec: 100,
376 ..Default::default()
377 };
378
379 latency.totals[0].total = 10;
381 latency.totals[0].n = 1;
382
383 latency.forward_to(100); assert_eq!(latency.last_sec, 100);
386 assert_eq!(latency.totals[0].total, 10); assert_eq!(latency.totals[0].n, 1);
388 }
389
390 #[test]
391 fn test_last_minute_latency_forward_to_past_time() {
392 let mut latency = LastMinuteLatency {
393 last_sec: 100,
394 ..Default::default()
395 };
396
397 latency.totals[0].total = 10;
399 latency.totals[0].n = 1;
400
401 latency.forward_to(50); assert_eq!(latency.last_sec, 100); assert_eq!(latency.totals[0].total, 10); assert_eq!(latency.totals[0].n, 1);
406 }
407
408 #[test]
409 fn test_last_minute_latency_forward_to_large_gap() {
410 let mut latency = LastMinuteLatency {
411 last_sec: 100,
412 ..Default::default()
413 };
414
415 latency.totals[0].total = 10;
417 latency.totals[0].n = 1;
418
419 latency.forward_to(200); assert_eq!(latency.last_sec, 200); for elem in &latency.totals {
425 assert_eq!(elem.total, 0);
426 assert_eq!(elem.size, 0);
427 assert_eq!(elem.n, 0);
428 }
429 }
430
431 #[test]
432 fn test_last_minute_latency_forward_to_small_gap() {
433 let mut latency = LastMinuteLatency {
434 last_sec: 100,
435 ..Default::default()
436 };
437
438 latency.totals[41].total = 10; latency.totals[42].total = 20; latency.forward_to(102); assert_eq!(latency.last_sec, 102);
445
446 assert_eq!(latency.totals[41].total, 0); assert_eq!(latency.totals[42].total, 0); }
450
451 #[test]
452 fn test_last_minute_latency_add_all() {
453 let mut latency = LastMinuteLatency::default();
454 let acc_elem = AccElem {
455 total: 15,
456 size: 100,
457 n: 3,
458 };
459
460 latency.add_all(1000, &acc_elem);
461
462 assert_eq!(latency.last_sec, 1000);
463 let idx = 1000 % 60; assert_eq!(latency.totals[idx as usize].total, 15);
465 assert_eq!(latency.totals[idx as usize].size, 100);
466 assert_eq!(latency.totals[idx as usize].n, 3);
467 }
468
469 #[test]
470 fn test_last_minute_latency_add_all_multiple() {
471 let mut latency = LastMinuteLatency::default();
472
473 let acc_elem1 = AccElem {
474 total: 10,
475 size: 50,
476 n: 2,
477 };
478 let acc_elem2 = AccElem {
479 total: 20,
480 size: 100,
481 n: 4,
482 };
483
484 latency.add_all(1000, &acc_elem1);
485 latency.add_all(1000, &acc_elem2); let idx = 1000 % 60;
488 assert_eq!(latency.totals[idx as usize].total, 30); assert_eq!(latency.totals[idx as usize].size, 150); assert_eq!(latency.totals[idx as usize].n, 6); }
492
493 #[test]
494 fn test_last_minute_latency_merge_same_time() {
495 let mut latency1 = LastMinuteLatency::default();
496 let mut latency2 = LastMinuteLatency::default();
497
498 latency1.last_sec = 1000;
499 latency2.last_sec = 1000;
500
501 latency1.totals[0].total = 10;
503 latency1.totals[0].n = 2;
504 latency2.totals[0].total = 20;
505 latency2.totals[0].n = 3;
506
507 let merged = latency1.merge(&latency2);
508
509 assert_eq!(merged.last_sec, 1000);
510 assert_eq!(merged.totals[0].total, 30); assert_eq!(merged.totals[0].n, 5); }
513
514 #[test]
515 fn test_last_minute_latency_merge_different_times() {
516 let mut latency1 = LastMinuteLatency::default();
517 let mut latency2 = LastMinuteLatency::default();
518
519 latency1.last_sec = 1000;
520 latency2.last_sec = 1010; latency1.totals[0].total = 10;
524 latency2.totals[0].total = 20;
525
526 let merged = latency1.merge(&latency2);
527
528 assert_eq!(merged.last_sec, 1010); assert_eq!(merged.totals[0].total, 30);
530 }
531
532 #[test]
533 fn test_last_minute_latency_merge_empty() {
534 let mut latency1 = LastMinuteLatency::default();
535 let latency2 = LastMinuteLatency::default();
536
537 let merged = latency1.merge(&latency2);
538
539 assert_eq!(merged.last_sec, 0);
540 for elem in &merged.totals {
541 assert_eq!(elem.total, 0);
542 assert_eq!(elem.size, 0);
543 assert_eq!(elem.n, 0);
544 }
545 }
546
547 #[test]
548 fn test_last_minute_latency_window_wraparound() {
549 let mut latency = LastMinuteLatency::default();
550
551 for sec in 0..120 {
553 let acc_elem = AccElem {
555 total: sec,
556 size: 0,
557 n: 1,
558 };
559 latency.add_all(sec, &acc_elem);
560
561 let expected_idx = sec % 60;
562 assert_eq!(latency.totals[expected_idx as usize].total, sec);
563 }
564 }
565
566 #[test]
567 fn test_last_minute_latency_time_progression() {
568 let mut latency = LastMinuteLatency::default();
569
570 latency.add_all(
572 1000,
573 &AccElem {
574 total: 10,
575 size: 0,
576 n: 1,
577 },
578 );
579
580 latency.forward_to(1030);
582
583 let idx_1000 = 1000 % 60;
585 assert_eq!(latency.totals[idx_1000 as usize].total, 10);
586
587 latency.forward_to(1070);
589
590 for elem in &latency.totals {
592 assert_eq!(elem.total, 0);
593 assert_eq!(elem.n, 0);
594 }
595 }
596
597 #[test]
598 fn test_last_minute_latency_realistic_scenario() {
599 let mut latency = LastMinuteLatency::default();
600 let base_time = 1000u64;
601
602 for i in 0..60 {
604 let current_time = base_time + i;
605 let duration_secs = i % 10 + 1; let acc_elem = AccElem {
607 total: duration_secs,
608 size: 1024 * (i % 5 + 1), n: 1,
610 };
611
612 latency.add_all(current_time, &acc_elem);
613 }
614
615 let mut non_empty_count = 0;
617 let mut total_n = 0;
618 let mut total_sum = 0;
619
620 for elem in &latency.totals {
621 if elem.n > 0 {
622 non_empty_count += 1;
623 total_n += elem.n;
624 total_sum += elem.total;
625 }
626 }
627
628 assert_eq!(non_empty_count, 60);
630 assert_eq!(total_n, 60); assert!(total_sum > 0);
632
633 let mut manual_total = AccElem::default();
635 for elem in &latency.totals {
636 manual_total.merge(elem);
637 }
638 assert_eq!(manual_total.n, 60);
639 assert_eq!(manual_total.total, total_sum);
640 }
641
642 #[test]
643 fn test_acc_elem_clone_and_debug() {
644 let elem = AccElem {
645 total: 100,
646 size: 200,
647 n: 5,
648 };
649
650 let cloned = elem;
651 assert_eq!(elem.total, cloned.total);
652 assert_eq!(elem.size, cloned.size);
653 assert_eq!(elem.n, cloned.n);
654
655 let debug_str = format!("{elem:?}");
657 assert!(debug_str.contains("100"));
658 assert!(debug_str.contains("200"));
659 assert!(debug_str.contains("5"));
660 }
661
662 #[test]
663 fn test_last_minute_latency_clone() {
664 let mut latency = LastMinuteLatency {
665 last_sec: 1000,
666 ..Default::default()
667 };
668 latency.totals[0].total = 100;
669 latency.totals[0].n = 5;
670
671 let cloned = latency.clone();
672 assert_eq!(latency.last_sec, cloned.last_sec);
673 assert_eq!(latency.totals[0].total, cloned.totals[0].total);
674 assert_eq!(latency.totals[0].n, cloned.totals[0].n);
675 }
676
677 #[test]
678 fn test_edge_case_max_values() {
679 let mut elem = AccElem {
680 total: u64::MAX - 50,
681 size: u64::MAX - 50,
682 n: u64::MAX - 50,
683 };
684
685 let other = AccElem {
686 total: 100,
687 size: 100,
688 n: 100,
689 };
690
691 elem.merge(&other);
693
694 assert_eq!(elem.total, 49); assert_eq!(elem.size, 49);
697 assert_eq!(elem.n, 49);
698 }
699
700 #[test]
701 fn test_forward_to_boundary_conditions() {
702 let mut latency = LastMinuteLatency {
703 last_sec: 59,
704 ..Default::default()
705 };
706
707 latency.totals[59].total = 100;
709 latency.totals[59].n = 1;
710
711 latency.forward_to(119);
713
714 for elem in &latency.totals {
716 assert_eq!(elem.total, 0);
717 assert_eq!(elem.n, 0);
718 }
719 }
720
721 #[test]
722 fn test_get_total_with_data() {
723 let mut latency = LastMinuteLatency::default();
724
725 let current_time = SystemTime::now()
727 .duration_since(UNIX_EPOCH)
728 .expect("Time went backwards")
729 .as_secs();
730 latency.last_sec = current_time;
731
732 latency.totals[0] = AccElem {
734 total: 10,
735 size: 100,
736 n: 1,
737 };
738 latency.totals[1] = AccElem {
739 total: 20,
740 size: 200,
741 n: 2,
742 };
743 latency.totals[59] = AccElem {
744 total: 30,
745 size: 300,
746 n: 3,
747 };
748
749 let total = latency.get_total();
750
751 assert_eq!(total.total, 60);
752 assert_eq!(total.size, 600);
753 assert_eq!(total.n, 6);
754 }
755
756 #[test]
757 fn test_window_index_calculation() {
758 let _latency = LastMinuteLatency::default();
760
761 let acc_elem = AccElem { total: 1, size: 1, n: 1 };
762
763 let test_cases = [(0, 0), (1, 1), (59, 59), (60, 0), (61, 1), (119, 59), (120, 0)];
765
766 for (timestamp, expected_idx) in test_cases {
767 let mut test_latency = LastMinuteLatency::default();
768 test_latency.add_all(timestamp, &acc_elem);
769
770 assert_eq!(
771 test_latency.totals[expected_idx].n, 1,
772 "Failed for timestamp {timestamp} (expected index {expected_idx})"
773 );
774 }
775 }
776
777 #[test]
778 fn test_concurrent_safety_simulation() {
779 let mut latency = LastMinuteLatency::default();
781
782 let current_time = SystemTime::now()
784 .duration_since(UNIX_EPOCH)
785 .expect("Time went backwards")
786 .as_secs();
787
788 for i in 0..1000 {
790 let acc_elem = AccElem {
791 total: (i % 10) + 1, size: (i % 100) + 1,
793 n: 1,
794 };
795 latency.add_all(current_time - (i % 60), &acc_elem);
797 }
798
799 let total = latency.get_total();
800 assert!(total.n > 0, "Total count should be greater than 0");
801 assert!(total.total > 0, "Total time should be greater than 0");
802 }
803
804 #[test]
805 fn test_acc_elem_debug_format() {
806 let elem = AccElem {
807 total: 123,
808 size: 456,
809 n: 789,
810 };
811
812 let debug_str = format!("{elem:?}");
813 assert!(debug_str.contains("123"));
814 assert!(debug_str.contains("456"));
815 assert!(debug_str.contains("789"));
816 }
817
818 #[test]
819 fn test_large_values() {
820 let mut elem = AccElem::default();
821
822 let large_duration = Duration::from_secs(u64::MAX / 2);
824 elem.add(&large_duration);
825
826 assert_eq!(elem.total, u64::MAX / 2);
827 assert_eq!(elem.n, 1);
828
829 let avg = elem.avg();
831 assert_eq!(avg, Duration::from_secs(u64::MAX / 2));
832 }
833
834 #[test]
835 fn test_zero_duration_handling() {
836 let mut elem = AccElem::default();
837
838 let zero_duration = Duration::from_secs(0);
839 elem.add(&zero_duration);
840
841 assert_eq!(elem.total, 0);
842 assert_eq!(elem.n, 1);
843 assert_eq!(elem.avg(), Duration::from_secs(0));
844 }
845}
846
847const SIZE_LAST_ELEM_MARKER: usize = 10; #[allow(dead_code)]
850#[derive(Debug, Default)]
851pub struct LastMinuteHistogram {
852 histogram: Vec<LastMinuteLatency>,
853 size: u32,
854}
855
856impl LastMinuteHistogram {
857 pub fn merge(&mut self, other: &LastMinuteHistogram) {
858 for i in 0..self.histogram.len() {
859 self.histogram[i].merge(&other.histogram[i]);
860 }
861 }
862
863 pub fn add(&mut self, size: i64, t: std::time::Duration) {
864 let index = size_to_tag(size);
865 self.histogram[index].add(&t);
866 }
867
868 pub fn get_avg_data(&mut self) -> [AccElem; SIZE_LAST_ELEM_MARKER] {
869 let mut res = [AccElem::default(); SIZE_LAST_ELEM_MARKER];
870 for (i, elem) in self.histogram.iter_mut().enumerate() {
871 res[i] = elem.get_total();
872 }
873 res
874 }
875}
876
877fn size_to_tag(size: i64) -> usize {
878 match size {
879 _ if size < 1024 => 0, _ if size < 1024 * 1024 => 1, _ if size < 10 * 1024 * 1024 => 2, _ if size < 100 * 1024 * 1024 => 3, _ if size < 1024 * 1024 * 1024 => 4, _ => 5, }
886}