1use std::time::Duration;
4
5#[derive(Debug, Clone, PartialEq, Default)]
23pub struct Config {
24 pub link: Link,
25 pub capacity: Capacity,
26}
27
28impl Config {
29 pub fn new(link: Link, capacity: Capacity) -> Self {
30 Self { link, capacity }
31 }
32
33 pub fn bounded(capacity: usize) -> Self {
43 Self {
44 link: Link::default(),
45 capacity: Capacity::Bounded(capacity),
46 }
47 }
48
49 pub fn detached() -> Self {
59 Self {
60 link: Link::Detached,
61 capacity: Capacity::default(),
62 }
63 }
64
65 pub fn attached(timeout: Duration) -> Self {
66 Self {
67 link: Link::Attached(timeout),
68 capacity: Capacity::default(),
69 }
70 }
71}
72
73#[derive(Clone, Debug, PartialEq, Eq, Hash)]
79pub enum Link {
80 Detached,
81 Attached(Duration),
82}
83
84impl Default for Link {
85 fn default() -> Self {
86 Link::Attached(Duration::from_secs(1))
87 }
88}
89
90impl Link {
91 pub(crate) fn attach(&mut self, mut duration: Duration) -> Option<Duration> {
92 match self {
93 Link::Detached => {
94 *self = Link::Attached(duration);
95 None
96 }
97 Link::Attached(old_duration) => {
98 std::mem::swap(old_duration, &mut duration);
99 Some(duration)
100 }
101 }
102 }
103
104 pub(crate) fn detach(&mut self) -> Option<Duration> {
105 match self {
106 Link::Detached => {
107 *self = Link::Detached;
108 None
109 }
110 Link::Attached(_) => {
111 let mut link = Link::Detached;
112 std::mem::swap(self, &mut link);
113 match link {
114 Link::Attached(duration) => Some(duration),
115 Link::Detached => unreachable!(),
116 }
117 }
118 }
119 }
120
121 pub fn is_attached(&self) -> bool {
123 matches!(self, Link::Attached(_))
124 }
125}
126
127#[derive(Debug, Clone, PartialEq)]
134pub enum Capacity {
135 Bounded(usize),
136 Unbounded(BackPressure),
137}
138
139impl Capacity {
140 pub fn is_bounded(&self) -> bool {
142 matches!(self, Self::Bounded(_))
143 }
144}
145
146impl Default for Capacity {
147 fn default() -> Self {
148 Capacity::Unbounded(BackPressure::default())
149 }
150}
151
152#[derive(Debug, Clone, PartialEq)]
158pub struct BackPressure {
159 starts_at: usize,
160 base_ns: u64,
161 exp_growth: Option<f32>,
162}
163
164impl BackPressure {
165 pub fn linear(starts_at: usize, timeout: Duration) -> Self {
173 let base_ns = timeout
174 .as_nanos()
175 .try_into()
176 .expect("Base duration > 213_503 days");
177
178 Self {
179 starts_at,
180 base_ns,
181 exp_growth: None,
182 }
183 }
184
185 pub fn exponential(starts_at: usize, timeout: Duration, factor: f32) -> Self {
193 if factor < 0.0 {
194 panic!("Negative factors not allowed!")
195 }
196
197 let base_ns = timeout
198 .as_nanos()
199 .try_into()
200 .expect("Base duration > 213_503 days");
201
202 Self {
203 starts_at,
204 base_ns,
205 exp_growth: Some(factor),
206 }
207 }
208
209 pub fn disabled() -> Self {
211 Self {
212 starts_at: usize::MAX,
213 base_ns: 0,
214 exp_growth: None,
215 }
216 }
217
218 pub(crate) fn get_timeout(&self, msg_count: usize) -> Option<Duration> {
219 if msg_count < self.starts_at {
220 return None;
221 }
222
223 match self.exp_growth {
224 Some(factor) => {
225 let diff = (msg_count - self.starts_at).try_into().unwrap_or(i32::MAX);
226 let mult = (factor as f64).powi(diff);
227 let ns = self.base_ns as f64 * mult;
228 Some(Duration::from_nanos(ns as u64))
229 }
230 None => {
231 let diff = (msg_count - self.starts_at + 1) as u64;
232 let ns = self.base_ns.saturating_mul(diff);
233 Some(Duration::from_nanos(ns))
234 }
235 }
236 }
237}
238
239impl Default for BackPressure {
240 fn default() -> Self {
241 Self::exponential(5, Duration::from_nanos(25), 1.3)
242 }
243}
244
245#[cfg(test)]
246mod test {
247 use super::*;
248 use std::time::Duration;
249
250 #[test]
251 fn backpressure_linear_start_at() {
252 let bp = BackPressure::linear(10, Duration::from_secs(1));
253
254 assert_eq!(bp.get_timeout(9), None);
255 assert_eq!(bp.get_timeout(10), Some(Duration::from_secs(1)));
256 }
257
258 #[test]
259 fn backpressure_exponential_start_at() {
260 let bp = BackPressure::exponential(10, Duration::from_secs(1), 1.1);
261
262 assert_eq!(bp.get_timeout(9), None);
263 assert_eq!(
264 bp.get_timeout(10),
265 Some(Duration::from_nanos(1_000_000_000))
266 );
267 }
268
269 #[test]
270 fn backpressure_linear() {
271 let bp = BackPressure::linear(0, Duration::from_secs(1));
272
273 assert_eq!(bp.get_timeout(0), Some(Duration::from_secs(1)));
274 assert_eq!(bp.get_timeout(1), Some(Duration::from_secs(2)));
275 assert_eq!(bp.get_timeout(10), Some(Duration::from_secs(11)));
276 }
277
278 #[test]
279 fn backpressure_exponential() {
280 let bp = BackPressure::exponential(0, Duration::from_secs(1), 1.1);
281
282 assert_eq!(bp.get_timeout(0), Some(Duration::from_nanos(1_000_000_000)));
283 assert_eq!(bp.get_timeout(1), Some(Duration::from_nanos(1_100_000_023)));
284 assert_eq!(bp.get_timeout(2), Some(Duration::from_nanos(1_210_000_052)));
285 assert_eq!(bp.get_timeout(3), Some(Duration::from_nanos(1_331_000_086)));
286 }
287
288 #[test]
289 fn disabled() {
290 let bp = BackPressure::disabled();
291
292 assert_eq!(bp.get_timeout(0), None);
293 assert_eq!(bp.get_timeout(9), None);
294 assert_eq!(bp.get_timeout(usize::MAX - 1), None);
295 assert_eq!(bp.get_timeout(usize::MAX), Some(Duration::from_nanos(0)));
296 }
297}