witty_actors/
actor_context.rs

1// Copyright (C) 2023 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::convert::Infallible;
21use std::fmt;
22use std::future::Future;
23use std::ops::Deref;
24use std::sync::Arc;
25use std::time::Duration;
26
27use crate::quickwit_common::metrics::IntCounter;
28use crate::quickwit_common::{KillSwitch, Progress, ProtectedZoneGuard};
29use tokio::sync::{oneshot, watch};
30use tracing::{debug, error};
31
32use crate::actor_state::AtomicState;
33use crate::registry::ActorRegistry;
34use crate::spawn_builder::{SpawnBuilder, SpawnContext};
35#[cfg(any(test, feature = "testsuite"))]
36use crate::Universe;
37use crate::{
38    Actor, ActorExitStatus, ActorState, AskError, Command, DeferableReplyHandler, Mailbox,
39    SendError, TrySendError,
40};
41
42// TODO hide all of this public stuff
43pub struct ActorContext<A: Actor> {
44    inner: Arc<ActorContextInner<A>>,
45}
46
47impl<A: Actor> Clone for ActorContext<A> {
48    fn clone(&self) -> Self {
49        ActorContext {
50            inner: self.inner.clone(),
51        }
52    }
53}
54
55impl<A: Actor> Deref for ActorContext<A> {
56    type Target = ActorContextInner<A>;
57
58    fn deref(&self) -> &Self::Target {
59        self.inner.as_ref()
60    }
61}
62
63pub struct ActorContextInner<A: Actor> {
64    spawn_ctx: SpawnContext,
65    self_mailbox: Mailbox<A>,
66    progress: Progress,
67    actor_state: AtomicState,
68    backpressure_micros_counter_opt: Option<IntCounter>,
69    observable_state_tx: watch::Sender<A::ObservableState>,
70}
71
72impl<A: Actor> ActorContext<A> {
73    pub(crate) fn new(
74        self_mailbox: Mailbox<A>,
75        spawn_ctx: SpawnContext,
76        observable_state_tx: watch::Sender<A::ObservableState>,
77        backpressure_micros_counter_opt: Option<IntCounter>,
78    ) -> Self {
79        ActorContext {
80            inner: ActorContextInner {
81                self_mailbox,
82                spawn_ctx,
83                progress: Progress::default(),
84                actor_state: AtomicState::default(),
85                observable_state_tx,
86                backpressure_micros_counter_opt,
87            }
88            .into(),
89        }
90    }
91
92    pub fn spawn_ctx(&self) -> &SpawnContext {
93        &self.spawn_ctx
94    }
95
96    /// Sleeps for a given amount of time.
97    ///
98    /// That sleep is measured by the universe scheduler, which means that it can be
99    /// shortened if `Universe::simulate_sleep(..)` is used.
100    ///
101    /// While sleeping, an actor is NOT protected from its supervisor.
102    /// It is up to the user to call `ActorContext::protect_future(..)`.
103    pub async fn sleep(&self, duration: Duration) {
104        let scheduler_client = &self.spawn_ctx().scheduler_client;
105        scheduler_client.dec_no_advance_time();
106        scheduler_client.sleep(duration).await;
107        scheduler_client.inc_no_advance_time();
108    }
109
110    #[cfg(any(test, feature = "testsuite"))]
111    pub fn for_test(
112        universe: &Universe,
113        actor_mailbox: Mailbox<A>,
114        observable_state_tx: watch::Sender<A::ObservableState>,
115    ) -> Self {
116        Self::new(
117            actor_mailbox,
118            universe.spawn_ctx.clone(),
119            observable_state_tx,
120            None,
121        )
122    }
123
124    pub fn mailbox(&self) -> &Mailbox<A> {
125        &self.self_mailbox
126    }
127
128    pub(crate) fn registry(&self) -> &ActorRegistry {
129        &self.spawn_ctx.registry
130    }
131
132    pub fn actor_instance_id(&self) -> &str {
133        self.mailbox().actor_instance_id()
134    }
135
136    /// This function returns a guard that prevents any supervisor from identifying the
137    /// actor as dead.
138    /// The protection ends when the `ProtectZoneGuard` is dropped.
139    ///
140    /// In an ideal world, you should never need to call this function.
141    /// It is only useful in some corner cases, like calling a long blocking
142    /// from an external library that you trust.
143    pub fn protect_zone(&self) -> ProtectedZoneGuard {
144        self.progress.protect_zone()
145    }
146
147    /// Executes a future in a protected zone.
148    pub async fn protect_future<Fut, T>(&self, future: Fut) -> T
149    where
150        Fut: Future<Output = T>,
151    {
152        let _guard = self.protect_zone();
153        future.await
154    }
155
156    /// Cooperatively yields, while keeping the actor protected.
157    pub async fn yield_now(&self) {
158        self.protect_future(tokio::task::yield_now()).await;
159    }
160
161    /// Gets a copy of the actor kill switch.
162    /// This should rarely be used.
163    ///
164    /// For instance, when quitting from the process_message function, prefer simply
165    /// returning `Error(ActorExitStatus::Failure(..))`
166    pub fn kill_switch(&self) -> &KillSwitch {
167        &self.spawn_ctx.kill_switch
168    }
169
170    #[must_use]
171    pub fn progress(&self) -> &Progress {
172        &self.progress
173    }
174
175    pub fn spawn_actor<SpawnedActor: Actor>(&self) -> SpawnBuilder<SpawnedActor> {
176        self.spawn_ctx.clone().spawn_builder()
177    }
178
179    /// Records some progress.
180    /// This function is only useful when implementing actors that may take more than
181    /// `HEARTBEAT` to process a single message.
182    /// In that case, you can call this function in the middle of the process_message method
183    /// to prevent the actor from being identified as blocked or dead.
184    pub fn record_progress(&self) {
185        self.progress.record_progress();
186    }
187
188    pub(crate) fn state(&self) -> ActorState {
189        self.actor_state.get_state()
190    }
191
192    pub(crate) fn process(&self) {
193        self.actor_state.process();
194    }
195
196    pub(crate) fn idle(&self) {
197        self.actor_state.idle();
198    }
199
200    pub(crate) fn pause(&self) {
201        self.actor_state.pause();
202    }
203
204    pub(crate) fn resume(&self) {
205        self.actor_state.resume();
206    }
207
208    pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState {
209        let obs_state = actor.observable_state();
210        let _ = self.observable_state_tx.send(obs_state.clone());
211        obs_state
212    }
213
214    pub(crate) fn exit(&self, exit_status: &ActorExitStatus) {
215        self.actor_state.exit(exit_status.is_success());
216        if should_activate_kill_switch(exit_status) {
217            error!(actor=%self.actor_instance_id(), exit_status=?exit_status, "exit activating-kill-switch");
218            self.kill_switch().kill();
219        }
220    }
221
222    /// Posts a message in an actor's mailbox.
223    ///
224    /// This method does not wait for the message to be handled by the
225    /// target actor. However, it returns a oneshot receiver that the caller
226    /// that makes it possible to `.await` it.
227    /// If the reply is important, chances are the `.ask(...)` method is
228    /// more indicated.
229    ///
230    /// Droppping the receiver channel will not cancel the
231    /// processing of the message. It is a very common usage.
232    /// In fact most actors are expected to send message in a
233    /// fire-and-forget fashion.
234    ///
235    /// Regular messages (as opposed to commands) are queued and guaranteed
236    /// to be processed in FIFO order.
237    ///
238    /// This method hides logic to prevent an actor from being identified
239    /// as frozen if the destination actor channel is saturated, and we
240    /// are simply experiencing back pressure.
241    pub async fn send_message<DestActor: Actor, M>(
242        &self,
243        mailbox: &Mailbox<DestActor>,
244        msg: M,
245    ) -> Result<oneshot::Receiver<DestActor::Reply>, SendError>
246    where
247        DestActor: DeferableReplyHandler<M>,
248        M: 'static + Send + Sync + fmt::Debug,
249    {
250        let _guard = self.protect_zone();
251        debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg);
252        mailbox
253            .send_message_with_backpressure_counter(
254                msg,
255                self.backpressure_micros_counter_opt.as_ref(),
256            )
257            .await
258    }
259
260    pub async fn ask<DestActor: Actor, M, T>(
261        &self,
262        mailbox: &Mailbox<DestActor>,
263        msg: M,
264    ) -> Result<T, AskError<Infallible>>
265    where
266        DestActor: DeferableReplyHandler<M, Reply = T>,
267        M: 'static + Send + Sync + fmt::Debug,
268    {
269        let _guard = self.protect_zone();
270        debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
271        mailbox
272            .ask_with_backpressure_counter(msg, self.backpressure_micros_counter_opt.as_ref())
273            .await
274    }
275
276    /// Similar to `send_message`, except this method
277    /// waits asynchronously for the actor reply.
278    pub async fn ask_for_res<DestActor: Actor, M, T, E>(
279        &self,
280        mailbox: &Mailbox<DestActor>,
281        msg: M,
282    ) -> Result<T, AskError<E>>
283    where
284        DestActor: DeferableReplyHandler<M, Reply = Result<T, E>>,
285        M: fmt::Debug + Send + Sync + 'static,
286        E: fmt::Debug,
287    {
288        let _guard = self.protect_zone();
289        debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
290        mailbox.ask_for_res(msg).await
291    }
292
293    /// Send the Success message to terminate the destination actor with the Success exit status.
294    ///
295    /// The message is queued like any regular message, so that pending messages will be processed
296    /// first.
297    pub async fn send_exit_with_success<Dest: Actor>(
298        &self,
299        mailbox: &Mailbox<Dest>,
300    ) -> Result<(), SendError> {
301        let _guard = self.protect_zone();
302        debug!(from=%self.self_mailbox.actor_instance_id(), to=%mailbox.actor_instance_id(), "success");
303        mailbox.send_message(Command::ExitWithSuccess).await?;
304        Ok(())
305    }
306
307    /// Sends a message to an actor's own mailbox.
308    ///
309    /// Warning: This method is dangerous as it can very easily
310    /// cause a deadlock.
311    pub async fn send_self_message<M>(
312        &self,
313        msg: M,
314    ) -> Result<oneshot::Receiver<A::Reply>, SendError>
315    where
316        A: DeferableReplyHandler<M>,
317        M: 'static + Sync + Send + fmt::Debug,
318    {
319        debug!(self=%self.self_mailbox.actor_instance_id(), msg=?msg, "self_send");
320        self.self_mailbox.send_message(msg).await
321    }
322
323    /// Attempts to send a message to itself.
324    /// The message will be queue to self's low_priority queue.
325    ///
326    /// Warning: This method will always fail if
327    /// an actor has a capacity of 0.
328    pub fn try_send_self_message<M>(
329        &self,
330        msg: M,
331    ) -> Result<oneshot::Receiver<A::Reply>, TrySendError<M>>
332    where
333        A: DeferableReplyHandler<M>,
334        M: 'static + Sync + Send + fmt::Debug,
335    {
336        self.self_mailbox.try_send_message(msg)
337    }
338
339    /// Schedules a message that will be sent to the high-priority
340    /// queue of the actor Mailbox once `after_duration` has elapsed.
341    pub async fn schedule_self_msg<M>(&self, after_duration: Duration, message: M)
342    where
343        A: DeferableReplyHandler<M>,
344        M: Sync + Send + std::fmt::Debug + 'static,
345    {
346        let self_mailbox = self.inner.self_mailbox.clone();
347        let callback = move || {
348            let _ = self_mailbox.send_message_with_high_priority(message);
349        };
350        self.inner
351            .spawn_ctx
352            .scheduler_client
353            .schedule_event(callback, after_duration);
354    }
355}
356
357/// If an actor exits in an unexpected manner, its kill
358/// switch will be activated, and all other actors under the same
359/// kill switch will be killed.
360fn should_activate_kill_switch(exit_status: &ActorExitStatus) -> bool {
361    match exit_status {
362        ActorExitStatus::DownstreamClosed => true,
363        ActorExitStatus::Failure(_) => true,
364        ActorExitStatus::Panicked => true,
365        ActorExitStatus::Success => false,
366        ActorExitStatus::Quit => false,
367        ActorExitStatus::Killed => false,
368    }
369}