atomr_core/routing/
tail_chopping.rs1use std::sync::atomic::{AtomicUsize, Ordering};
16use std::time::Duration;
17
18use crate::actor::ActorRef;
19
20pub struct TailChoppingRouter<M: Send + Clone + 'static> {
21 routees: Vec<ActorRef<M>>,
22 cursor: AtomicUsize,
23 pub interval: Duration,
27 pub within: Duration,
29}
30
31impl<M: Send + Clone + 'static> TailChoppingRouter<M> {
32 pub fn new(routees: Vec<ActorRef<M>>, interval: Duration, within: Duration) -> Self {
33 Self { routees, cursor: AtomicUsize::new(0), interval, within }
34 }
35
36 pub fn routee_count(&self) -> usize {
38 self.routees.len()
39 }
40
41 pub fn next_attempt(&self) -> Option<&ActorRef<M>> {
44 if self.routees.is_empty() {
45 return None;
46 }
47 let idx = self.cursor.fetch_add(1, Ordering::Relaxed) % self.routees.len();
48 Some(&self.routees[idx])
49 }
50
51 pub fn max_attempts(&self) -> usize {
53 if self.interval.is_zero() {
54 self.routees.len()
55 } else {
56 let nanos = self.within.as_nanos();
58 let step = self.interval.as_nanos().max(1);
59 nanos.div_ceil(step) as usize
60 }
61 .min(self.routees.len().max(1))
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use super::*;
68 use crate::actor::Inbox;
69
70 #[test]
71 fn next_attempt_round_robins() {
72 let r1 = Inbox::<u32>::new("a").actor_ref().clone();
73 let r2 = Inbox::<u32>::new("b").actor_ref().clone();
74 let r3 = Inbox::<u32>::new("c").actor_ref().clone();
75 let router = TailChoppingRouter::new(
76 vec![r1.clone(), r2.clone(), r3.clone()],
77 Duration::from_millis(10),
78 Duration::from_millis(50),
79 );
80 let p1 = router.next_attempt().unwrap().path().clone();
81 let p2 = router.next_attempt().unwrap().path().clone();
82 let p3 = router.next_attempt().unwrap().path().clone();
83 let p4 = router.next_attempt().unwrap().path().clone();
84 assert_ne!(p1, p2);
85 assert_ne!(p2, p3);
86 assert_eq!(p1, p4); }
88
89 #[test]
90 fn empty_router_has_no_next_attempt() {
91 let router =
92 TailChoppingRouter::<u32>::new(Vec::new(), Duration::from_millis(10), Duration::from_millis(50));
93 assert!(router.next_attempt().is_none());
94 assert_eq!(router.routee_count(), 0);
95 }
96
97 #[test]
98 fn max_attempts_respects_interval_and_within() {
99 let r = Inbox::<u32>::new("x").actor_ref().clone();
100 let routees = vec![r.clone(); 10];
101 let router = TailChoppingRouter::new(routees, Duration::from_millis(20), Duration::from_millis(100));
103 assert_eq!(router.max_attempts(), 5);
104 }
105
106 #[test]
107 fn zero_interval_is_scatter_gather() {
108 let r = Inbox::<u32>::new("x").actor_ref().clone();
109 let routees = vec![r; 4];
110 let router = TailChoppingRouter::new(routees, Duration::ZERO, Duration::from_millis(50));
111 assert_eq!(router.max_attempts(), 4);
112 }
113}