use crate::{
actors::{ActorPath, DynActorRef},
messaging::NetMessage,
utils::IterExtras,
KompactLogger,
};
#[allow(unused_imports)]
use slog::{crit, debug, error, info, trace, warn};
use std::{
fmt,
hash::{BuildHasher, BuildHasherDefault, Hash},
ops::Deref,
sync::atomic::{AtomicUsize, Ordering},
};
pub static DEFAULT_BROADCAST_POLICY: BroadcastRouting = BroadcastRouting;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct DefaultHasherBuilder;
impl BuildHasher for DefaultHasherBuilder {
type Hasher = std::collections::hash_map::DefaultHasher;
fn build_hasher(&self) -> Self::Hasher {
std::collections::hash_map::DefaultHasher::default()
}
}
pub static DEFAULT_SELECT_POLICY: SenderHashBucketRouting<DefaultHasherBuilder> =
FieldHashBucketRouting {
hasher_builder: DefaultHasherBuilder,
field_extractor: NetMessage::sender,
};
#[derive(Debug)]
pub struct StorePolicy(Box<dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync>);
impl StorePolicy {
pub fn new(policy: Box<dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync>) -> Self {
StorePolicy(policy)
}
}
impl Clone for StorePolicy {
fn clone(&self) -> Self {
StorePolicy(self.0.boxed_clone())
}
}
impl Deref for StorePolicy {
type Target = dyn RoutingPolicy<DynActorRef, NetMessage>;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl<R> From<R> for StorePolicy
where
R: RoutingPolicy<DynActorRef, NetMessage> + Send + Sync + 'static,
{
fn from(policy: R) -> Self {
let boxed = Box::new(policy);
Self::new(boxed)
}
}
#[derive(Debug)]
pub struct RoutingGroup<'a> {
members: Vec<&'a DynActorRef>,
policy: &'a dyn RoutingPolicy<DynActorRef, NetMessage>,
}
impl<'a> RoutingGroup<'a> {
pub fn new(
members: Vec<&'a DynActorRef>,
policy: &'a dyn RoutingPolicy<DynActorRef, NetMessage>,
) -> Self {
RoutingGroup { members, policy }
}
pub fn route(&self, msg: NetMessage, logger: &KompactLogger) {
let members: &[&DynActorRef] = &self.members;
self.policy.route(members, msg, logger);
}
}
pub trait RoutingPolicy<Ref, M>: fmt::Debug {
fn route(&self, members: &[&Ref], msg: M, logger: &KompactLogger);
fn boxed_clone(&self) -> Box<dyn RoutingPolicy<Ref, M> + Send + Sync>;
fn broadcast(&self) -> Option<&(dyn RoutingPolicy<Ref, M> + Send + Sync)>;
fn select(&self) -> Option<&(dyn RoutingPolicy<Ref, M> + Send + Sync)>;
}
#[derive(Debug)]
pub struct RoundRobinRouting {
offset: AtomicUsize,
}
impl RoundRobinRouting {
pub fn new() -> Self {
RoundRobinRouting {
offset: AtomicUsize::new(0),
}
}
pub fn get_and_increment_index(&self, length: usize) -> usize {
self.offset.fetch_add(1, Ordering::Relaxed) % length
}
}
impl Default for RoundRobinRouting {
fn default() -> Self {
Self::new()
}
}
impl RoutingPolicy<DynActorRef, NetMessage> for RoundRobinRouting {
fn route(&self, members: &[&DynActorRef], msg: NetMessage, logger: &KompactLogger) {
let index = self.get_and_increment_index(members.len());
trace!(logger, "Routing msg to member at index={}", index);
members[index].tell(msg);
}
fn boxed_clone(&self) -> Box<dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync> {
let offset = self.offset.load(Ordering::Relaxed);
let routing = RoundRobinRouting {
offset: AtomicUsize::new(offset),
};
Box::new(routing)
}
fn broadcast(&self) -> Option<&(dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync)> {
None
}
fn select(&self) -> Option<&(dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync)> {
Some(self)
}
}
pub struct FieldHashBucketRouting<M, T: Hash, H: BuildHasher + Clone> {
hasher_builder: H,
field_extractor: fn(&M) -> &T,
}
impl<M, T: Hash, H: BuildHasher + Clone> Clone for FieldHashBucketRouting<M, T, H> {
fn clone(&self) -> Self {
FieldHashBucketRouting {
hasher_builder: self.hasher_builder.clone(),
field_extractor: self.field_extractor,
}
}
}
impl<M, T: Hash, H: BuildHasher + Clone + Copy> Copy for FieldHashBucketRouting<M, T, H> {}
impl<M, T: Hash, H: BuildHasher + Clone> fmt::Debug for FieldHashBucketRouting<M, T, H> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"FieldHashBucketRouting {{ hasher_builder: {}, field_extractor: {} }}",
std::any::type_name::<H>(),
std::any::type_name::<fn(&M) -> &T>()
)
}
}
impl<M, T: Hash, H: BuildHasher + Clone> FieldHashBucketRouting<M, T, H> {
pub fn new(hasher_builder: H, field_extractor: fn(&M) -> &T) -> Self {
FieldHashBucketRouting {
hasher_builder,
field_extractor,
}
}
pub fn hash_field(&self, field: &T) -> u64 {
self.hasher_builder.hash_one(field)
}
pub fn hash_message(&self, msg: &M) -> u64 {
let field_ref = (self.field_extractor)(msg);
self.hash_field(field_ref)
}
pub fn get_bucket(&self, msg: &M, length: usize) -> usize {
let hash = self.hash_message(msg);
let index = hash % (length as u64);
index as usize
}
}
pub type SenderHashBucketRouting<H> = FieldHashBucketRouting<NetMessage, ActorPath, H>;
pub type SenderDefaultHashBucketRouting =
SenderHashBucketRouting<BuildHasherDefault<std::collections::hash_map::DefaultHasher>>;
impl<H: BuildHasher + Default + Clone> Default for SenderHashBucketRouting<H> {
fn default() -> Self {
let hasher_builder = H::default();
Self::new(hasher_builder, NetMessage::sender)
}
}
impl<H: BuildHasher + Clone + Send + Sync + 'static> RoutingPolicy<DynActorRef, NetMessage>
for SenderHashBucketRouting<H>
{
fn route(&self, members: &[&DynActorRef], msg: NetMessage, logger: &KompactLogger) {
let index = self.get_bucket(&msg, members.len());
trace!(
logger,
"Routing msg with sender={} to member at index={}",
msg.sender,
index
);
members[index].tell(msg);
}
fn boxed_clone(&self) -> Box<dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync> {
let cloned: SenderHashBucketRouting<H> = self.clone();
Box::new(cloned)
}
fn broadcast(&self) -> Option<&(dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync)> {
None
}
fn select(&self) -> Option<&(dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync)> {
Some(self)
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct BroadcastRouting;
impl Default for BroadcastRouting {
fn default() -> Self {
BroadcastRouting
}
}
impl RoutingPolicy<DynActorRef, NetMessage> for BroadcastRouting {
fn route(&self, members: &[&DynActorRef], msg: NetMessage, logger: &KompactLogger) {
trace!(logger, "Trying to broadcast message: {:?}", msg);
let res = members
.iter()
.for_each_try_with(msg, |member, my_msg| member.tell(my_msg));
match res {
Ok(_) => trace!(logger, "The message was broadcast."),
Err(e) => error!(logger, "Could not broadcast a message! Error was: {}", e),
}
}
fn boxed_clone(&self) -> Box<dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync> {
Box::new(*self)
}
fn broadcast(&self) -> Option<&(dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync)> {
Some(self)
}
fn select(&self) -> Option<&(dyn RoutingPolicy<DynActorRef, NetMessage> + Send + Sync)> {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{prelude::*, routing::test_helpers::*};
const GROUP_SIZE: usize = 3;
const NUM_MESSAGES: usize = 30;
const SLEEP_TIME: Duration = Duration::from_millis(1000);
#[test]
fn router_debug() {
{
let router = RoundRobinRouting::default();
println!("Router: {:?}", router);
let group = RoutingGroup::new(Vec::new(), &router);
println!("Group: {:?}", group);
}
{
let router = SenderDefaultHashBucketRouting::default();
println!("Router: {:?}", router);
let group = RoutingGroup::new(Vec::new(), &router);
println!("Group: {:?}", group);
}
{
let router = BroadcastRouting;
println!("Router: {:?}", router);
let group = RoutingGroup::new(Vec::new(), &router);
println!("Group: {:?}", group);
}
}
#[test]
fn round_robin_routing() {
let system = KompactConfig::default().build().expect("system");
let receivers: Vec<Arc<Component<ReceiverComponent>>> = (0..GROUP_SIZE)
.map(|_i| system.create(ReceiverComponent::default))
.collect();
let receiver_refs: Vec<DynActorRef> =
receivers.iter().map(|c| c.actor_ref().dyn_ref()).collect();
receivers.iter().for_each(|c| system.start(c));
let router = RoundRobinRouting::default();
let group = RoutingGroup::new(receiver_refs.iter().collect(), &router);
let group_ref: ActorPath = NamedPath::with_system(
system.system_path(),
vec!["routing_group".to_string(), "?".to_string()],
)
.into();
let source_ref = system.deadletter_path();
let msg = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(1, total_count(&receivers));
let counts = individual_count(&receivers);
assert_eq!(1, counts[0]);
assert_eq!(0, counts[1]);
assert_eq!(0, counts[2]);
let msg2 = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg2, system.logger());
let msg3 = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg3, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(3, total_count(&receivers));
let counts = individual_count(&receivers);
assert_eq!(1, counts[0]);
assert_eq!(1, counts[1]);
assert_eq!(1, counts[2]);
let msg4 = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg4, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(4, total_count(&receivers));
let counts = individual_count(&receivers);
assert_eq!(2, counts[0]);
assert_eq!(1, counts[1]);
assert_eq!(1, counts[2]);
for _i in 0..NUM_MESSAGES {
let msg = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg, system.logger());
}
std::thread::sleep(SLEEP_TIME);
assert_eq!(NUM_MESSAGES + 4, total_count(&receivers));
system.shutdown().expect("shutdown");
}
#[test]
fn sender_hash_routing() {
let system = KompactConfig::default().build().expect("system");
let receivers: Vec<Arc<Component<ReceiverComponent>>> = (0..GROUP_SIZE)
.map(|_i| system.create(ReceiverComponent::default))
.collect();
let receiver_refs: Vec<DynActorRef> =
receivers.iter().map(|c| c.actor_ref().dyn_ref()).collect();
receivers.iter().for_each(|c| system.start(c));
let router = SenderDefaultHashBucketRouting::default();
let group = RoutingGroup::new(receiver_refs.iter().collect(), &router);
let group_ref: ActorPath = NamedPath::with_system(
system.system_path(),
vec!["routing_group".to_string(), "?".to_string()],
)
.into();
let source_ref = system.deadletter_path();
let msg = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(1, total_count(&receivers));
let msg2 = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg2, system.logger());
let msg3 = NetMessage::with_box(
CountMe::SER_ID,
source_ref,
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg3, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(3, total_count(&receivers));
let counts = individual_count(&receivers);
assert!(counts.iter().any(|&v| v == 3));
let other_source_ref: ActorPath = system
.system_path()
.into_named_with_string("othersource")
.expect("actor path")
.into();
let msg4 = NetMessage::with_box(
CountMe::SER_ID,
other_source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg4, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(4, total_count(&receivers));
let counts = individual_count(&receivers);
assert!(counts.iter().any(|&v| v == 3));
assert!(counts.iter().any(|&v| v == 1));
for _i in 0..NUM_MESSAGES {
let msg = NetMessage::with_box(
CountMe::SER_ID,
other_source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg, system.logger());
}
std::thread::sleep(SLEEP_TIME);
assert_eq!(NUM_MESSAGES + 4, total_count(&receivers));
let counts = individual_count(&receivers);
assert!(counts.iter().any(|&v| v == 3));
assert!(counts.iter().any(|&v| v == NUM_MESSAGES + 1));
system.shutdown().expect("shutdown");
}
#[test]
fn broadcast_routing() {
let system = KompactConfig::default().build().expect("system");
let receivers: Vec<Arc<Component<ReceiverComponent>>> = (0..GROUP_SIZE)
.map(|_i| system.create(ReceiverComponent::default))
.collect();
let receiver_refs: Vec<DynActorRef> =
receivers.iter().map(|c| c.actor_ref().dyn_ref()).collect();
receivers.iter().for_each(|c| system.start(c));
let router = BroadcastRouting;
let group = RoutingGroup::new(receiver_refs.iter().collect(), &router);
let group_ref: ActorPath = NamedPath::with_system(
system.system_path(),
vec!["routing_group".to_string(), "*".to_string()],
)
.into();
let source_ref = system.deadletter_path();
let msg = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(3, total_count(&receivers));
let counts = individual_count(&receivers);
assert_eq!(1, counts[0]);
assert_eq!(1, counts[1]);
assert_eq!(1, counts[2]);
let msg2 = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg2, system.logger());
let msg3 = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg3, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(9, total_count(&receivers));
let counts = individual_count(&receivers);
assert_eq!(3, counts[0]);
assert_eq!(3, counts[1]);
assert_eq!(3, counts[2]);
let msg4 = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg4, system.logger());
std::thread::sleep(SLEEP_TIME);
assert_eq!(12, total_count(&receivers));
let counts = individual_count(&receivers);
assert_eq!(4, counts[0]);
assert_eq!(4, counts[1]);
assert_eq!(4, counts[2]);
for _i in 0..NUM_MESSAGES {
let msg = NetMessage::with_box(
CountMe::SER_ID,
source_ref.clone(),
group_ref.clone(),
Box::new(CountMe),
);
group.route(msg, system.logger());
}
std::thread::sleep(SLEEP_TIME);
assert_eq!((NUM_MESSAGES + 4) * GROUP_SIZE, total_count(&receivers));
let counts = individual_count(&receivers);
assert_eq!(NUM_MESSAGES + 4, counts[0]);
assert_eq!(NUM_MESSAGES + 4, counts[1]);
assert_eq!(NUM_MESSAGES + 4, counts[2]);
system.shutdown().expect("shutdown");
}
}