1use std::future::Future;
2
3use async_channel::{Receiver, Sender};
4
5use crate::{
6 Actor, Address, WeakAddress,
7 error::{ActorError, AddressError},
8 message::Envelope,
9};
10
11const DEFAULT_CAP: usize = 100;
12
13#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
14enum State {
15 #[default]
16 Continue,
17 Shutdown,
18 SendersClosed,
19}
20
21#[derive(Debug, Clone)]
27pub struct Context<A> {
28 sender: async_channel::Sender<State>,
29 address: WeakAddress<A>,
30}
31
32impl<A> Context<A> {
33 pub fn shutdown(&self) {
38 let _ = self.sender.force_send(State::Shutdown);
39 }
40
41 pub fn address(&self) -> &WeakAddress<A> {
45 &self.address
46 }
47}
48
49#[derive(Debug)]
68pub struct Executor<A> {
69 actor: A,
70 context: Context<A>,
71 state: State,
72 from_context: Receiver<State>,
73 receiver: Receiver<Envelope<A>>,
74}
75
76#[derive(Debug, Clone)]
77pub struct ShutdownHandle(Sender<State>);
78
79impl ShutdownHandle {
80 pub fn shutdown(&self) -> Result<(), ActorError> {
81 self.0
82 .force_send(State::Shutdown)
83 .map(|_| ())
84 .map_err(|_| ActorError::Shutdown)
85 }
86}
87
88impl<A> Executor<A> {
89 pub fn new(actor: A) -> (Self, Address<A>) {
90 Self::new_with_capacity(actor, DEFAULT_CAP)
91 }
92
93 pub fn new_with_capacity(actor: A, cap: usize) -> (Self, Address<A>) {
94 let (sender, receiver) = async_channel::bounded(cap);
95 let address = Address::new(sender);
96 let (state_tx, state_rx) = async_channel::unbounded();
97 let me = Self {
98 actor,
99 receiver,
100 context: Context {
101 sender: state_tx,
102 address: address.downgrade(),
103 },
104 from_context: state_rx,
105 state: Default::default(),
106 };
107
108 (me, address)
109 }
110
111 pub fn shutdown_handle(&self) -> ShutdownHandle {
113 let sender = self.context.sender.clone();
114 ShutdownHandle(sender)
115 }
116}
117
118enum Race<A> {
119 State(State),
120 Envelope(Envelope<A>),
121}
122
123impl<A> Executor<A>
124where
125 A: Actor,
126{
127 pub async fn run(&mut self) -> Result<(), AddressError> {
133 self.reset_state();
134 self.actor.starting(&self.context).await;
135
136 #[allow(clippy::while_let_loop)]
138 let result = loop {
139 match self.state {
140 State::Continue => self.continuation().await,
141 State::Shutdown => break Ok(()),
142 State::SendersClosed => break Err(AddressError::Closed),
143 }
144 };
145
146 self.actor.stopping(&self.context).await;
147
148 result
149 }
150
151 pub async fn run_against<F>(&mut self, fut: F) -> Result<bool, AddressError>
159 where
160 F: Future<Output = ()>,
161 {
162 self.reset_state();
163 let fut1 = async { self.run().await.map(|_| false) };
164 let fut2 = async {
165 fut.await;
166 Ok(true)
167 };
168
169 crate::futures::race_biased(fut1, fut2).await
170 }
171
172 fn reset_state(&mut self) {
174 while self.from_context.try_recv().is_ok() {}
175 self.state = State::Continue;
176 }
177
178 pub fn actor_ref(&self) -> &A {
179 &self.actor
180 }
181
182 pub fn actor_mut(&mut self) -> &mut A {
183 &mut self.actor
184 }
185
186 async fn continuation(&mut self) {
187 let fut1 = async { self.from_context.recv().await.map(|val| Race::State(val)) };
188 let fut2 = async { self.receiver.recv().await.map(|val| Race::Envelope(val)) };
189
190 let result = crate::futures::race_biased(fut1, fut2).await;
191
192 match result {
193 Ok(Race::State(state)) => self.state = state,
194 Ok(Race::Envelope(env)) => env.resolve(&mut self.actor, &self.context).await,
195 Err(_) => {
196 self.state = State::SendersClosed;
197 }
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 pub struct Foo;
207
208 impl Actor for Foo {}
209
210 #[tokio::test]
211 async fn dropped_address_exits() {
212 let (mut actor, addr) = Executor::new(Foo);
213 let handle = tokio::spawn(async move { actor.run().await });
214 assert!(!handle.is_finished());
215 drop(addr);
216 assert!(handle.await.unwrap().is_err())
217 }
218}