xtra/
envelope.rs

1use std::marker::PhantomData;
2use std::ops::ControlFlow;
3use std::sync::Arc;
4
5use catty::{Receiver, Sender};
6use futures_core::future::BoxFuture;
7use futures_util::FutureExt;
8
9use crate::chan::{HasPriority, MessageToAll, MessageToOne, Priority};
10use crate::context::Context;
11use crate::instrumentation::{Instrumentation, Span};
12use crate::{Actor, Handler, Mailbox};
13
14/// A message envelope is a struct that encapsulates a message and its return channel sender (if applicable).
15/// Firstly, this allows us to be generic over returning and non-returning messages (as all use the
16/// same `handle` method and return the same pinned & boxed future), but almost more importantly it
17/// allows us to erase the type of the message when this is in dyn Trait format, thereby being able to
18/// use only one channel to send all the kinds of messages that the actor can receives. This does,
19/// however, induce a bit of allocation (as envelopes have to be boxed).
20pub trait MessageEnvelope: HasPriority + Send {
21    /// The type of actor that this envelope carries a message for
22    type Actor;
23
24    fn set_priority(&mut self, new_priority: u32);
25
26    /// Starts the instrumentation of this message request. This will create the request span.
27    fn start_span(&mut self);
28
29    /// Handle the message inside of the box by calling the relevant [`Handler::handle`] method,
30    /// returning its result over a return channel if applicable. This also takes `Box<Self>` as the
31    /// `self` parameter because `Envelope`s always appear as `Box<dyn Envelope<Actor = ...>>`,
32    /// and this allows us to consume the envelope.
33    fn handle(
34        self: Box<Self>,
35        act: &mut Self::Actor,
36        mailbox: Mailbox<Self::Actor>,
37    ) -> (BoxFuture<ControlFlow<(), ()>>, Span);
38}
39
40/// An envelope that returns a result from a message. Constructed by the `AddressExt::do_send` method.
41pub struct ReturningEnvelope<A, M, R> {
42    message: M,
43    result_sender: Sender<R>,
44    priority: u32,
45    phantom: PhantomData<for<'a> fn(&'a A)>,
46    instrumentation: Instrumentation,
47}
48
49impl<A, M, R: Send + 'static> ReturningEnvelope<A, M, R> {
50    pub fn new(message: M, priority: u32) -> (Self, Receiver<R>) {
51        let (tx, rx) = catty::oneshot();
52        let envelope = ReturningEnvelope {
53            message,
54            result_sender: tx,
55            priority,
56            phantom: PhantomData,
57            instrumentation: Instrumentation::empty(),
58        };
59
60        (envelope, rx)
61    }
62}
63
64impl<A, M, R> HasPriority for ReturningEnvelope<A, M, R> {
65    fn priority(&self) -> Priority {
66        Priority::Valued(self.priority)
67    }
68}
69
70impl<A> HasPriority for MessageToOne<A> {
71    fn priority(&self) -> Priority {
72        self.as_ref().priority()
73    }
74}
75
76impl<A, M, R> MessageEnvelope for ReturningEnvelope<A, M, R>
77where
78    A: Handler<M, Return = R>,
79    M: Send + 'static,
80    R: Send + 'static,
81{
82    type Actor = A;
83
84    fn set_priority(&mut self, new_priority: u32) {
85        self.priority = new_priority;
86    }
87
88    fn start_span(&mut self) {
89        assert!(self.instrumentation.is_parent_none());
90        self.instrumentation = Instrumentation::started::<A, M>();
91    }
92
93    fn handle(
94        self: Box<Self>,
95        act: &mut Self::Actor,
96        mailbox: Mailbox<Self::Actor>,
97    ) -> (BoxFuture<ControlFlow<(), ()>>, Span) {
98        let Self {
99            message,
100            result_sender,
101            instrumentation,
102            ..
103        } = *self;
104
105        let fut = async move {
106            let mut ctx = Context {
107                running: true,
108                mailbox,
109            };
110            let r = act.handle(message, &mut ctx).await;
111
112            if ctx.running {
113                (r, ControlFlow::Continue(()))
114            } else {
115                (r, ControlFlow::Break(()))
116            }
117        };
118
119        let (fut, span) = instrumentation.apply::<_>(fut);
120
121        let fut = Box::pin(fut.map(move |(r, flow)| {
122            // We don't actually care if the receiver is listening
123            let _ = result_sender.send(r);
124            flow
125        }));
126
127        (fut, span)
128    }
129}
130
131/// Like MessageEnvelope, but with an Arc instead of Box
132pub trait BroadcastEnvelope: HasPriority + Send + Sync {
133    type Actor;
134
135    fn set_priority(&mut self, new_priority: u32);
136
137    /// Starts the instrumentation of this message request, if this arc is unique. This will create
138    /// the request span
139    fn start_span(&mut self);
140
141    fn handle(
142        self: Arc<Self>,
143        act: &mut Self::Actor,
144        mailbox: Mailbox<Self::Actor>,
145    ) -> (BoxFuture<ControlFlow<()>>, Span);
146}
147
148impl<A> HasPriority for MessageToAll<A> {
149    fn priority(&self) -> Priority {
150        self.as_ref().priority()
151    }
152}
153
154pub struct BroadcastEnvelopeConcrete<A, M> {
155    message: M,
156    priority: u32,
157    phantom: PhantomData<for<'a> fn(&'a A)>,
158    instrumentation: Instrumentation,
159}
160
161impl<A: Actor, M> BroadcastEnvelopeConcrete<A, M> {
162    pub fn new(message: M, priority: u32) -> Self {
163        BroadcastEnvelopeConcrete {
164            message,
165            priority,
166            phantom: PhantomData,
167            instrumentation: Instrumentation::empty(),
168        }
169    }
170}
171
172impl<A, M> BroadcastEnvelope for BroadcastEnvelopeConcrete<A, M>
173where
174    A: Handler<M, Return = ()>,
175    M: Clone + Send + Sync + 'static,
176{
177    type Actor = A;
178
179    fn set_priority(&mut self, new_priority: u32) {
180        self.priority = new_priority;
181    }
182
183    fn start_span(&mut self) {
184        assert!(self.instrumentation.is_parent_none());
185        self.instrumentation = Instrumentation::started::<A, M>();
186    }
187
188    fn handle(
189        self: Arc<Self>,
190        act: &mut Self::Actor,
191        mailbox: Mailbox<Self::Actor>,
192    ) -> (BoxFuture<ControlFlow<(), ()>>, Span) {
193        let (msg, instrumentation) = (self.message.clone(), self.instrumentation.clone());
194        drop(self); // Drop ASAP to end the message waiting for actor span
195        let fut = async move {
196            let mut ctx = Context {
197                running: true,
198                mailbox,
199            };
200            act.handle(msg, &mut ctx).await;
201
202            if ctx.running {
203                ControlFlow::Continue(())
204            } else {
205                ControlFlow::Break(())
206            }
207        };
208        let (fut, span) = instrumentation.apply::<_>(fut);
209        (Box::pin(fut), span)
210    }
211}
212
213impl<A, M> HasPriority for BroadcastEnvelopeConcrete<A, M> {
214    fn priority(&self) -> Priority {
215        Priority::Valued(self.priority)
216    }
217}
218
219#[derive(Copy, Clone, Default)]
220pub struct Shutdown<A>(PhantomData<for<'a> fn(&'a A)>);
221
222impl<A> Shutdown<A> {
223    pub fn new() -> Self {
224        Shutdown(PhantomData)
225    }
226
227    pub fn handle() -> (BoxFuture<'static, ControlFlow<()>>, Span) {
228        let fut = Box::pin(async { ControlFlow::Break(()) });
229
230        (fut, Span::none())
231    }
232}
233
234impl<A> HasPriority for Shutdown<A> {
235    fn priority(&self) -> Priority {
236        Priority::Shutdown
237    }
238}
239
240impl<A> BroadcastEnvelope for Shutdown<A>
241where
242    A: Actor,
243{
244    type Actor = A;
245
246    fn set_priority(&mut self, _: u32) {}
247
248    // This message is not instrumented
249    fn start_span(&mut self) {}
250
251    fn handle(
252        self: Arc<Self>,
253        _act: &mut Self::Actor,
254        _mailbox: Mailbox<Self::Actor>,
255    ) -> (BoxFuture<ControlFlow<()>>, Span) {
256        Self::handle()
257    }
258}