quickwit_actors/
actor_handle.rs1use std::any::Any;
21use std::borrow::Borrow;
22use std::fmt;
23
24use tokio::sync::{oneshot, watch};
25use tokio::time::timeout;
26use tracing::error;
27
28use crate::actor_state::ActorState;
29use crate::channel_with_priority::Priority;
30use crate::join_handle::JoinHandle;
31use crate::mailbox::Command;
32use crate::observation::ObservationType;
33use crate::{Actor, ActorContext, ActorExitStatus, Mailbox, Observation};
34
35pub struct ActorHandle<A: Actor> {
37 actor_context: ActorContext<A>,
38 last_state: watch::Receiver<A::ObservableState>,
39 join_handle: JoinHandle,
40}
41
42#[derive(Clone, Eq, PartialEq, Debug)]
44pub enum Health {
45 Healthy,
47 FailureOrUnhealthy,
49 Success,
51}
52
53impl<A: Actor> fmt::Debug for ActorHandle<A> {
54 fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
55 formatter
56 .debug_struct("ActorHandle")
57 .field("name", &self.actor_context.actor_instance_id())
58 .finish()
59 }
60}
61
62pub trait Supervisable {
63 fn name(&self) -> &str;
64 fn health(&self) -> Health;
65}
66
67impl<A: Actor> Supervisable for ActorHandle<A> {
68 fn name(&self) -> &str {
69 self.actor_context.actor_instance_id()
70 }
71
72 fn health(&self) -> Health {
73 let actor_state = self.state();
74 if actor_state == ActorState::Success {
75 Health::Success
76 } else if actor_state == ActorState::Failure {
77 error!(actor = self.name(), "actor-exit-without-success");
78 Health::FailureOrUnhealthy
79 } else if self
80 .actor_context
81 .progress()
82 .registered_activity_since_last_call()
83 {
84 Health::Healthy
85 } else {
86 error!(actor = self.name(), "actor-timeout");
87 Health::FailureOrUnhealthy
88 }
89 }
90}
91
92impl<A: Actor> ActorHandle<A> {
93 pub(crate) fn new(
94 last_state: watch::Receiver<A::ObservableState>,
95 join_handle: JoinHandle,
96 actor_context: ActorContext<A>,
97 ) -> Self {
98 ActorHandle {
99 actor_context,
100 last_state,
101 join_handle,
102 }
103 }
104
105 pub fn state(&self) -> ActorState {
106 self.actor_context.state()
107 }
108
109 pub async fn process_pending_and_observe(&self) -> Observation<A::ObservableState> {
119 let (tx, rx) = oneshot::channel();
120 if !self.actor_context.state().is_exit()
121 && self
122 .actor_context
123 .mailbox()
124 .send_with_priority(Command::Observe(tx).into(), Priority::Low)
125 .await
126 .is_err()
127 {
128 error!(
129 actor = self.actor_context.actor_instance_id(),
130 "Failed to send observe message"
131 );
132 }
133 self.wait_for_observable_state_callback(rx).await
136 }
137
138 pub async fn pause(&self) {
141 let _ = self
142 .actor_context
143 .mailbox()
144 .send_command(Command::Pause)
145 .await;
146 }
147
148 pub async fn resume(&self) {
150 let _ = self
151 .actor_context
152 .mailbox()
153 .send_command(Command::Resume)
154 .await;
155 }
156
157 pub async fn kill(self) -> (ActorExitStatus, A::ObservableState) {
164 self.actor_context.kill_switch().kill();
165 let _ = self
166 .actor_context
167 .mailbox()
168 .send_command(Command::Kill)
169 .await;
170 self.join().await
171 }
172
173 pub async fn quit(self) -> (ActorExitStatus, A::ObservableState) {
181 let _ = self
182 .actor_context
183 .mailbox()
184 .send_command(Command::Quit)
185 .await;
186 self.join().await
187 }
188
189 pub async fn join(self) -> (ActorExitStatus, A::ObservableState) {
191 let exit_status = self.join_handle.join().await;
192 let observation = self.last_state.borrow().clone();
193 (exit_status, observation)
194 }
195
196 pub async fn observe(&self) -> Observation<A::ObservableState> {
201 let (tx, rx) = oneshot::channel();
202 if self.actor_context.state().is_exit() {
203 let state = self.last_observation().borrow().clone();
204 return Observation {
205 obs_type: ObservationType::PostMortem,
206 state,
207 };
208 }
209 if self
210 .actor_context
211 .mailbox()
212 .send_command(Command::Observe(tx))
213 .await
214 .is_err()
215 {
216 error!(
217 actor_id = self.actor_context.actor_instance_id(),
218 "Failed to send observe message"
219 );
220 }
221 self.wait_for_observable_state_callback(rx).await
222 }
223
224 pub fn last_observation(&self) -> A::ObservableState {
225 self.last_state.borrow().clone()
226 }
227
228 async fn wait_for_observable_state_callback(
229 &self,
230 rx: oneshot::Receiver<Box<dyn Any + Send>>,
231 ) -> Observation<A::ObservableState> {
232 let observable_state_or_timeout = timeout(crate::HEARTBEAT, rx).await;
233 match observable_state_or_timeout {
234 Ok(Ok(observable_state_any)) => {
235 let state: A::ObservableState = *observable_state_any
236 .downcast()
237 .expect("The type is guaranteed logically by the ActorHandle.");
238 let obs_type = ObservationType::Alive;
239 Observation { obs_type, state }
240 }
241 Ok(Err(_)) => {
242 let state = self.last_observation();
243 let obs_type = ObservationType::PostMortem;
244 Observation { obs_type, state }
245 }
246 Err(_) => {
247 let state = self.last_observation();
248 let obs_type = if self.actor_context.state().is_exit() {
249 ObservationType::PostMortem
250 } else {
251 ObservationType::Timeout
252 };
253 Observation { obs_type, state }
254 }
255 }
256 }
257
258 pub fn mailbox(&self) -> &Mailbox<A> {
259 self.actor_context.mailbox()
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use async_trait::async_trait;
266
267 use super::*;
268 use crate::{ActorRunner, Handler, Universe};
269
270 #[derive(Default)]
271 struct PanickingActor {
272 count: usize,
273 }
274
275 impl Actor for PanickingActor {
276 type ObservableState = usize;
277 fn observable_state(&self) -> usize {
278 self.count
279 }
280 }
281
282 #[derive(Debug)]
283 struct Panic;
284
285 #[async_trait]
286 impl Handler<Panic> for PanickingActor {
287 type Reply = ();
288 async fn handle(
289 &mut self,
290 _message: Panic,
291 _ctx: &ActorContext<Self>,
292 ) -> Result<(), ActorExitStatus> {
293 self.count += 1;
294 panic!("Oops");
295 }
296 }
297
298 #[derive(Default)]
299 struct ExitActor {
300 count: usize,
301 }
302
303 impl Actor for ExitActor {
304 type ObservableState = usize;
305 fn observable_state(&self) -> usize {
306 self.count
307 }
308 }
309
310 #[derive(Debug)]
311 struct Exit;
312
313 #[async_trait]
314 impl Handler<Exit> for ExitActor {
315 type Reply = ();
316
317 async fn handle(
318 &mut self,
319 _msg: Exit,
320 _ctx: &ActorContext<Self>,
321 ) -> Result<(), ActorExitStatus> {
322 self.count += 1;
323 Err(ActorExitStatus::DownstreamClosed)
324 }
325 }
326
327 #[track_caller]
328 async fn test_panic_in_actor_aux(runner: ActorRunner) -> anyhow::Result<()> {
329 let universe = Universe::new();
330 let (mailbox, handle) = universe
331 .spawn_actor(PanickingActor::default())
332 .spawn_with_forced_runner(runner);
333 mailbox.send_message(Panic).await?;
334 let (exit_status, count) = handle.join().await;
335 assert!(matches!(exit_status, ActorExitStatus::Panicked));
336 assert!(matches!(count, 1)); Ok(())
338 }
339
340 #[tokio::test]
341 async fn test_panic_in_actor_dedicated_thread() -> anyhow::Result<()> {
342 test_panic_in_actor_aux(ActorRunner::DedicatedThread).await?;
343 Ok(())
344 }
345
346 #[tokio::test]
347 async fn test_panic_in_actor_tokio_task() -> anyhow::Result<()> {
348 test_panic_in_actor_aux(ActorRunner::GlobalRuntime).await?;
349 Ok(())
350 }
351
352 #[track_caller]
353 async fn test_exit_aux(runner: ActorRunner) -> anyhow::Result<()> {
354 let universe = Universe::new();
355 let (mailbox, handle) = universe
356 .spawn_actor(ExitActor::default())
357 .spawn_with_forced_runner(runner);
358 mailbox.send_message(Exit).await?;
359 let (exit_status, count) = handle.join().await;
360 assert!(matches!(exit_status, ActorExitStatus::DownstreamClosed));
361 assert!(matches!(count, 1)); Ok(())
363 }
364
365 #[tokio::test]
366 async fn test_exit_dedicated_thread() -> anyhow::Result<()> {
367 test_exit_aux(ActorRunner::DedicatedThread).await
368 }
369
370 #[tokio::test]
371 async fn test_exit_tokio_task() -> anyhow::Result<()> {
372 test_exit_aux(ActorRunner::GlobalRuntime).await
373 }
374}