1use std::pin::Pin;
23
24use async_trait::async_trait;
25
26use crate::{
27 cfg_runtime,
28 prelude::{Actor, Context, Handler, Notifiable},
29};
30
31#[async_trait]
32pub(crate) trait EnvelopeProxy<A: Actor + Unpin>: Send + 'static {
33 async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>);
34}
35
36pub(crate) struct MessageEnvelope<A: Handler<IN>, IN> {
37 data: Option<(IN, async_oneshot::Sender<A::Result>)>,
38}
39
40impl<A, IN> MessageEnvelope<A, IN>
41where
42 A: Handler<IN>,
43{
44 pub(crate) fn new(message: IN, response: async_oneshot::Sender<A::Result>) -> Self {
45 Self {
46 data: Some((message, response)),
47 }
48 }
49}
50
51#[async_trait]
52impl<A, IN> EnvelopeProxy<A> for MessageEnvelope<A, IN>
53where
54 A: Handler<IN> + Actor + Send + Unpin,
55 IN: Send + 'static,
56 A::Result: Send + Sync + 'static,
57{
58 async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>) {
59 let (message, mut response) = self.data.take().expect("`Envelope::handle` called twice");
60
61 let result = actor
62 .get_mut()
63 .handle(message, Pin::into_inner(context))
64 .await;
65 let _ = response.send(result);
66 }
67}
68
69pub(crate) struct NotificationEnvelope<A: Notifiable<IN>, IN> {
70 message: Option<IN>,
71 _marker: std::marker::PhantomData<A>,
72}
73
74impl<A, IN> NotificationEnvelope<A, IN>
75where
76 A: Notifiable<IN>,
77{
78 pub(crate) fn new(message: IN) -> Self {
79 Self {
80 message: Some(message),
81 _marker: std::marker::PhantomData,
82 }
83 }
84}
85
86#[async_trait]
87impl<A, IN> EnvelopeProxy<A> for NotificationEnvelope<A, IN>
88where
89 A: Notifiable<IN> + Actor + Send + Unpin,
90 IN: Send + 'static,
91{
92 async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>) {
93 let message = self
94 .message
95 .take()
96 .expect("`Envelope::handle` called twice");
97
98 actor
99 .get_mut()
100 .notify(message, Pin::into_inner(context))
101 .await;
102 }
103}
104
105cfg_runtime! {
106 use crate::handler::Coroutine;
107
108 pub(crate) struct CoroutineEnvelope<A: Coroutine<IN>, IN> {
109 data: Option<(IN, async_oneshot::Sender<A::Result>)>,
110 }
111
112 impl<A, IN> CoroutineEnvelope<A, IN>
113 where
114 A: Coroutine<IN>,
115 {
116 pub(crate) fn new(message: IN, response: async_oneshot::Sender<A::Result>) -> Self {
117 Self {
118 data: Some((message, response)),
119 }
120 }
121 }
122
123 #[async_trait]
124 impl<A, IN> EnvelopeProxy<A> for CoroutineEnvelope<A, IN>
125 where
126 A: Coroutine<IN> + Actor + Send + Unpin,
127 IN: Send + 'static,
128 A::Result: Send + Sync + 'static,
129 {
130 async fn handle(&mut self, actor: Pin<&mut A>, _context: Pin<&Context<A>>) {
131 let actor = Pin::into_inner(actor).clone();
132 let (message, mut response) = self
133 .data
134 .take()
135 .expect("`Envelope::handle` called twice");
136
137 crate::runtime::spawn(async move {
138 let result = actor.calculate(message).await;
139 let _ = response.send(result);
140 });
141 }
142 }
143}