tiny_actor/config/
mod.rs

1//! Module containing the configuration for newly spawned actors. See [Config] for more details.
2
3use std::time::Duration;
4
5/// The config used for spawning new processes, made up of a [Link] and a [Capacity]. This
6/// decides whether the actor will be attached/detached and unbounded/bounded.
7///
8/// # Example
9/// ```no_run
10/// use tiny_actor::*;
11/// Config {
12///     link: Link::default(),
13///     capacity: Capacity::default(),
14/// };
15/// ```
16///
17/// # Default
18/// Attached with an abort-timer of `1 sec`.
19///
20/// The capacity is `unbounded`, with `exponential` backoff starting `5 messages` in the inbox
21/// at `25 ns`, with a growth-factor of `1.3`.
22#[derive(Debug, Clone, PartialEq, Default)]
23pub struct Config {
24    pub link: Link,
25    pub capacity: Capacity,
26}
27
28impl Config {
29    pub fn new(link: Link, capacity: Capacity) -> Self {
30        Self { link, capacity }
31    }
32
33    /// A config for a default bounded [Channel]:
34    /// ```no_run
35    /// # use tiny_actor::*;
36    /// # let capacity = 1;
37    /// Config {
38    ///     link: Link::default(),
39    ///     capacity: Capacity::Bounded(capacity),
40    /// };
41    /// ```
42    pub fn bounded(capacity: usize) -> Self {
43        Self {
44            link: Link::default(),
45            capacity: Capacity::Bounded(capacity),
46        }
47    }
48
49    /// A default bounded inbox:
50    /// ```no_run
51    /// # use tiny_actor::*;
52    /// # let capacity = 1;
53    /// Config {
54    ///     link: Link::Detached,
55    ///     capacity: Capacity::default(),
56    /// };
57    /// ```
58    pub fn detached() -> Self {
59        Self {
60            link: Link::Detached,
61            capacity: Capacity::default(),
62        }
63    }
64
65    pub fn attached(timeout: Duration) -> Self {
66        Self {
67            link: Link::Attached(timeout),
68            capacity: Capacity::default(),
69        }
70    }
71}
72
73/// This decides whether the actor is attached or detached. If it is attached, then the
74/// abort-timer is specified here as well.
75///
76/// # Default
77/// Attached with an abort-timer of 1 second.
78#[derive(Clone, Debug, PartialEq, Eq, Hash)]
79pub enum Link {
80    Detached,
81    Attached(Duration),
82}
83
84impl Default for Link {
85    fn default() -> Self {
86        Link::Attached(Duration::from_secs(1))
87    }
88}
89
90impl Link {
91    pub(crate) fn attach(&mut self, mut duration: Duration) -> Option<Duration> {
92        match self {
93            Link::Detached => {
94                *self = Link::Attached(duration);
95                None
96            }
97            Link::Attached(old_duration) => {
98                std::mem::swap(old_duration, &mut duration);
99                Some(duration)
100            }
101        }
102    }
103
104    pub(crate) fn detach(&mut self) -> Option<Duration> {
105        match self {
106            Link::Detached => {
107                *self = Link::Detached;
108                None
109            }
110            Link::Attached(_) => {
111                let mut link = Link::Detached;
112                std::mem::swap(self, &mut link);
113                match link {
114                    Link::Attached(duration) => Some(duration),
115                    Link::Detached => unreachable!(),
116                }
117            }
118        }
119    }
120
121    /// Whether the link is attached.
122    pub fn is_attached(&self) -> bool {
123        matches!(self, Link::Attached(_))
124    }
125}
126
127/// This decides whether the actor is bounded or unbounded. If it is unbounded,
128/// then a [BackPressure] must be given.
129///
130/// # Default
131/// `unbounded`, with `exponential` backoff starting `5 messages` in the inbox
132/// at `25 ns`, with a growth-factor of `1.3`
133#[derive(Debug, Clone, PartialEq)]
134pub enum Capacity {
135    Bounded(usize),
136    Unbounded(BackPressure),
137}
138
139impl Capacity {
140    /// Whether the capacity is bounded.
141    pub fn is_bounded(&self) -> bool {
142        matches!(self, Self::Bounded(_))
143    }
144}
145
146impl Default for Capacity {
147    fn default() -> Self {
148        Capacity::Unbounded(BackPressure::default())
149    }
150}
151
152/// The backpressure mechanism for unbounded inboxes.
153///
154/// # Default
155/// `exponential` backoff starting `5 messages` in the inbox at `25 ns`, with a
156/// growth-factor of `1.3`
157#[derive(Debug, Clone, PartialEq)]
158pub struct BackPressure {
159    starts_at: usize,
160    base_ns: u64,
161    exp_growth: Option<f32>,
162}
163
164impl BackPressure {
165    /// Creates a new linear backpressure.
166    ///
167    /// The timeout is calculated as follows:
168    /// `timeout = timeout * (msg_count - start_at)`
169    ///
170    /// # Panics
171    /// Panics if the `timeout` is bigger than `213_503 days`.
172    pub fn linear(starts_at: usize, timeout: Duration) -> Self {
173        let base_ns = timeout
174            .as_nanos()
175            .try_into()
176            .expect("Base duration > 213_503 days");
177
178        Self {
179            starts_at,
180            base_ns,
181            exp_growth: None,
182        }
183    }
184
185    /// Creates a new linear backpressure.
186    ///
187    /// The timeout is calculated as follows:
188    /// `timeout = timeout * (factor ^ (msg_count - start_at))`
189    ///
190    /// # Panics
191    /// Panics if the `factor` is negative, or if the `timeout` is bigger than `213_503 days.`
192    pub fn exponential(starts_at: usize, timeout: Duration, factor: f32) -> Self {
193        if factor < 0.0 {
194            panic!("Negative factors not allowed!")
195        }
196
197        let base_ns = timeout
198            .as_nanos()
199            .try_into()
200            .expect("Base duration > 213_503 days");
201
202        Self {
203            starts_at,
204            base_ns,
205            exp_growth: Some(factor),
206        }
207    }
208
209    /// Create a new backpressure that is disabled.
210    pub fn disabled() -> Self {
211        Self {
212            starts_at: usize::MAX,
213            base_ns: 0,
214            exp_growth: None,
215        }
216    }
217
218    pub(crate) fn get_timeout(&self, msg_count: usize) -> Option<Duration> {
219        if msg_count < self.starts_at {
220            return None;
221        }
222
223        match self.exp_growth {
224            Some(factor) => {
225                let diff = (msg_count - self.starts_at).try_into().unwrap_or(i32::MAX);
226                let mult = (factor as f64).powi(diff);
227                let ns = self.base_ns as f64 * mult;
228                Some(Duration::from_nanos(ns as u64))
229            }
230            None => {
231                let diff = (msg_count - self.starts_at + 1) as u64;
232                let ns = self.base_ns.saturating_mul(diff);
233                Some(Duration::from_nanos(ns))
234            }
235        }
236    }
237}
238
239impl Default for BackPressure {
240    fn default() -> Self {
241        Self::exponential(5, Duration::from_nanos(25), 1.3)
242    }
243}
244
245#[cfg(test)]
246mod test {
247    use super::*;
248    use std::time::Duration;
249
250    #[test]
251    fn backpressure_linear_start_at() {
252        let bp = BackPressure::linear(10, Duration::from_secs(1));
253
254        assert_eq!(bp.get_timeout(9), None);
255        assert_eq!(bp.get_timeout(10), Some(Duration::from_secs(1)));
256    }
257
258    #[test]
259    fn backpressure_exponential_start_at() {
260        let bp = BackPressure::exponential(10, Duration::from_secs(1), 1.1);
261
262        assert_eq!(bp.get_timeout(9), None);
263        assert_eq!(
264            bp.get_timeout(10),
265            Some(Duration::from_nanos(1_000_000_000))
266        );
267    }
268
269    #[test]
270    fn backpressure_linear() {
271        let bp = BackPressure::linear(0, Duration::from_secs(1));
272
273        assert_eq!(bp.get_timeout(0), Some(Duration::from_secs(1)));
274        assert_eq!(bp.get_timeout(1), Some(Duration::from_secs(2)));
275        assert_eq!(bp.get_timeout(10), Some(Duration::from_secs(11)));
276    }
277
278    #[test]
279    fn backpressure_exponential() {
280        let bp = BackPressure::exponential(0, Duration::from_secs(1), 1.1);
281
282        assert_eq!(bp.get_timeout(0), Some(Duration::from_nanos(1_000_000_000)));
283        assert_eq!(bp.get_timeout(1), Some(Duration::from_nanos(1_100_000_023)));
284        assert_eq!(bp.get_timeout(2), Some(Duration::from_nanos(1_210_000_052)));
285        assert_eq!(bp.get_timeout(3), Some(Duration::from_nanos(1_331_000_086)));
286    }
287
288    #[test]
289    fn disabled() {
290        let bp = BackPressure::disabled();
291
292        assert_eq!(bp.get_timeout(0), None);
293        assert_eq!(bp.get_timeout(9), None);
294        assert_eq!(bp.get_timeout(usize::MAX - 1), None);
295        assert_eq!(bp.get_timeout(usize::MAX), Some(Duration::from_nanos(0)));
296    }
297}