actor12/
handler.rs

1use std::future::Future;
2use std::ops::Deref;
3use std::ops::DerefMut;
4use std::sync::Arc;
5
6use downcast_rs::DowncastSync;
7use take_once::TakeOnce;
8use tokio::sync::oneshot::error::RecvError;
9
10use crate::actor::Actor;
11use crate::actor::ActorContext;
12use crate::actor::SyncTrait;
13use crate::error::ActorError;
14use crate::error::ActorSendError;
15use crate::error::FromError;
16use crate::link::ActorLike;
17
18pub trait ActorReply: DowncastSync {
19    fn is_async(&self) -> bool;
20}
21
22downcast_rs::impl_downcast!(sync ActorReply);
23
24impl<T> ActorReply for anyhow::Result<T>
25where
26    T: Send + Sync + 'static,
27{
28    fn is_async(&self) -> bool {
29        match self {
30            Ok(_) => false,
31            Err(e) => match e.downcast_ref::<ActorError>() {
32                Some(err) => matches!(err, ActorError::AsyncReply),
33                None => false,
34            },
35        }
36    }
37}
38
39pub trait Handler<M>: ActorLike
40where
41    M: SyncTrait,
42{
43    type Reply: Send
44        + Sync
45        + 'static
46        + FromError<ActorSendError<Self>>
47        + FromError<RecvError>
48        + FromError<ActorError>
49        + ActorReply;
50
51    fn handle<'a>(
52        &'a mut self,
53        ctx: Call<'a, Self, Self::Reply>,
54        ev: M,
55    ) -> impl Future<Output = Self::Reply> + use<'a, M, Self> + Send;
56}
57
58pub struct Exec<'a, A: ActorLike> {
59    pub(crate) ctx: &'a mut ActorContext<A>,
60}
61
62pub struct Call<'a, A: ActorLike, R> {
63    pub(crate) reply: Arc<TakeOnce<tokio::sync::oneshot::Sender<R>>>,
64    pub ctx: Exec<'a, A>,
65}
66
67impl<'a, A, R> Deref for Call<'a, A, R>
68where
69    A: Actor,
70{
71    type Target = Exec<'a, A>;
72
73    fn deref(&self) -> &Self::Target {
74        &self.ctx
75    }
76}
77
78impl<'a, A, R> DerefMut for Call<'a, A, R>
79where
80    A: Actor,
81{
82    fn deref_mut(&mut self) -> &mut Self::Target {
83        &mut self.ctx
84    }
85}
86
87impl<A, R> Call<'_, A, R>
88where
89    A: Actor,
90    R: Send + Sync + 'static,
91    R: FromError<ActorError> + ActorReply,
92{
93    pub fn take_reply(&mut self) -> tokio::sync::oneshot::Sender<R> {
94        self.reply.take().unwrap()
95    }
96
97    pub fn reply_async<F>(&mut self, future: F) -> R
98    where
99        F: Future<Output = R> + Send + 'static,
100    {
101        let reply = self.reply.take();
102        let Some(reply) = reply else {
103            return R::from_err(ActorError::ReplyTaken);
104        };
105
106        self.ctx.spawn(async move {
107            let result = future.await;
108            let _ = reply.send(result);
109        });
110
111        return R::from_err(ActorError::AsyncReply);
112    }
113}
114
115impl<'a, A> Exec<'a, A>
116where
117    A: Actor,
118{
119    pub fn new(ctx: &'a mut ActorContext<A>) -> Self {
120        Self { ctx }
121    }
122}
123
124impl<'a, A> Deref for Exec<'a, A>
125where
126    A: Actor,
127{
128    type Target = ActorContext<A>;
129
130    fn deref(&self) -> &Self::Target {
131        self.ctx
132    }
133}
134
135impl<'a, A> DerefMut for Exec<'a, A>
136where
137    A: Actor,
138{
139    fn deref_mut(&mut self) -> &mut Self::Target {
140        self.ctx
141    }
142}