Skip to main content

coerce_rt/actor/
message.rs

1use crate::actor::context::ActorHandlerContext;
2use crate::actor::Actor;
3
4use std::marker::PhantomData;
5
6pub trait Message {
7    type Result;
8}
9
10pub(crate) type MessageHandler<A> = Box<dyn ActorMessageHandler<A> + Sync + Send>;
11
12#[async_trait]
13pub trait Handler<Msg: Message + Send + Sync>
14where
15    Msg::Result: Send + Sync,
16{
17    async fn handle(&mut self, message: Msg, ctx: &mut ActorHandlerContext) -> Msg::Result;
18}
19
20#[derive(Debug)]
21pub enum MessageResult<T> {
22    Ok(T),
23    Error,
24}
25
26pub struct ActorMessage<A: Actor, M: Message>
27where
28    A: Handler<M> + Send + Sync,
29    M: Send + Sync,
30    M::Result: 'static + Send + Sync,
31{
32    msg: Option<M>,
33    sender: Option<tokio::sync::oneshot::Sender<M::Result>>,
34    _a: PhantomData<A>,
35}
36
37#[async_trait]
38pub trait ActorMessageHandler<A>: Sync + Send
39where
40    A: Actor + Sync + Send,
41{
42    async fn handle(&mut self, actor: &mut A, ctx: &mut ActorHandlerContext) -> ();
43}
44
45#[async_trait]
46impl<A: 'static + Actor, M: 'static + Message> ActorMessageHandler<A> for ActorMessage<A, M>
47where
48    A: Handler<M> + Send + Sync,
49    M: Send + Sync,
50    M::Result: Send + Sync,
51{
52    async fn handle(&mut self, actor: &mut A, ctx: &mut ActorHandlerContext) -> () {
53        self.handle_msg(actor, ctx).await;
54    }
55}
56
57impl<A: 'static + Actor, M: 'static + Message> ActorMessage<A, M>
58where
59    A: Handler<M> + Send + Sync,
60    M: Send + Sync,
61    M::Result: Send + Sync,
62{
63    pub fn new(
64        msg: M,
65        sender: Option<tokio::sync::oneshot::Sender<M::Result>>,
66    ) -> ActorMessage<A, M> {
67        ActorMessage {
68            msg: Some(msg),
69            sender,
70            _a: PhantomData,
71        }
72    }
73
74    pub async fn handle_msg(&mut self, actor: &mut A, ctx: &mut ActorHandlerContext) {
75        let msg = self.msg.take();
76        let result = actor.handle(msg.unwrap(), ctx).await;
77
78        if let &None = &self.sender {
79            trace!(target: "ActorMessage", "no result consumer, message handling complete");
80            return;
81        }
82
83        let sender = self.sender.take();
84        match sender.unwrap().send(result) {
85            Ok(_) => trace!(target: "ActorMessage", "sent result successfully"),
86            Err(_e) => warn!(target: "ActorMessage", "failed to send result"),
87        }
88    }
89}
90
91pub struct Exec<F, A, R>
92where
93    F: (FnMut(&mut A) -> R),
94{
95    func: F,
96    _a: PhantomData<A>,
97}
98
99impl<F, A, R> Exec<F, A, R>
100where
101    F: (FnMut(&mut A) -> R),
102{
103    pub fn new(f: F) -> Exec<F, A, R> {
104        Exec {
105            func: f,
106            _a: PhantomData,
107        }
108    }
109}
110
111impl<F, A, R> Message for Exec<F, A, R>
112where
113    for<'r> F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
114    R: 'static + Send + Sync,
115{
116    type Result = R;
117}
118
119#[async_trait]
120impl<F, A, R> Handler<Exec<F, A, R>> for A
121where
122    A: 'static + Actor + Sync + Send,
123    F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
124    R: 'static + Send + Sync,
125{
126    async fn handle(&mut self, message: Exec<F, A, R>, _ctx: &mut ActorHandlerContext) -> R {
127        let message = message;
128        let mut func = message.func;
129
130        func(self)
131    }
132}