quickwit_actors/
actor_handle.rs

1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::any::Any;
21use std::borrow::Borrow;
22use std::fmt;
23
24use tokio::sync::{oneshot, watch};
25use tokio::time::timeout;
26use tracing::error;
27
28use crate::actor_state::ActorState;
29use crate::channel_with_priority::Priority;
30use crate::join_handle::JoinHandle;
31use crate::mailbox::Command;
32use crate::observation::ObservationType;
33use crate::{Actor, ActorContext, ActorExitStatus, Mailbox, Observation};
34
35/// An Actor Handle serves as an address to communicate with an actor.
36pub struct ActorHandle<A: Actor> {
37    actor_context: ActorContext<A>,
38    last_state: watch::Receiver<A::ObservableState>,
39    join_handle: JoinHandle,
40}
41
42/// Describes the health of a given actor.
43#[derive(Clone, Eq, PartialEq, Debug)]
44pub enum Health {
45    /// The actor is running and behaving as expected.
46    Healthy,
47    /// No progress was registered, or the process terminated with an error
48    FailureOrUnhealthy,
49    /// The actor terminated successfully.
50    Success,
51}
52
53impl<A: Actor> fmt::Debug for ActorHandle<A> {
54    fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
55        formatter
56            .debug_struct("ActorHandle")
57            .field("name", &self.actor_context.actor_instance_id())
58            .finish()
59    }
60}
61
62pub trait Supervisable {
63    fn name(&self) -> &str;
64    fn health(&self) -> Health;
65}
66
67impl<A: Actor> Supervisable for ActorHandle<A> {
68    fn name(&self) -> &str {
69        self.actor_context.actor_instance_id()
70    }
71
72    fn health(&self) -> Health {
73        let actor_state = self.state();
74        if actor_state == ActorState::Success {
75            Health::Success
76        } else if actor_state == ActorState::Failure {
77            error!(actor = self.name(), "actor-exit-without-success");
78            Health::FailureOrUnhealthy
79        } else if self
80            .actor_context
81            .progress()
82            .registered_activity_since_last_call()
83        {
84            Health::Healthy
85        } else {
86            error!(actor = self.name(), "actor-timeout");
87            Health::FailureOrUnhealthy
88        }
89    }
90}
91
92impl<A: Actor> ActorHandle<A> {
93    pub(crate) fn new(
94        last_state: watch::Receiver<A::ObservableState>,
95        join_handle: JoinHandle,
96        actor_context: ActorContext<A>,
97    ) -> Self {
98        ActorHandle {
99            actor_context,
100            last_state,
101            join_handle,
102        }
103    }
104
105    pub fn state(&self) -> ActorState {
106        self.actor_context.state()
107    }
108
109    /// Process all of the pending messages, and returns a snapshot of
110    /// the observable state of the actor after this.
111    ///
112    /// This method is mostly useful for tests.
113    ///
114    /// To actually observe the state of an actor for ops purpose,
115    /// prefer using the `.observe()` method.
116    ///
117    /// This method timeout if reaching the end of the message takes more than an HEARTBEAT.
118    pub async fn process_pending_and_observe(&self) -> Observation<A::ObservableState> {
119        let (tx, rx) = oneshot::channel();
120        if !self.actor_context.state().is_exit()
121            && self
122                .actor_context
123                .mailbox()
124                .send_with_priority(Command::Observe(tx).into(), Priority::Low)
125                .await
126                .is_err()
127        {
128            error!(
129                actor = self.actor_context.actor_instance_id(),
130                "Failed to send observe message"
131            );
132        }
133        // The timeout is required here. If the actor fails, its inbox is properly dropped but the
134        // send channel might actually prevent the onechannel Receiver from being dropped.
135        self.wait_for_observable_state_callback(rx).await
136    }
137
138    /// Pauses the actor. The actor will stop processing the message, but its
139    /// work can be resumed by calling the method `.resume()`.
140    pub async fn pause(&self) {
141        let _ = self
142            .actor_context
143            .mailbox()
144            .send_command(Command::Pause)
145            .await;
146    }
147
148    /// Resumes a paused actor.
149    pub async fn resume(&self) {
150        let _ = self
151            .actor_context
152            .mailbox()
153            .send_command(Command::Resume)
154            .await;
155    }
156
157    /// Kills the actor. Its finalize function will still be called.
158    ///
159    /// This function also actionnates the actor kill switch.
160    ///
161    /// The other difference with quit is the exit status. It is important,
162    /// as the finalize logic may behave differently depending on the exit status.
163    pub async fn kill(self) -> (ActorExitStatus, A::ObservableState) {
164        self.actor_context.kill_switch().kill();
165        let _ = self
166            .actor_context
167            .mailbox()
168            .send_command(Command::Kill)
169            .await;
170        self.join().await
171    }
172
173    /// Gracefully quit the actor, regardless of whether there are pending messages or not.
174    /// Its finalize function will be called.
175    ///
176    /// The kill switch is not actionated.
177    ///
178    /// The other difference with kill is the exit status. It is important,
179    /// as the finalize logic may behave differently depending on the exit status.
180    pub async fn quit(self) -> (ActorExitStatus, A::ObservableState) {
181        let _ = self
182            .actor_context
183            .mailbox()
184            .send_command(Command::Quit)
185            .await;
186        self.join().await
187    }
188
189    /// Waits until the actor exits by itself. This is the equivalent of `Thread::join`.
190    pub async fn join(self) -> (ActorExitStatus, A::ObservableState) {
191        let exit_status = self.join_handle.join().await;
192        let observation = self.last_state.borrow().clone();
193        (exit_status, observation)
194    }
195
196    /// Observe the current state.
197    ///
198    /// The observation will be scheduled as a command message, therefore it will be executed
199    /// after the current active message and the current command queue have been processed.
200    pub async fn observe(&self) -> Observation<A::ObservableState> {
201        let (tx, rx) = oneshot::channel();
202        if self.actor_context.state().is_exit() {
203            let state = self.last_observation().borrow().clone();
204            return Observation {
205                obs_type: ObservationType::PostMortem,
206                state,
207            };
208        }
209        if self
210            .actor_context
211            .mailbox()
212            .send_command(Command::Observe(tx))
213            .await
214            .is_err()
215        {
216            error!(
217                actor_id = self.actor_context.actor_instance_id(),
218                "Failed to send observe message"
219            );
220        }
221        self.wait_for_observable_state_callback(rx).await
222    }
223
224    pub fn last_observation(&self) -> A::ObservableState {
225        self.last_state.borrow().clone()
226    }
227
228    async fn wait_for_observable_state_callback(
229        &self,
230        rx: oneshot::Receiver<Box<dyn Any + Send>>,
231    ) -> Observation<A::ObservableState> {
232        let observable_state_or_timeout = timeout(crate::HEARTBEAT, rx).await;
233        match observable_state_or_timeout {
234            Ok(Ok(observable_state_any)) => {
235                let state: A::ObservableState = *observable_state_any
236                    .downcast()
237                    .expect("The type is guaranteed logically by the ActorHandle.");
238                let obs_type = ObservationType::Alive;
239                Observation { obs_type, state }
240            }
241            Ok(Err(_)) => {
242                let state = self.last_observation();
243                let obs_type = ObservationType::PostMortem;
244                Observation { obs_type, state }
245            }
246            Err(_) => {
247                let state = self.last_observation();
248                let obs_type = if self.actor_context.state().is_exit() {
249                    ObservationType::PostMortem
250                } else {
251                    ObservationType::Timeout
252                };
253                Observation { obs_type, state }
254            }
255        }
256    }
257
258    pub fn mailbox(&self) -> &Mailbox<A> {
259        self.actor_context.mailbox()
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use async_trait::async_trait;
266
267    use super::*;
268    use crate::{ActorRunner, Handler, Universe};
269
270    #[derive(Default)]
271    struct PanickingActor {
272        count: usize,
273    }
274
275    impl Actor for PanickingActor {
276        type ObservableState = usize;
277        fn observable_state(&self) -> usize {
278            self.count
279        }
280    }
281
282    #[derive(Debug)]
283    struct Panic;
284
285    #[async_trait]
286    impl Handler<Panic> for PanickingActor {
287        type Reply = ();
288        async fn handle(
289            &mut self,
290            _message: Panic,
291            _ctx: &ActorContext<Self>,
292        ) -> Result<(), ActorExitStatus> {
293            self.count += 1;
294            panic!("Oops");
295        }
296    }
297
298    #[derive(Default)]
299    struct ExitActor {
300        count: usize,
301    }
302
303    impl Actor for ExitActor {
304        type ObservableState = usize;
305        fn observable_state(&self) -> usize {
306            self.count
307        }
308    }
309
310    #[derive(Debug)]
311    struct Exit;
312
313    #[async_trait]
314    impl Handler<Exit> for ExitActor {
315        type Reply = ();
316
317        async fn handle(
318            &mut self,
319            _msg: Exit,
320            _ctx: &ActorContext<Self>,
321        ) -> Result<(), ActorExitStatus> {
322            self.count += 1;
323            Err(ActorExitStatus::DownstreamClosed)
324        }
325    }
326
327    #[track_caller]
328    async fn test_panic_in_actor_aux(runner: ActorRunner) -> anyhow::Result<()> {
329        let universe = Universe::new();
330        let (mailbox, handle) = universe
331            .spawn_actor(PanickingActor::default())
332            .spawn_with_forced_runner(runner);
333        mailbox.send_message(Panic).await?;
334        let (exit_status, count) = handle.join().await;
335        assert!(matches!(exit_status, ActorExitStatus::Panicked));
336        assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
337        Ok(())
338    }
339
340    #[tokio::test]
341    async fn test_panic_in_actor_dedicated_thread() -> anyhow::Result<()> {
342        test_panic_in_actor_aux(ActorRunner::DedicatedThread).await?;
343        Ok(())
344    }
345
346    #[tokio::test]
347    async fn test_panic_in_actor_tokio_task() -> anyhow::Result<()> {
348        test_panic_in_actor_aux(ActorRunner::GlobalRuntime).await?;
349        Ok(())
350    }
351
352    #[track_caller]
353    async fn test_exit_aux(runner: ActorRunner) -> anyhow::Result<()> {
354        let universe = Universe::new();
355        let (mailbox, handle) = universe
356            .spawn_actor(ExitActor::default())
357            .spawn_with_forced_runner(runner);
358        mailbox.send_message(Exit).await?;
359        let (exit_status, count) = handle.join().await;
360        assert!(matches!(exit_status, ActorExitStatus::DownstreamClosed));
361        assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
362        Ok(())
363    }
364
365    #[tokio::test]
366    async fn test_exit_dedicated_thread() -> anyhow::Result<()> {
367        test_exit_aux(ActorRunner::DedicatedThread).await
368    }
369
370    #[tokio::test]
371    async fn test_exit_tokio_task() -> anyhow::Result<()> {
372        test_exit_aux(ActorRunner::GlobalRuntime).await
373    }
374}