1use std::future::Future;
2use std::ops::Deref;
3use std::ops::DerefMut;
4use std::sync::Arc;
5
6use downcast_rs::DowncastSync;
7use take_once::TakeOnce;
8use tokio::sync::oneshot::error::RecvError;
9
10use crate::actor::Actor;
11use crate::actor::ActorContext;
12use crate::actor::SyncTrait;
13use crate::error::ActorError;
14use crate::error::ActorSendError;
15use crate::error::FromError;
16use crate::link::ActorLike;
17
18pub trait ActorReply: DowncastSync {
19 fn is_async(&self) -> bool;
20}
21
22downcast_rs::impl_downcast!(sync ActorReply);
23
24impl<T> ActorReply for anyhow::Result<T>
25where
26 T: Send + Sync + 'static,
27{
28 fn is_async(&self) -> bool {
29 match self {
30 Ok(_) => false,
31 Err(e) => match e.downcast_ref::<ActorError>() {
32 Some(err) => matches!(err, ActorError::AsyncReply),
33 None => false,
34 },
35 }
36 }
37}
38
39pub trait Handler<M>: ActorLike
40where
41 M: SyncTrait,
42{
43 type Reply: Send
44 + Sync
45 + 'static
46 + FromError<ActorSendError<Self>>
47 + FromError<RecvError>
48 + FromError<ActorError>
49 + ActorReply;
50
51 fn handle<'a>(
52 &'a mut self,
53 ctx: Call<'a, Self, Self::Reply>,
54 ev: M,
55 ) -> impl Future<Output = Self::Reply> + use<'a, M, Self> + Send;
56}
57
58pub struct Exec<'a, A: ActorLike> {
59 pub(crate) ctx: &'a mut ActorContext<A>,
60}
61
62pub struct Call<'a, A: ActorLike, R> {
63 pub(crate) reply: Arc<TakeOnce<tokio::sync::oneshot::Sender<R>>>,
64 pub ctx: Exec<'a, A>,
65}
66
67impl<'a, A, R> Deref for Call<'a, A, R>
68where
69 A: Actor,
70{
71 type Target = Exec<'a, A>;
72
73 fn deref(&self) -> &Self::Target {
74 &self.ctx
75 }
76}
77
78impl<'a, A, R> DerefMut for Call<'a, A, R>
79where
80 A: Actor,
81{
82 fn deref_mut(&mut self) -> &mut Self::Target {
83 &mut self.ctx
84 }
85}
86
87impl<A, R> Call<'_, A, R>
88where
89 A: Actor,
90 R: Send + Sync + 'static,
91 R: FromError<ActorError> + ActorReply,
92{
93 pub fn take_reply(&mut self) -> tokio::sync::oneshot::Sender<R> {
94 self.reply.take().unwrap()
95 }
96
97 pub fn reply_async<F>(&mut self, future: F) -> R
98 where
99 F: Future<Output = R> + Send + 'static,
100 {
101 let reply = self.reply.take();
102 let Some(reply) = reply else {
103 return R::from_err(ActorError::ReplyTaken);
104 };
105
106 self.ctx.spawn(async move {
107 let result = future.await;
108 let _ = reply.send(result);
109 });
110
111 return R::from_err(ActorError::AsyncReply);
112 }
113}
114
115impl<'a, A> Exec<'a, A>
116where
117 A: Actor,
118{
119 pub fn new(ctx: &'a mut ActorContext<A>) -> Self {
120 Self { ctx }
121 }
122}
123
124impl<'a, A> Deref for Exec<'a, A>
125where
126 A: Actor,
127{
128 type Target = ActorContext<A>;
129
130 fn deref(&self) -> &Self::Target {
131 self.ctx
132 }
133}
134
135impl<'a, A> DerefMut for Exec<'a, A>
136where
137 A: Actor,
138{
139 fn deref_mut(&mut self) -> &mut Self::Target {
140 self.ctx
141 }
142}