use std::convert::Infallible;
use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use crate::quickwit_common::metrics::IntCounter;
use crate::quickwit_common::{KillSwitch, Progress, ProtectedZoneGuard};
use tokio::sync::{oneshot, watch};
use tracing::{debug, error};
use crate::actor_state::AtomicState;
use crate::registry::ActorRegistry;
use crate::spawn_builder::{SpawnBuilder, SpawnContext};
#[cfg(any(test, feature = "testsuite"))]
use crate::Universe;
use crate::{
Actor, ActorExitStatus, ActorState, AskError, Command, DeferableReplyHandler, Mailbox,
SendError, TrySendError,
};
pub struct ActorContext<A: Actor> {
inner: Arc<ActorContextInner<A>>,
}
impl<A: Actor> Clone for ActorContext<A> {
fn clone(&self) -> Self {
ActorContext {
inner: self.inner.clone(),
}
}
}
impl<A: Actor> Deref for ActorContext<A> {
type Target = ActorContextInner<A>;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
pub struct ActorContextInner<A: Actor> {
spawn_ctx: SpawnContext,
self_mailbox: Mailbox<A>,
progress: Progress,
actor_state: AtomicState,
backpressure_micros_counter_opt: Option<IntCounter>,
observable_state_tx: watch::Sender<A::ObservableState>,
}
impl<A: Actor> ActorContext<A> {
pub(crate) fn new(
self_mailbox: Mailbox<A>,
spawn_ctx: SpawnContext,
observable_state_tx: watch::Sender<A::ObservableState>,
backpressure_micros_counter_opt: Option<IntCounter>,
) -> Self {
ActorContext {
inner: ActorContextInner {
self_mailbox,
spawn_ctx,
progress: Progress::default(),
actor_state: AtomicState::default(),
observable_state_tx,
backpressure_micros_counter_opt,
}
.into(),
}
}
pub fn spawn_ctx(&self) -> &SpawnContext {
&self.spawn_ctx
}
pub async fn sleep(&self, duration: Duration) {
let scheduler_client = &self.spawn_ctx().scheduler_client;
scheduler_client.dec_no_advance_time();
scheduler_client.sleep(duration).await;
scheduler_client.inc_no_advance_time();
}
#[cfg(any(test, feature = "testsuite"))]
pub fn for_test(
universe: &Universe,
actor_mailbox: Mailbox<A>,
observable_state_tx: watch::Sender<A::ObservableState>,
) -> Self {
Self::new(
actor_mailbox,
universe.spawn_ctx.clone(),
observable_state_tx,
None,
)
}
pub fn mailbox(&self) -> &Mailbox<A> {
&self.self_mailbox
}
pub(crate) fn registry(&self) -> &ActorRegistry {
&self.spawn_ctx.registry
}
pub fn actor_instance_id(&self) -> &str {
self.mailbox().actor_instance_id()
}
pub fn protect_zone(&self) -> ProtectedZoneGuard {
self.progress.protect_zone()
}
pub async fn protect_future<Fut, T>(&self, future: Fut) -> T
where
Fut: Future<Output = T>,
{
let _guard = self.protect_zone();
future.await
}
pub async fn yield_now(&self) {
self.protect_future(tokio::task::yield_now()).await;
}
pub fn kill_switch(&self) -> &KillSwitch {
&self.spawn_ctx.kill_switch
}
#[must_use]
pub fn progress(&self) -> &Progress {
&self.progress
}
pub fn spawn_actor<SpawnedActor: Actor>(&self) -> SpawnBuilder<SpawnedActor> {
self.spawn_ctx.clone().spawn_builder()
}
pub fn record_progress(&self) {
self.progress.record_progress();
}
pub(crate) fn state(&self) -> ActorState {
self.actor_state.get_state()
}
pub(crate) fn process(&self) {
self.actor_state.process();
}
pub(crate) fn idle(&self) {
self.actor_state.idle();
}
pub(crate) fn pause(&self) {
self.actor_state.pause();
}
pub(crate) fn resume(&self) {
self.actor_state.resume();
}
pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState {
let obs_state = actor.observable_state();
let _ = self.observable_state_tx.send(obs_state.clone());
obs_state
}
pub(crate) fn exit(&self, exit_status: &ActorExitStatus) {
self.actor_state.exit(exit_status.is_success());
if should_activate_kill_switch(exit_status) {
error!(actor=%self.actor_instance_id(), exit_status=?exit_status, "exit activating-kill-switch");
self.kill_switch().kill();
}
}
pub async fn send_message<DestActor: Actor, M>(
&self,
mailbox: &Mailbox<DestActor>,
msg: M,
) -> Result<oneshot::Receiver<DestActor::Reply>, SendError>
where
DestActor: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg);
mailbox
.send_message_with_backpressure_counter(
msg,
self.backpressure_micros_counter_opt.as_ref(),
)
.await
}
pub async fn ask<DestActor: Actor, M, T>(
&self,
mailbox: &Mailbox<DestActor>,
msg: M,
) -> Result<T, AskError<Infallible>>
where
DestActor: DeferableReplyHandler<M, Reply = T>,
M: 'static + Send + Sync + fmt::Debug,
{
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
mailbox
.ask_with_backpressure_counter(msg, self.backpressure_micros_counter_opt.as_ref())
.await
}
pub async fn ask_for_res<DestActor: Actor, M, T, E>(
&self,
mailbox: &Mailbox<DestActor>,
msg: M,
) -> Result<T, AskError<E>>
where
DestActor: DeferableReplyHandler<M, Reply = Result<T, E>>,
M: fmt::Debug + Send + Sync + 'static,
E: fmt::Debug,
{
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), send=%mailbox.actor_instance_id(), msg=?msg, "ask");
mailbox.ask_for_res(msg).await
}
pub async fn send_exit_with_success<Dest: Actor>(
&self,
mailbox: &Mailbox<Dest>,
) -> Result<(), SendError> {
let _guard = self.protect_zone();
debug!(from=%self.self_mailbox.actor_instance_id(), to=%mailbox.actor_instance_id(), "success");
mailbox.send_message(Command::ExitWithSuccess).await?;
Ok(())
}
pub async fn send_self_message<M>(
&self,
msg: M,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Sync + Send + fmt::Debug,
{
debug!(self=%self.self_mailbox.actor_instance_id(), msg=?msg, "self_send");
self.self_mailbox.send_message(msg).await
}
pub fn try_send_self_message<M>(
&self,
msg: M,
) -> Result<oneshot::Receiver<A::Reply>, TrySendError<M>>
where
A: DeferableReplyHandler<M>,
M: 'static + Sync + Send + fmt::Debug,
{
self.self_mailbox.try_send_message(msg)
}
pub async fn schedule_self_msg<M>(&self, after_duration: Duration, message: M)
where
A: DeferableReplyHandler<M>,
M: Sync + Send + std::fmt::Debug + 'static,
{
let self_mailbox = self.inner.self_mailbox.clone();
let callback = move || {
let _ = self_mailbox.send_message_with_high_priority(message);
};
self.inner
.spawn_ctx
.scheduler_client
.schedule_event(callback, after_duration);
}
}
fn should_activate_kill_switch(exit_status: &ActorExitStatus) -> bool {
match exit_status {
ActorExitStatus::DownstreamClosed => true,
ActorExitStatus::Failure(_) => true,
ActorExitStatus::Panicked => true,
ActorExitStatus::Success => false,
ActorExitStatus::Quit => false,
ActorExitStatus::Killed => false,
}
}