statsig_rust/event_logging/
flush_interval.rs

1use std::sync::atomic::AtomicU64;
2
3use crate::{event_logging::event_logger_constants::EventLoggerConstants, log_d};
4use chrono::Utc;
5use std::sync::atomic::Ordering::Relaxed;
6
7const TAG: &str = stringify!(FlushInterval);
8
9#[derive(Default)]
10pub struct FlushInterval {
11    current_flush_interval_ms: AtomicU64,
12    last_scheduled_flush_attempt_time: AtomicU64,
13}
14
15impl FlushInterval {
16    pub fn new() -> Self {
17        Self {
18            current_flush_interval_ms: AtomicU64::new(EventLoggerConstants::min_flush_interval_ms()),
19            last_scheduled_flush_attempt_time: AtomicU64::new(get_now_timestamp()),
20        }
21    }
22
23    pub fn get_current_flush_interval_ms(&self) -> u64 {
24        self.current_flush_interval_ms.load(Relaxed)
25    }
26
27    pub fn mark_scheduled_flush_attempt(&self) {
28        let now = get_now_timestamp();
29        self.last_scheduled_flush_attempt_time.store(now, Relaxed);
30    }
31
32    pub fn adjust_for_success(&self) {
33        let current = self.load_current_interval();
34        let min_interval = EventLoggerConstants::min_flush_interval_ms();
35        let adjusted = (current / 2).max(min_interval);
36        if current == min_interval {
37            return;
38        }
39
40        self.current_flush_interval_ms.store(adjusted, Relaxed);
41
42        log_d!(
43            TAG,
44            "Flush interval adjusted for success: was {}ms, now {}ms",
45            current,
46            adjusted
47        );
48    }
49
50    pub fn adjust_for_failure(&self) {
51        let current = self.load_current_interval();
52        let adjusted = (current * 2).min(EventLoggerConstants::max_flush_interval_ms());
53        self.current_flush_interval_ms.store(adjusted, Relaxed);
54
55        log_d!(
56            TAG,
57            "Flush interval adjusted for failure: was {}ms, now {}ms",
58            current,
59            adjusted
60        );
61    }
62
63    pub fn has_cooled_from_most_recent_failure(&self) -> bool {
64        let last_flush_attempt_time = self.load_last_scheduled_flush_attempt_time();
65        let flush_interval_ms = self.load_current_interval();
66        let next_flush_time = last_flush_attempt_time + flush_interval_ms;
67        next_flush_time < get_now_timestamp()
68    }
69
70    pub fn has_waited_max_allowed_interval(&self) -> bool {
71        let last_flush_attempt_time = self.load_last_scheduled_flush_attempt_time();
72        let next_flush_time =
73            last_flush_attempt_time + EventLoggerConstants::max_flush_interval_ms();
74        next_flush_time < get_now_timestamp()
75    }
76
77    pub fn has_completely_recovered_from_backoff(&self) -> bool {
78        let current_interval = self.load_current_interval();
79        current_interval <= EventLoggerConstants::min_flush_interval_ms()
80    }
81
82    fn load_current_interval(&self) -> u64 {
83        self.current_flush_interval_ms.load(Relaxed)
84    }
85
86    fn load_last_scheduled_flush_attempt_time(&self) -> u64 {
87        self.last_scheduled_flush_attempt_time.load(Relaxed)
88    }
89}
90
91fn get_now_timestamp() -> u64 {
92    Utc::now().timestamp_millis() as u64
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98
99    #[test]
100    fn test_failure_doubles_backoff() {
101        let flush_interval = FlushInterval::new();
102
103        flush_interval.adjust_for_failure();
104        assert_eq!(flush_interval.get_current_flush_interval_ms(), 2000);
105
106        flush_interval.adjust_for_failure();
107        assert_eq!(flush_interval.get_current_flush_interval_ms(), 4000);
108    }
109
110    #[test]
111    fn test_failure_backoff_max() {
112        let flush_interval = FlushInterval::new();
113
114        for _ in 0..1000 {
115            flush_interval.adjust_for_failure();
116        }
117        assert_eq!(flush_interval.get_current_flush_interval_ms(), 60000);
118    }
119
120    #[test]
121    fn test_success_halves_backoff() {
122        let flush_interval = FlushInterval::new();
123
124        for _ in 0..1000 {
125            flush_interval.adjust_for_failure();
126        }
127
128        flush_interval.adjust_for_success();
129        assert_eq!(flush_interval.get_current_flush_interval_ms(), 30000);
130
131        flush_interval.adjust_for_success();
132        assert_eq!(flush_interval.get_current_flush_interval_ms(), 15000);
133    }
134
135    #[test]
136    fn test_success_backoff_min() {
137        let flush_interval = FlushInterval::new();
138
139        for _ in 0..1000 {
140            flush_interval.adjust_for_success();
141        }
142
143        assert_eq!(flush_interval.get_current_flush_interval_ms(), 1000);
144    }
145}