1use std::convert::Infallible;
21use std::fmt;
22use std::future::Future;
23use std::ops::Deref;
24use std::sync::Arc;
25use std::time::Duration;
26
27use crate::quickwit_common::metrics::IntCounter;
28use crate::quickwit_common::{KillSwitch, Progress, ProtectedZoneGuard};
29use tokio::sync::{oneshot, watch};
30use tracing::{debug, error};
31
32use crate::actor_state::AtomicState;
33use crate::registry::ActorRegistry;
34use crate::spawn_builder::{SpawnBuilder, SpawnContext};
35#[cfg(any(test, feature = "testsuite"))]
36use crate::Universe;
37use crate::{
38 Actor, ActorExitStatus, ActorState, AskError, Command, DeferableReplyHandler, Mailbox,
39 SendError, TrySendError,
40};
41
42pub struct ActorContext<A: Actor> {
44 inner: Arc<ActorContextInner<A>>,
45}
46
47impl<A: Actor> Clone for ActorContext<A> {
48 fn clone(&self) -> Self {
49 ActorContext {
50 inner: self.inner.clone(),
51 }
52 }
53}
54
55impl<A: Actor> Deref for ActorContext<A> {
56 type Target = ActorContextInner<A>;
57
58 fn deref(&self) -> &Self::Target {
59 self.inner.as_ref()
60 }
61}
62
63pub struct ActorContextInner<A: Actor> {
64 spawn_ctx: SpawnContext,
65 self_mailbox: Mailbox<A>,
66 progress: Progress,
67 actor_state: AtomicState,
68 backpressure_micros_counter_opt: Option<IntCounter>,
69 observable_state_tx: watch::Sender<A::ObservableState>,
70}
71
72impl<A: Actor> ActorContext<A> {
73 pub(crate) fn new(
74 self_mailbox: Mailbox<A>,
75 spawn_ctx: SpawnContext,
76 observable_state_tx: watch::Sender<A::ObservableState>,
77 backpressure_micros_counter_opt: Option<IntCounter>,
78 ) -> Self {
79 ActorContext {
80 inner: ActorContextInner {
81 self_mailbox,
82 spawn_ctx,
83 progress: Progress::default(),
84 actor_state: AtomicState::default(),
85 observable_state_tx,
86 backpressure_micros_counter_opt,
87 }
88 .into(),
89 }
90 }
91
92 pub fn spawn_ctx(&self) -> &SpawnContext {
93 &self.spawn_ctx
94 }
95
96 pub async fn sleep(&self, duration: Duration) {
104 let scheduler_client = &self.spawn_ctx().scheduler_client;
105 scheduler_client.dec_no_advance_time();
106 scheduler_client.sleep(duration).await;
107 scheduler_client.inc_no_advance_time();
108 }
109
110 #[cfg(any(test, feature = "testsuite"))]
111 pub fn for_test(
112 universe: &Universe,
113 actor_mailbox: Mailbox<A>,
114 observable_state_tx: watch::Sender<A::ObservableState>,
115 ) -> Self {
116 Self::new(
117 actor_mailbox,
118 universe.spawn_ctx.clone(),
119 observable_state_tx,
120 None,
121 )
122 }
123
124 pub fn mailbox(&self) -> &Mailbox<A> {
125 &self.self_mailbox
126 }
127
128 pub(crate) fn registry(&self) -> &ActorRegistry {
129 &self.spawn_ctx.registry
130 }
131
132 pub fn actor_instance_id(&self) -> &str {
133 self.mailbox().actor_instance_id()
134 }
135
136 pub fn protect_zone(&self) -> ProtectedZoneGuard {
144 self.progress.protect_zone()
145 }
146
147 pub async fn protect_future<Fut, T>(&self, future: Fut) -> T
149 where
150 Fut: Future<Output = T>,
151 {
152 let _guard = self.protect_zone();
153 future.await
154 }
155
156 pub async fn yield_now(&self) {
158 self.protect_future(tokio::task::yield_now()).await;
159 }
160
161 pub fn kill_switch(&self) -> &KillSwitch {
167 &self.spawn_ctx.kill_switch
168 }
169
170 #[must_use]
171 pub fn progress(&self) -> &Progress {
172 &self.progress
173 }
174
175 pub fn spawn_actor<SpawnedActor: Actor>(&self) -> SpawnBuilder<SpawnedActor> {
176 self.spawn_ctx.clone().spawn_builder()
177 }
178
179 pub fn record_progress(&self) {
185 self.progress.record_progress();
186 }
187
188 pub(crate) fn state(&self) -> ActorState {
189 self.actor_state.get_state()
190 }
191
192 pub(crate) fn process(&self) {
193 self.actor_state.process();
194 }
195
196 pub(crate) fn idle(&self) {
197 self.actor_state.idle();
198 }
199
200 pub(crate) fn pause(&self) {
201 self.actor_state.pause();
202 }
203
204 pub(crate) fn resume(&self) {
205 self.actor_state.resume();
206 }
207
208 pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState {
209 let obs_state = actor.observable_state();
210 let _ = self.observable_state_tx.send(obs_state.clone());
211 obs_state
212 }
213
214 pub(crate) fn exit(&self, exit_status: &ActorExitStatus) {
215 self.actor_state.exit(exit_status.is_success());
216 if should_activate_kill_switch(exit_status) {
217 error!(actor=%self.actor_instance_id(), exit_status=?exit_status, "exit activating-kill-switch");
218 self.kill_switch().kill();
219 }
220 }
221
222 pub async fn send_message<DestActor: Actor, M>(
242 &self,
243 mailbox: &Mailbox<DestActor>,
244 msg: M,
245 ) -> Result<oneshot::Receiver<DestActor::Reply>, SendError>
246 where
247 DestActor: DeferableReplyHandler<M>,
248 M: 'static + Send + Sync + fmt::Debug,
249 {
250 let _guard = self.protect_zone();
251 debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg);
252 mailbox
253 .send_message_with_backpressure_counter(
254 msg,
255 self.backpressure_micros_counter_opt.as_ref(),
256 )
257 .await
258 }
259
260 pub async fn ask<DestActor: Actor, M, T>(
261 &self,
262 mailbox: &Mailbox<DestActor>,
263 msg: M,
264 ) -> Result<T, AskError<Infallible>>
265 where
266 DestActor: DeferableReplyHandler<M, Reply = T>,
267 M: 'static + Send + Sync + fmt::Debug,
268 {
269 let _guard = self.protect_zone();
270 debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
271 mailbox
272 .ask_with_backpressure_counter(msg, self.backpressure_micros_counter_opt.as_ref())
273 .await
274 }
275
276 pub async fn ask_for_res<DestActor: Actor, M, T, E>(
279 &self,
280 mailbox: &Mailbox<DestActor>,
281 msg: M,
282 ) -> Result<T, AskError<E>>
283 where
284 DestActor: DeferableReplyHandler<M, Reply = Result<T, E>>,
285 M: fmt::Debug + Send + Sync + 'static,
286 E: fmt::Debug,
287 {
288 let _guard = self.protect_zone();
289 debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
290 mailbox.ask_for_res(msg).await
291 }
292
293 pub async fn send_exit_with_success<Dest: Actor>(
298 &self,
299 mailbox: &Mailbox<Dest>,
300 ) -> Result<(), SendError> {
301 let _guard = self.protect_zone();
302 debug!(from=%self.self_mailbox.actor_instance_id(), to=%mailbox.actor_instance_id(), "success");
303 mailbox.send_message(Command::ExitWithSuccess).await?;
304 Ok(())
305 }
306
307 pub async fn send_self_message<M>(
312 &self,
313 msg: M,
314 ) -> Result<oneshot::Receiver<A::Reply>, SendError>
315 where
316 A: DeferableReplyHandler<M>,
317 M: 'static + Sync + Send + fmt::Debug,
318 {
319 debug!(self=%self.self_mailbox.actor_instance_id(), msg=?msg, "self_send");
320 self.self_mailbox.send_message(msg).await
321 }
322
323 pub fn try_send_self_message<M>(
329 &self,
330 msg: M,
331 ) -> Result<oneshot::Receiver<A::Reply>, TrySendError<M>>
332 where
333 A: DeferableReplyHandler<M>,
334 M: 'static + Sync + Send + fmt::Debug,
335 {
336 self.self_mailbox.try_send_message(msg)
337 }
338
339 pub async fn schedule_self_msg<M>(&self, after_duration: Duration, message: M)
342 where
343 A: DeferableReplyHandler<M>,
344 M: Sync + Send + std::fmt::Debug + 'static,
345 {
346 let self_mailbox = self.inner.self_mailbox.clone();
347 let callback = move || {
348 let _ = self_mailbox.send_message_with_high_priority(message);
349 };
350 self.inner
351 .spawn_ctx
352 .scheduler_client
353 .schedule_event(callback, after_duration);
354 }
355}
356
357fn should_activate_kill_switch(exit_status: &ActorExitStatus) -> bool {
361 match exit_status {
362 ActorExitStatus::DownstreamClosed => true,
363 ActorExitStatus::Failure(_) => true,
364 ActorExitStatus::Panicked => true,
365 ActorExitStatus::Success => false,
366 ActorExitStatus::Quit => false,
367 ActorExitStatus::Killed => false,
368 }
369}