opentelemetry_lambda_extension/
flush.rs1use crate::config::{FlushConfig, FlushStrategy};
7use std::collections::VecDeque;
8use std::time::{Duration, Instant};
9
10const HISTORY_SIZE: usize = 20;
11const FREQUENT_THRESHOLD: Duration = Duration::from_secs(120);
12const DEADLINE_BUFFER: Duration = Duration::from_millis(500);
13
14#[non_exhaustive]
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum FlushReason {
18 Shutdown,
20 Deadline,
22 BufferFull,
24 Periodic,
26 InvocationEnd,
28 Continuous,
30}
31
32pub struct FlushManager {
34 config: FlushConfig,
35 invocation_timestamps: VecDeque<Instant>,
36 last_flush: Option<Instant>,
37 consecutive_timeout_count: usize,
38}
39
40impl FlushManager {
41 pub fn new(config: FlushConfig) -> Self {
43 Self {
44 config,
45 invocation_timestamps: VecDeque::with_capacity(HISTORY_SIZE),
46 last_flush: None,
47 consecutive_timeout_count: 0,
48 }
49 }
50
51 pub fn with_defaults() -> Self {
53 Self::new(FlushConfig::default())
54 }
55
56 pub fn record_invocation(&mut self) {
58 let now = Instant::now();
59
60 if self.invocation_timestamps.len() >= HISTORY_SIZE {
61 self.invocation_timestamps.pop_front();
62 }
63 self.invocation_timestamps.push_back(now);
64 }
65
66 pub fn record_flush(&mut self) {
68 self.last_flush = Some(Instant::now());
69 self.consecutive_timeout_count = 0;
70 }
71
72 pub fn record_flush_timeout(&mut self) {
74 self.consecutive_timeout_count += 1;
75 }
76
77 pub fn should_flush(
85 &self,
86 deadline_ms: Option<i64>,
87 pending_count: usize,
88 is_shutdown: bool,
89 ) -> Option<FlushReason> {
90 if is_shutdown {
91 return Some(FlushReason::Shutdown);
92 }
93
94 if let Some(deadline) = deadline_ms
95 && self.is_deadline_approaching(deadline)
96 {
97 return Some(FlushReason::Deadline);
98 }
99
100 if pending_count > 0 && self.is_buffer_full(pending_count) {
101 return Some(FlushReason::BufferFull);
102 }
103
104 if pending_count == 0 {
105 return None;
106 }
107
108 match self.effective_strategy() {
109 FlushStrategy::Continuous => {
110 if self.should_flush_continuous() {
111 return Some(FlushReason::Continuous);
112 }
113 }
114 FlushStrategy::Periodic => {
115 if self.should_flush_periodic() {
116 return Some(FlushReason::Periodic);
117 }
118 }
119 FlushStrategy::End => {
120 return None;
121 }
122 FlushStrategy::Default => {
123 if self.is_infrequent_pattern() {
124 return None;
125 } else if self.should_flush_continuous() {
126 return Some(FlushReason::Continuous);
127 }
128 }
129 }
130
131 None
132 }
133
134 pub fn should_flush_on_invocation_end(&self, pending_count: usize) -> Option<FlushReason> {
136 if pending_count == 0 {
137 return None;
138 }
139
140 match self.effective_strategy() {
141 FlushStrategy::End => Some(FlushReason::InvocationEnd),
142 FlushStrategy::Default if self.is_infrequent_pattern() => {
143 Some(FlushReason::InvocationEnd)
144 }
145 _ => None,
146 }
147 }
148
149 fn effective_strategy(&self) -> FlushStrategy {
151 if self.consecutive_timeout_count >= HISTORY_SIZE {
152 return FlushStrategy::Continuous;
153 }
154
155 self.config.strategy
156 }
157
158 pub fn average_invocation_interval(&self) -> Option<Duration> {
160 if self.invocation_timestamps.len() < 2 {
161 return None;
162 }
163
164 let first = self.invocation_timestamps.front()?;
165 let last = self.invocation_timestamps.back()?;
166
167 let total_duration = last.duration_since(*first);
168 let intervals = self.invocation_timestamps.len() - 1;
169
170 Some(total_duration / intervals as u32)
171 }
172
173 pub fn is_infrequent_pattern(&self) -> bool {
175 match self.average_invocation_interval() {
176 Some(avg) => avg > FREQUENT_THRESHOLD,
177 None => true,
178 }
179 }
180
181 fn is_deadline_approaching(&self, deadline_ms: i64) -> bool {
182 let now_ms = std::time::SystemTime::now()
183 .duration_since(std::time::UNIX_EPOCH)
184 .unwrap_or_default()
185 .as_millis() as i64;
186
187 let remaining = deadline_ms - now_ms;
188 remaining < DEADLINE_BUFFER.as_millis() as i64
189 }
190
191 fn is_buffer_full(&self, pending_count: usize) -> bool {
192 pending_count >= self.config.max_batch_entries
193 }
194
195 fn should_flush_continuous(&self) -> bool {
196 match self.last_flush {
197 Some(last) => last.elapsed() >= self.config.interval,
198 None => true,
199 }
200 }
201
202 fn should_flush_periodic(&self) -> bool {
203 self.should_flush_continuous()
204 }
205
206 pub fn interval(&self) -> Duration {
208 self.config.interval
209 }
210
211 pub fn time_until_next_flush(&self) -> Duration {
213 match self.last_flush {
214 Some(last) => {
215 let elapsed = last.elapsed();
216 if elapsed >= self.config.interval {
217 Duration::ZERO
218 } else {
219 self.config.interval - elapsed
220 }
221 }
222 None => Duration::ZERO,
223 }
224 }
225
226 pub fn consecutive_timeout_count(&self) -> usize {
228 self.consecutive_timeout_count
229 }
230
231 pub fn invocation_history_len(&self) -> usize {
233 self.invocation_timestamps.len()
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 fn test_config() -> FlushConfig {
242 FlushConfig {
243 strategy: FlushStrategy::Default,
244 interval: Duration::from_millis(100),
245 max_batch_bytes: 1024,
246 max_batch_entries: 10,
247 }
248 }
249
250 #[test]
251 fn test_shutdown_always_flushes() {
252 let manager = FlushManager::new(test_config());
253
254 let reason = manager.should_flush(None, 1, true);
255 assert_eq!(reason, Some(FlushReason::Shutdown));
256
257 let reason = manager.should_flush(None, 0, true);
258 assert_eq!(reason, Some(FlushReason::Shutdown));
259 }
260
261 #[test]
262 fn test_buffer_full_triggers_flush() {
263 let mut config = test_config();
264 config.max_batch_entries = 5;
265
266 let manager = FlushManager::new(config);
267
268 let reason = manager.should_flush(None, 5, false);
269 assert_eq!(reason, Some(FlushReason::BufferFull));
270
271 let reason = manager.should_flush(None, 3, false);
272 assert_ne!(reason, Some(FlushReason::BufferFull));
273 }
274
275 #[test]
276 fn test_empty_buffer_no_flush() {
277 let manager = FlushManager::new(test_config());
278
279 let reason = manager.should_flush(None, 0, false);
280 assert!(reason.is_none());
281 }
282
283 #[test]
284 fn test_continuous_strategy() {
285 let mut config = test_config();
286 config.strategy = FlushStrategy::Continuous;
287 config.interval = Duration::from_millis(10);
288
289 let mut manager = FlushManager::new(config);
290
291 let reason = manager.should_flush(None, 1, false);
292 assert_eq!(reason, Some(FlushReason::Continuous));
293
294 manager.record_flush();
295
296 let reason = manager.should_flush(None, 1, false);
297 assert!(reason.is_none());
298
299 std::thread::sleep(Duration::from_millis(15));
300
301 let reason = manager.should_flush(None, 1, false);
302 assert_eq!(reason, Some(FlushReason::Continuous));
303 }
304
305 #[test]
306 fn test_end_strategy() {
307 let mut config = test_config();
308 config.strategy = FlushStrategy::End;
309
310 let manager = FlushManager::new(config);
311
312 let reason = manager.should_flush(None, 1, false);
313 assert!(reason.is_none());
314
315 let reason = manager.should_flush_on_invocation_end(1);
316 assert_eq!(reason, Some(FlushReason::InvocationEnd));
317 }
318
319 #[test]
320 fn test_infrequent_pattern_detection() {
321 let mut manager = FlushManager::new(test_config());
322
323 assert!(manager.is_infrequent_pattern());
324
325 manager.record_invocation();
326 assert!(manager.is_infrequent_pattern());
327 }
328
329 #[test]
330 fn test_frequent_pattern_detection() {
331 let mut manager = FlushManager::new(test_config());
332
333 for _ in 0..5 {
334 manager.record_invocation();
335 }
336
337 let avg = manager.average_invocation_interval();
338 assert!(avg.is_some());
339 assert!(!manager.is_infrequent_pattern());
340 }
341
342 #[test]
343 fn test_timeout_escalation() {
344 let mut config = test_config();
345 config.strategy = FlushStrategy::End;
346
347 let mut manager = FlushManager::new(config);
348
349 for _ in 0..HISTORY_SIZE {
350 manager.record_flush_timeout();
351 }
352
353 let reason = manager.should_flush(None, 1, false);
354 assert_eq!(reason, Some(FlushReason::Continuous));
355 }
356
357 #[test]
358 fn test_record_flush_resets_timeout_count() {
359 let mut manager = FlushManager::new(test_config());
360
361 manager.record_flush_timeout();
362 manager.record_flush_timeout();
363 assert_eq!(manager.consecutive_timeout_count(), 2);
364
365 manager.record_flush();
366 assert_eq!(manager.consecutive_timeout_count(), 0);
367 }
368
369 #[test]
370 fn test_time_until_next_flush() {
371 let mut config = test_config();
372 config.interval = Duration::from_millis(100);
373
374 let mut manager = FlushManager::new(config);
375
376 assert_eq!(manager.time_until_next_flush(), Duration::ZERO);
377
378 manager.record_flush();
379
380 let remaining = manager.time_until_next_flush();
381 assert!(remaining <= Duration::from_millis(100));
382 assert!(remaining > Duration::ZERO);
383 }
384
385 #[test]
386 fn test_invocation_history_limit() {
387 let mut manager = FlushManager::new(test_config());
388
389 for _ in 0..30 {
390 manager.record_invocation();
391 }
392
393 assert_eq!(manager.invocation_history_len(), HISTORY_SIZE);
394 }
395}