1use crate::actor::actor_address::ActorAddress;
2use crate::actor::actor_config::ActorConfig;
3use crate::actor::actor_factory::ActorFactory;
4use crate::actor::actor_state::ActorState;
5use crate::actor::actor_wrapper::ActorWrapper;
6use crate::actor::context::ActorContext;
7use crate::actor::handler::Handler;
8use crate::actor::mailbox::Mailbox;
9use crate::message::actor_message::BaseActorMessage;
10use crate::message::envelope::{MessageEnvelope, MessageEnvelopeTrait};
11use crate::message::system_stop_message::SystemStopMessage;
12use crate::prelude::{Actor, ActorPanicSource, ActorResult};
13use crate::system::actor_error::ActorError;
14use crate::system::actor_system::ActorSystem;
15use log::debug;
16use std::error::Error;
17use std::panic::{catch_unwind, AssertUnwindSafe};
18use std::sync::atomic::Ordering;
19use std::time::{Duration, Instant};
20
21pub trait ExecutorTrait: Send + Sync {
22 fn handle(&mut self, is_system_stopping: bool) -> ActorState;
23 fn get_config(&self) -> &ActorConfig;
24 fn get_address(&self) -> ActorAddress;
25 fn is_sleeping(&self) -> bool;
26 fn is_stopped(&self) -> bool;
27 fn wakeup(&mut self);
28 fn on_actor_panic(&mut self, source: ActorPanicSource) -> ActorState;
29 fn restart_actor(&mut self) -> ActorState;
30 fn stop_actor(&mut self, immediately: bool) -> ActorState;
31 fn handle_actor_result(&mut self, result: Result<ActorResult, Box<dyn Error>>) -> ActorState;
32}
33
34pub struct Executor<A, P>
35where
36 A: Actor + 'static,
37 P: ActorFactory<A>,
38{
39 actor: A,
40 actor_props: P,
41 actor_config: ActorConfig,
42 mailbox: Mailbox<A>,
43 queue: flume::Receiver<MessageEnvelope<A>>,
44 actor_address: ActorAddress,
45 is_startup: bool,
46 system_triggered_stop: bool,
47 last_wakeup: Instant,
48 context: ActorContext<A>,
49}
50
51unsafe impl<A, P> Send for Executor<A, P>
52where
53 A: Actor + 'static,
54 P: ActorFactory<A>,
55{
56}
57unsafe impl<A, P> Sync for Executor<A, P>
58where
59 A: Actor + 'static,
60 P: ActorFactory<A>,
61{
62}
63
64impl<A, P> ExecutorTrait for Executor<A, P>
65where
66 A: Actor + 'static,
67 P: ActorFactory<A>,
68{
69 fn handle(&mut self, system_is_stopping: bool) -> ActorState {
70 if system_is_stopping && !self.system_triggered_stop {
71 let result = self.send(SystemStopMessage::new());
72 if result.is_ok() {
73 self.system_triggered_stop = true;
74 }
75 }
76 if self.is_startup {
77 self.is_startup = false;
78 let result = catch_unwind(AssertUnwindSafe(|| {
79 return self.actor.pre_start(&self.context);
80 }));
81 return if result.is_err() {
82 self.on_actor_panic(ActorPanicSource::PreStart)
83 } else {
84 self.handle_actor_result(result.unwrap())
85 };
86 }
87 let _len = self.queue.len();
88 let m = self.queue.try_recv();
89
90 if m.is_err() {
91 if self.is_stopped() {
92 let _ = catch_unwind(AssertUnwindSafe(|| self.actor.post_stop(&self.context)));
93 return ActorState::Stopped;
94 }
95 self.mailbox.is_sleeping.store(true, Ordering::Relaxed);
96 let duration = self.last_wakeup.elapsed();
97 if duration >= Duration::from_millis(5000) {
98 return ActorState::Inactive;
99 }
100 self.mailbox.is_sleeping.store(false, Ordering::Relaxed);
101 return ActorState::Running;
102 }
103
104 let mut msg = m.unwrap();
105 let result = catch_unwind(AssertUnwindSafe(|| {
106 let actor_result = msg.handle(&mut self.actor, &self.context);
107 return self.handle_actor_result(actor_result);
108 }));
109 if result.is_err() {
110 return self.on_actor_panic(ActorPanicSource::Message);
111 }
112 return result.unwrap();
113 }
114
115 fn stop_actor(&mut self, immediately: bool) -> ActorState {
116 self.mailbox.is_stopped.store(true, Ordering::Relaxed);
117 if immediately {
118 let _ = catch_unwind(AssertUnwindSafe(|| self.actor.post_stop(&self.context)));
119 return ActorState::Stopped;
120 }
121 return ActorState::Running;
122 }
123
124 fn restart_actor(&mut self) -> ActorState {
125 let result = catch_unwind(AssertUnwindSafe(|| {
126 self.actor.pre_restart(&self.context);
127 let actor = self.actor_props.new_actor(self.context.clone());
128 if actor.is_err() {
129 let err = actor.as_ref().err().unwrap();
130 debug!("{:?}", err);
131 }
132 return actor.unwrap();
133 }));
134 if result.is_err() {
135 return self.on_actor_panic(ActorPanicSource::Restart);
136 } else {
137 self.actor = result.unwrap();
138 self.is_startup = true;
139 }
140 return ActorState::Running;
141 }
142
143 fn on_actor_panic(&mut self, source: ActorPanicSource) -> ActorState {
144 let result = catch_unwind(AssertUnwindSafe(|| {
145 let actor_result = self.actor.on_panic(&self.context, source);
146 return self.handle_actor_result(actor_result);
147 }));
148 if result.is_err() {
149 let result = catch_unwind(AssertUnwindSafe(|| {
150 let actor_result = self
151 .actor
152 .on_panic(&self.context, ActorPanicSource::OnPanic);
153 return self.handle_actor_result(actor_result);
154 }));
155 if result.is_err() {
156 self.stop_actor(true);
157 }
158 return result.unwrap();
159 }
160 return result.unwrap();
161 }
162
163 fn get_config(&self) -> &ActorConfig {
164 &self.actor_config
165 }
166
167 fn get_address(&self) -> ActorAddress {
168 self.actor_address.clone()
169 }
170
171 fn is_sleeping(&self) -> bool {
172 self.mailbox.is_sleeping.load(Ordering::Relaxed)
173 }
174
175 fn is_stopped(&self) -> bool {
176 self.mailbox.is_stopped.load(Ordering::Relaxed)
177 }
178
179 fn wakeup(&mut self) {
180 self.mailbox.is_sleeping.store(false, Ordering::Relaxed);
181 self.last_wakeup = Instant::now();
182 }
183
184 fn handle_actor_result(&mut self, result: Result<ActorResult, Box<dyn Error>>) -> ActorState {
185 let res: ActorResult;
186 if result.is_err() {
187 let catch_result = catch_unwind(AssertUnwindSafe(|| {
188 let actor_result = self.actor.on_error(&self.context, result.unwrap_err());
189 return actor_result;
190 }));
191 if catch_result.is_err() {
192 return self.stop_actor(true);
193 }
194 res = catch_result.unwrap();
195 } else {
196 res = result.unwrap();
197 }
198 return match res {
199 ActorResult::Ok => ActorState::Running,
200 ActorResult::Restart => self.restart_actor(),
201 ActorResult::Stop => self.stop_actor(false),
202 ActorResult::Kill => self.stop_actor(true),
203 ActorResult::Sleep(duration) => ActorState::Sleeping(duration),
204 };
205 }
206}
207
208impl<A, P> Executor<A, P>
209where
210 A: Actor,
211 P: ActorFactory<A>,
212{
213 pub fn new(
214 mut actor_props: P,
215 actor_address: ActorAddress,
216 actor_config: ActorConfig,
217 mailbox: Mailbox<A>,
218 receiver: flume::Receiver<MessageEnvelope<A>>,
219 system: ActorSystem,
220 actor_ref: ActorWrapper<A>,
221 ) -> Result<Self, ActorError> {
222 let context = ActorContext {
223 actor_ref,
224 system: system.clone(),
225 };
226
227 let actor = catch_unwind(AssertUnwindSafe(|| {
228 let to_return = actor_props.new_actor(context.clone());
229 if to_return.is_err() {
230 let err = to_return.as_ref().err().unwrap();
231 debug!("{:?}", err);
232 }
233 return to_return.unwrap();
234 }));
235 if actor.is_err() {
236 return Err(ActorError::InitError);
237 }
238 return Ok(Self {
239 actor: actor.unwrap(),
240 actor_props,
241 actor_config,
242 mailbox,
243 queue: receiver,
244 actor_address,
245 is_startup: true,
246 system_triggered_stop: false,
247 last_wakeup: Instant::now(),
248 context,
249 });
250 }
251 pub fn send<M>(&self, msg: M) -> Result<(), flume::SendTimeoutError<MessageEnvelope<A>>>
252 where
253 A: Handler<M>,
254 M: BaseActorMessage + 'static,
255 {
256 return self
257 .mailbox
258 .msg_in
259 .send_timeout(MessageEnvelope::new(msg), Duration::from_millis(10));
260 }
261}