tyra/actor/
executor.rs

1use crate::actor::actor_address::ActorAddress;
2use crate::actor::actor_config::ActorConfig;
3use crate::actor::actor_factory::ActorFactory;
4use crate::actor::actor_state::ActorState;
5use crate::actor::actor_wrapper::ActorWrapper;
6use crate::actor::context::ActorContext;
7use crate::actor::handler::Handler;
8use crate::actor::mailbox::Mailbox;
9use crate::message::actor_message::BaseActorMessage;
10use crate::message::envelope::{MessageEnvelope, MessageEnvelopeTrait};
11use crate::message::system_stop_message::SystemStopMessage;
12use crate::prelude::{Actor, ActorPanicSource, ActorResult};
13use crate::system::actor_error::ActorError;
14use crate::system::actor_system::ActorSystem;
15use log::debug;
16use std::error::Error;
17use std::panic::{catch_unwind, AssertUnwindSafe};
18use std::sync::atomic::Ordering;
19use std::time::{Duration, Instant};
20
21pub trait ExecutorTrait: Send + Sync {
22    fn handle(&mut self, is_system_stopping: bool) -> ActorState;
23    fn get_config(&self) -> &ActorConfig;
24    fn get_address(&self) -> ActorAddress;
25    fn is_sleeping(&self) -> bool;
26    fn is_stopped(&self) -> bool;
27    fn wakeup(&mut self);
28    fn on_actor_panic(&mut self, source: ActorPanicSource) -> ActorState;
29    fn restart_actor(&mut self) -> ActorState;
30    fn stop_actor(&mut self, immediately: bool) -> ActorState;
31    fn handle_actor_result(&mut self, result: Result<ActorResult, Box<dyn Error>>) -> ActorState;
32}
33
34pub struct Executor<A, P>
35where
36    A: Actor + 'static,
37    P: ActorFactory<A>,
38{
39    actor: A,
40    actor_props: P,
41    actor_config: ActorConfig,
42    mailbox: Mailbox<A>,
43    queue: flume::Receiver<MessageEnvelope<A>>,
44    actor_address: ActorAddress,
45    is_startup: bool,
46    system_triggered_stop: bool,
47    last_wakeup: Instant,
48    context: ActorContext<A>,
49}
50
51unsafe impl<A, P> Send for Executor<A, P>
52where
53    A: Actor + 'static,
54    P: ActorFactory<A>,
55{
56}
57unsafe impl<A, P> Sync for Executor<A, P>
58where
59    A: Actor + 'static,
60    P: ActorFactory<A>,
61{
62}
63
64impl<A, P> ExecutorTrait for Executor<A, P>
65where
66    A: Actor + 'static,
67    P: ActorFactory<A>,
68{
69    fn handle(&mut self, system_is_stopping: bool) -> ActorState {
70        if system_is_stopping && !self.system_triggered_stop {
71            let result = self.send(SystemStopMessage::new());
72            if result.is_ok() {
73                self.system_triggered_stop = true;
74            }
75        }
76        if self.is_startup {
77            self.is_startup = false;
78            let result = catch_unwind(AssertUnwindSafe(|| {
79                return self.actor.pre_start(&self.context);
80            }));
81            return if result.is_err() {
82                self.on_actor_panic(ActorPanicSource::PreStart)
83            } else {
84                self.handle_actor_result(result.unwrap())
85            };
86        }
87        let _len = self.queue.len();
88        let m = self.queue.try_recv();
89
90        if m.is_err() {
91            if self.is_stopped() {
92                let _ = catch_unwind(AssertUnwindSafe(|| self.actor.post_stop(&self.context)));
93                return ActorState::Stopped;
94            }
95            self.mailbox.is_sleeping.store(true, Ordering::Relaxed);
96            let duration = self.last_wakeup.elapsed();
97            if duration >= Duration::from_millis(5000) {
98                return ActorState::Inactive;
99            }
100            self.mailbox.is_sleeping.store(false, Ordering::Relaxed);
101            return ActorState::Running;
102        }
103
104        let mut msg = m.unwrap();
105        let result = catch_unwind(AssertUnwindSafe(|| {
106            let actor_result = msg.handle(&mut self.actor, &self.context);
107            return self.handle_actor_result(actor_result);
108        }));
109        if result.is_err() {
110            return self.on_actor_panic(ActorPanicSource::Message);
111        }
112        return result.unwrap();
113    }
114
115    fn stop_actor(&mut self, immediately: bool) -> ActorState {
116        self.mailbox.is_stopped.store(true, Ordering::Relaxed);
117        if immediately {
118            let _ = catch_unwind(AssertUnwindSafe(|| self.actor.post_stop(&self.context)));
119            return ActorState::Stopped;
120        }
121        return ActorState::Running;
122    }
123
124    fn restart_actor(&mut self) -> ActorState {
125        let result = catch_unwind(AssertUnwindSafe(|| {
126            self.actor.pre_restart(&self.context);
127            let actor = self.actor_props.new_actor(self.context.clone());
128            if actor.is_err() {
129                let err = actor.as_ref().err().unwrap();
130                debug!("{:?}", err);
131            }
132            return actor.unwrap();
133        }));
134        if result.is_err() {
135            return self.on_actor_panic(ActorPanicSource::Restart);
136        } else {
137            self.actor = result.unwrap();
138            self.is_startup = true;
139        }
140        return ActorState::Running;
141    }
142
143    fn on_actor_panic(&mut self, source: ActorPanicSource) -> ActorState {
144        let result = catch_unwind(AssertUnwindSafe(|| {
145            let actor_result = self.actor.on_panic(&self.context, source);
146            return self.handle_actor_result(actor_result);
147        }));
148        if result.is_err() {
149            let result = catch_unwind(AssertUnwindSafe(|| {
150                let actor_result = self
151                    .actor
152                    .on_panic(&self.context, ActorPanicSource::OnPanic);
153                return self.handle_actor_result(actor_result);
154            }));
155            if result.is_err() {
156                self.stop_actor(true);
157            }
158            return result.unwrap();
159        }
160        return result.unwrap();
161    }
162
163    fn get_config(&self) -> &ActorConfig {
164        &self.actor_config
165    }
166
167    fn get_address(&self) -> ActorAddress {
168        self.actor_address.clone()
169    }
170
171    fn is_sleeping(&self) -> bool {
172        self.mailbox.is_sleeping.load(Ordering::Relaxed)
173    }
174
175    fn is_stopped(&self) -> bool {
176        self.mailbox.is_stopped.load(Ordering::Relaxed)
177    }
178
179    fn wakeup(&mut self) {
180        self.mailbox.is_sleeping.store(false, Ordering::Relaxed);
181        self.last_wakeup = Instant::now();
182    }
183
184    fn handle_actor_result(&mut self, result: Result<ActorResult, Box<dyn Error>>) -> ActorState {
185        let res: ActorResult;
186        if result.is_err() {
187            let catch_result = catch_unwind(AssertUnwindSafe(|| {
188                let actor_result = self.actor.on_error(&self.context, result.unwrap_err());
189                return actor_result;
190            }));
191            if catch_result.is_err() {
192                return self.stop_actor(true);
193            }
194            res = catch_result.unwrap();
195        } else {
196            res = result.unwrap();
197        }
198        return match res {
199            ActorResult::Ok => ActorState::Running,
200            ActorResult::Restart => self.restart_actor(),
201            ActorResult::Stop => self.stop_actor(false),
202            ActorResult::Kill => self.stop_actor(true),
203            ActorResult::Sleep(duration) => ActorState::Sleeping(duration),
204        };
205    }
206}
207
208impl<A, P> Executor<A, P>
209where
210    A: Actor,
211    P: ActorFactory<A>,
212{
213    pub fn new(
214        mut actor_props: P,
215        actor_address: ActorAddress,
216        actor_config: ActorConfig,
217        mailbox: Mailbox<A>,
218        receiver: flume::Receiver<MessageEnvelope<A>>,
219        system: ActorSystem,
220        actor_ref: ActorWrapper<A>,
221    ) -> Result<Self, ActorError> {
222        let context = ActorContext {
223            actor_ref,
224            system: system.clone(),
225        };
226
227        let actor = catch_unwind(AssertUnwindSafe(|| {
228            let to_return = actor_props.new_actor(context.clone());
229            if to_return.is_err() {
230                let err = to_return.as_ref().err().unwrap();
231                debug!("{:?}", err);
232            }
233            return to_return.unwrap();
234        }));
235        if actor.is_err() {
236            return Err(ActorError::InitError);
237        }
238        return Ok(Self {
239            actor: actor.unwrap(),
240            actor_props,
241            actor_config,
242            mailbox,
243            queue: receiver,
244            actor_address,
245            is_startup: true,
246            system_triggered_stop: false,
247            last_wakeup: Instant::now(),
248            context,
249        });
250    }
251    pub fn send<M>(&self, msg: M) -> Result<(), flume::SendTimeoutError<MessageEnvelope<A>>>
252    where
253        A: Handler<M>,
254        M: BaseActorMessage + 'static,
255    {
256        return self
257            .mailbox
258            .msg_in
259            .send_timeout(MessageEnvelope::new(msg), Duration::from_millis(10));
260    }
261}