opentelemetry_lambda_extension/
flush.rs

1//! Adaptive flush management.
2//!
3//! This module provides intelligent flush timing based on invocation patterns.
4//! It tracks invocation frequency and adapts the flush strategy accordingly.
5
6use crate::config::{FlushConfig, FlushStrategy};
7use std::collections::VecDeque;
8use std::time::{Duration, Instant};
9
10const HISTORY_SIZE: usize = 20;
11const FREQUENT_THRESHOLD: Duration = Duration::from_secs(120);
12const DEADLINE_BUFFER: Duration = Duration::from_millis(500);
13
14/// Reason for triggering a flush.
15#[non_exhaustive]
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum FlushReason {
18    /// Flush triggered by shutdown signal.
19    Shutdown,
20    /// Flush triggered by approaching deadline.
21    Deadline,
22    /// Flush triggered by buffer size limits.
23    BufferFull,
24    /// Flush triggered by periodic timer.
25    Periodic,
26    /// Flush triggered at end of invocation (infrequent pattern).
27    InvocationEnd,
28    /// Flush triggered by continuous strategy.
29    Continuous,
30}
31
32/// Manages adaptive flush timing based on invocation patterns.
33pub struct FlushManager {
34    config: FlushConfig,
35    invocation_timestamps: VecDeque<Instant>,
36    last_flush: Option<Instant>,
37    consecutive_timeout_count: usize,
38}
39
40impl FlushManager {
41    /// Creates a new flush manager with the given configuration.
42    pub fn new(config: FlushConfig) -> Self {
43        Self {
44            config,
45            invocation_timestamps: VecDeque::with_capacity(HISTORY_SIZE),
46            last_flush: None,
47            consecutive_timeout_count: 0,
48        }
49    }
50
51    /// Creates a new flush manager with default configuration.
52    pub fn with_defaults() -> Self {
53        Self::new(FlushConfig::default())
54    }
55
56    /// Records that an invocation has started.
57    pub fn record_invocation(&mut self) {
58        let now = Instant::now();
59
60        if self.invocation_timestamps.len() >= HISTORY_SIZE {
61            self.invocation_timestamps.pop_front();
62        }
63        self.invocation_timestamps.push_back(now);
64    }
65
66    /// Records that a flush completed successfully.
67    pub fn record_flush(&mut self) {
68        self.last_flush = Some(Instant::now());
69        self.consecutive_timeout_count = 0;
70    }
71
72    /// Records that a flush timed out or failed.
73    pub fn record_flush_timeout(&mut self) {
74        self.consecutive_timeout_count += 1;
75    }
76
77    /// Determines if a flush should be triggered now.
78    ///
79    /// # Arguments
80    ///
81    /// * `deadline_ms` - The invocation deadline in milliseconds since epoch
82    /// * `pending_count` - Number of signals waiting to be flushed
83    /// * `is_shutdown` - Whether a shutdown is in progress
84    pub fn should_flush(
85        &self,
86        deadline_ms: Option<i64>,
87        pending_count: usize,
88        is_shutdown: bool,
89    ) -> Option<FlushReason> {
90        if is_shutdown {
91            return Some(FlushReason::Shutdown);
92        }
93
94        if let Some(deadline) = deadline_ms
95            && self.is_deadline_approaching(deadline)
96        {
97            return Some(FlushReason::Deadline);
98        }
99
100        if pending_count > 0 && self.is_buffer_full(pending_count) {
101            return Some(FlushReason::BufferFull);
102        }
103
104        if pending_count == 0 {
105            return None;
106        }
107
108        match self.effective_strategy() {
109            FlushStrategy::Continuous => {
110                if self.should_flush_continuous() {
111                    return Some(FlushReason::Continuous);
112                }
113            }
114            FlushStrategy::Periodic => {
115                if self.should_flush_periodic() {
116                    return Some(FlushReason::Periodic);
117                }
118            }
119            FlushStrategy::End => {
120                return None;
121            }
122            FlushStrategy::Default => {
123                if self.is_infrequent_pattern() {
124                    return None;
125                } else if self.should_flush_continuous() {
126                    return Some(FlushReason::Continuous);
127                }
128            }
129        }
130
131        None
132    }
133
134    /// Should be called at the end of an invocation to check for end-of-invocation flush.
135    pub fn should_flush_on_invocation_end(&self, pending_count: usize) -> Option<FlushReason> {
136        if pending_count == 0 {
137            return None;
138        }
139
140        match self.effective_strategy() {
141            FlushStrategy::End => Some(FlushReason::InvocationEnd),
142            FlushStrategy::Default if self.is_infrequent_pattern() => {
143                Some(FlushReason::InvocationEnd)
144            }
145            _ => None,
146        }
147    }
148
149    /// Returns the effective flush strategy, considering escalation.
150    fn effective_strategy(&self) -> FlushStrategy {
151        if self.consecutive_timeout_count >= HISTORY_SIZE {
152            return FlushStrategy::Continuous;
153        }
154
155        self.config.strategy
156    }
157
158    /// Returns the average time between invocations, if available.
159    pub fn average_invocation_interval(&self) -> Option<Duration> {
160        if self.invocation_timestamps.len() < 2 {
161            return None;
162        }
163
164        let first = self.invocation_timestamps.front()?;
165        let last = self.invocation_timestamps.back()?;
166
167        let total_duration = last.duration_since(*first);
168        let intervals = self.invocation_timestamps.len() - 1;
169
170        Some(total_duration / intervals as u32)
171    }
172
173    /// Returns whether invocations are infrequent (>120s average interval).
174    pub fn is_infrequent_pattern(&self) -> bool {
175        match self.average_invocation_interval() {
176            Some(avg) => avg > FREQUENT_THRESHOLD,
177            None => true,
178        }
179    }
180
181    fn is_deadline_approaching(&self, deadline_ms: i64) -> bool {
182        let now_ms = std::time::SystemTime::now()
183            .duration_since(std::time::UNIX_EPOCH)
184            .unwrap_or_default()
185            .as_millis() as i64;
186
187        let remaining = deadline_ms - now_ms;
188        remaining < DEADLINE_BUFFER.as_millis() as i64
189    }
190
191    fn is_buffer_full(&self, pending_count: usize) -> bool {
192        pending_count >= self.config.max_batch_entries
193    }
194
195    fn should_flush_continuous(&self) -> bool {
196        match self.last_flush {
197            Some(last) => last.elapsed() >= self.config.interval,
198            None => true,
199        }
200    }
201
202    fn should_flush_periodic(&self) -> bool {
203        self.should_flush_continuous()
204    }
205
206    /// Returns the configured flush interval.
207    pub fn interval(&self) -> Duration {
208        self.config.interval
209    }
210
211    /// Returns the time until the next periodic flush should occur.
212    pub fn time_until_next_flush(&self) -> Duration {
213        match self.last_flush {
214            Some(last) => {
215                let elapsed = last.elapsed();
216                if elapsed >= self.config.interval {
217                    Duration::ZERO
218                } else {
219                    self.config.interval - elapsed
220                }
221            }
222            None => Duration::ZERO,
223        }
224    }
225
226    /// Returns the number of consecutive flush timeouts.
227    pub fn consecutive_timeout_count(&self) -> usize {
228        self.consecutive_timeout_count
229    }
230
231    /// Returns the number of invocations in the history.
232    pub fn invocation_history_len(&self) -> usize {
233        self.invocation_timestamps.len()
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    fn test_config() -> FlushConfig {
242        FlushConfig {
243            strategy: FlushStrategy::Default,
244            interval: Duration::from_millis(100),
245            max_batch_bytes: 1024,
246            max_batch_entries: 10,
247        }
248    }
249
250    #[test]
251    fn test_shutdown_always_flushes() {
252        let manager = FlushManager::new(test_config());
253
254        let reason = manager.should_flush(None, 1, true);
255        assert_eq!(reason, Some(FlushReason::Shutdown));
256
257        let reason = manager.should_flush(None, 0, true);
258        assert_eq!(reason, Some(FlushReason::Shutdown));
259    }
260
261    #[test]
262    fn test_buffer_full_triggers_flush() {
263        let mut config = test_config();
264        config.max_batch_entries = 5;
265
266        let manager = FlushManager::new(config);
267
268        let reason = manager.should_flush(None, 5, false);
269        assert_eq!(reason, Some(FlushReason::BufferFull));
270
271        let reason = manager.should_flush(None, 3, false);
272        assert_ne!(reason, Some(FlushReason::BufferFull));
273    }
274
275    #[test]
276    fn test_empty_buffer_no_flush() {
277        let manager = FlushManager::new(test_config());
278
279        let reason = manager.should_flush(None, 0, false);
280        assert!(reason.is_none());
281    }
282
283    #[test]
284    fn test_continuous_strategy() {
285        let mut config = test_config();
286        config.strategy = FlushStrategy::Continuous;
287        config.interval = Duration::from_millis(10);
288
289        let mut manager = FlushManager::new(config);
290
291        let reason = manager.should_flush(None, 1, false);
292        assert_eq!(reason, Some(FlushReason::Continuous));
293
294        manager.record_flush();
295
296        let reason = manager.should_flush(None, 1, false);
297        assert!(reason.is_none());
298
299        std::thread::sleep(Duration::from_millis(15));
300
301        let reason = manager.should_flush(None, 1, false);
302        assert_eq!(reason, Some(FlushReason::Continuous));
303    }
304
305    #[test]
306    fn test_end_strategy() {
307        let mut config = test_config();
308        config.strategy = FlushStrategy::End;
309
310        let manager = FlushManager::new(config);
311
312        let reason = manager.should_flush(None, 1, false);
313        assert!(reason.is_none());
314
315        let reason = manager.should_flush_on_invocation_end(1);
316        assert_eq!(reason, Some(FlushReason::InvocationEnd));
317    }
318
319    #[test]
320    fn test_infrequent_pattern_detection() {
321        let mut manager = FlushManager::new(test_config());
322
323        assert!(manager.is_infrequent_pattern());
324
325        manager.record_invocation();
326        assert!(manager.is_infrequent_pattern());
327    }
328
329    #[test]
330    fn test_frequent_pattern_detection() {
331        let mut manager = FlushManager::new(test_config());
332
333        for _ in 0..5 {
334            manager.record_invocation();
335        }
336
337        let avg = manager.average_invocation_interval();
338        assert!(avg.is_some());
339        assert!(!manager.is_infrequent_pattern());
340    }
341
342    #[test]
343    fn test_timeout_escalation() {
344        let mut config = test_config();
345        config.strategy = FlushStrategy::End;
346
347        let mut manager = FlushManager::new(config);
348
349        for _ in 0..HISTORY_SIZE {
350            manager.record_flush_timeout();
351        }
352
353        let reason = manager.should_flush(None, 1, false);
354        assert_eq!(reason, Some(FlushReason::Continuous));
355    }
356
357    #[test]
358    fn test_record_flush_resets_timeout_count() {
359        let mut manager = FlushManager::new(test_config());
360
361        manager.record_flush_timeout();
362        manager.record_flush_timeout();
363        assert_eq!(manager.consecutive_timeout_count(), 2);
364
365        manager.record_flush();
366        assert_eq!(manager.consecutive_timeout_count(), 0);
367    }
368
369    #[test]
370    fn test_time_until_next_flush() {
371        let mut config = test_config();
372        config.interval = Duration::from_millis(100);
373
374        let mut manager = FlushManager::new(config);
375
376        assert_eq!(manager.time_until_next_flush(), Duration::ZERO);
377
378        manager.record_flush();
379
380        let remaining = manager.time_until_next_flush();
381        assert!(remaining <= Duration::from_millis(100));
382        assert!(remaining > Duration::ZERO);
383    }
384
385    #[test]
386    fn test_invocation_history_limit() {
387        let mut manager = FlushManager::new(test_config());
388
389        for _ in 0..30 {
390            manager.record_invocation();
391        }
392
393        assert_eq!(manager.invocation_history_len(), HISTORY_SIZE);
394    }
395}