async_actor/
lib.rs

1//! This crate provides a simple, runtime-agnostic, actor framework, aimed
2//! to be a minimal framework that gets out of your way.
3//!
4//! ```rust
5//! use async_oneshot_channel::oneshot;
6//! use async_actor::{Actor, ActorRef, WeakActorRef};
7//!
8//! struct CounterActor(usize);
9//!
10//! impl Actor for CounterActor {
11//!     type Error = ();
12//!     type Message = usize;
13//!
14//!     async fn on_msg(
15//!         &mut self,
16//!         _: &WeakActorRef<Self>,
17//!         msg: Self::Message,
18//!     ) -> Result<(), Self::Error> {
19//!         self.0 += msg;
20//!         println!("Received message: {}. Current state: {}", msg, self.0);
21//!
22//!         Ok(())
23//!     }
24//! }
25//!
26//! #[tokio::main]
27//! async fn main() {
28//!     let actor = CounterActor(0);
29//!     let (actor_ref, fut) = actor.into_future(None);
30//!     let handle = tokio::spawn(fut);
31//!
32//!     actor_ref.send(3).await.unwrap();
33//!     actor_ref.send(7).await.unwrap();
34//!
35//!     actor_ref.stop(0).unwrap();
36//!     let res = handle.await;
37//!     assert!(res.is_ok());
38//! }
39//! ```
40
41mod actor;
42mod actor_ref;
43mod actor_run;
44mod mailbox;
45
46pub use actor::*;
47pub use actor_ref::*;
48pub use actor_run::*;
49pub use mailbox::Mailbox;
50
51#[cfg(test)]
52mod test {
53    use super::*;
54
55    struct MyActor(usize);
56
57    impl Actor for MyActor {
58        type Error = ();
59        type Message = usize;
60
61        async fn on_msg(
62            &mut self,
63            _: &WeakActorRef<Self>,
64            msg: Self::Message,
65        ) -> Result<(), Self::Error> {
66            self.0 += msg;
67            println!("Received message: {}. Current state: {}", msg, self.0);
68
69            Ok(())
70        }
71    }
72
73    #[tokio::test]
74    async fn test_actor() {
75        let actor = MyActor(0);
76        let (actor_ref, fut) = actor.into_future(None);
77        let handle = tokio::spawn(fut);
78
79        actor_ref.send(3).await.unwrap();
80        actor_ref.send(7).await.unwrap();
81
82        actor_ref.stop(0).unwrap();
83
84        let res = handle.await;
85        assert!(res.is_ok());
86    }
87
88    #[tokio::test]
89    async fn test_actor_long() {
90        let actor = MyActor(0);
91        let (actor_ref, fut) = actor.into_future(None);
92        let handle = tokio::spawn(fut);
93
94        for i in 0..1000 {
95            actor_ref.send(i).await.unwrap();
96        }
97
98        actor_ref.stop(0).unwrap();
99
100        let res = handle.await;
101        assert!(res.is_ok());
102    }
103
104    #[tokio::test]
105    async fn test_drop() {
106        let actor = MyActor(0);
107        let (actor_ref, fut) = actor.into_future(None);
108        let handle = tokio::spawn(fut);
109
110        actor_ref.send(3).await.unwrap();
111        actor_ref.send(7).await.unwrap();
112
113        drop(actor_ref);
114
115        let res = handle.await;
116        assert!(res.is_ok());
117    }
118
119    #[tokio::test]
120    async fn test_drop_partial() {
121        let actor = MyActor(0);
122        let (actor_ref, fut) = actor.into_future(None);
123        let handle = tokio::spawn(fut);
124
125        let ActorRef { sender, stop } = actor_ref;
126        sender.send(3).await.unwrap();
127        sender.send(7).await.unwrap();
128
129        drop(stop);
130
131        let res = handle.await;
132        assert!(res.is_ok());
133    }
134
135    #[tokio::test]
136    async fn test_drop_partial_sender() {
137        let actor = MyActor(0);
138        let (actor_ref, fut) = actor.into_future(None);
139        let handle = tokio::spawn(fut);
140
141        let ActorRef { sender, .. } = actor_ref;
142        sender.send(3).await.unwrap();
143        sender.send(7).await.unwrap();
144
145        drop(sender);
146
147        let res = handle.await;
148        assert!(res.is_ok());
149    }
150
151    struct PlusOneActor;
152
153    #[derive(Debug)]
154    enum PlusOneActorMessage {
155        PlusOne(usize, async_oneshot_channel::Sender<usize>),
156        Stop,
157    }
158
159    impl Actor for PlusOneActor {
160        type Message = PlusOneActorMessage;
161        type Error = ();
162
163        async fn on_msg(
164            &mut self,
165            _: &WeakActorRef<Self>,
166            msg: Self::Message,
167        ) -> Result<(), Self::Error> {
168            match msg {
169                PlusOneActorMessage::PlusOne(num, reply) => {
170                    let _ = reply.send(num + 1);
171                    Ok(())
172                }
173                _ => Err(()),
174            }
175        }
176    }
177
178    #[tokio::test]
179    async fn test_reply() {
180        let actor = PlusOneActor;
181        let (actor_ref, fut) = actor.into_future(None);
182        let handle = tokio::spawn(fut);
183
184        let (reply_sender, reply_receiver) = async_oneshot_channel::oneshot();
185        actor_ref
186            .send(PlusOneActorMessage::PlusOne(3, reply_sender))
187            .await
188            .unwrap();
189        let res = reply_receiver.recv().await;
190        assert_eq!(res, Some(4));
191
192        let (reply_sender, reply_receiver) = async_oneshot_channel::oneshot();
193        actor_ref
194            .send(PlusOneActorMessage::PlusOne(7, reply_sender))
195            .await
196            .unwrap();
197        let res = reply_receiver.recv().await;
198        assert_eq!(res, Some(8));
199
200        actor_ref.stop(PlusOneActorMessage::Stop).unwrap();
201
202        let res = handle.await;
203        assert!(res.is_ok());
204    }
205
206    struct PingActor(ActorRef<PongActor>);
207
208    #[derive(Debug)]
209    enum PingActorMessage {
210        Ping(usize),
211        Stop,
212    }
213
214    impl Actor for PingActor {
215        type Message = PingActorMessage;
216        type Error = ();
217
218        async fn on_msg(
219            &mut self,
220            this: &WeakActorRef<Self>,
221            msg: Self::Message,
222        ) -> Result<(), Self::Error> {
223            match msg {
224                PingActorMessage::Ping(num) => {
225                    println!("PingActor received Ping({})", num);
226                    self.0.send(PongActorMessage::Pong(num)).await.unwrap();
227                    Ok(())
228                }
229                PingActorMessage::Stop => {
230                    println!("PingActor received Stop");
231                    this.stop(PingActorMessage::Stop).unwrap();
232                    Ok(())
233                }
234            }
235        }
236    }
237
238    struct PongActor;
239
240    #[derive(Debug)]
241    enum PongActorMessage {
242        Pong(usize),
243        Stop,
244    }
245
246    impl Actor for PongActor {
247        type Message = PongActorMessage;
248        type Error = ();
249
250        async fn on_msg(
251            &mut self,
252            this: &WeakActorRef<Self>,
253            msg: Self::Message,
254        ) -> Result<(), Self::Error> {
255            match msg {
256                PongActorMessage::Pong(num) => {
257                    println!("PongActor received Pong({})", num);
258                    let _ = this.stop(PongActorMessage::Stop);
259                    Ok(())
260                }
261                PongActorMessage::Stop => {
262                    println!("PongActor received Stop");
263                    Ok(())
264                }
265            }
266        }
267    }
268
269    #[tokio::test]
270    async fn test_ping_pong() {
271        let pong_actor = PongActor;
272        let (pong_actor_ref, pong_fut) = pong_actor.into_future(None);
273        let pong_handle = tokio::spawn(pong_fut);
274
275        let ping_actor = PingActor(pong_actor_ref);
276        let (ping_actor_ref, ping_fut) = ping_actor.into_future(None);
277        let ping_handle = tokio::spawn(ping_fut);
278
279        ping_actor_ref
280            .send(PingActorMessage::Ping(3))
281            .await
282            .unwrap();
283        ping_actor_ref
284            .send(PingActorMessage::Ping(7))
285            .await
286            .unwrap();
287
288        ping_actor_ref.stop(PingActorMessage::Stop).unwrap();
289
290        let res = ping_handle.await;
291        assert!(res.is_ok());
292
293        let res = pong_handle.await;
294        assert!(res.is_ok());
295    }
296}