Skip to main content

atomr_remote/
router.rs

1//! Remote router config.
2//!
3//! Wraps a local routing strategy so that the routees can be deployed
4//! across a list of remote `Address`es. The local pool decides *which*
5//! routee gets the next message; the `RemoteRouterConfig` decides *where*
6//! that routee lives.
7
8use std::sync::Arc;
9
10use atomr_core::actor::{ActorPath, Address};
11
12use crate::endpoint_manager::EndpointManager;
13
14/// Strategy for distributing routees across the configured `nodes`.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[non_exhaustive]
17pub enum RemoteRouterStrategy {
18    /// Round-robin across nodes in declaration order.
19    RoundRobin,
20    /// Hash an arbitrary key onto a node.
21    ConsistentHash,
22    /// Delegate to the configured adaptive picker (e.g. lowest-CPU
23    /// from cluster-metrics' `AdaptiveLoadBalancer`).
24    Adaptive,
25}
26
27/// Pluggable picker for [`RemoteRouterStrategy::Adaptive`]. Receives
28/// the candidate addresses (as strings) and returns the chosen one.
29pub type AdaptivePicker = Arc<dyn Fn(&[String]) -> Option<String> + Send + Sync + 'static>;
30
31#[derive(Clone)]
32pub struct RemoteRouterConfig {
33    pub nodes: Arc<Vec<Address>>,
34    pub strategy: RemoteRouterStrategy,
35    pub endpoint_manager: EndpointManager,
36    counter: Arc<std::sync::atomic::AtomicUsize>,
37    adaptive: Option<AdaptivePicker>,
38}
39
40impl RemoteRouterConfig {
41    pub fn new(
42        nodes: Vec<Address>,
43        strategy: RemoteRouterStrategy,
44        endpoint_manager: EndpointManager,
45    ) -> Self {
46        Self {
47            nodes: Arc::new(nodes),
48            strategy,
49            endpoint_manager,
50            counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
51            adaptive: None,
52        }
53    }
54
55    /// Install an adaptive picker for [`RemoteRouterStrategy::Adaptive`].
56    /// Cluster-metrics callers wire `AdaptiveLoadBalancer` here:
57    ///
58    /// ```ignore
59    /// router.with_adaptive_picker(Arc::new(move |cands| {
60    ///     let refs: Vec<&str> = cands.iter().map(String::as_str).collect();
61    ///     lb.pick(&refs).map(str::to_string)
62    /// }));
63    /// ```
64    pub fn with_adaptive_picker(mut self, picker: AdaptivePicker) -> Self {
65        self.adaptive = Some(picker);
66        self
67    }
68
69    /// Pick the next remote node for the message identified by `key`
70    /// (use `None` for round-robin / counter-driven selection).
71    pub fn pick(&self, key: Option<&str>) -> Option<&Address> {
72        if self.nodes.is_empty() {
73            return None;
74        }
75        let i = match (self.strategy, key) {
76            (RemoteRouterStrategy::RoundRobin, _) => {
77                self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.nodes.len()
78            }
79            (RemoteRouterStrategy::ConsistentHash, Some(k)) => fxhash(k) as usize % self.nodes.len(),
80            (RemoteRouterStrategy::ConsistentHash, None) => 0,
81            (RemoteRouterStrategy::Adaptive, _) => {
82                // Delegate to the picker; fall back to round-robin if
83                // no picker is installed or the picker returns None.
84                if let Some(p) = &self.adaptive {
85                    let cands: Vec<String> = self.nodes.iter().map(|a| a.to_string()).collect();
86                    if let Some(chosen) = p(&cands) {
87                        if let Some(idx) = self.nodes.iter().position(|a| a.to_string() == chosen) {
88                            return Some(&self.nodes[idx]);
89                        }
90                    }
91                }
92                self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.nodes.len()
93            }
94        };
95        Some(&self.nodes[i])
96    }
97
98    /// Build a fully-qualified routee path on the picked remote node.
99    pub fn routee_path(&self, base: &str, key: Option<&str>) -> Option<ActorPath> {
100        let addr = self.pick(key)?.clone();
101        let mut path = ActorPath::root(addr).child("user");
102        for seg in base.split('/').filter(|s| !s.is_empty()) {
103            path = path.child(seg);
104        }
105        Some(path)
106    }
107}
108
109/// Cheap non-cryptographic hash used for `ConsistentHash`.
110fn fxhash(s: &str) -> u64 {
111    let mut h: u64 = 0xcbf29ce484222325;
112    for b in s.bytes() {
113        h ^= b as u64;
114        h = h.wrapping_mul(0x100000001b3);
115    }
116    h
117}