lance_core/utils/
backoff.rs

1use rand::{Rng, SeedableRng};
2use std::time::Duration;
3
4// SPDX-License-Identifier: Apache-2.0
5// SPDX-FileCopyrightText: Copyright The Lance Authors
6
7/// Computes backoff as
8///
9/// ```text
10/// backoff = base^attempt * unit + jitter
11/// ```
12///
13/// The defaults are base=2, unit=50ms, jitter=50ms, min=0ms, max=5s. This gives
14/// a backoff of 50ms, 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, 5s, (not including jitter).
15///
16/// You can have non-exponential backoff by setting base=1.
17pub struct Backoff {
18    base: u32,
19    unit: u32,
20    jitter: i32,
21    min: u32,
22    max: u32,
23    attempt: u32,
24}
25
26impl Default for Backoff {
27    fn default() -> Self {
28        Self {
29            base: 2,
30            unit: 50,
31            jitter: 50,
32            min: 0,
33            max: 5000,
34            attempt: 0,
35        }
36    }
37}
38
39impl Backoff {
40    pub fn with_base(self, base: u32) -> Self {
41        Self { base, ..self }
42    }
43
44    pub fn with_unit(self, unit: u32) -> Self {
45        Self { unit, ..self }
46    }
47
48    pub fn with_jitter(self, jitter: i32) -> Self {
49        Self { jitter, ..self }
50    }
51
52    pub fn with_min(self, min: u32) -> Self {
53        Self { min, ..self }
54    }
55
56    pub fn with_max(self, max: u32) -> Self {
57        Self { max, ..self }
58    }
59
60    pub fn next_backoff(&mut self) -> Duration {
61        let backoff = self
62            .base
63            .saturating_pow(self.attempt)
64            .saturating_mul(self.unit);
65        let jitter = rand::rng().random_range(-self.jitter..=self.jitter);
66        let backoff = (backoff.saturating_add_signed(jitter)).clamp(self.min, self.max);
67        self.attempt += 1;
68        Duration::from_millis(backoff as u64)
69    }
70
71    pub fn attempt(&self) -> u32 {
72        self.attempt
73    }
74
75    pub fn reset(&mut self) {
76        self.attempt = 0;
77    }
78}
79
80/// SlotBackoff is a backoff strategy that randomly chooses a time slot to retry.
81///
82/// This is useful when you have multiple tasks that can't overlap, and each
83/// task takes roughly the same amount of time.
84///
85/// The `unit` represents the time it takes to complete one attempt. Future attempts
86/// are divided into time slots, and a random slot is chosen for the retry. The number
87/// of slots increases exponentially with each attempt. Initially, there are 4 slots,
88/// then 8, then 16, and so on.
89///
90/// Example:
91/// Suppose you have 10 tasks that can't overlap, each taking 1 second. The tasks
92/// don't know about each other and can't coordinate. Each task randomly picks a
93/// time slot to retry. Here's how it might look:
94///
95/// First round (4 slots):
96/// ```text
97/// task id   | 1, 2, 3 | 4, 5, 6 | 7, 8, 9 | 10 |
98/// status    | x, x, ✓ | x, x, ✓ | x, x, ✓ | ✓  |
99/// timeline  | 0s      | 1s      | 2s      | 3s |
100/// ```
101/// Each slot can have one success. Here, tasks 3, 6, 9, and 10 succeed.
102/// In the next round, the number of slots doubles (8):
103///
104/// Second round (8 slots):
105/// ```text
106/// task id   |  1 |  2 |    | 4, 5 |  7 |  8 |    |    |
107/// status    |  ✓ |  ✓ |    | x, ✓ |  ✓ |  ✓ |    |    |
108/// timeline  | 0s | 1s | 2s | 3s   | 4s | 5s | 6s | 7s |
109/// ```
110/// Most tasks are done now, except for task 4. It will succeed in the next round.
111pub struct SlotBackoff {
112    base: u32,
113    unit: u32,
114    starting_i: u32,
115    attempt: u32,
116    rng: rand::rngs::SmallRng,
117}
118
119impl Default for SlotBackoff {
120    fn default() -> Self {
121        Self {
122            base: 2,
123            unit: 50,
124            starting_i: 2, // start with 4 slots
125            attempt: 0,
126            rng: rand::rngs::SmallRng::from_os_rng(),
127        }
128    }
129}
130
131impl SlotBackoff {
132    pub fn with_unit(self, unit: u32) -> Self {
133        Self { unit, ..self }
134    }
135
136    pub fn attempt(&self) -> u32 {
137        self.attempt
138    }
139
140    pub fn next_backoff(&mut self) -> Duration {
141        let num_slots = self.base.saturating_pow(self.attempt + self.starting_i);
142        let slot_i = self.rng.random_range(0..num_slots);
143        self.attempt += 1;
144        Duration::from_millis((slot_i * self.unit) as u64)
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151
152    #[test]
153    fn test_backoff() {
154        let mut backoff = Backoff::default().with_jitter(0);
155        assert_eq!(backoff.next_backoff().as_millis(), 50);
156        assert_eq!(backoff.attempt(), 1);
157        assert_eq!(backoff.next_backoff().as_millis(), 100);
158        assert_eq!(backoff.attempt(), 2);
159        assert_eq!(backoff.next_backoff().as_millis(), 200);
160        assert_eq!(backoff.attempt(), 3);
161        assert_eq!(backoff.next_backoff().as_millis(), 400);
162        assert_eq!(backoff.attempt(), 4);
163    }
164
165    #[test]
166    fn test_slot_backoff() {
167        fn assert_in(value: u128, expected: &[u128]) {
168            assert!(
169                expected.contains(&value),
170                "value {} not in {:?}",
171                value,
172                expected
173            );
174        }
175
176        for _ in 0..10 {
177            let mut backoff = SlotBackoff::default().with_unit(100);
178            assert_in(backoff.next_backoff().as_millis(), &[0, 100, 200, 300]);
179            assert_eq!(backoff.attempt(), 1);
180            assert_in(
181                backoff.next_backoff().as_millis(),
182                &[0, 100, 200, 300, 400, 500, 600, 700],
183            );
184            assert_eq!(backoff.attempt(), 2);
185            assert_in(
186                backoff.next_backoff().as_millis(),
187                &(0..16).map(|i| i * 100).collect::<Vec<_>>(),
188            );
189            assert_eq!(backoff.attempt(), 3);
190        }
191    }
192}