use std::{future::Future, pin::Pin};
use crate::{Actor, ActorContext, ActorRef, AskError, AsyncAsk, DeadActorResult, Handler, Message};
pub enum RouterStrategyBuilder {
RoundRobin,
}
pub struct RouterBuilder {
max_retry: usize,
max_actors: usize,
strategy: RouterStrategyBuilder,
}
impl RouterBuilder {
pub fn new(max_actors: usize) -> Self {
Self {
max_retry: 0,
max_actors,
strategy: RouterStrategyBuilder::RoundRobin,
}
}
pub fn max_retry(mut self, max: usize) -> Self {
self.max_retry = max;
self
}
pub fn strategy(mut self, strategy: RouterStrategyBuilder) -> Self {
self.strategy = strategy;
self
}
}
enum RouterStrategy {
RoundRobin { index: usize },
}
struct ProxyFail {
actor_index: usize,
}
pub struct Router<A: Actor + Default> {
max_retry: usize,
max_actors: usize,
actors: Vec<ActorRef<A>>,
strategy: RouterStrategy,
}
impl<A: Actor + Default> Router<A> {
pub fn new(builder: RouterBuilder) -> Self {
Self {
max_retry: builder.max_retry,
max_actors: builder.max_actors,
actors: vec![],
strategy: RouterStrategy::RoundRobin { index: 0 },
}
}
}
impl<A: Actor + Default> Actor for Router<A> {
fn on_start(&mut self, ctx: &mut crate::Ctx<Self>)
where
Self: Actor,
{
for _ in 0..self.max_actors {
let actor = A::default();
self.actors.push(ctx.spawn(actor));
}
}
}
impl<A: Actor + Default> Handler<DeadActorResult<A>> for Router<A> {
fn handle(&mut self, message: DeadActorResult<A>, _context: &mut crate::Ctx<Self>) {
match message {
Ok(_actor) => {}
Err(_error) => todo!(),
}
}
}
impl<M, A> AsyncAsk<M> for Router<A>
where
M: Message,
A: Actor + Default + AsyncAsk<M>,
{
type Output = A::Output;
type Future<'a> = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'a>>;
fn handle<'a>(&'a mut self, message: M, _: &mut crate::Ctx<Self>) -> Self::Future<'a> {
match &mut self.strategy {
RouterStrategy::RoundRobin { index } => {
let max_retry = self.max_retry;
let actor_index = *index;
let address = self.actors[actor_index].clone();
*index = (*index + 1) % self.max_actors;
Box::pin(async move {
match address.async_ask(message).await {
Ok(result) => result,
Err(AskError::Closed(msg)) if max_retry > 0 => {
let mut retry_message = msg;
for _ in 0..max_retry {
match address.async_ask(retry_message).await {
Ok(result) => return result,
Err(AskError::Closed(msg)) => retry_message = msg,
Err(AskError::Dropped) => {
break;
}
}
}
todo!()
}
Err(AskError::Dropped) | Err(AskError::Closed(_)) => {
todo!("Implement unexpected exit")
}
}
})
}
}
}
}
impl<A> Handler<ProxyFail> for Router<A>
where
A: Actor + Default,
{
fn handle(&mut self, message: ProxyFail, context: &mut crate::Ctx<Self>) {
tracing::trace!(
actor = A::name(),
index = message.actor_index,
"router actor failed"
);
context.stop();
}
}
#[cfg(test)]
mod tests {
use std::{future::Future, pin::Pin};
use crate::{Actor, AsyncAsk};
use super::{Router, RouterBuilder};
static mut VAL: usize = 0;
#[derive(Debug)]
struct Id(());
struct ChoosenActor {
number: usize,
}
impl Actor for ChoosenActor {}
impl Default for ChoosenActor {
fn default() -> Self {
unsafe { VAL += 1 };
Self {
number: unsafe { VAL },
}
}
}
impl AsyncAsk<Id> for ChoosenActor {
type Output = ChoosenActor;
type Future<'a> = Pin<Box<dyn Future<Output = Self::Output> + Send + Sync + 'a>>;
fn handle<'a>(&'a mut self, _: Id, _: &mut crate::Ctx<Self>) -> Self::Future<'a> {
let number = self.number;
Box::pin(async move { ChoosenActor { number } })
}
}
#[tokio::test]
async fn round_robin_router() {
let builder = RouterBuilder::new(5);
let router = Router::<ChoosenActor>::new(builder);
let address = router.start();
for _ in 0..5 {
for i in 0..5 {
let actor = address.async_ask(Id(())).await.unwrap();
assert_eq!(actor.number, i + 1);
}
}
let _ = address.await;
}
}