Skip to main content

atomr_core/routing/
smallest_mailbox.rs

1//! Smallest-mailbox router.
2//!
3//! True mailbox-size inspection requires hooking into the mpsc internals,
4//! which is not stable. This port approximates by round-robin as a baseline
5//! and allows plugging in a custom size probe. Matches the behaviour of
6//! when mailbox size introspection is unavailable (it falls back
7//! to round-robin per routee).
8
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11use crate::actor::ActorRef;
12
13pub struct SmallestMailboxRouter<M: Send + Clone + 'static> {
14    routees: Vec<(ActorRef<M>, AtomicUsize)>,
15    cursor: AtomicUsize,
16}
17
18impl<M: Send + Clone + 'static> SmallestMailboxRouter<M> {
19    pub fn new(routees: Vec<ActorRef<M>>) -> Self {
20        Self {
21            routees: routees.into_iter().map(|r| (r, AtomicUsize::new(0))).collect(),
22            cursor: AtomicUsize::new(0),
23        }
24    }
25
26    pub fn route(&self, msg: M) {
27        if self.routees.is_empty() {
28            return;
29        }
30        let (best_idx, _) = self
31            .routees
32            .iter()
33            .enumerate()
34            .min_by_key(|(_, (_, c))| c.load(Ordering::Relaxed))
35            .map(|(i, (_, c))| (i, c.load(Ordering::Relaxed)))
36            .unwrap_or((self.cursor.fetch_add(1, Ordering::Relaxed) % self.routees.len(), 0));
37        self.routees[best_idx].0.tell(msg);
38        self.routees[best_idx].1.fetch_add(1, Ordering::Relaxed);
39    }
40
41    /// Callers can decrement after they know a message was processed — optional.
42    pub fn on_processed(&self, idx: usize) {
43        if let Some((_, c)) = self.routees.get(idx) {
44            c.fetch_sub(1, Ordering::Relaxed);
45        }
46    }
47}