black_box/
executor.rs

1use async_channel::Receiver;
2
3use crate::{message::Envelope, Actor, Address};
4
5const DEFAULT_CAP: usize = 100;
6
7#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
8enum State {
9    #[default]
10    Continue,
11    Shutdown,
12}
13
14/// A cloneable context for the actor.
15///
16/// Currently this fuctions as a means by which to alter the state of the [`Executor`], it is
17/// cloneable and can thus be sent to other threads, runtimes or even other actors to trigger a
18/// shutdown.
19#[derive(Clone)]
20pub struct Context<A> {
21    sender: async_channel::Sender<State>,
22    address: Address<A>,
23}
24
25impl<A> Context<A> {
26    /// Triggers the end of the executor.
27    ///
28    /// Once triggered, no new messages will be processed and the actor will exit after resolving
29    /// [`Actor::stopping`]
30    pub fn shutdown(&self) {
31        let _ = self.sender.force_send(State::Shutdown);
32    }
33
34    /// Retrieve the address for the executor's actor
35    ///
36    /// This is useful when an actor wants to emit messages to itself.
37    pub fn address(&self) -> &Address<A> {
38        &self.address
39    }
40}
41
42/// The event loop for an actor
43///
44/// Handles the receipt of messages, and state management of the actor. The primary method exposed
45/// by the executor is [`Executor::run`], which is used to execute the event loop.
46///
47/// # Example
48///
49/// A common pattern is to spawn the executor onto an async runtime like tokio.
50///
51/// ```no_run
52/// # use black_box::*;
53/// # struct MyActor;
54/// # impl Actor for MyActor {}
55/// let my_actor = MyActor;
56/// let (executor, addr) = Executor::new(my_actor);
57///
58/// tokio::spawn(executor.run());
59/// ```
60pub struct Executor<A> {
61    actor: A,
62    context: Context<A>,
63    state: State,
64    from_context: Receiver<State>,
65    receiver: Receiver<Envelope<A>>,
66}
67
68impl<A> Executor<A> {
69    pub fn new(actor: A) -> (Self, Address<A>) {
70        let (sender, receiver) = async_channel::bounded(DEFAULT_CAP);
71        let address = Address::new(sender);
72        let (state_tx, state_rx) = async_channel::unbounded();
73        let me = Self {
74            actor,
75            receiver,
76            context: Context {
77                sender: state_tx,
78                address: address.clone(),
79            },
80            from_context: state_rx,
81            state: Default::default(),
82        };
83
84        (me, address)
85    }
86}
87
88enum Race<A> {
89    State(State),
90    Envelope(Envelope<A>),
91}
92
93impl<A> Executor<A>
94where
95    A: Actor,
96{
97    pub async fn run(mut self) {
98        self.actor.starting(&self.context).await;
99
100        // TODO: In the future we will likely add more states, this is fine for now
101        #[allow(clippy::while_let_loop)]
102        loop {
103            match self.state {
104                State::Continue => self.continuation().await,
105                State::Shutdown => break,
106            }
107        }
108
109        self.actor.stopping(&self.context).await;
110    }
111
112    async fn continuation(&mut self) {
113        let fut1 = async { self.from_context.recv().await.map(|val| Race::State(val)) };
114        let fut2 = async { self.receiver.recv().await.map(|val| Race::Envelope(val)) };
115
116        let result = crate::futures::race_biased(fut1, fut2).await;
117
118        match result {
119            Ok(Race::State(state)) => self.state = state,
120            Ok(Race::Envelope(env)) => env.resolve(&mut self.actor, &self.context).await,
121            Err(_) => {
122                self.state = State::Shutdown;
123            }
124        }
125    }
126}