statsig_rust/event_logging/
flush_interval.rs1use std::sync::atomic::AtomicU64;
2
3use crate::{event_logging::event_logger_constants::EventLoggerConstants, log_d};
4use chrono::Utc;
5use std::sync::atomic::Ordering::Relaxed;
6
7const TAG: &str = stringify!(FlushInterval);
8
9#[derive(Default)]
10pub struct FlushInterval {
11 current_flush_interval_ms: AtomicU64,
12 last_scheduled_flush_attempt_time: AtomicU64,
13}
14
15impl FlushInterval {
16 pub fn new() -> Self {
17 Self {
18 current_flush_interval_ms: AtomicU64::new(EventLoggerConstants::min_flush_interval_ms()),
19 last_scheduled_flush_attempt_time: AtomicU64::new(get_now_timestamp()),
20 }
21 }
22
23 pub fn get_current_flush_interval_ms(&self) -> u64 {
24 self.current_flush_interval_ms.load(Relaxed)
25 }
26
27 pub fn mark_scheduled_flush_attempt(&self) {
28 let now = get_now_timestamp();
29 self.last_scheduled_flush_attempt_time.store(now, Relaxed);
30 }
31
32 pub fn adjust_for_success(&self) {
33 let current = self.load_current_interval();
34 let min_interval = EventLoggerConstants::min_flush_interval_ms();
35 let adjusted = (current / 2).max(min_interval);
36 if current == min_interval {
37 return;
38 }
39
40 self.current_flush_interval_ms.store(adjusted, Relaxed);
41
42 log_d!(
43 TAG,
44 "Flush interval adjusted for success: was {}ms, now {}ms",
45 current,
46 adjusted
47 );
48 }
49
50 pub fn adjust_for_failure(&self) {
51 let current = self.load_current_interval();
52 let adjusted = (current * 2).min(EventLoggerConstants::max_flush_interval_ms());
53 self.current_flush_interval_ms.store(adjusted, Relaxed);
54
55 log_d!(
56 TAG,
57 "Flush interval adjusted for failure: was {}ms, now {}ms",
58 current,
59 adjusted
60 );
61 }
62
63 pub fn has_cooled_from_most_recent_failure(&self) -> bool {
64 let last_flush_attempt_time = self.load_last_scheduled_flush_attempt_time();
65 let flush_interval_ms = self.load_current_interval();
66 let next_flush_time = last_flush_attempt_time + flush_interval_ms;
67 next_flush_time < get_now_timestamp()
68 }
69
70 pub fn has_waited_max_allowed_interval(&self) -> bool {
71 let last_flush_attempt_time = self.load_last_scheduled_flush_attempt_time();
72 let next_flush_time =
73 last_flush_attempt_time + EventLoggerConstants::max_flush_interval_ms();
74 next_flush_time < get_now_timestamp()
75 }
76
77 pub fn has_completely_recovered_from_backoff(&self) -> bool {
78 let current_interval = self.load_current_interval();
79 current_interval <= EventLoggerConstants::min_flush_interval_ms()
80 }
81
82 fn load_current_interval(&self) -> u64 {
83 self.current_flush_interval_ms.load(Relaxed)
84 }
85
86 fn load_last_scheduled_flush_attempt_time(&self) -> u64 {
87 self.last_scheduled_flush_attempt_time.load(Relaxed)
88 }
89}
90
91fn get_now_timestamp() -> u64 {
92 Utc::now().timestamp_millis() as u64
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98
99 #[test]
100 fn test_failure_doubles_backoff() {
101 let flush_interval = FlushInterval::new();
102
103 flush_interval.adjust_for_failure();
104 assert_eq!(flush_interval.get_current_flush_interval_ms(), 2000);
105
106 flush_interval.adjust_for_failure();
107 assert_eq!(flush_interval.get_current_flush_interval_ms(), 4000);
108 }
109
110 #[test]
111 fn test_failure_backoff_max() {
112 let flush_interval = FlushInterval::new();
113
114 for _ in 0..1000 {
115 flush_interval.adjust_for_failure();
116 }
117 assert_eq!(flush_interval.get_current_flush_interval_ms(), 60000);
118 }
119
120 #[test]
121 fn test_success_halves_backoff() {
122 let flush_interval = FlushInterval::new();
123
124 for _ in 0..1000 {
125 flush_interval.adjust_for_failure();
126 }
127
128 flush_interval.adjust_for_success();
129 assert_eq!(flush_interval.get_current_flush_interval_ms(), 30000);
130
131 flush_interval.adjust_for_success();
132 assert_eq!(flush_interval.get_current_flush_interval_ms(), 15000);
133 }
134
135 #[test]
136 fn test_success_backoff_min() {
137 let flush_interval = FlushInterval::new();
138
139 for _ in 0..1000 {
140 flush_interval.adjust_for_success();
141 }
142
143 assert_eq!(flush_interval.get_current_flush_interval_ms(), 1000);
144 }
145}