1use crate::actor::context::{ActorContext, ActorHandlerContext};
2use crate::actor::message::{Handler, Message};
3use crate::actor::{Actor, ActorRef, ActorRefErr};
4use std::collections::VecDeque;
5
6pub type WorkerRef<W> = ActorRef<Worker<W>>;
7
8pub struct Worker<W: Actor>
9where
10 W: 'static + Sync + Send,
11{
12 state: W,
13 workers: VecDeque<ActorRef<W>>,
14}
15
16impl<W: Actor> Worker<W>
17where
18 W: 'static + Clone + Sync + Send,
19{
20 pub async fn new(
21 state: W,
22 count: usize,
23 context: &mut ActorContext,
24 ) -> Result<WorkerRef<W>, ActorRefErr> {
25 let mut workers = VecDeque::with_capacity(count);
26
27 for i in 0..count {
28 workers.push_back(context.new_anon_actor(state.clone()).await?);
29 }
30
31 Ok(context.new_anon_actor(Worker { state, workers }).await?)
32 }
33}
34
35impl<W: Actor> Actor for Worker<W> where W: 'static + Clone + Sync + Send {}
36
37#[async_trait]
38pub trait WorkerRefExt<W: Actor> {
39 async fn dispatch<M: Message>(&mut self, message: M) -> Result<M::Result, ActorRefErr>
40 where
41 M: 'static + Send + Sync,
42 M::Result: 'static + Send + Sync,
43 W: Handler<M>;
44}
45
46pub struct WorkerMessage<M: Message>
47where
48 M: 'static + Sync + Send,
49{
50 message: M,
51 res_tx: tokio::sync::oneshot::Sender<M::Result>,
52}
53
54impl<M: Message> WorkerMessage<M>
55where
56 M: 'static + Sync + Send,
57 M::Result: 'static + Sync + Send,
58{
59 pub fn new(message: M, res_tx: tokio::sync::oneshot::Sender<M::Result>) -> WorkerMessage<M> {
60 WorkerMessage { message, res_tx }
61 }
62}
63
64impl<M: Message> Message for WorkerMessage<M>
65where
66 M: 'static + Sync + Send,
67 M::Result: 'static + Sync + Send,
68{
69 type Result = ();
70}
71
72#[async_trait]
73impl<W: Actor> WorkerRefExt<W> for ActorRef<Worker<W>>
74where
75 W: 'static + Clone + Sync + Send,
76{
77 async fn dispatch<M: Message>(&mut self, message: M) -> Result<M::Result, ActorRefErr>
78 where
79 M: 'static + Send + Sync,
80 M::Result: 'static + Send + Sync,
81 W: Handler<M>,
82 {
83 let (res_tx, res) = tokio::sync::oneshot::channel();
84 let message = WorkerMessage::new(message, res_tx);
85
86 if let Err(e) = self.send(message).await {
87 Err(e)
88 } else {
89 match res.await {
90 Ok(res) => Ok(res),
91 Err(_e) => {
92 error!(target: "WorkerRef", "error receiving result");
93 Err(ActorRefErr::ActorUnavailable)
94 }
95 }
96 }
97 }
98}
99
100#[async_trait]
101impl<W: Actor, M: Message> Handler<WorkerMessage<M>> for Worker<W>
102where
103 W: 'static + Sync + Send,
104 M: 'static + Sync + Send,
105 W: Handler<M>,
106 M::Result: 'static + Sync + Send,
107{
108 async fn handle(&mut self, message: WorkerMessage<M>, ctx: &mut ActorHandlerContext) {
109 if let Some(worker) = self.workers.pop_front() {
110 let mut worker_ref = worker.clone();
111
112 self.workers.push_back(worker);
113
114 tokio::spawn(async move {
116 match worker_ref.send(message.message).await {
117 Ok(res) => {
118 if message.res_tx.send(res).is_ok() {
119 trace!(target: "Worker", "sent result successfully");
120 } else {
121 error!(target: "Worker", "failed to send result, receiver dropped?");
122 }
123 }
124 Err(e) => error!(target: "Worker", "error sending msg"),
125 }
126 });
127 }
128 }
129}