tiny_tokio_actor/
system.rs

1use std::{fmt, any::Any, collections::HashMap, sync::Arc};
2
3use tokio::sync::RwLock;
4
5use crate::{
6    actor::{runner::ActorRunner, Actor, ActorRef},
7    bus::{EventBus, EventReceiver},
8    ActorError, ActorPath,
9};
10
11/// Events that this actor system will send
12pub trait SystemEvent: Clone + Send + Sync + 'static {}
13
14#[derive(Clone)]
15pub struct ActorSystem<E: SystemEvent> {
16    name: String,
17    actors: Arc<RwLock<HashMap<ActorPath, Box<dyn Any + Send + Sync + 'static>>>>,
18    bus: EventBus<E>,
19}
20
21impl<E: SystemEvent> ActorSystem<E> {
22    /// The name given to this actor system
23    pub fn name(&self) -> &str {
24        &self.name
25    }
26
27    /// Publish an event on the actor system's event bus. These events can be
28    /// received by other actors in the same actor system.
29    pub fn publish(&self, event: E) {
30        self.bus.send(event).unwrap_or_else(|error| {
31            log::warn!(
32                "No listeners active on event bus. Dropping event: {:?}",
33                &error.to_string(),
34            );
35            0
36        });
37    }
38
39    /// Subscribe to events of this actor system.
40    pub fn events(&self) -> EventReceiver<E> {
41        self.bus.subscribe()
42    }
43
44    /// Retrieves an actor running in this actor system. If actor does not exist, a None
45    /// is returned instead.
46    pub async fn get_actor<A: Actor<E>>(&self, path: &ActorPath) -> Option<ActorRef<E, A>> {
47        let actors = self.actors.read().await;
48        actors
49            .get(path)
50            .and_then(|any| any.downcast_ref::<ActorRef<E, A>>().cloned())
51    }
52
53        pub(crate) async fn create_actor_path<A: Actor<E>>(
54        &self,
55        path: ActorPath,
56        actor: A,
57    ) -> Result<ActorRef<E, A>, ActorError> {
58        log::debug!("Creating actor '{}' on system '{}'...", &path, &self.name);
59
60        let mut actors = self.actors.write().await;
61        if actors.contains_key(&path) {
62            return Err(ActorError::Exists(path));
63        }
64
65        let system = self.clone();
66        let (mut runner, actor_ref) = ActorRunner::create(path, actor);
67        tokio::spawn(async move {
68            runner.start(system).await;
69        });
70
71        let path = actor_ref.path().clone();
72        let any = Box::new(actor_ref.clone());
73
74        actors.insert(path, any);
75
76        Ok(actor_ref)
77    }
78
79    /// Launches a new top level actor on this actor system at the '/user' actor path. If another actor with
80    /// the same name already exists, an `Err(ActorError::Exists(ActorPath))` is returned instead.
81        pub async fn create_actor<A: Actor<E>>(
82        &self,
83        name: &str,
84        actor: A,
85    ) -> Result<ActorRef<E, A>, ActorError> {
86        let path = ActorPath::from("/user") / name;
87        self.create_actor_path(path, actor).await
88    }
89
90    /// Retrieve or create a new actor on this actor system if it does not exist yet.
91        pub async fn get_or_create_actor<A, F>(
92        &self,
93        name: &str,
94        actor_fn: F,
95    ) -> Result<ActorRef<E, A>, ActorError>
96    where
97        A: Actor<E>,
98        F: FnOnce() -> A,
99    {
100        let path = ActorPath::from("/user") / name;
101        self.get_or_create_actor_path(&path, actor_fn).await
102    }
103
104        pub(crate) async fn get_or_create_actor_path<A, F>(
105        &self,
106        path: &ActorPath,
107        actor_fn: F,
108    ) -> Result<ActorRef<E, A>, ActorError>
109    where
110        A: Actor<E>,
111        F: FnOnce() -> A,
112    {
113        let actors = self.actors.read().await;
114        match self.get_actor(path).await {
115            Some(actor) => Ok(actor),
116            None => {
117                drop(actors);
118                self.create_actor_path(path.clone(), actor_fn()).await
119            }
120        }
121    }
122
123    /// Stops the actor on this actor system. All its children will also be stopped.
124        pub async fn stop_actor(&self, path: &ActorPath) {
125        log::debug!("Stopping actor '{}' on system '{}'...", &path, &self.name);
126        let mut paths: Vec<ActorPath> = vec![path.clone()];
127        {
128            let running_actors = self.actors.read().await;
129            for running in running_actors.keys() {
130                if running.is_descendant_of(path) {
131                    paths.push(running.clone());
132                }
133            }
134        }
135        paths.sort_unstable();
136        paths.reverse();
137        let mut actors = self.actors.write().await;
138        for path in &paths {
139            actors.remove(path);
140        }
141    }
142
143    /// Creats a new actor system on which you can create actors.
144    pub fn new(name: &str, bus: EventBus<E>) -> Self {
145        let name = name.to_string();
146        let actors = Arc::new(RwLock::new(HashMap::new()));
147        ActorSystem { name, actors, bus }
148    }
149}
150
151impl<E: SystemEvent> fmt::Debug for ActorSystem<E> {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        f.debug_struct("ActorSystem")
154            .field("name", &self.name)
155            .finish()
156    }
157}
158
159#[cfg(test)]
160mod tests {
161
162    use crate::actor::{Actor, ActorContext, Handler, Message};
163    use async_trait::async_trait;
164    use thiserror::Error;
165
166    use super::*;
167
168    #[derive(Clone, Debug)]
169    struct TestEvent(String);
170
171    impl SystemEvent for TestEvent {}
172
173    #[derive(Default)]
174    struct TestActor {
175        counter: usize,
176    }
177
178    #[async_trait]
179    impl Actor<TestEvent> for TestActor {
180        async fn pre_start(
181            &mut self,
182            _ctx: &mut ActorContext<TestEvent>,
183        ) -> Result<(), ActorError> {
184            log::debug!("Starting actor TestActor!");
185            Ok(())
186        }
187
188        async fn post_stop(&mut self, _ctx: &mut ActorContext<TestEvent>) {
189            log::debug!("Stopped actor TestActor!");
190        }
191    }
192
193    #[derive(Clone, Debug)]
194    struct TestMessage(usize);
195
196    impl Message for TestMessage {
197        type Response = usize;
198    }
199
200    impl SystemEvent for TestMessage {}
201
202    #[async_trait]
203    impl Handler<TestEvent, TestMessage> for TestActor {
204        async fn handle(&mut self, msg: TestMessage, ctx: &mut ActorContext<TestEvent>) -> usize {
205            log::debug!("received message! {:?}", &msg);
206            self.counter += 1;
207            log::debug!("counter is now {}", &self.counter);
208            log::debug!("{} on system {}", &ctx.path, ctx.system.name());
209            ctx.system
210                .publish(TestEvent("Message received!".to_string()));
211            self.counter
212        }
213    }
214
215    #[derive(Default, Clone)]
216    struct OtherActor {
217        message: String,
218        child: Option<ActorRef<TestEvent, TestActor>>,
219    }
220
221    #[async_trait]
222    impl Actor<TestEvent> for OtherActor {
223        async fn pre_start(&mut self, ctx: &mut ActorContext<TestEvent>) -> Result<(), ActorError> {
224            log::debug!("OtherActor initial message: {}", &self.message);
225            let child = TestActor { counter: 0 };
226            self.child = ctx.create_child("child", child).await.ok();
227            Ok(())
228        }
229
230        async fn post_stop(&mut self, _ctx: &mut ActorContext<TestEvent>) {
231            log::debug!("OtherActor stopped.");
232        }
233    }
234
235    #[derive(Clone, Debug)]
236    struct OtherMessage(String);
237
238    impl Message for OtherMessage {
239        type Response = String;
240    }
241
242    #[async_trait]
243    impl Handler<TestEvent, OtherMessage> for OtherActor {
244        async fn handle(&mut self, msg: OtherMessage, ctx: &mut ActorContext<TestEvent>) -> String {
245            log::debug!("OtherActor received message! {:?}", &msg);
246            log::debug!("original message is {}", &self.message);
247            self.message = msg.0;
248            log::debug!("message is now {}", &self.message);
249            log::debug!("{} on system {}", &ctx.path, ctx.system.name());
250            ctx.system
251                .publish(TestEvent("Received message!".to_string()));
252            self.message.clone()
253        }
254    }
255
256    #[tokio::test]
257    async fn actor_create() {
258        if std::env::var("RUST_LOG").is_err() {
259            std::env::set_var("RUST_LOG", "trace");
260        }
261        let _ = env_logger::builder().is_test(true).try_init();
262
263        let actor = TestActor { counter: 0 };
264        let msg = TestMessage(10);
265
266        let bus = EventBus::<TestEvent>::new(1000);
267        let system = ActorSystem::new("test", bus);
268        let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
269        let result = actor_ref.ask(msg).await.unwrap();
270
271        assert_eq!(result, 1);
272    }
273
274    #[derive(Debug, Error)]
275    #[error("custom error")]
276    struct CustomError(String);
277
278    fn create_other(message: String) -> OtherActor {
279        OtherActor {
280            message,
281            child: None,
282        }
283    }
284
285    #[tokio::test]
286    async fn actor_get_or_create() {
287        if std::env::var("RUST_LOG").is_err() {
288            std::env::set_var("RUST_LOG", "trace");
289        }
290        let _ = env_logger::builder().is_test(true).try_init();
291
292        let bus = EventBus::<TestEvent>::new(1000);
293        let system = ActorSystem::new("test", bus);
294
295        let initial_message = "hello world!".to_string();
296        let actor_fn = || create_other(initial_message);
297        let actor_ref = system
298            .get_or_create_actor("test-actor", actor_fn)
299            .await
300            .unwrap();
301
302        let msg = OtherMessage("Updated message.".to_string());
303        let result = actor_ref.ask(msg).await.unwrap();
304
305        assert_eq!(result, "Updated message.".to_string());
306    }
307
308    #[tokio::test]
309    async fn actor_stop() {
310        if std::env::var("RUST_LOG").is_err() {
311            std::env::set_var("RUST_LOG", "trace");
312        }
313        let _ = env_logger::builder().is_test(true).try_init();
314
315        let actor = TestActor { counter: 0 };
316        let msg = TestMessage(10);
317
318        let bus = EventBus::<TestEvent>::new(1000);
319        let system = ActorSystem::new("test", bus);
320
321        {
322            let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
323            let result = actor_ref.ask(msg).await.unwrap();
324
325            assert_eq!(result, 1);
326
327            system.stop_actor(actor_ref.path()).await;
328            assert!(system.get_actor::<TestActor>(actor_ref.path()).await.is_none());
329        }
330
331        tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
332    }
333
334    #[tokio::test]
335    async fn actor_events() {
336        if std::env::var("RUST_LOG").is_err() {
337            std::env::set_var("RUST_LOG", "trace");
338        }
339        let _ = env_logger::builder().is_test(true).try_init();
340
341        let actor = TestActor { counter: 0 };
342        let msg = TestMessage(10);
343
344        let bus = EventBus::<TestEvent>::new(1000);
345        let system = ActorSystem::new("test", bus);
346        let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
347
348        let mut events = system.events();
349        tokio::spawn(async move {
350            loop {
351                match events.recv().await {
352                    Ok(event) => println!("Received event! {event:?}"),
353                    Err(err) => println!("Error receivng event!!! {err:?}"),
354                }
355            }
356        });
357
358        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
359
360        let result = actor_ref.ask(msg).await.unwrap();
361
362        assert_eq!(result, 1);
363    }
364
365    #[tokio::test]
366    async fn actor_get() {
367        if std::env::var("RUST_LOG").is_err() {
368            std::env::set_var("RUST_LOG", "trace");
369        }
370        let _ = env_logger::builder().is_test(true).try_init();
371
372        let actor = TestActor { counter: 0 };
373
374        let bus = EventBus::<TestEvent>::new(1000);
375        let system = ActorSystem::new("test", bus);
376        let original = system.create_actor("test-actor", actor).await.unwrap();
377
378        if let Some(actor_ref) = system.get_actor::<TestActor>(original.path()).await {
379            let msg = TestMessage(10);
380            let result = actor_ref.ask(msg).await.unwrap();
381            assert_eq!(result, 1);
382        } else {
383            panic!("It should have retrieved the actor!")
384        }
385
386        if let Some(actor_ref) = system.get_actor::<OtherActor>(original.path()).await {
387            let msg = OtherMessage("Hello world!".to_string());
388            let result = actor_ref.ask(msg).await.unwrap();
389            println!("Result is: {result}");
390            panic!("It should not go here!");
391        }
392
393        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
394    }
395
396    #[tokio::test]
397    async fn actor_parent_child() {
398        if std::env::var("RUST_LOG").is_err() {
399            std::env::set_var("RUST_LOG", "trace");
400        }
401        let _ = env_logger::builder().is_test(true).try_init();
402
403        let actor = OtherActor {
404            message: "Initial".to_string(),
405            child: None,
406        };
407
408        let bus = EventBus::<TestEvent>::new(1000);
409        let system = ActorSystem::new("test", bus);
410
411        {
412            let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
413            let msg = OtherMessage("new message!".to_string());
414            let result = actor_ref.ask(msg).await.unwrap();
415            assert_eq!(result, "new message!".to_string());
416
417            tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
418            system.stop_actor(actor_ref.path()).await;
419        }
420
421        tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
422        let actors = system.actors.read().await;
423        for actor in actors.keys() {
424            println!("Still active!: {actor:?}");
425        }
426        assert_eq!(actors.len(), 0);
427    }
428}