1use std::sync::Arc;
9
10use atomr_core::actor::{ActorPath, Address};
11
12use crate::endpoint_manager::EndpointManager;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[non_exhaustive]
17pub enum RemoteRouterStrategy {
18 RoundRobin,
20 ConsistentHash,
22 Adaptive,
25}
26
27pub type AdaptivePicker = Arc<dyn Fn(&[String]) -> Option<String> + Send + Sync + 'static>;
30
31#[derive(Clone)]
32pub struct RemoteRouterConfig {
33 pub nodes: Arc<Vec<Address>>,
34 pub strategy: RemoteRouterStrategy,
35 pub endpoint_manager: EndpointManager,
36 counter: Arc<std::sync::atomic::AtomicUsize>,
37 adaptive: Option<AdaptivePicker>,
38}
39
40impl RemoteRouterConfig {
41 pub fn new(
42 nodes: Vec<Address>,
43 strategy: RemoteRouterStrategy,
44 endpoint_manager: EndpointManager,
45 ) -> Self {
46 Self {
47 nodes: Arc::new(nodes),
48 strategy,
49 endpoint_manager,
50 counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
51 adaptive: None,
52 }
53 }
54
55 pub fn with_adaptive_picker(mut self, picker: AdaptivePicker) -> Self {
65 self.adaptive = Some(picker);
66 self
67 }
68
69 pub fn pick(&self, key: Option<&str>) -> Option<&Address> {
72 if self.nodes.is_empty() {
73 return None;
74 }
75 let i = match (self.strategy, key) {
76 (RemoteRouterStrategy::RoundRobin, _) => {
77 self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.nodes.len()
78 }
79 (RemoteRouterStrategy::ConsistentHash, Some(k)) => fxhash(k) as usize % self.nodes.len(),
80 (RemoteRouterStrategy::ConsistentHash, None) => 0,
81 (RemoteRouterStrategy::Adaptive, _) => {
82 if let Some(p) = &self.adaptive {
85 let cands: Vec<String> = self.nodes.iter().map(|a| a.to_string()).collect();
86 if let Some(chosen) = p(&cands) {
87 if let Some(idx) = self.nodes.iter().position(|a| a.to_string() == chosen) {
88 return Some(&self.nodes[idx]);
89 }
90 }
91 }
92 self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.nodes.len()
93 }
94 };
95 Some(&self.nodes[i])
96 }
97
98 pub fn routee_path(&self, base: &str, key: Option<&str>) -> Option<ActorPath> {
100 let addr = self.pick(key)?.clone();
101 let mut path = ActorPath::root(addr).child("user");
102 for seg in base.split('/').filter(|s| !s.is_empty()) {
103 path = path.child(seg);
104 }
105 Some(path)
106 }
107}
108
109fn fxhash(s: &str) -> u64 {
111 let mut h: u64 = 0xcbf29ce484222325;
112 for b in s.bytes() {
113 h ^= b as u64;
114 h = h.wrapping_mul(0x100000001b3);
115 }
116 h
117}