1use actor_system::*;
2use async_trait::async_trait;
3use flume::*;
4
5mod actor_system;
6
7pub mod prelude {
8 pub use crate::actor_system::ActorSystem;
9 pub use crate::{
10 Actor, ActorBehavior, ActorBehaviorAsync, ActorExt, ActorHooks, ActorHooksAsync, Addr,
11 Address, Chan, Channel, Context, HasLength, MappedChannel,
12 };
13
14 #[cfg(feature = "macros")]
15 pub use tractor_macros::*;
16}
17
18pub trait Actor: Sized + Send + 'static {
19 type Msg: Send;
20}
21
22pub trait ActorBehavior: Actor {
23 fn handle(&mut self, msg: Self::Msg, ctx: &Context<Self>);
24}
25
26pub trait ActorHooks: Actor {
27 fn started(&mut self, _ctx: &Context<Self>) {}
28
29 fn stopped(&mut self) {}
30}
31
32#[async_trait]
33pub trait ActorHooksAsync: Actor {
34 async fn started(&mut self, _ctx: &Context<Self>) {}
35
36 async fn stopped(&mut self) {}
37}
38
39#[async_trait]
40pub trait ActorBehaviorAsync: Actor {
41 async fn handle(&mut self, msg: Self::Msg, ctx: &Context<Self>);
42}
43
44#[async_trait]
46impl<T: ActorBehavior> ActorBehaviorAsync for T {
47 async fn handle(&mut self, msg: <Self as Actor>::Msg, ctx: &Context<Self>) {
48 <Self as ActorBehavior>::handle(self, msg, ctx);
49 }
50}
51
52#[async_trait]
54impl<T: ActorHooks> ActorHooksAsync for T {
55 async fn started(&mut self, ctx: &Context<Self>) {
56 <Self as ActorHooks>::started(self, ctx);
57 }
58
59 async fn stopped(&mut self) {
60 <Self as ActorHooks>::stopped(self);
61 }
62}
63
64pub trait ActorExt: Actor + ActorBehaviorAsync + ActorHooksAsync {
65 fn start(self) -> Addr<Self> {
67 let (sender, receiver) = unbounded::<ActorMessage<Self::Msg>>();
68 let mailbox = Mailbox {
69 receiver,
70 consumed: false,
71 };
72 let addr = Addr(Chan(sender));
73 let context = Context {
74 myself: addr.clone(),
75 };
76
77 ActorSystem::spawn_actor(actor_loop(self, context, mailbox));
78
79 addr
80 }
81}
82
83impl<T: Actor + ActorBehaviorAsync + ActorHooksAsync> ActorExt for T {}
84
85pub trait HasLength {
86 fn len(&self) -> usize;
87}
88
89pub trait Channel<T: Sized>: HasLength + Send {
90 fn send(&self, msg: T);
91}
92
93pub struct MappedChannel<FROM, TO, CHAN, MAP>
94where
95 FROM: Send,
96 TO: Send,
97 CHAN: Channel<FROM>,
98 MAP: Fn(TO) -> FROM + Send,
99{
100 channel: CHAN,
101 map: MAP,
102 _from: std::marker::PhantomData<FROM>,
103 _to: std::marker::PhantomData<TO>,
104}
105
106impl<FROM, TO, CHAN, MAP> MappedChannel<FROM, TO, CHAN, MAP>
107where
108 FROM: Send,
109 TO: Send,
110 CHAN: Channel<FROM>,
111 MAP: Fn(TO) -> FROM + Send,
112{
113 pub fn from(channel: CHAN, map: MAP) -> Self {
114 Self {
115 channel,
116 map,
117 _from: std::marker::PhantomData,
118 _to: std::marker::PhantomData,
119 }
120 }
121}
122
123impl<FROM, TO, CHAN, MAP> HasLength for MappedChannel<FROM, TO, CHAN, MAP>
124where
125 FROM: Send,
126 TO: Send,
127 CHAN: Channel<FROM>,
128 MAP: Fn(TO) -> FROM + Send,
129{
130 fn len(&self) -> usize {
131 self.channel.len()
132 }
133}
134
135impl<FROM, TO, CHAN, MAP> Channel<TO> for MappedChannel<FROM, TO, CHAN, MAP>
136where
137 FROM: Send,
138 TO: Send,
139 CHAN: Channel<FROM>,
140 MAP: Fn(TO) -> FROM + Send,
141{
142 fn send(&self, msg: TO) {
143 self.channel.send((self.map)(msg));
144 }
145}
146
147pub trait Address<T: Actor>: HasLength {
148 fn send(&self, msg: T::Msg);
149}
150
151enum ActorMessage<T: Sized + Send> {
153 Ref,
154 UnRef,
155 Msg(T),
156}
157
158pub struct Chan<T: Send>(Sender<ActorMessage<T>>);
159
160impl<T: Send> Clone for Chan<T> {
161 fn clone(&self) -> Self {
162 let () = self.0.send(ActorMessage::Ref).unwrap();
163 Self(self.0.clone())
164 }
165}
166
167impl<T: Send> HasLength for Chan<T> {
168 fn len(&self) -> usize {
169 self.0.len()
170 }
171}
172
173impl<T: Send> Channel<T> for Chan<T> {
174 fn send(&self, msg: T) {
175 let () = self.0.send(ActorMessage::Msg(msg)).unwrap();
176 }
177}
178
179impl<T: Send> Drop for Chan<T> {
180 fn drop(&mut self) {
181 let () = self.0.send(ActorMessage::UnRef).unwrap();
182 }
183}
184
185pub struct Addr<T: Actor>(Chan<T::Msg>);
187
188impl<T: Actor> Clone for Addr<T> {
189 fn clone(&self) -> Self {
190 Self(self.0.clone())
191 }
192}
193
194impl<T: Actor> HasLength for Addr<T> {
195 fn len(&self) -> usize {
196 self.0.len()
197 }
198}
199
200impl<T: Actor> Address<T> for Addr<T> {
201 fn send(&self, msg: T::Msg) {
202 self.0.send(msg);
203 }
204}
205
206impl<T: Actor> Addr<T> {
207 pub fn as_chan(&self) -> &Chan<T::Msg> {
208 &self.0
209 }
210
211 pub fn chan(&self) -> Chan<T::Msg> {
212 self.0.clone()
213 }
214
215 pub fn into_chan(self) -> Chan<T::Msg> {
216 self.0
217 }
218}
219
220pub struct Context<T: Actor> {
221 myself: Addr<T>,
222}
223
224impl<T: Actor> Context<T> {
225 pub fn myself(&self) -> &Addr<T> {
226 &self.myself
227 }
228}
229
230pub(crate) struct Mailbox<T: Send + Sized> {
231 pub(crate) receiver: Receiver<ActorMessage<T>>,
232 pub(crate) consumed: bool,
233}
234
235async fn process_messages<T>(actor: &mut T, mailbox: &mut Mailbox<T::Msg>, context: Context<T>)
236where
237 T: Actor + ActorBehaviorAsync,
238{
239 let mut ref_cnt: usize = 0;
240
241 let mut next_msg = mailbox.receiver.recv_async().await.ok();
242
243 loop {
244 match next_msg.take() {
245 Some(ActorMessage::Msg(msg)) => actor.handle(msg, &context).await,
246 Some(ActorMessage::Ref) => {
247 ref_cnt += 1;
248 }
249 Some(ActorMessage::UnRef) => {
250 ref_cnt -= 1;
251 }
252 None => {
253 if ref_cnt == 0 {
254 mailbox.consumed = true;
255 break;
256 } else {
257 panic!("Mailbox was closed prematurely");
258 }
259 }
260 }
261
262 if ActorSystem::is_terminating() {
263 break;
264 }
265
266 next_msg = if ref_cnt > 0 {
267 mailbox.receiver.recv_async().await.ok()
268 } else {
269 mailbox.receiver.try_recv().ok()
271 };
272 }
273
274 drop(context); }
276
277async fn actor_loop<T: Actor + ActorBehaviorAsync + ActorHooksAsync>(
278 mut actor: T,
279 context: Context<T>,
280 mut mailbox: Mailbox<T::Msg>,
281) -> (T, Mailbox<T::Msg>) {
282 let () = actor.started(&context).await;
283 let () = process_messages(&mut actor, &mut mailbox, context).await;
284 let () = actor.stopped().await;
285 (actor, mailbox)
286}