Skip to main content

atomr_core/routing/
resizer.rs

1//! Pool resizer config.,
2//! `DefaultResizer.cs`, `ResizerSpec`.
3//!
4//! A resizer monitors pressure on a pool of routees and decides whether
5//! to grow or shrink it. atomr's pool routers (RoundRobin, Random,
6//! SmallestMailbox, …) accept a [`ResizerConfig`] and implement
7//! [`ResizerConfig::compute_delta`] to advise the parent on how many
8//! routees to add or remove.
9//!
10//! The semantics mirror:
11//!   * `lower_bound` ≤ pool size ≤ `upper_bound`
12//!   * pressure is measured as the count of busy routees
13//!   * if pressure ≥ `pressure_threshold * pool_size` for
14//!     `messages_per_resize` messages, grow by `rampup_rate * pool_size`
15//!     (rounded up, clamped to upper bound)
16//!   * if pressure ≤ `backoff_threshold * pool_size` after a delay,
17//!     shrink by `backoff_rate * pool_size`
18
19use std::time::Duration;
20
21#[derive(Debug, Clone, PartialEq)]
22pub struct ResizerConfig {
23    pub lower_bound: usize,
24    pub upper_bound: usize,
25    /// Fraction of busy routees that triggers growth (0.0..=1.0).
26    pub pressure_threshold: f64,
27    /// Fraction of routees needed to be idle to trigger backoff.
28    pub backoff_threshold: f64,
29    /// Multiplicative ramp-up factor applied to the current size.
30    pub rampup_rate: f64,
31    /// Multiplicative ramp-down factor applied to the current size.
32    pub backoff_rate: f64,
33    /// How many messages must be processed before checking pressure
34    /// again.
35    pub messages_per_resize: u64,
36    /// Idle interval before considering a backoff resize.
37    pub backoff_delay: Duration,
38}
39
40impl Default for ResizerConfig {
41    fn default() -> Self {
42        Self {
43            lower_bound: 1,
44            upper_bound: 10,
45            pressure_threshold: 1.0,
46            backoff_threshold: 0.3,
47            rampup_rate: 0.2,
48            backoff_rate: 0.1,
49            messages_per_resize: 10,
50            backoff_delay: Duration::from_secs(10),
51        }
52    }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct ResizeAdvice {
57    /// Net change to apply: positive grows the pool, negative shrinks
58    /// it, zero leaves it alone.
59    pub delta: i32,
60}
61
62impl ResizerConfig {
63    /// Decide how many routees to add or remove given the current pool
64    /// size and the count of currently-busy routees. Returns the net
65    /// delta clamped to `[lower_bound, upper_bound]`.
66    pub fn compute_delta(&self, current_size: usize, busy: usize) -> ResizeAdvice {
67        if current_size == 0 {
68            // Always grow to at least the lower bound.
69            return ResizeAdvice { delta: self.lower_bound as i32 };
70        }
71        let load = busy as f64 / current_size as f64;
72        let target = if load >= self.pressure_threshold {
73            let grown = current_size as f64 * (1.0 + self.rampup_rate);
74            grown.ceil() as usize
75        } else if load <= self.backoff_threshold {
76            let shrunk = current_size as f64 * (1.0 - self.backoff_rate);
77            shrunk.floor() as usize
78        } else {
79            current_size
80        };
81        let clamped = target.clamp(self.lower_bound, self.upper_bound);
82        ResizeAdvice { delta: clamped as i32 - current_size as i32 }
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89
90    #[test]
91    fn default_grows_under_pressure() {
92        let r = ResizerConfig::default();
93        let advice = r.compute_delta(2, 2);
94        assert!(advice.delta > 0, "expected growth, got {:?}", advice);
95    }
96
97    #[test]
98    fn default_shrinks_when_idle() {
99        let r = ResizerConfig { lower_bound: 1, upper_bound: 10, backoff_rate: 0.5, ..Default::default() };
100        let advice = r.compute_delta(8, 0);
101        assert!(advice.delta < 0, "expected shrink, got {:?}", advice);
102    }
103
104    #[test]
105    fn clamps_to_upper_bound() {
106        let r = ResizerConfig {
107            lower_bound: 1,
108            upper_bound: 4,
109            rampup_rate: 5.0,
110            pressure_threshold: 0.5,
111            ..Default::default()
112        };
113        let advice = r.compute_delta(3, 3);
114        // Cannot exceed upper_bound (=4) regardless of rampup.
115        assert_eq!(advice.delta, 1);
116    }
117
118    #[test]
119    fn clamps_to_lower_bound() {
120        let r = ResizerConfig {
121            lower_bound: 2,
122            upper_bound: 10,
123            backoff_rate: 0.9,
124            backoff_threshold: 0.5,
125            ..Default::default()
126        };
127        let advice = r.compute_delta(3, 0);
128        assert_eq!(advice.delta, -1);
129    }
130
131    #[test]
132    fn zero_size_grows_to_lower_bound() {
133        let r = ResizerConfig { lower_bound: 3, ..Default::default() };
134        let advice = r.compute_delta(0, 0);
135        assert_eq!(advice.delta, 3);
136    }
137
138    #[test]
139    fn no_change_when_load_in_band() {
140        let r = ResizerConfig { pressure_threshold: 0.9, backoff_threshold: 0.1, ..Default::default() };
141        let advice = r.compute_delta(5, 3);
142        assert_eq!(advice.delta, 0);
143    }
144}