actix-actor-pool 0.3.2

Simple implementation of Pool for Actix actors
Documentation
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};

use actix::{Addr, Context, Handler, MailboxError, Message, Supervised};

pub struct Pool<A: actix::Actor> {
    pub(crate) workers: Vec<Addr<A>>,
    pub(crate) current: Arc<AtomicUsize>,
}

impl<A> Pool<A>
where
    A: actix::Actor<Context = Context<A>> + Supervised,
{
    pub fn new<F: 'static + Clone + Fn() -> A>(size: usize, init_fn: F) -> Self {
        Self {
            workers: (0..size)
                .map(|_| {
                    let init_fn = init_fn.clone();
                    actix::Supervisor::start(move |_| init_fn())
                })
                .collect(),
            current: Arc::new(AtomicUsize::new(0)),
        }
    }

    fn next_worker(&self) -> Addr<A> {
        let current = self.current.fetch_add(1, Ordering::SeqCst);
        let index = current % self.workers.len();
        self.workers[index].clone()
    }

    pub fn do_send<M>(&self, msg: M)
    where
        A: Handler<M>,
        M: Message + Send + 'static,
        M::Result: Send,
    {
        let actor = self.next_worker();
        actor.do_send(msg);
    }

    pub async fn send<M>(&self, msg: M) -> Result<M::Result, MailboxError>
    where
        A: Handler<M>,
        M: Message + Send + 'static,
        M::Result: Send,
    {
        let actor = self.next_worker();
        actor.send(msg).await
    }
}

#[cfg(test)]
mod tests {
    use actix::{Actor, ActorContext, Message, Supervised};

    use crate::Pool;

    struct TestActor {
        pub name: String,
    }

    impl Actor for TestActor {
        type Context = actix::Context<Self>;
    }
    impl Default for TestActor {
        fn default() -> Self {
            TestActor {
                name: uuid::Uuid::new_v4().to_string(),
            }
        }
    }
    #[derive(Debug, Message)]
    #[rtype(result = "String")]
    struct TestMessage(usize);

    #[derive(Debug, Message)]
    #[rtype(result = "()")]
    struct FailMessage;

    impl actix::Handler<TestMessage> for TestActor {
        type Result = String;
        fn handle(&mut self, msg: TestMessage, _ctx: &mut Self::Context) -> Self::Result {
            format!("Handle test message in test actor: {}-{}", self.name, msg.0)
        }
    }

    impl actix::Handler<FailMessage> for TestActor {
        type Result = ();
        fn handle(&mut self, _msg: FailMessage, ctx: &mut Self::Context) -> Self::Result {
            ctx.stop();
        }
    }

    impl Supervised for TestActor {
        fn restarting(&mut self, _ctx: &mut <Self as Actor>::Context) {
            println!("Restarting actor: {}", self.name);
        }
    }

    #[test]
    fn test_pool() {
        let sys = actix::System::new();

        sys.block_on(async {
            let pool = Pool::new(15, || TestActor::default());

            for i in 0..250 {
                if i % 2 != 0 {
                    let s = pool.send(TestMessage(i)).await;
                    println!("{s:?}");
                    assert!(s.is_ok())
                } else {
                    let _ = pool.send(FailMessage).await;
                }
            }

            actix::System::current().stop();
        });
    }
}