witty_actors/
actor_handle.rs

1// Copyright (C) 2023 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::fmt;
21
22use serde::Serialize;
23use tokio::sync::{oneshot, watch};
24use tracing::error;
25
26use crate::actor_state::ActorState;
27use crate::command::Observe;
28use crate::mailbox::Priority;
29use crate::observation::ObservationType;
30use crate::registry::ActorJoinHandle;
31use crate::{Actor, ActorContext, ActorExitStatus, Command, Mailbox, Observation};
32
33/// An Actor Handle serves as an address to communicate with an actor.
34pub struct ActorHandle<A: Actor> {
35    actor_context: ActorContext<A>,
36    last_state: watch::Receiver<A::ObservableState>,
37    join_handle: ActorJoinHandle,
38}
39
40/// Describes the health of a given actor.
41#[derive(Clone, Eq, PartialEq, Debug, Hash, Serialize)]
42pub enum Health {
43    /// The actor is running and behaving as expected.
44    Healthy,
45    /// No progress was registered, or the process terminated with an error
46    FailureOrUnhealthy,
47    /// The actor terminated successfully.
48    Success,
49}
50
51/// Message received by health probe handlers.
52#[derive(Clone, Debug)]
53pub struct Healthz;
54
55impl<A: Actor> fmt::Debug for ActorHandle<A> {
56    fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
57        formatter
58            .debug_struct("ActorHandle")
59            .field("name", &self.actor_context.actor_instance_id())
60            .finish()
61    }
62}
63
64pub trait Supervisable {
65    fn name(&self) -> &str;
66    fn harvest_health(&self) -> Health;
67}
68
69impl<A: Actor> Supervisable for ActorHandle<A> {
70    fn name(&self) -> &str {
71        self.actor_context.actor_instance_id()
72    }
73
74    /// Harvests the health of the actor by checking its state (see [`ActorState`]) and/or progress
75    /// (see `Progress`). When the actor is running, calling this method resets its progress state
76    /// to "no update" (see `ProgressState`). As a consequence, only one supervisor or probe
77    /// should periodically invoke this method during the lifetime of the actor.
78    fn harvest_health(&self) -> Health {
79        let actor_state = self.state();
80        if actor_state == ActorState::Success {
81            Health::Success
82        } else if actor_state == ActorState::Failure {
83            error!(actor = self.name(), "actor-exit-without-success");
84            Health::FailureOrUnhealthy
85        } else if self
86            .actor_context
87            .progress()
88            .registered_activity_since_last_call()
89        {
90            Health::Healthy
91        } else {
92            error!(actor = self.name(), "actor-timeout");
93            Health::FailureOrUnhealthy
94        }
95    }
96}
97
98impl<A: Actor> ActorHandle<A> {
99    pub(crate) fn new(
100        last_state: watch::Receiver<A::ObservableState>,
101        join_handle: ActorJoinHandle,
102        actor_context: ActorContext<A>,
103    ) -> Self {
104        ActorHandle {
105            actor_context,
106            last_state,
107            join_handle,
108        }
109    }
110
111    pub fn state(&self) -> ActorState {
112        self.actor_context.state()
113    }
114
115    /// Process all of the pending messages, and returns a snapshot of
116    /// the observable state of the actor after this.
117    ///
118    /// This method is mostly useful for tests.
119    ///
120    /// To actually observe the state of an actor for ops purpose,
121    /// prefer using the `.observe()` method.
122    ///
123    /// This method timeout if reaching the end of the message takes more than an HEARTBEAT.
124    pub async fn process_pending_and_observe(&self) -> Observation<A::ObservableState> {
125        self.observe_with_priority(Priority::Low).await
126    }
127
128    /// Observe the current state.
129    ///
130    /// The observation will be scheduled as a high priority message, therefore it will be executed
131    /// after the current active message and the current command queue have been processed.
132    pub async fn observe(&self) -> Observation<A::ObservableState> {
133        self.observe_with_priority(Priority::High).await
134    }
135
136    async fn observe_with_priority(&self, priority: Priority) -> Observation<A::ObservableState> {
137        if !self.actor_context.state().is_exit() {
138            if let Ok(oneshot_rx) = self
139                .actor_context
140                .mailbox()
141                .send_message_with_priority(Observe, priority)
142                .await
143            {
144                // The timeout is required here. If the actor fails, its inbox is properly dropped
145                // but the send channel might actually prevent the onechannel
146                // Receiver from being dropped.
147                return self.wait_for_observable_state_callback(oneshot_rx).await;
148            } else {
149                error!(
150                    actor_id = self.actor_context.actor_instance_id(),
151                    "Failed to send observe message"
152                );
153            }
154        }
155        let state = self.last_observation();
156        Observation {
157            obs_type: ObservationType::PostMortem,
158            state,
159        }
160    }
161
162    /// Pauses the actor. The actor will stop processing messages from the low priority
163    /// channel, but its work can be resumed by calling the method `.resume()`.
164    pub fn pause(&self) {
165        let _ = self
166            .actor_context
167            .mailbox()
168            .send_message_with_high_priority(Command::Pause);
169    }
170
171    /// Resumes a paused actor.
172    pub fn resume(&self) {
173        let _ = self
174            .actor_context
175            .mailbox()
176            .send_message_with_high_priority(Command::Resume);
177    }
178
179    /// Kills the actor. Its finalize function will still be called.
180    ///
181    /// This function also actionnates the actor kill switch.
182    ///
183    /// The other difference with quit is the exit status. It is important,
184    /// as the finalize logic may behave differently depending on the exit status.
185    pub async fn kill(self) -> (ActorExitStatus, A::ObservableState) {
186        self.actor_context.kill_switch().kill();
187        let _ = self
188            .actor_context
189            .mailbox()
190            .send_message_with_high_priority(Command::Nudge);
191        self.join().await
192    }
193
194    /// Gracefully quit the actor, regardless of whether there are pending messages or not.
195    /// Its finalize function will be called.
196    ///
197    /// The kill switch is not actionated.
198    ///
199    /// The other difference with kill is the exit status. It is important,
200    /// as the finalize logic may behave differently depending on the exit status.
201    pub async fn quit(self) -> (ActorExitStatus, A::ObservableState) {
202        let _ = self
203            .actor_context
204            .mailbox()
205            .send_message_with_high_priority(Command::Quit);
206        self.join().await
207    }
208
209    /// Waits until the actor exits by itself. This is the equivalent of `Thread::join`.
210    pub async fn join(self) -> (ActorExitStatus, A::ObservableState) {
211        let exit_status = self.join_handle.join().await;
212        let observation = self.last_state.borrow().clone();
213        (exit_status, observation)
214    }
215
216    pub fn last_observation(&self) -> A::ObservableState {
217        self.last_state.borrow().clone()
218    }
219
220    async fn wait_for_observable_state_callback(
221        &self,
222        rx: oneshot::Receiver<A::ObservableState>,
223    ) -> Observation<A::ObservableState> {
224        let scheduler_client = &self.actor_context.spawn_ctx().scheduler_client;
225        let observable_state_or_timeout =
226            scheduler_client.timeout(crate::OBSERVE_TIMEOUT, rx).await;
227        match observable_state_or_timeout {
228            Ok(Ok(state)) => {
229                let obs_type = ObservationType::Alive;
230                Observation { obs_type, state }
231            }
232            Ok(Err(_)) => {
233                let state = self.last_observation();
234                let obs_type = ObservationType::PostMortem;
235                Observation { obs_type, state }
236            }
237            Err(_) => {
238                let state = self.last_observation();
239                let obs_type = if self.actor_context.state().is_exit() {
240                    ObservationType::PostMortem
241                } else {
242                    ObservationType::Timeout
243                };
244                Observation { obs_type, state }
245            }
246        }
247    }
248
249    pub fn mailbox(&self) -> &Mailbox<A> {
250        self.actor_context.mailbox()
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use async_trait::async_trait;
257
258    use super::*;
259    use crate::{Handler, Universe};
260
261    #[derive(Default)]
262    struct PanickingActor {
263        count: usize,
264    }
265
266    impl Actor for PanickingActor {
267        type ObservableState = usize;
268        fn observable_state(&self) -> usize {
269            self.count
270        }
271    }
272
273    #[derive(Debug)]
274    struct Panic;
275
276    #[async_trait]
277    impl Handler<Panic> for PanickingActor {
278        type Reply = ();
279        async fn handle(
280            &mut self,
281            _message: Panic,
282            _ctx: &ActorContext<Self>,
283        ) -> Result<(), ActorExitStatus> {
284            self.count += 1;
285            panic!("Oops");
286        }
287    }
288
289    #[derive(Default)]
290    struct ExitActor {
291        count: usize,
292    }
293
294    impl Actor for ExitActor {
295        type ObservableState = usize;
296        fn observable_state(&self) -> usize {
297            self.count
298        }
299    }
300
301    #[derive(Debug)]
302    struct Exit;
303
304    #[async_trait]
305    impl Handler<Exit> for ExitActor {
306        type Reply = ();
307
308        async fn handle(
309            &mut self,
310            _msg: Exit,
311            _ctx: &ActorContext<Self>,
312        ) -> Result<(), ActorExitStatus> {
313            self.count += 1;
314            Err(ActorExitStatus::DownstreamClosed)
315        }
316    }
317
318    #[tokio::test]
319    async fn test_panic_in_actor() -> anyhow::Result<()> {
320        let universe = Universe::with_accelerated_time();
321        let (mailbox, handle) = universe.spawn_builder().spawn(PanickingActor::default());
322        mailbox.send_message(Panic).await?;
323        let (exit_status, count) = handle.join().await;
324        assert!(matches!(exit_status, ActorExitStatus::Panicked));
325        assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
326        Ok(())
327    }
328
329    #[tokio::test]
330    async fn test_exit() -> anyhow::Result<()> {
331        let universe = Universe::with_accelerated_time();
332        let (mailbox, handle) = universe.spawn_builder().spawn(ExitActor::default());
333        mailbox.send_message(Exit).await?;
334        let (exit_status, count) = handle.join().await;
335        assert!(matches!(exit_status, ActorExitStatus::DownstreamClosed));
336        assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
337        Ok(())
338    }
339}