use std::collections::HashMap;
use std::collections::VecDeque;
use std::marker::PhantomData;
use crate::factory::worker::WorkerProperties;
use crate::factory::Job;
use crate::factory::JobKey;
use crate::factory::WorkerId;
use crate::ActorProcessingErr;
use crate::Message;
use crate::State;
pub trait CustomHashFunction<TKey>: Send + Sync
where
TKey: Send + Sync + 'static,
{
fn hash(&self, key: &TKey, worker_count: usize) -> usize;
}
#[derive(Debug)]
pub enum RouteResult<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
Handled,
Backlog(Job<TKey, TMsg>),
RateLimited(Job<TKey, TMsg>),
}
pub trait Router<TKey, TMsg>: State
where
TKey: JobKey,
TMsg: Message,
{
fn route_message(
&mut self,
job: Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr>;
fn choose_target_worker(
&mut self,
job: &Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Option<WorkerId>;
fn is_factory_queueing(&self) -> bool;
fn on_worker_availability_change(&mut self, _wid: WorkerId, _available: bool) {}
}
macro_rules! impl_routing_mode {
($routing_mode: ident, $doc:expr) => {
#[doc = $doc]
#[derive(Debug)]
pub struct $routing_mode<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
_key: PhantomData<fn() -> TKey>,
_msg: PhantomData<fn() -> TMsg>,
}
impl<TKey, TMsg> Default for $routing_mode<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn default() -> Self {
Self {
_key: PhantomData,
_msg: PhantomData,
}
}
}
};
}
impl_routing_mode! {KeyPersistentRouting, "Factory will select worker by hashing the job's key.
Workers will have jobs placed into their incoming message queue's"}
impl<TKey, TMsg> Router<TKey, TMsg> for KeyPersistentRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn route_message(
&mut self,
job: Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
if let Some(worker) = self
.choose_target_worker(&job, pool_size, worker_hint, worker_pool)
.and_then(|wid| worker_pool.get_mut(&wid))
{
worker.enqueue_job(job)?;
}
Ok(RouteResult::Handled)
}
fn choose_target_worker(
&mut self,
job: &Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
_worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Option<WorkerId> {
let key =
worker_hint.unwrap_or_else(|| crate::factory::hash::hash_with_max(&job.key, pool_size));
Some(key)
}
fn is_factory_queueing(&self) -> bool {
false
}
}
#[derive(Debug)]
pub struct QueuerRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
_key: PhantomData<fn() -> TKey>,
_msg: PhantomData<fn() -> TMsg>,
available_workers: VecDeque<WorkerId>,
worker_in_queue: Vec<bool>,
}
impl<TKey, TMsg> Default for QueuerRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn default() -> Self {
Self {
_key: PhantomData,
_msg: PhantomData,
available_workers: VecDeque::new(),
worker_in_queue: Vec::new(),
}
}
}
impl<TKey, TMsg> Router<TKey, TMsg> for QueuerRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn route_message(
&mut self,
job: Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
if let Some(worker) = self
.choose_target_worker(&job, pool_size, worker_hint, worker_pool)
.and_then(|wid| worker_pool.get_mut(&wid))
{
worker.enqueue_job(job)?;
Ok(RouteResult::Handled)
} else {
Ok(RouteResult::Backlog(job))
}
}
fn choose_target_worker(
&mut self,
_job: &Job<TKey, TMsg>,
_pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Option<WorkerId> {
if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
if worker.is_available() {
return worker_hint;
}
}
while let Some(wid) = self.available_workers.pop_front() {
if wid < self.worker_in_queue.len() {
self.worker_in_queue[wid] = false;
}
if let Some(worker) = worker_pool.get(&wid) {
if worker.is_available() {
return Some(wid);
}
}
}
None
}
fn is_factory_queueing(&self) -> bool {
true
}
fn on_worker_availability_change(&mut self, wid: WorkerId, available: bool) {
if wid >= self.worker_in_queue.len() {
self.worker_in_queue.resize(wid + 1, false);
}
if available {
if !self.worker_in_queue[wid] {
self.worker_in_queue[wid] = true;
self.available_workers.push_back(wid);
}
} else {
self.worker_in_queue[wid] = false;
}
}
}
#[derive(Debug)]
pub struct StickyQueuerRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
_key: PhantomData<fn() -> TKey>,
_msg: PhantomData<fn() -> TMsg>,
available_workers: VecDeque<WorkerId>,
worker_in_queue: Vec<bool>,
}
impl<TKey, TMsg> Default for StickyQueuerRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn default() -> Self {
Self {
_key: PhantomData,
_msg: PhantomData,
available_workers: VecDeque::new(),
worker_in_queue: Vec::new(),
}
}
}
impl<TKey, TMsg> Router<TKey, TMsg> for StickyQueuerRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn route_message(
&mut self,
job: Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
if let Some(worker) = self
.choose_target_worker(&job, pool_size, worker_hint, worker_pool)
.and_then(|wid| worker_pool.get_mut(&wid))
{
worker.enqueue_job(job)?;
Ok(RouteResult::Handled)
} else {
Ok(RouteResult::Backlog(job))
}
}
fn choose_target_worker(
&mut self,
job: &Job<TKey, TMsg>,
_pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Option<WorkerId> {
if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
if worker.is_processing_key(&job.key) {
return worker_hint;
}
}
let maybe_worker = worker_pool
.iter()
.find(|(_, worker)| worker.is_processing_key(&job.key))
.map(|(a, _)| *a);
if maybe_worker.is_some() {
return maybe_worker;
}
if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
if worker.is_available() {
return worker_hint;
}
}
while let Some(wid) = self.available_workers.pop_front() {
if wid < self.worker_in_queue.len() {
self.worker_in_queue[wid] = false;
}
if let Some(worker) = worker_pool.get(&wid) {
if worker.is_available() {
return Some(wid);
}
}
}
None
}
fn is_factory_queueing(&self) -> bool {
true
}
fn on_worker_availability_change(&mut self, wid: WorkerId, available: bool) {
if wid >= self.worker_in_queue.len() {
self.worker_in_queue.resize(wid + 1, false);
}
if available {
if !self.worker_in_queue[wid] {
self.worker_in_queue[wid] = true;
self.available_workers.push_back(wid);
}
} else {
self.worker_in_queue[wid] = false;
}
}
}
#[derive(Debug)]
pub struct RoundRobinRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
_key: PhantomData<fn() -> TKey>,
_msg: PhantomData<fn() -> TMsg>,
last_worker: WorkerId,
}
impl<TKey, TMsg> Default for RoundRobinRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn default() -> Self {
Self {
_key: PhantomData,
_msg: PhantomData,
last_worker: 0,
}
}
}
impl<TKey, TMsg> Router<TKey, TMsg> for RoundRobinRouting<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn route_message(
&mut self,
job: Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
if let Some(worker) = self
.choose_target_worker(&job, pool_size, worker_hint, worker_pool)
.and_then(|wid| worker_pool.get_mut(&wid))
{
worker.enqueue_job(job)?;
}
Ok(RouteResult::Handled)
}
fn choose_target_worker(
&mut self,
_job: &Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Option<WorkerId> {
if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
if worker.is_available() {
return worker_hint;
}
}
let mut key = self.last_worker + 1;
if key >= pool_size {
key = 0;
}
self.last_worker = key;
Some(key)
}
fn is_factory_queueing(&self) -> bool {
false
}
}
#[derive(Debug)]
pub struct CustomRouting<TKey, TMsg, THasher>
where
TKey: JobKey,
TMsg: Message,
THasher: CustomHashFunction<TKey>,
{
_key: PhantomData<fn() -> TKey>,
_msg: PhantomData<fn() -> TMsg>,
hasher: THasher,
}
impl<TKey, TMsg, THasher> CustomRouting<TKey, TMsg, THasher>
where
TKey: JobKey,
TMsg: Message,
THasher: CustomHashFunction<TKey>,
{
pub fn new(hasher: THasher) -> Self {
Self {
_key: PhantomData,
_msg: PhantomData,
hasher,
}
}
}
impl<TKey, TMsg, THasher> Router<TKey, TMsg> for CustomRouting<TKey, TMsg, THasher>
where
TKey: JobKey,
TMsg: Message,
THasher: CustomHashFunction<TKey> + 'static,
{
fn route_message(
&mut self,
job: Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
if let Some(worker) = self
.choose_target_worker(&job, pool_size, worker_hint, worker_pool)
.and_then(|wid| worker_pool.get_mut(&wid))
{
worker.enqueue_job(job)?;
}
Ok(RouteResult::Handled)
}
fn choose_target_worker(
&mut self,
job: &Job<TKey, TMsg>,
pool_size: usize,
_worker_hint: Option<WorkerId>,
_worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Option<WorkerId> {
let key = self.hasher.hash(&job.key, pool_size);
Some(key)
}
fn is_factory_queueing(&self) -> bool {
false
}
}