atomr_core/routing/
smallest_mailbox.rs1use std::sync::atomic::{AtomicUsize, Ordering};
10
11use crate::actor::ActorRef;
12
13pub struct SmallestMailboxRouter<M: Send + Clone + 'static> {
14 routees: Vec<(ActorRef<M>, AtomicUsize)>,
15 cursor: AtomicUsize,
16}
17
18impl<M: Send + Clone + 'static> SmallestMailboxRouter<M> {
19 pub fn new(routees: Vec<ActorRef<M>>) -> Self {
20 Self {
21 routees: routees.into_iter().map(|r| (r, AtomicUsize::new(0))).collect(),
22 cursor: AtomicUsize::new(0),
23 }
24 }
25
26 pub fn route(&self, msg: M) {
27 if self.routees.is_empty() {
28 return;
29 }
30 let (best_idx, _) = self
31 .routees
32 .iter()
33 .enumerate()
34 .min_by_key(|(_, (_, c))| c.load(Ordering::Relaxed))
35 .map(|(i, (_, c))| (i, c.load(Ordering::Relaxed)))
36 .unwrap_or((self.cursor.fetch_add(1, Ordering::Relaxed) % self.routees.len(), 0));
37 self.routees[best_idx].0.tell(msg);
38 self.routees[best_idx].1.fetch_add(1, Ordering::Relaxed);
39 }
40
41 pub fn on_processed(&self, idx: usize) {
43 if let Some((_, c)) = self.routees.get(idx) {
44 c.fetch_sub(1, Ordering::Relaxed);
45 }
46 }
47}