black_box/
executor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use async_channel::Receiver;

use crate::{message::Envelope, Actor, Address};

const DEFAULT_CAP: usize = 100;

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum State {
    #[default]
    Continue,
    Shutdown,
}

/// A cloneable context for the actor.
///
/// Currently this fuctions as a means by which to alter the state of the [`Executor`], it is
/// cloneable and can thus be sent to other threads, runtimes or even other actors to trigger a
/// shutdown.
#[derive(Clone, Debug)]
pub struct Context {
    sender: async_channel::Sender<State>,
}

impl Context {
    /// Triggers the end of the executor.
    ///
    /// Once triggered, no new messages will be processed and the actor will exit after resolving
    /// [`Actor::stopping`]
    pub fn shutdown(&self) {
        let _ = self.sender.force_send(State::Shutdown);
    }
}

/// The event loop for an actor
///
/// Handles the receipt of messages, and state management of the actor. The primary method exposed
/// by the executor is [`Executor::run`], which is used to execute the event loop.
///
/// # Example
///
/// A common pattern is to spawn the executor onto an async runtime like tokio.
///
/// ```ignore
/// let my_actor = MyActor;
/// let (executor, addr) = Executor::new(my_actor);
///
/// tokio::spawn(executor.run());
/// ```
pub struct Executor<A> {
    actor: A,
    context: Context,
    state: State,
    from_context: Receiver<State>,
    receiver: Receiver<Envelope<A>>,
}

impl<A> Executor<A> {
    pub fn new(actor: A) -> (Self, Address<A>) {
        let (sender, receiver) = async_channel::bounded(DEFAULT_CAP);
        let (state_tx, state_rx) = async_channel::unbounded();
        let me = Self {
            actor,
            receiver,
            context: Context { sender: state_tx },
            from_context: state_rx,
            state: Default::default(),
        };
        let address = Address::new(sender);

        (me, address)
    }
}

enum Race<A> {
    State(State),
    Envelope(Envelope<A>),
}

impl<A> Executor<A>
where
    A: Actor,
{
    pub async fn run(mut self) {
        self.actor.starting().await;

        // TODO: In the future we will likely add more states, this is fine for now
        #[allow(clippy::while_let_loop)]
        loop {
            match self.state {
                State::Continue => self.continuation().await,
                State::Shutdown => break,
            }
        }

        self.actor.stopping().await;
    }

    async fn continuation(&mut self) {
        let fut1 = async { self.from_context.recv().await.map(|val| Race::State(val)) };
        let fut2 = async { self.receiver.recv().await.map(|val| Race::Envelope(val)) };

        let result = futures_lite::future::race(fut1, fut2).await;

        match result {
            Ok(Race::State(state)) => self.state = state,
            Ok(Race::Envelope(env)) => env.resolve(&mut self.actor, &self.context).await,
            Err(_) => {
                self.state = State::Shutdown;
            }
        }
    }
}