1use std::marker::PhantomData;
2use std::ops::ControlFlow;
3use std::sync::Arc;
4
5use catty::{Receiver, Sender};
6use futures_core::future::BoxFuture;
7use futures_util::FutureExt;
8
9use crate::chan::{HasPriority, MessageToAll, MessageToOne, Priority};
10use crate::context::Context;
11use crate::instrumentation::{Instrumentation, Span};
12use crate::{Actor, Handler, Mailbox};
13
14pub trait MessageEnvelope: HasPriority + Send {
21 type Actor;
23
24 fn set_priority(&mut self, new_priority: u32);
25
26 fn start_span(&mut self);
28
29 fn handle(
34 self: Box<Self>,
35 act: &mut Self::Actor,
36 mailbox: Mailbox<Self::Actor>,
37 ) -> (BoxFuture<ControlFlow<(), ()>>, Span);
38}
39
40pub struct ReturningEnvelope<A, M, R> {
42 message: M,
43 result_sender: Sender<R>,
44 priority: u32,
45 phantom: PhantomData<for<'a> fn(&'a A)>,
46 instrumentation: Instrumentation,
47}
48
49impl<A, M, R: Send + 'static> ReturningEnvelope<A, M, R> {
50 pub fn new(message: M, priority: u32) -> (Self, Receiver<R>) {
51 let (tx, rx) = catty::oneshot();
52 let envelope = ReturningEnvelope {
53 message,
54 result_sender: tx,
55 priority,
56 phantom: PhantomData,
57 instrumentation: Instrumentation::empty(),
58 };
59
60 (envelope, rx)
61 }
62}
63
64impl<A, M, R> HasPriority for ReturningEnvelope<A, M, R> {
65 fn priority(&self) -> Priority {
66 Priority::Valued(self.priority)
67 }
68}
69
70impl<A> HasPriority for MessageToOne<A> {
71 fn priority(&self) -> Priority {
72 self.as_ref().priority()
73 }
74}
75
76impl<A, M, R> MessageEnvelope for ReturningEnvelope<A, M, R>
77where
78 A: Handler<M, Return = R>,
79 M: Send + 'static,
80 R: Send + 'static,
81{
82 type Actor = A;
83
84 fn set_priority(&mut self, new_priority: u32) {
85 self.priority = new_priority;
86 }
87
88 fn start_span(&mut self) {
89 assert!(self.instrumentation.is_parent_none());
90 self.instrumentation = Instrumentation::started::<A, M>();
91 }
92
93 fn handle(
94 self: Box<Self>,
95 act: &mut Self::Actor,
96 mailbox: Mailbox<Self::Actor>,
97 ) -> (BoxFuture<ControlFlow<(), ()>>, Span) {
98 let Self {
99 message,
100 result_sender,
101 instrumentation,
102 ..
103 } = *self;
104
105 let fut = async move {
106 let mut ctx = Context {
107 running: true,
108 mailbox,
109 };
110 let r = act.handle(message, &mut ctx).await;
111
112 if ctx.running {
113 (r, ControlFlow::Continue(()))
114 } else {
115 (r, ControlFlow::Break(()))
116 }
117 };
118
119 let (fut, span) = instrumentation.apply::<_>(fut);
120
121 let fut = Box::pin(fut.map(move |(r, flow)| {
122 let _ = result_sender.send(r);
124 flow
125 }));
126
127 (fut, span)
128 }
129}
130
131pub trait BroadcastEnvelope: HasPriority + Send + Sync {
133 type Actor;
134
135 fn set_priority(&mut self, new_priority: u32);
136
137 fn start_span(&mut self);
140
141 fn handle(
142 self: Arc<Self>,
143 act: &mut Self::Actor,
144 mailbox: Mailbox<Self::Actor>,
145 ) -> (BoxFuture<ControlFlow<()>>, Span);
146}
147
148impl<A> HasPriority for MessageToAll<A> {
149 fn priority(&self) -> Priority {
150 self.as_ref().priority()
151 }
152}
153
154pub struct BroadcastEnvelopeConcrete<A, M> {
155 message: M,
156 priority: u32,
157 phantom: PhantomData<for<'a> fn(&'a A)>,
158 instrumentation: Instrumentation,
159}
160
161impl<A: Actor, M> BroadcastEnvelopeConcrete<A, M> {
162 pub fn new(message: M, priority: u32) -> Self {
163 BroadcastEnvelopeConcrete {
164 message,
165 priority,
166 phantom: PhantomData,
167 instrumentation: Instrumentation::empty(),
168 }
169 }
170}
171
172impl<A, M> BroadcastEnvelope for BroadcastEnvelopeConcrete<A, M>
173where
174 A: Handler<M, Return = ()>,
175 M: Clone + Send + Sync + 'static,
176{
177 type Actor = A;
178
179 fn set_priority(&mut self, new_priority: u32) {
180 self.priority = new_priority;
181 }
182
183 fn start_span(&mut self) {
184 assert!(self.instrumentation.is_parent_none());
185 self.instrumentation = Instrumentation::started::<A, M>();
186 }
187
188 fn handle(
189 self: Arc<Self>,
190 act: &mut Self::Actor,
191 mailbox: Mailbox<Self::Actor>,
192 ) -> (BoxFuture<ControlFlow<(), ()>>, Span) {
193 let (msg, instrumentation) = (self.message.clone(), self.instrumentation.clone());
194 drop(self); let fut = async move {
196 let mut ctx = Context {
197 running: true,
198 mailbox,
199 };
200 act.handle(msg, &mut ctx).await;
201
202 if ctx.running {
203 ControlFlow::Continue(())
204 } else {
205 ControlFlow::Break(())
206 }
207 };
208 let (fut, span) = instrumentation.apply::<_>(fut);
209 (Box::pin(fut), span)
210 }
211}
212
213impl<A, M> HasPriority for BroadcastEnvelopeConcrete<A, M> {
214 fn priority(&self) -> Priority {
215 Priority::Valued(self.priority)
216 }
217}
218
219#[derive(Copy, Clone, Default)]
220pub struct Shutdown<A>(PhantomData<for<'a> fn(&'a A)>);
221
222impl<A> Shutdown<A> {
223 pub fn new() -> Self {
224 Shutdown(PhantomData)
225 }
226
227 pub fn handle() -> (BoxFuture<'static, ControlFlow<()>>, Span) {
228 let fut = Box::pin(async { ControlFlow::Break(()) });
229
230 (fut, Span::none())
231 }
232}
233
234impl<A> HasPriority for Shutdown<A> {
235 fn priority(&self) -> Priority {
236 Priority::Shutdown
237 }
238}
239
240impl<A> BroadcastEnvelope for Shutdown<A>
241where
242 A: Actor,
243{
244 type Actor = A;
245
246 fn set_priority(&mut self, _: u32) {}
247
248 fn start_span(&mut self) {}
250
251 fn handle(
252 self: Arc<Self>,
253 _act: &mut Self::Actor,
254 _mailbox: Mailbox<Self::Actor>,
255 ) -> (BoxFuture<ControlFlow<()>>, Span) {
256 Self::handle()
257 }
258}