Skip to main content

atomr_core/routing/
tail_chopping.rs

1//! Tail-chopping router.
2//!
3//! semantics:
4//! a request is sent to a randomly-picked routee; if no reply
5//! arrives within `interval`, a second routee is tried; and so on
6//! until either a reply arrives or `within` is exceeded. Useful for
7//! latency-sensitive workloads where a stuck or slow node would
8//! otherwise tail-latency the whole call.
9//!
10//! The actual reply path is the caller's responsibility (
11//! relies on the `Ask` machinery to pick a winner); this router
12//! exposes the per-attempt fan-out + interval policy as a typed
13//! schedule.
14
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::time::Duration;
17
18use crate::actor::ActorRef;
19
20pub struct TailChoppingRouter<M: Send + Clone + 'static> {
21    routees: Vec<ActorRef<M>>,
22    cursor: AtomicUsize,
23    /// How long to wait for the previous attempt's reply before
24    /// firing the next. A `Duration::ZERO` interval makes this
25    /// equivalent to scatter-gather.
26    pub interval: Duration,
27    /// Hard ceiling on how long the caller will wait overall.
28    pub within: Duration,
29}
30
31impl<M: Send + Clone + 'static> TailChoppingRouter<M> {
32    pub fn new(routees: Vec<ActorRef<M>>, interval: Duration, within: Duration) -> Self {
33        Self { routees, cursor: AtomicUsize::new(0), interval, within }
34    }
35
36    /// Number of routees currently registered.
37    pub fn routee_count(&self) -> usize {
38        self.routees.len()
39    }
40
41    /// Pick the next attempt's recipient; returns `None` when the
42    /// router is empty.
43    pub fn next_attempt(&self) -> Option<&ActorRef<M>> {
44        if self.routees.is_empty() {
45            return None;
46        }
47        let idx = self.cursor.fetch_add(1, Ordering::Relaxed) % self.routees.len();
48        Some(&self.routees[idx])
49    }
50
51    /// Maximum number of distinct attempts within `within`.
52    pub fn max_attempts(&self) -> usize {
53        if self.interval.is_zero() {
54            self.routees.len()
55        } else {
56            // ceil(within / interval) capped at routee count.
57            let nanos = self.within.as_nanos();
58            let step = self.interval.as_nanos().max(1);
59            nanos.div_ceil(step) as usize
60        }
61        .min(self.routees.len().max(1))
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68    use crate::actor::Inbox;
69
70    #[test]
71    fn next_attempt_round_robins() {
72        let r1 = Inbox::<u32>::new("a").actor_ref().clone();
73        let r2 = Inbox::<u32>::new("b").actor_ref().clone();
74        let r3 = Inbox::<u32>::new("c").actor_ref().clone();
75        let router = TailChoppingRouter::new(
76            vec![r1.clone(), r2.clone(), r3.clone()],
77            Duration::from_millis(10),
78            Duration::from_millis(50),
79        );
80        let p1 = router.next_attempt().unwrap().path().clone();
81        let p2 = router.next_attempt().unwrap().path().clone();
82        let p3 = router.next_attempt().unwrap().path().clone();
83        let p4 = router.next_attempt().unwrap().path().clone();
84        assert_ne!(p1, p2);
85        assert_ne!(p2, p3);
86        assert_eq!(p1, p4); // wrap-around
87    }
88
89    #[test]
90    fn empty_router_has_no_next_attempt() {
91        let router =
92            TailChoppingRouter::<u32>::new(Vec::new(), Duration::from_millis(10), Duration::from_millis(50));
93        assert!(router.next_attempt().is_none());
94        assert_eq!(router.routee_count(), 0);
95    }
96
97    #[test]
98    fn max_attempts_respects_interval_and_within() {
99        let r = Inbox::<u32>::new("x").actor_ref().clone();
100        let routees = vec![r.clone(); 10];
101        // 100ms / 20ms = 5 attempts, capped at routee count (10).
102        let router = TailChoppingRouter::new(routees, Duration::from_millis(20), Duration::from_millis(100));
103        assert_eq!(router.max_attempts(), 5);
104    }
105
106    #[test]
107    fn zero_interval_is_scatter_gather() {
108        let r = Inbox::<u32>::new("x").actor_ref().clone();
109        let routees = vec![r; 4];
110        let router = TailChoppingRouter::new(routees, Duration::ZERO, Duration::from_millis(50));
111        assert_eq!(router.max_attempts(), 4);
112    }
113}