Skip to main content

black_box/
executor.rs

1use std::future::Future;
2
3use async_channel::{Receiver, Sender};
4
5use crate::{
6    Actor, Address, WeakAddress,
7    error::{ActorError, AddressError},
8    message::Envelope,
9};
10
11const DEFAULT_CAP: usize = 100;
12
13#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
14enum State {
15    #[default]
16    Continue,
17    Shutdown,
18    SendersClosed,
19}
20
21/// A cloneable context for the actor.
22///
23/// Currently this fuctions as a means by which to alter the state of the [`Executor`], it is
24/// cloneable and can thus be sent to other threads, runtimes or even other actors to trigger a
25/// shutdown.
26#[derive(Debug, Clone)]
27pub struct Context<A> {
28    sender: async_channel::Sender<State>,
29    address: WeakAddress<A>,
30}
31
32impl<A> Context<A> {
33    /// Triggers the end of the executor.
34    ///
35    /// Once triggered, no new messages will be processed and the actor will exit after resolving
36    /// [`Actor::stopping`]
37    pub fn shutdown(&self) {
38        let _ = self.sender.force_send(State::Shutdown);
39    }
40
41    /// Retrieve the address for the executor's actor
42    ///
43    /// This is useful when an actor wants to emit messages to itself.
44    pub fn address(&self) -> &WeakAddress<A> {
45        &self.address
46    }
47}
48
49/// The event loop for an actor
50///
51/// Handles the receipt of messages, and state management of the actor. The primary method exposed
52/// by the executor is [`Executor::run`], which is used to execute the event loop.
53///
54/// # Example
55///
56/// A common pattern is to spawn the executor onto an async runtime like tokio.
57///
58/// ```no_run
59/// # use black_box::*;
60/// # struct MyActor;
61/// # impl Actor for MyActor {}
62/// let my_actor = MyActor;
63/// let (mut executor, addr) = Executor::new(my_actor);
64///
65/// tokio::spawn(async move { executor.run().await });
66/// ```
67#[derive(Debug)]
68pub struct Executor<A> {
69    actor: A,
70    context: Context<A>,
71    state: State,
72    from_context: Receiver<State>,
73    receiver: Receiver<Envelope<A>>,
74}
75
76#[derive(Debug, Clone)]
77pub struct ShutdownHandle(Sender<State>);
78
79impl ShutdownHandle {
80    pub fn shutdown(&self) -> Result<(), ActorError> {
81        self.0
82            .force_send(State::Shutdown)
83            .map(|_| ())
84            .map_err(|_| ActorError::Shutdown)
85    }
86}
87
88impl<A> Executor<A> {
89    pub fn new(actor: A) -> (Self, Address<A>) {
90        Self::new_with_capacity(actor, DEFAULT_CAP)
91    }
92
93    pub fn new_with_capacity(actor: A, cap: usize) -> (Self, Address<A>) {
94        let (sender, receiver) = async_channel::bounded(cap);
95        let address = Address::new(sender);
96        let (state_tx, state_rx) = async_channel::unbounded();
97        let me = Self {
98            actor,
99            receiver,
100            context: Context {
101                sender: state_tx,
102                address: address.downgrade(),
103            },
104            from_context: state_rx,
105            state: Default::default(),
106        };
107
108        (me, address)
109    }
110
111    /// Construct a new shutdown handle to be able to remotely shutdown the actor
112    pub fn shutdown_handle(&self) -> ShutdownHandle {
113        let sender = self.context.sender.clone();
114        ShutdownHandle(sender)
115    }
116}
117
118enum Race<A> {
119    State(State),
120    Envelope(Envelope<A>),
121}
122
123impl<A> Executor<A>
124where
125    A: Actor,
126{
127    /// Runs the executor, returns `Ok(())` if the actor invoked shutdown manually, and `Err(_)`
128    /// if all addresses to the actor have been dropped
129    ///
130    /// This function should be likely be handed off to the spawn function of your async runtime
131    /// of choice.
132    pub async fn run(&mut self) -> Result<(), AddressError> {
133        self.reset_state();
134        self.actor.starting(&self.context).await;
135
136        // TODO: In the future we will likely add more states, this is fine for now
137        #[allow(clippy::while_let_loop)]
138        let result = loop {
139            match self.state {
140                State::Continue => self.continuation().await,
141                State::Shutdown => break Ok(()),
142                State::SendersClosed => break Err(AddressError::Closed),
143            }
144        };
145
146        self.actor.stopping(&self.context).await;
147
148        result
149    }
150
151    /// Runs the executor, halting execution early if the provided future polls ready.
152    ///
153    /// Returns `Ok(true)` if the provided future resolved, and `Ok(false)` if the the actor was
154    /// shut down
155    ///
156    /// This can be used in conjunction with [`Self::actor_mut`] to periodically alter the state of
157    /// the actor.
158    pub async fn run_against<F>(&mut self, fut: F) -> Result<bool, AddressError>
159    where
160        F: Future<Output = ()>,
161    {
162        self.reset_state();
163        let fut1 = async { self.run().await.map(|_| false) };
164        let fut2 = async {
165            fut.await;
166            Ok(true)
167        };
168
169        crate::futures::race_biased(fut1, fut2).await
170    }
171
172    /// Resets the actor's state
173    fn reset_state(&mut self) {
174        while self.from_context.try_recv().is_ok() {}
175        self.state = State::Continue;
176    }
177
178    pub fn actor_ref(&self) -> &A {
179        &self.actor
180    }
181
182    pub fn actor_mut(&mut self) -> &mut A {
183        &mut self.actor
184    }
185
186    async fn continuation(&mut self) {
187        let fut1 = async { self.from_context.recv().await.map(|val| Race::State(val)) };
188        let fut2 = async { self.receiver.recv().await.map(|val| Race::Envelope(val)) };
189
190        let result = crate::futures::race_biased(fut1, fut2).await;
191
192        match result {
193            Ok(Race::State(state)) => self.state = state,
194            Ok(Race::Envelope(env)) => env.resolve(&mut self.actor, &self.context).await,
195            Err(_) => {
196                self.state = State::SendersClosed;
197            }
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    pub struct Foo;
207
208    impl Actor for Foo {}
209
210    #[tokio::test]
211    async fn dropped_address_exits() {
212        let (mut actor, addr) = Executor::new(Foo);
213        let handle = tokio::spawn(async move { actor.run().await });
214        assert!(!handle.is_finished());
215        drop(addr);
216        assert!(handle.await.unwrap().is_err())
217    }
218}