1use std::{fmt, any::Any, collections::HashMap, sync::Arc};
2
3use tokio::sync::RwLock;
4
5use crate::{
6 actor::{runner::ActorRunner, Actor, ActorRef},
7 bus::{EventBus, EventReceiver},
8 ActorError, ActorPath,
9};
10
11pub trait SystemEvent: Clone + Send + Sync + 'static {}
13
14#[derive(Clone)]
15pub struct ActorSystem<E: SystemEvent> {
16 name: String,
17 actors: Arc<RwLock<HashMap<ActorPath, Box<dyn Any + Send + Sync + 'static>>>>,
18 bus: EventBus<E>,
19}
20
21impl<E: SystemEvent> ActorSystem<E> {
22 pub fn name(&self) -> &str {
24 &self.name
25 }
26
27 pub fn publish(&self, event: E) {
30 self.bus.send(event).unwrap_or_else(|error| {
31 log::warn!(
32 "No listeners active on event bus. Dropping event: {:?}",
33 &error.to_string(),
34 );
35 0
36 });
37 }
38
39 pub fn events(&self) -> EventReceiver<E> {
41 self.bus.subscribe()
42 }
43
44 pub async fn get_actor<A: Actor<E>>(&self, path: &ActorPath) -> Option<ActorRef<E, A>> {
47 let actors = self.actors.read().await;
48 actors
49 .get(path)
50 .and_then(|any| any.downcast_ref::<ActorRef<E, A>>().cloned())
51 }
52
53 pub(crate) async fn create_actor_path<A: Actor<E>>(
54 &self,
55 path: ActorPath,
56 actor: A,
57 ) -> Result<ActorRef<E, A>, ActorError> {
58 log::debug!("Creating actor '{}' on system '{}'...", &path, &self.name);
59
60 let mut actors = self.actors.write().await;
61 if actors.contains_key(&path) {
62 return Err(ActorError::Exists(path));
63 }
64
65 let system = self.clone();
66 let (mut runner, actor_ref) = ActorRunner::create(path, actor);
67 tokio::spawn(async move {
68 runner.start(system).await;
69 });
70
71 let path = actor_ref.path().clone();
72 let any = Box::new(actor_ref.clone());
73
74 actors.insert(path, any);
75
76 Ok(actor_ref)
77 }
78
79 pub async fn create_actor<A: Actor<E>>(
82 &self,
83 name: &str,
84 actor: A,
85 ) -> Result<ActorRef<E, A>, ActorError> {
86 let path = ActorPath::from("/user") / name;
87 self.create_actor_path(path, actor).await
88 }
89
90 pub async fn get_or_create_actor<A, F>(
92 &self,
93 name: &str,
94 actor_fn: F,
95 ) -> Result<ActorRef<E, A>, ActorError>
96 where
97 A: Actor<E>,
98 F: FnOnce() -> A,
99 {
100 let path = ActorPath::from("/user") / name;
101 self.get_or_create_actor_path(&path, actor_fn).await
102 }
103
104 pub(crate) async fn get_or_create_actor_path<A, F>(
105 &self,
106 path: &ActorPath,
107 actor_fn: F,
108 ) -> Result<ActorRef<E, A>, ActorError>
109 where
110 A: Actor<E>,
111 F: FnOnce() -> A,
112 {
113 let actors = self.actors.read().await;
114 match self.get_actor(path).await {
115 Some(actor) => Ok(actor),
116 None => {
117 drop(actors);
118 self.create_actor_path(path.clone(), actor_fn()).await
119 }
120 }
121 }
122
123 pub async fn stop_actor(&self, path: &ActorPath) {
125 log::debug!("Stopping actor '{}' on system '{}'...", &path, &self.name);
126 let mut paths: Vec<ActorPath> = vec![path.clone()];
127 {
128 let running_actors = self.actors.read().await;
129 for running in running_actors.keys() {
130 if running.is_descendant_of(path) {
131 paths.push(running.clone());
132 }
133 }
134 }
135 paths.sort_unstable();
136 paths.reverse();
137 let mut actors = self.actors.write().await;
138 for path in &paths {
139 actors.remove(path);
140 }
141 }
142
143 pub fn new(name: &str, bus: EventBus<E>) -> Self {
145 let name = name.to_string();
146 let actors = Arc::new(RwLock::new(HashMap::new()));
147 ActorSystem { name, actors, bus }
148 }
149}
150
151impl<E: SystemEvent> fmt::Debug for ActorSystem<E> {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 f.debug_struct("ActorSystem")
154 .field("name", &self.name)
155 .finish()
156 }
157}
158
159#[cfg(test)]
160mod tests {
161
162 use crate::actor::{Actor, ActorContext, Handler, Message};
163 use async_trait::async_trait;
164 use thiserror::Error;
165
166 use super::*;
167
168 #[derive(Clone, Debug)]
169 struct TestEvent(String);
170
171 impl SystemEvent for TestEvent {}
172
173 #[derive(Default)]
174 struct TestActor {
175 counter: usize,
176 }
177
178 #[async_trait]
179 impl Actor<TestEvent> for TestActor {
180 async fn pre_start(
181 &mut self,
182 _ctx: &mut ActorContext<TestEvent>,
183 ) -> Result<(), ActorError> {
184 log::debug!("Starting actor TestActor!");
185 Ok(())
186 }
187
188 async fn post_stop(&mut self, _ctx: &mut ActorContext<TestEvent>) {
189 log::debug!("Stopped actor TestActor!");
190 }
191 }
192
193 #[derive(Clone, Debug)]
194 struct TestMessage(usize);
195
196 impl Message for TestMessage {
197 type Response = usize;
198 }
199
200 impl SystemEvent for TestMessage {}
201
202 #[async_trait]
203 impl Handler<TestEvent, TestMessage> for TestActor {
204 async fn handle(&mut self, msg: TestMessage, ctx: &mut ActorContext<TestEvent>) -> usize {
205 log::debug!("received message! {:?}", &msg);
206 self.counter += 1;
207 log::debug!("counter is now {}", &self.counter);
208 log::debug!("{} on system {}", &ctx.path, ctx.system.name());
209 ctx.system
210 .publish(TestEvent("Message received!".to_string()));
211 self.counter
212 }
213 }
214
215 #[derive(Default, Clone)]
216 struct OtherActor {
217 message: String,
218 child: Option<ActorRef<TestEvent, TestActor>>,
219 }
220
221 #[async_trait]
222 impl Actor<TestEvent> for OtherActor {
223 async fn pre_start(&mut self, ctx: &mut ActorContext<TestEvent>) -> Result<(), ActorError> {
224 log::debug!("OtherActor initial message: {}", &self.message);
225 let child = TestActor { counter: 0 };
226 self.child = ctx.create_child("child", child).await.ok();
227 Ok(())
228 }
229
230 async fn post_stop(&mut self, _ctx: &mut ActorContext<TestEvent>) {
231 log::debug!("OtherActor stopped.");
232 }
233 }
234
235 #[derive(Clone, Debug)]
236 struct OtherMessage(String);
237
238 impl Message for OtherMessage {
239 type Response = String;
240 }
241
242 #[async_trait]
243 impl Handler<TestEvent, OtherMessage> for OtherActor {
244 async fn handle(&mut self, msg: OtherMessage, ctx: &mut ActorContext<TestEvent>) -> String {
245 log::debug!("OtherActor received message! {:?}", &msg);
246 log::debug!("original message is {}", &self.message);
247 self.message = msg.0;
248 log::debug!("message is now {}", &self.message);
249 log::debug!("{} on system {}", &ctx.path, ctx.system.name());
250 ctx.system
251 .publish(TestEvent("Received message!".to_string()));
252 self.message.clone()
253 }
254 }
255
256 #[tokio::test]
257 async fn actor_create() {
258 if std::env::var("RUST_LOG").is_err() {
259 std::env::set_var("RUST_LOG", "trace");
260 }
261 let _ = env_logger::builder().is_test(true).try_init();
262
263 let actor = TestActor { counter: 0 };
264 let msg = TestMessage(10);
265
266 let bus = EventBus::<TestEvent>::new(1000);
267 let system = ActorSystem::new("test", bus);
268 let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
269 let result = actor_ref.ask(msg).await.unwrap();
270
271 assert_eq!(result, 1);
272 }
273
274 #[derive(Debug, Error)]
275 #[error("custom error")]
276 struct CustomError(String);
277
278 fn create_other(message: String) -> OtherActor {
279 OtherActor {
280 message,
281 child: None,
282 }
283 }
284
285 #[tokio::test]
286 async fn actor_get_or_create() {
287 if std::env::var("RUST_LOG").is_err() {
288 std::env::set_var("RUST_LOG", "trace");
289 }
290 let _ = env_logger::builder().is_test(true).try_init();
291
292 let bus = EventBus::<TestEvent>::new(1000);
293 let system = ActorSystem::new("test", bus);
294
295 let initial_message = "hello world!".to_string();
296 let actor_fn = || create_other(initial_message);
297 let actor_ref = system
298 .get_or_create_actor("test-actor", actor_fn)
299 .await
300 .unwrap();
301
302 let msg = OtherMessage("Updated message.".to_string());
303 let result = actor_ref.ask(msg).await.unwrap();
304
305 assert_eq!(result, "Updated message.".to_string());
306 }
307
308 #[tokio::test]
309 async fn actor_stop() {
310 if std::env::var("RUST_LOG").is_err() {
311 std::env::set_var("RUST_LOG", "trace");
312 }
313 let _ = env_logger::builder().is_test(true).try_init();
314
315 let actor = TestActor { counter: 0 };
316 let msg = TestMessage(10);
317
318 let bus = EventBus::<TestEvent>::new(1000);
319 let system = ActorSystem::new("test", bus);
320
321 {
322 let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
323 let result = actor_ref.ask(msg).await.unwrap();
324
325 assert_eq!(result, 1);
326
327 system.stop_actor(actor_ref.path()).await;
328 assert!(system.get_actor::<TestActor>(actor_ref.path()).await.is_none());
329 }
330
331 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
332 }
333
334 #[tokio::test]
335 async fn actor_events() {
336 if std::env::var("RUST_LOG").is_err() {
337 std::env::set_var("RUST_LOG", "trace");
338 }
339 let _ = env_logger::builder().is_test(true).try_init();
340
341 let actor = TestActor { counter: 0 };
342 let msg = TestMessage(10);
343
344 let bus = EventBus::<TestEvent>::new(1000);
345 let system = ActorSystem::new("test", bus);
346 let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
347
348 let mut events = system.events();
349 tokio::spawn(async move {
350 loop {
351 match events.recv().await {
352 Ok(event) => println!("Received event! {event:?}"),
353 Err(err) => println!("Error receivng event!!! {err:?}"),
354 }
355 }
356 });
357
358 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
359
360 let result = actor_ref.ask(msg).await.unwrap();
361
362 assert_eq!(result, 1);
363 }
364
365 #[tokio::test]
366 async fn actor_get() {
367 if std::env::var("RUST_LOG").is_err() {
368 std::env::set_var("RUST_LOG", "trace");
369 }
370 let _ = env_logger::builder().is_test(true).try_init();
371
372 let actor = TestActor { counter: 0 };
373
374 let bus = EventBus::<TestEvent>::new(1000);
375 let system = ActorSystem::new("test", bus);
376 let original = system.create_actor("test-actor", actor).await.unwrap();
377
378 if let Some(actor_ref) = system.get_actor::<TestActor>(original.path()).await {
379 let msg = TestMessage(10);
380 let result = actor_ref.ask(msg).await.unwrap();
381 assert_eq!(result, 1);
382 } else {
383 panic!("It should have retrieved the actor!")
384 }
385
386 if let Some(actor_ref) = system.get_actor::<OtherActor>(original.path()).await {
387 let msg = OtherMessage("Hello world!".to_string());
388 let result = actor_ref.ask(msg).await.unwrap();
389 println!("Result is: {result}");
390 panic!("It should not go here!");
391 }
392
393 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
394 }
395
396 #[tokio::test]
397 async fn actor_parent_child() {
398 if std::env::var("RUST_LOG").is_err() {
399 std::env::set_var("RUST_LOG", "trace");
400 }
401 let _ = env_logger::builder().is_test(true).try_init();
402
403 let actor = OtherActor {
404 message: "Initial".to_string(),
405 child: None,
406 };
407
408 let bus = EventBus::<TestEvent>::new(1000);
409 let system = ActorSystem::new("test", bus);
410
411 {
412 let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
413 let msg = OtherMessage("new message!".to_string());
414 let result = actor_ref.ask(msg).await.unwrap();
415 assert_eq!(result, "new message!".to_string());
416
417 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
418 system.stop_actor(actor_ref.path()).await;
419 }
420
421 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
422 let actors = system.actors.read().await;
423 for actor in actors.keys() {
424 println!("Still active!: {actor:?}");
425 }
426 assert_eq!(actors.len(), 0);
427 }
428}