Skip to main content

coerce_rt/
worker.rs

1use crate::actor::context::{ActorContext, ActorHandlerContext};
2use crate::actor::message::{Handler, Message};
3use crate::actor::{Actor, ActorRef, ActorRefErr};
4use std::collections::VecDeque;
5
6pub type WorkerRef<W> = ActorRef<Worker<W>>;
7
8pub struct Worker<W: Actor>
9where
10    W: 'static + Sync + Send,
11{
12    state: W,
13    workers: VecDeque<ActorRef<W>>,
14}
15
16impl<W: Actor> Worker<W>
17where
18    W: 'static + Clone + Sync + Send,
19{
20    pub async fn new(
21        state: W,
22        count: usize,
23        context: &mut ActorContext,
24    ) -> Result<WorkerRef<W>, ActorRefErr> {
25        let mut workers = VecDeque::with_capacity(count);
26
27        for i in 0..count {
28            workers.push_back(context.new_anon_actor(state.clone()).await?);
29        }
30
31        Ok(context.new_anon_actor(Worker { state, workers }).await?)
32    }
33}
34
35impl<W: Actor> Actor for Worker<W> where W: 'static + Clone + Sync + Send {}
36
37#[async_trait]
38pub trait WorkerRefExt<W: Actor> {
39    async fn dispatch<M: Message>(&mut self, message: M) -> Result<M::Result, ActorRefErr>
40    where
41        M: 'static + Send + Sync,
42        M::Result: 'static + Send + Sync,
43        W: Handler<M>;
44}
45
46pub struct WorkerMessage<M: Message>
47where
48    M: 'static + Sync + Send,
49{
50    message: M,
51    res_tx: tokio::sync::oneshot::Sender<M::Result>,
52}
53
54impl<M: Message> WorkerMessage<M>
55where
56    M: 'static + Sync + Send,
57    M::Result: 'static + Sync + Send,
58{
59    pub fn new(message: M, res_tx: tokio::sync::oneshot::Sender<M::Result>) -> WorkerMessage<M> {
60        WorkerMessage { message, res_tx }
61    }
62}
63
64impl<M: Message> Message for WorkerMessage<M>
65where
66    M: 'static + Sync + Send,
67    M::Result: 'static + Sync + Send,
68{
69    type Result = ();
70}
71
72#[async_trait]
73impl<W: Actor> WorkerRefExt<W> for ActorRef<Worker<W>>
74where
75    W: 'static + Clone + Sync + Send,
76{
77    async fn dispatch<M: Message>(&mut self, message: M) -> Result<M::Result, ActorRefErr>
78    where
79        M: 'static + Send + Sync,
80        M::Result: 'static + Send + Sync,
81        W: Handler<M>,
82    {
83        let (res_tx, res) = tokio::sync::oneshot::channel();
84        let message = WorkerMessage::new(message, res_tx);
85
86        if let Err(e) = self.send(message).await {
87            Err(e)
88        } else {
89            match res.await {
90                Ok(res) => Ok(res),
91                Err(_e) => {
92                    error!(target: "WorkerRef", "error receiving result");
93                    Err(ActorRefErr::ActorUnavailable)
94                }
95            }
96        }
97    }
98}
99
100#[async_trait]
101impl<W: Actor, M: Message> Handler<WorkerMessage<M>> for Worker<W>
102where
103    W: 'static + Sync + Send,
104    M: 'static + Sync + Send,
105    W: Handler<M>,
106    M::Result: 'static + Sync + Send,
107{
108    async fn handle(&mut self, message: WorkerMessage<M>, ctx: &mut ActorHandlerContext) {
109        if let Some(worker) = self.workers.pop_front() {
110            let mut worker_ref = worker.clone();
111
112            self.workers.push_back(worker);
113
114            // main worker acts as a scheduler, don't block it by handling the task, dispatch it off
115            tokio::spawn(async move {
116                match worker_ref.send(message.message).await {
117                    Ok(res) => {
118                        if message.res_tx.send(res).is_ok() {
119                            trace!(target: "Worker", "sent result successfully");
120                        } else {
121                            error!(target: "Worker", "failed to send result, receiver dropped?");
122                        }
123                    }
124                    Err(e) => error!(target: "Worker", "error sending msg"),
125                }
126            });
127        }
128    }
129}