witty_actors/
actor_handle.rs1use std::fmt;
21
22use serde::Serialize;
23use tokio::sync::{oneshot, watch};
24use tracing::error;
25
26use crate::actor_state::ActorState;
27use crate::command::Observe;
28use crate::mailbox::Priority;
29use crate::observation::ObservationType;
30use crate::registry::ActorJoinHandle;
31use crate::{Actor, ActorContext, ActorExitStatus, Command, Mailbox, Observation};
32
33pub struct ActorHandle<A: Actor> {
35 actor_context: ActorContext<A>,
36 last_state: watch::Receiver<A::ObservableState>,
37 join_handle: ActorJoinHandle,
38}
39
40#[derive(Clone, Eq, PartialEq, Debug, Hash, Serialize)]
42pub enum Health {
43 Healthy,
45 FailureOrUnhealthy,
47 Success,
49}
50
51#[derive(Clone, Debug)]
53pub struct Healthz;
54
55impl<A: Actor> fmt::Debug for ActorHandle<A> {
56 fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
57 formatter
58 .debug_struct("ActorHandle")
59 .field("name", &self.actor_context.actor_instance_id())
60 .finish()
61 }
62}
63
64pub trait Supervisable {
65 fn name(&self) -> &str;
66 fn harvest_health(&self) -> Health;
67}
68
69impl<A: Actor> Supervisable for ActorHandle<A> {
70 fn name(&self) -> &str {
71 self.actor_context.actor_instance_id()
72 }
73
74 fn harvest_health(&self) -> Health {
79 let actor_state = self.state();
80 if actor_state == ActorState::Success {
81 Health::Success
82 } else if actor_state == ActorState::Failure {
83 error!(actor = self.name(), "actor-exit-without-success");
84 Health::FailureOrUnhealthy
85 } else if self
86 .actor_context
87 .progress()
88 .registered_activity_since_last_call()
89 {
90 Health::Healthy
91 } else {
92 error!(actor = self.name(), "actor-timeout");
93 Health::FailureOrUnhealthy
94 }
95 }
96}
97
98impl<A: Actor> ActorHandle<A> {
99 pub(crate) fn new(
100 last_state: watch::Receiver<A::ObservableState>,
101 join_handle: ActorJoinHandle,
102 actor_context: ActorContext<A>,
103 ) -> Self {
104 ActorHandle {
105 actor_context,
106 last_state,
107 join_handle,
108 }
109 }
110
111 pub fn state(&self) -> ActorState {
112 self.actor_context.state()
113 }
114
115 pub async fn process_pending_and_observe(&self) -> Observation<A::ObservableState> {
125 self.observe_with_priority(Priority::Low).await
126 }
127
128 pub async fn observe(&self) -> Observation<A::ObservableState> {
133 self.observe_with_priority(Priority::High).await
134 }
135
136 async fn observe_with_priority(&self, priority: Priority) -> Observation<A::ObservableState> {
137 if !self.actor_context.state().is_exit() {
138 if let Ok(oneshot_rx) = self
139 .actor_context
140 .mailbox()
141 .send_message_with_priority(Observe, priority)
142 .await
143 {
144 return self.wait_for_observable_state_callback(oneshot_rx).await;
148 } else {
149 error!(
150 actor_id = self.actor_context.actor_instance_id(),
151 "Failed to send observe message"
152 );
153 }
154 }
155 let state = self.last_observation();
156 Observation {
157 obs_type: ObservationType::PostMortem,
158 state,
159 }
160 }
161
162 pub fn pause(&self) {
165 let _ = self
166 .actor_context
167 .mailbox()
168 .send_message_with_high_priority(Command::Pause);
169 }
170
171 pub fn resume(&self) {
173 let _ = self
174 .actor_context
175 .mailbox()
176 .send_message_with_high_priority(Command::Resume);
177 }
178
179 pub async fn kill(self) -> (ActorExitStatus, A::ObservableState) {
186 self.actor_context.kill_switch().kill();
187 let _ = self
188 .actor_context
189 .mailbox()
190 .send_message_with_high_priority(Command::Nudge);
191 self.join().await
192 }
193
194 pub async fn quit(self) -> (ActorExitStatus, A::ObservableState) {
202 let _ = self
203 .actor_context
204 .mailbox()
205 .send_message_with_high_priority(Command::Quit);
206 self.join().await
207 }
208
209 pub async fn join(self) -> (ActorExitStatus, A::ObservableState) {
211 let exit_status = self.join_handle.join().await;
212 let observation = self.last_state.borrow().clone();
213 (exit_status, observation)
214 }
215
216 pub fn last_observation(&self) -> A::ObservableState {
217 self.last_state.borrow().clone()
218 }
219
220 async fn wait_for_observable_state_callback(
221 &self,
222 rx: oneshot::Receiver<A::ObservableState>,
223 ) -> Observation<A::ObservableState> {
224 let scheduler_client = &self.actor_context.spawn_ctx().scheduler_client;
225 let observable_state_or_timeout =
226 scheduler_client.timeout(crate::OBSERVE_TIMEOUT, rx).await;
227 match observable_state_or_timeout {
228 Ok(Ok(state)) => {
229 let obs_type = ObservationType::Alive;
230 Observation { obs_type, state }
231 }
232 Ok(Err(_)) => {
233 let state = self.last_observation();
234 let obs_type = ObservationType::PostMortem;
235 Observation { obs_type, state }
236 }
237 Err(_) => {
238 let state = self.last_observation();
239 let obs_type = if self.actor_context.state().is_exit() {
240 ObservationType::PostMortem
241 } else {
242 ObservationType::Timeout
243 };
244 Observation { obs_type, state }
245 }
246 }
247 }
248
249 pub fn mailbox(&self) -> &Mailbox<A> {
250 self.actor_context.mailbox()
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use async_trait::async_trait;
257
258 use super::*;
259 use crate::{Handler, Universe};
260
261 #[derive(Default)]
262 struct PanickingActor {
263 count: usize,
264 }
265
266 impl Actor for PanickingActor {
267 type ObservableState = usize;
268 fn observable_state(&self) -> usize {
269 self.count
270 }
271 }
272
273 #[derive(Debug)]
274 struct Panic;
275
276 #[async_trait]
277 impl Handler<Panic> for PanickingActor {
278 type Reply = ();
279 async fn handle(
280 &mut self,
281 _message: Panic,
282 _ctx: &ActorContext<Self>,
283 ) -> Result<(), ActorExitStatus> {
284 self.count += 1;
285 panic!("Oops");
286 }
287 }
288
289 #[derive(Default)]
290 struct ExitActor {
291 count: usize,
292 }
293
294 impl Actor for ExitActor {
295 type ObservableState = usize;
296 fn observable_state(&self) -> usize {
297 self.count
298 }
299 }
300
301 #[derive(Debug)]
302 struct Exit;
303
304 #[async_trait]
305 impl Handler<Exit> for ExitActor {
306 type Reply = ();
307
308 async fn handle(
309 &mut self,
310 _msg: Exit,
311 _ctx: &ActorContext<Self>,
312 ) -> Result<(), ActorExitStatus> {
313 self.count += 1;
314 Err(ActorExitStatus::DownstreamClosed)
315 }
316 }
317
318 #[tokio::test]
319 async fn test_panic_in_actor() -> anyhow::Result<()> {
320 let universe = Universe::with_accelerated_time();
321 let (mailbox, handle) = universe.spawn_builder().spawn(PanickingActor::default());
322 mailbox.send_message(Panic).await?;
323 let (exit_status, count) = handle.join().await;
324 assert!(matches!(exit_status, ActorExitStatus::Panicked));
325 assert!(matches!(count, 1)); Ok(())
327 }
328
329 #[tokio::test]
330 async fn test_exit() -> anyhow::Result<()> {
331 let universe = Universe::with_accelerated_time();
332 let (mailbox, handle) = universe.spawn_builder().spawn(ExitActor::default());
333 mailbox.send_message(Exit).await?;
334 let (exit_status, count) = handle.join().await;
335 assert!(matches!(exit_status, ActorExitStatus::DownstreamClosed));
336 assert!(matches!(count, 1)); Ok(())
338 }
339}