1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::{
    any::{self, Any},
    convert::Infallible,
    time::Duration,
};

use async_trait::async_trait;
use simpl_actor::{actor, Actor, ActorError, ActorRef, ActorStopReason, ShouldRestart, Spawn};

pub struct CounterActor {
    count: i64,
}

#[async_trait]
impl Actor for CounterActor {
    type Error = Infallible;

    async fn on_start(&mut self, _id: u64) -> Result<(), Self::Error> {
        println!("starting actor {}", any::type_name::<Self>());
        Ok(())
    }

    async fn on_panic(&mut self, _err: Box<dyn Any + Send>) -> Result<ShouldRestart, Self::Error> {
        println!("restarting actor {}", any::type_name::<Self>());
        Ok(ShouldRestart::Yes)
    }

    async fn on_stop(self, reason: ActorStopReason) -> Result<(), Self::Error> {
        println!(
            "stopping actor {} because {reason}",
            any::type_name::<Self>()
        );
        Ok(())
    }
}

#[actor]
impl CounterActor {
    pub fn new() -> Self {
        CounterActor { count: 0 }
    }

    /// A message with no return value
    #[message]
    pub fn inc(&mut self, amount: i64) {
        self.count += amount;
    }

    /// A message returning a value
    #[message]
    pub fn count(&self) -> i64 {
        self.count
    }

    /// A message that uses async
    #[message]
    pub async fn sleep(&self) {
        tokio::time::sleep(Duration::from_millis(500)).await;
    }

    /// A message that sends a message to itself
    #[message]
    pub async fn inc_myself(&self) -> Result<(), ActorError<i64>> {
        self.actor_ref().inc_async(1).await
    }

    /// A message that panics
    #[message]
    pub fn force_panic(&self) {
        panic!("forced panic, don't worry this is correct and the actor will be restarted")
    }
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let counter = CounterActor::new();
    let actor = counter.spawn();

    // Increment
    assert_eq!(actor.inc(2).await, Ok(()));
    // Count should be 2
    assert_eq!(actor.count().await, Ok(2));

    // Trigger the actor to sleep for 500ms in the background
    assert_eq!(actor.sleep_async().await, Ok(()));
    // Fill the mailbox with messages
    for _ in 0..CounterActor::mailbox_size() {
        assert_eq!(actor.inc_async(1).await, Ok(()));
    }
    // Mailbox should be full, so if we try to send a message without backpressure using the try_ method,
    // we should get an error that the mailbox is full
    assert_eq!(actor.try_inc(1).await, Err(ActorError::MailboxFull(1)));
    // And if we try to increment, waiting for 200ms for there to be capacity, we should timeout since the actor was sleeping for 500ms
    assert_eq!(
        actor.inc_timeout(1, Duration::from_millis(200)).await,
        Err(ActorError::Timeout(1))
    );

    // If a panic occurs when running a message, we should receive an error that the actor was sopped
    assert_eq!(actor.force_panic().await, Err(ActorError::ActorStopped));
    // But we've implemented the `Actor::pre_restart` method to return `true`, so the actor will be restarted,
    // and new messages should be handled sucessfully, even with the state being preserved
    assert_eq!(actor.inc(1).await, Ok(()));
    assert_eq!(actor.count().await, Ok(67));

    // Stop the actor, dropping any pending messages
    actor.stop_immediately();
    // Await the actor to stop
    actor.wait_for_stop().await;
    // Any new messages should error since the actor is no longer running
    assert_eq!(actor.inc(1).await, Err(ActorError::ActorNotRunning(1)));
}