coerce_rt/actor/
message.rs1use crate::actor::context::ActorHandlerContext;
2use crate::actor::Actor;
3
4use std::marker::PhantomData;
5
6pub trait Message {
7 type Result;
8}
9
10pub(crate) type MessageHandler<A> = Box<dyn ActorMessageHandler<A> + Sync + Send>;
11
12#[async_trait]
13pub trait Handler<Msg: Message + Send + Sync>
14where
15 Msg::Result: Send + Sync,
16{
17 async fn handle(&mut self, message: Msg, ctx: &mut ActorHandlerContext) -> Msg::Result;
18}
19
20#[derive(Debug)]
21pub enum MessageResult<T> {
22 Ok(T),
23 Error,
24}
25
26pub struct ActorMessage<A: Actor, M: Message>
27where
28 A: Handler<M> + Send + Sync,
29 M: Send + Sync,
30 M::Result: 'static + Send + Sync,
31{
32 msg: Option<M>,
33 sender: Option<tokio::sync::oneshot::Sender<M::Result>>,
34 _a: PhantomData<A>,
35}
36
37#[async_trait]
38pub trait ActorMessageHandler<A>: Sync + Send
39where
40 A: Actor + Sync + Send,
41{
42 async fn handle(&mut self, actor: &mut A, ctx: &mut ActorHandlerContext) -> ();
43}
44
45#[async_trait]
46impl<A: 'static + Actor, M: 'static + Message> ActorMessageHandler<A> for ActorMessage<A, M>
47where
48 A: Handler<M> + Send + Sync,
49 M: Send + Sync,
50 M::Result: Send + Sync,
51{
52 async fn handle(&mut self, actor: &mut A, ctx: &mut ActorHandlerContext) -> () {
53 self.handle_msg(actor, ctx).await;
54 }
55}
56
57impl<A: 'static + Actor, M: 'static + Message> ActorMessage<A, M>
58where
59 A: Handler<M> + Send + Sync,
60 M: Send + Sync,
61 M::Result: Send + Sync,
62{
63 pub fn new(
64 msg: M,
65 sender: Option<tokio::sync::oneshot::Sender<M::Result>>,
66 ) -> ActorMessage<A, M> {
67 ActorMessage {
68 msg: Some(msg),
69 sender,
70 _a: PhantomData,
71 }
72 }
73
74 pub async fn handle_msg(&mut self, actor: &mut A, ctx: &mut ActorHandlerContext) {
75 let msg = self.msg.take();
76 let result = actor.handle(msg.unwrap(), ctx).await;
77
78 if let &None = &self.sender {
79 trace!(target: "ActorMessage", "no result consumer, message handling complete");
80 return;
81 }
82
83 let sender = self.sender.take();
84 match sender.unwrap().send(result) {
85 Ok(_) => trace!(target: "ActorMessage", "sent result successfully"),
86 Err(_e) => warn!(target: "ActorMessage", "failed to send result"),
87 }
88 }
89}
90
91pub struct Exec<F, A, R>
92where
93 F: (FnMut(&mut A) -> R),
94{
95 func: F,
96 _a: PhantomData<A>,
97}
98
99impl<F, A, R> Exec<F, A, R>
100where
101 F: (FnMut(&mut A) -> R),
102{
103 pub fn new(f: F) -> Exec<F, A, R> {
104 Exec {
105 func: f,
106 _a: PhantomData,
107 }
108 }
109}
110
111impl<F, A, R> Message for Exec<F, A, R>
112where
113 for<'r> F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
114 R: 'static + Send + Sync,
115{
116 type Result = R;
117}
118
119#[async_trait]
120impl<F, A, R> Handler<Exec<F, A, R>> for A
121where
122 A: 'static + Actor + Sync + Send,
123 F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
124 R: 'static + Send + Sync,
125{
126 async fn handle(&mut self, message: Exec<F, A, R>, _ctx: &mut ActorHandlerContext) -> R {
127 let message = message;
128 let mut func = message.func;
129
130 func(self)
131 }
132}