Skip to main content

kube_runtime/utils/
backoff_reset_timer.rs

1use std::time::{Duration, Instant};
2
3/// A mostly internal trait to allow resetting backoff based on a reset duration
4pub trait Backoff: Iterator<Item = Duration> + Send + Sync + Unpin {
5    /// Resets the internal state to the initial value.
6    fn reset(&mut self);
7}
8
9impl<B: Backoff + ?Sized> Backoff for Box<B> {
10    fn reset(&mut self) {
11        let this: &mut B = self;
12        this.reset()
13    }
14}
15
16/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
17pub struct ResetTimerBackoff<B: Backoff> {
18    backoff: B,
19    last_backoff: Option<Instant>,
20    reset_duration: Duration,
21}
22
23impl<B: Backoff> ResetTimerBackoff<B> {
24    /// Create a reset backoff wrapper for a given `Backoff` implementing object and a reset duration.
25    pub fn new(backoff: B, reset_duration: Duration) -> Self {
26        Self {
27            backoff,
28            last_backoff: None,
29            reset_duration,
30        }
31    }
32}
33
34impl<B: Backoff> Iterator for ResetTimerBackoff<B> {
35    type Item = Duration;
36
37    fn next(&mut self) -> Option<Duration> {
38        if let Some(last_backoff) = self.last_backoff
39            && tokio::time::Instant::now().into_std() > last_backoff + self.reset_duration
40        {
41            tracing::debug!(
42                ?last_backoff,
43                reset_duration = ?self.reset_duration,
44                "Resetting backoff, since reset duration has expired"
45            );
46            self.backoff.reset();
47        }
48        self.last_backoff = Some(tokio::time::Instant::now().into_std());
49        self.backoff.next()
50    }
51}
52
53impl<B: Backoff> Backoff for ResetTimerBackoff<B> {
54    fn reset(&mut self) {
55        self.backoff.reset();
56    }
57}
58
59#[cfg(test)]
60mod tests {
61    use tokio::time::advance;
62
63    use super::ResetTimerBackoff;
64    use crate::utils::stream_backoff::tests::LinearBackoff;
65    use std::time::Duration;
66
67    #[tokio::test]
68    async fn should_reset_when_timer_expires() {
69        tokio::time::pause();
70        let mut backoff = ResetTimerBackoff::new(
71            LinearBackoff::new(Duration::from_secs(2)),
72            Duration::from_secs(60),
73        );
74        assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
75        advance(Duration::from_secs(40)).await;
76        assert_eq!(backoff.next(), Some(Duration::from_secs(4)));
77        advance(Duration::from_secs(40)).await;
78        assert_eq!(backoff.next(), Some(Duration::from_secs(6)));
79        advance(Duration::from_secs(80)).await;
80        assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
81        advance(Duration::from_secs(80)).await;
82        assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
83    }
84}