tractor/
lib.rs

1use actor_system::*;
2use async_trait::async_trait;
3use flume::*;
4
5mod actor_system;
6
7pub mod prelude {
8    pub use crate::actor_system::ActorSystem;
9    pub use crate::{
10        Actor, ActorBehavior, ActorBehaviorAsync, ActorExt, ActorHooks, ActorHooksAsync, Addr,
11        Address, Chan, Channel, Context, HasLength, MappedChannel,
12    };
13
14    #[cfg(feature = "macros")]
15    pub use tractor_macros::*;
16}
17
18pub trait Actor: Sized + Send + 'static {
19    type Msg: Send;
20}
21
22pub trait ActorBehavior: Actor {
23    fn handle(&mut self, msg: Self::Msg, ctx: &Context<Self>);
24}
25
26pub trait ActorHooks: Actor {
27    fn started(&mut self, _ctx: &Context<Self>) {}
28
29    fn stopped(&mut self) {}
30}
31
32#[async_trait]
33pub trait ActorHooksAsync: Actor {
34    async fn started(&mut self, _ctx: &Context<Self>) {}
35
36    async fn stopped(&mut self) {}
37}
38
39#[async_trait]
40pub trait ActorBehaviorAsync: Actor {
41    async fn handle(&mut self, msg: Self::Msg, ctx: &Context<Self>);
42}
43
44/// Default implementation for `ActorBehaviorAsync` given `ActorBehavior`
45#[async_trait]
46impl<T: ActorBehavior> ActorBehaviorAsync for T {
47    async fn handle(&mut self, msg: <Self as Actor>::Msg, ctx: &Context<Self>) {
48        <Self as ActorBehavior>::handle(self, msg, ctx);
49    }
50}
51
52/// Default implementation for `ActorHooksAsync` given `ActorHooks`
53#[async_trait]
54impl<T: ActorHooks> ActorHooksAsync for T {
55    async fn started(&mut self, ctx: &Context<Self>) {
56        <Self as ActorHooks>::started(self, ctx);
57    }
58
59    async fn stopped(&mut self) {
60        <Self as ActorHooks>::stopped(self);
61    }
62}
63
64pub trait ActorExt: Actor + ActorBehaviorAsync + ActorHooksAsync {
65    /// Starts the actor.
66    fn start(self) -> Addr<Self> {
67        let (sender, receiver) = unbounded::<ActorMessage<Self::Msg>>();
68        let mailbox = Mailbox {
69            receiver,
70            consumed: false,
71        };
72        let addr = Addr(Chan(sender));
73        let context = Context {
74            myself: addr.clone(),
75        };
76
77        ActorSystem::spawn_actor(actor_loop(self, context, mailbox));
78
79        addr
80    }
81}
82
83impl<T: Actor + ActorBehaviorAsync + ActorHooksAsync> ActorExt for T {}
84
85pub trait HasLength {
86    fn len(&self) -> usize;
87}
88
89pub trait Channel<T: Sized>: HasLength + Send {
90    fn send(&self, msg: T);
91}
92
93pub struct MappedChannel<FROM, TO, CHAN, MAP>
94where
95    FROM: Send,
96    TO: Send,
97    CHAN: Channel<FROM>,
98    MAP: Fn(TO) -> FROM + Send,
99{
100    channel: CHAN,
101    map: MAP,
102    _from: std::marker::PhantomData<FROM>,
103    _to: std::marker::PhantomData<TO>,
104}
105
106impl<FROM, TO, CHAN, MAP> MappedChannel<FROM, TO, CHAN, MAP>
107where
108    FROM: Send,
109    TO: Send,
110    CHAN: Channel<FROM>,
111    MAP: Fn(TO) -> FROM + Send,
112{
113    pub fn from(channel: CHAN, map: MAP) -> Self {
114        Self {
115            channel,
116            map,
117            _from: std::marker::PhantomData,
118            _to: std::marker::PhantomData,
119        }
120    }
121}
122
123impl<FROM, TO, CHAN, MAP> HasLength for MappedChannel<FROM, TO, CHAN, MAP>
124where
125    FROM: Send,
126    TO: Send,
127    CHAN: Channel<FROM>,
128    MAP: Fn(TO) -> FROM + Send,
129{
130    fn len(&self) -> usize {
131        self.channel.len()
132    }
133}
134
135impl<FROM, TO, CHAN, MAP> Channel<TO> for MappedChannel<FROM, TO, CHAN, MAP>
136where
137    FROM: Send,
138    TO: Send,
139    CHAN: Channel<FROM>,
140    MAP: Fn(TO) -> FROM + Send,
141{
142    fn send(&self, msg: TO) {
143        self.channel.send((self.map)(msg));
144    }
145}
146
147pub trait Address<T: Actor>: HasLength {
148    fn send(&self, msg: T::Msg);
149}
150
151/// Type of message that is sent to an actor's mailbox.
152enum ActorMessage<T: Sized + Send> {
153    Ref,
154    UnRef,
155    Msg(T),
156}
157
158pub struct Chan<T: Send>(Sender<ActorMessage<T>>);
159
160impl<T: Send> Clone for Chan<T> {
161    fn clone(&self) -> Self {
162        let () = self.0.send(ActorMessage::Ref).unwrap();
163        Self(self.0.clone())
164    }
165}
166
167impl<T: Send> HasLength for Chan<T> {
168    fn len(&self) -> usize {
169        self.0.len()
170    }
171}
172
173impl<T: Send> Channel<T> for Chan<T> {
174    fn send(&self, msg: T) {
175        let () = self.0.send(ActorMessage::Msg(msg)).unwrap();
176    }
177}
178
179impl<T: Send> Drop for Chan<T> {
180    fn drop(&mut self) {
181        let () = self.0.send(ActorMessage::UnRef).unwrap();
182    }
183}
184
185/// Address to send an actor a message.
186pub struct Addr<T: Actor>(Chan<T::Msg>);
187
188impl<T: Actor> Clone for Addr<T> {
189    fn clone(&self) -> Self {
190        Self(self.0.clone())
191    }
192}
193
194impl<T: Actor> HasLength for Addr<T> {
195    fn len(&self) -> usize {
196        self.0.len()
197    }
198}
199
200impl<T: Actor> Address<T> for Addr<T> {
201    fn send(&self, msg: T::Msg) {
202        self.0.send(msg);
203    }
204}
205
206impl<T: Actor> Addr<T> {
207    pub fn as_chan(&self) -> &Chan<T::Msg> {
208        &self.0
209    }
210
211    pub fn chan(&self) -> Chan<T::Msg> {
212        self.0.clone()
213    }
214
215    pub fn into_chan(self) -> Chan<T::Msg> {
216        self.0
217    }
218}
219
220pub struct Context<T: Actor> {
221    myself: Addr<T>,
222}
223
224impl<T: Actor> Context<T> {
225    pub fn myself(&self) -> &Addr<T> {
226        &self.myself
227    }
228}
229
230pub(crate) struct Mailbox<T: Send + Sized> {
231    pub(crate) receiver: Receiver<ActorMessage<T>>,
232    pub(crate) consumed: bool,
233}
234
235async fn process_messages<T>(actor: &mut T, mailbox: &mut Mailbox<T::Msg>, context: Context<T>)
236where
237    T: Actor + ActorBehaviorAsync,
238{
239    let mut ref_cnt: usize = 0;
240
241    let mut next_msg = mailbox.receiver.recv_async().await.ok();
242
243    loop {
244        match next_msg.take() {
245            Some(ActorMessage::Msg(msg)) => actor.handle(msg, &context).await,
246            Some(ActorMessage::Ref) => {
247                ref_cnt += 1;
248            }
249            Some(ActorMessage::UnRef) => {
250                ref_cnt -= 1;
251            }
252            None => {
253                if ref_cnt == 0 {
254                    mailbox.consumed = true;
255                    break;
256                } else {
257                    panic!("Mailbox was closed prematurely");
258                }
259            }
260        }
261
262        if ActorSystem::is_terminating() {
263            break;
264        }
265
266        next_msg = if ref_cnt > 0 {
267            mailbox.receiver.recv_async().await.ok()
268        } else {
269            // non-blocking read in case there are no further references
270            mailbox.receiver.try_recv().ok()
271        };
272    }
273
274    drop(context); // This should still send a last UnRef msg.
275}
276
277async fn actor_loop<T: Actor + ActorBehaviorAsync + ActorHooksAsync>(
278    mut actor: T,
279    context: Context<T>,
280    mut mailbox: Mailbox<T::Msg>,
281) -> (T, Mailbox<T::Msg>) {
282    let () = actor.started(&context).await;
283    let () = process_messages(&mut actor, &mut mailbox, context).await;
284    let () = actor.stopped().await;
285    (actor, mailbox)
286}