1use futures::channel::oneshot;
2use std::borrow::Cow;
3use std::future::Future;
4
5pub use puppet_derive::puppet_actor;
6
7#[doc(hidden)]
9pub mod __private {
10 pub use flume;
11 pub use futures;
12
13 #[cfg(feature = "helper-methods")]
14 pub use tokio;
15}
16
17pub trait Actor {
19 type Messages;
21}
22
23pub trait Message {
25 type Output;
27}
28
29pub trait Executor {
31 fn spawn(&self, fut: impl Future<Output = ()> + Send + 'static);
32}
33
34#[doc(hidden)]
36pub trait MessageHandler<T: Message> {
37 fn create(msg: T) -> (Self, oneshot::Receiver<T::Output>)
38 where
39 Self: Sized;
40}
41
42pub struct ActorMailbox<A: Actor> {
46 tx: flume::Sender<A::Messages>,
47 name: Cow<'static, str>,
48}
49
50impl<A: Actor> Clone for ActorMailbox<A> {
51 fn clone(&self) -> Self {
52 Self {
53 tx: self.tx.clone(),
54 name: self.name.clone(),
55 }
56 }
57}
58
59impl<A: Actor> ActorMailbox<A> {
60 #[doc(hidden)]
62 pub fn new(tx: flume::Sender<A::Messages>, name: Cow<'static, str>) -> Self {
66 Self { tx, name }
67 }
68
69 #[inline]
70 pub fn name(&self) -> &str {
74 self.name.as_ref()
75 }
76
77 pub async fn send<T>(&self, msg: T) -> T::Output
79 where
80 T: Message,
81 A::Messages: MessageHandler<T>,
82 {
83 let (msg, rx) = A::Messages::create(msg);
84 self.tx.send_async(msg).await.expect("Contact actor");
85 rx.await.expect("Actor response")
86 }
87
88 pub async fn deferred_send<T>(&self, msg: T) -> DeferredResponse<T::Output>
92 where
93 T: Message,
94 A::Messages: MessageHandler<T>,
95 {
96 let (msg, rx) = A::Messages::create(msg);
97 self.tx.send_async(msg).await.expect("Contact actor");
98 DeferredResponse { rx }
99 }
100
101 pub fn send_sync<T>(&self, msg: T) -> T::Output
105 where
106 T: Message,
107 A::Messages: MessageHandler<T>,
108 {
109 let (msg, rx) = A::Messages::create(msg);
110 self.tx.send(msg).expect("Contact actor");
111 futures::executor::block_on(rx).expect("Actor response")
112 }
113}
114
115pub struct DeferredResponse<T> {
119 rx: oneshot::Receiver<T>,
120}
121
122impl<T> DeferredResponse<T> {
123 pub fn try_recv(&mut self) -> Option<T> {
125 self.rx.try_recv().expect("Get actor response")
126 }
127
128 pub async fn recv(self) -> T {
130 self.rx.await.expect("Get actor response")
131 }
132}
133
134pub struct Reply<T: Default> {
140 tx: Option<oneshot::Sender<T>>,
141}
142
143impl<T: Default> From<oneshot::Sender<T>> for Reply<T> {
144 fn from(tx: oneshot::Sender<T>) -> Self {
145 Self { tx: Some(tx) }
146 }
147}
148
149impl<T: Default> Reply<T> {
150 pub fn reply(mut self, msg: T) {
152 if let Some(tx) = self.tx.take() {
153 let _ = tx.send(msg);
154 }
155 }
156}
157
158impl<T: Default> Drop for Reply<T> {
159 fn drop(&mut self) {
160 if let Some(tx) = self.tx.take() {
161 let _ = tx.send(T::default());
162 }
163 }
164}
165
166#[macro_export]
167macro_rules! derive_message {
169 ($msg:ident, $output:ty) => {
170 impl $crate::Message for $msg {
171 type Output = $output;
172 }
173 };
174}