use std::any::Any;
use std::convert::Infallible;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::Instant;
use crate::quickwit_common::metrics::IntCounter;
use async_trait::async_trait;
use tokio::sync::oneshot;
use crate::channel_with_priority::{Receiver, Sender, TrySendError};
use crate::envelope::{wrap_in_envelope, Envelope};
use crate::scheduler::SchedulerClient;
use crate::{
Actor, ActorContext, ActorExitStatus, AskError, DeferableReplyHandler, Handler, QueueCapacity,
RecvError, SendError,
};
pub struct Mailbox<A: Actor> {
inner: Arc<Inner<A>>,
ref_count: Arc<AtomicUsize>,
}
impl<A: Actor> Mailbox<A> {
pub fn downgrade(&self) -> WeakMailbox<A> {
WeakMailbox {
inner: Arc::downgrade(&self.inner),
ref_count: Arc::downgrade(&self.ref_count),
}
}
}
impl<A: Actor> Drop for Mailbox<A> {
fn drop(&mut self) {
let old_val = self.ref_count.fetch_sub(1, Ordering::SeqCst);
if old_val == 2 {
let _ = self.send_message_with_high_priority(LastMailbox);
}
}
}
#[derive(Debug)]
struct LastMailbox;
#[async_trait]
impl<A: Actor> Handler<LastMailbox> for A {
type Reply = ();
async fn handle(
&mut self,
_: LastMailbox,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
Ok(())
}
}
#[derive(Copy, Clone)]
pub(crate) enum Priority {
High,
Low,
}
impl<A: Actor> Clone for Mailbox<A> {
fn clone(&self) -> Self {
self.ref_count.fetch_add(1, Ordering::SeqCst);
Mailbox {
inner: self.inner.clone(),
ref_count: self.ref_count.clone(),
}
}
}
impl<A: Actor> Mailbox<A> {
pub(crate) fn is_last_mailbox(&self) -> bool {
self.ref_count.load(Ordering::SeqCst) == 1
}
pub fn id(&self) -> &str {
&self.inner.instance_id
}
pub(crate) fn scheduler_client(&self) -> Option<&SchedulerClient> {
self.inner.scheduler_client_opt.as_ref()
}
}
struct Inner<A: Actor> {
pub(crate) tx: Sender<Envelope<A>>,
scheduler_client_opt: Option<SchedulerClient>,
instance_id: String,
}
impl<A: Actor> fmt::Debug for Mailbox<A> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Mailbox")
.field(&self.actor_instance_id())
.finish()
}
}
impl<A: Actor> Mailbox<A> {
pub fn actor_instance_id(&self) -> &str {
&self.inner.instance_id
}
pub fn is_disconnected(&self) -> bool {
self.inner.tx.is_disconnected()
}
pub async fn send_message<M>(
&self,
message: M,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
self.send_message_with_backpressure_counter(message, None)
.await
}
pub fn try_send_message<M>(
&self,
message: M,
) -> Result<oneshot::Receiver<A::Reply>, TrySendError<M>>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let (envelope, response_rx) = self.wrap_in_envelope(message);
self.inner
.tx
.try_send_low_priority(envelope)
.map_err(|err| {
match err {
TrySendError::Disconnected => TrySendError::Disconnected,
TrySendError::Full(mut envelope) => {
let message: M = envelope.message_typed().unwrap();
TrySendError::Full(message)
}
}
})?;
Ok(response_rx)
}
fn wrap_in_envelope<M>(&self, message: M) -> (Envelope<A>, oneshot::Receiver<A::Reply>)
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let guard = self
.inner
.scheduler_client_opt
.as_ref()
.map(|scheduler_client| scheduler_client.no_advance_time_guard());
wrap_in_envelope(message, guard)
}
pub async fn send_message_with_backpressure_counter<M>(
&self,
message: M,
backpressure_micros_counter_opt: Option<&IntCounter>,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let (envelope, response_rx) = self.wrap_in_envelope(message);
match self.inner.tx.try_send_low_priority(envelope) {
Ok(()) => Ok(response_rx),
Err(TrySendError::Full(envelope)) => {
if let Some(backpressure_micros_counter) = backpressure_micros_counter_opt {
let now = Instant::now();
self.inner.tx.send_low_priority(envelope).await?;
let elapsed = now.elapsed();
backpressure_micros_counter.inc_by(elapsed.as_micros() as u64);
} else {
self.inner.tx.send_low_priority(envelope).await?;
}
Ok(response_rx)
}
Err(TrySendError::Disconnected) => Err(SendError::Disconnected),
}
}
pub(crate) fn send_message_with_high_priority<M>(
&self,
message: M,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let (envelope, response_rx) = self.wrap_in_envelope(message);
self.inner.tx.send_high_priority(envelope)?;
Ok(response_rx)
}
pub(crate) async fn send_message_with_priority<M>(
&self,
message: M,
priority: Priority,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: DeferableReplyHandler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let (envelope, response_rx) = self.wrap_in_envelope(message);
match priority {
Priority::High => self.inner.tx.send_high_priority(envelope)?,
Priority::Low => {
self.inner.tx.send_low_priority(envelope).await?;
}
}
Ok(response_rx)
}
pub async fn ask<M, T>(&self, message: M) -> Result<T, AskError<Infallible>>
where
A: DeferableReplyHandler<M, Reply = T>,
M: 'static + Send + Sync + fmt::Debug,
{
self.ask_with_backpressure_counter(message, None).await
}
pub async fn ask_with_backpressure_counter<M, T>(
&self,
message: M,
backpressure_micros_counter_opt: Option<&IntCounter>,
) -> Result<T, AskError<Infallible>>
where
A: DeferableReplyHandler<M, Reply = T>,
M: 'static + Send + Sync + fmt::Debug,
{
let resp = self
.send_message_with_backpressure_counter(message, backpressure_micros_counter_opt)
.await;
resp.map_err(|_send_error| AskError::MessageNotDelivered)?
.await
.map_err(|_| AskError::ProcessMessageError)
}
pub async fn ask_for_res<M, T, E>(&self, message: M) -> Result<T, AskError<E>>
where
A: DeferableReplyHandler<M, Reply = Result<T, E>>,
M: fmt::Debug + Send + Sync + 'static,
E: fmt::Debug,
{
self.send_message(message)
.await
.map_err(|_send_error| AskError::MessageNotDelivered)?
.await
.map_err(|_| AskError::ProcessMessageError)?
.map_err(AskError::from)
}
}
pub struct Inbox<A: Actor> {
rx: Arc<Receiver<Envelope<A>>>,
}
impl<A: Actor> Clone for Inbox<A> {
fn clone(&self) -> Self {
Inbox {
rx: self.rx.clone(),
}
}
}
impl<A: Actor> Inbox<A> {
pub(crate) fn is_empty(&self) -> bool {
self.rx.is_empty()
}
pub(crate) async fn recv(&self) -> Result<Envelope<A>, RecvError> {
self.rx.recv().await
}
pub(crate) async fn recv_cmd_and_scheduled_msg_only(&self) -> Envelope<A> {
self.rx.recv_high_priority().await
}
pub(crate) fn try_recv(&self) -> Result<Envelope<A>, RecvError> {
self.rx.try_recv()
}
pub async fn recv_typed_message<M: 'static>(&self) -> Option<M> {
while let Ok(mut envelope) = self.rx.recv().await {
if let Some(msg) = envelope.message_typed() {
return Some(msg);
}
}
None
}
#[allow(dead_code)] pub(crate) fn try_recv_cmd_and_scheduled_msg_only(&self) -> Result<Envelope<A>, RecvError> {
self.rx.try_recv_high_priority_message()
}
pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> {
self.rx
.drain_low_priority()
.into_iter()
.map(|mut envelope| envelope.message())
.collect()
}
pub fn drain_for_test_typed<M: 'static>(&self) -> Vec<M> {
self.rx
.drain_low_priority()
.into_iter()
.flat_map(|mut envelope| envelope.message_typed())
.collect()
}
}
pub(crate) fn create_mailbox<A: Actor>(
actor_name: String,
queue_capacity: QueueCapacity,
scheduler_client_opt: Option<SchedulerClient>,
) -> (Mailbox<A>, Inbox<A>) {
let (tx, rx) = crate::channel_with_priority::channel(queue_capacity);
let ref_count = Arc::new(AtomicUsize::new(1));
let mailbox = Mailbox {
inner: Arc::new(Inner {
tx,
instance_id: crate::quickwit_common::new_coolid(&actor_name),
scheduler_client_opt,
}),
ref_count,
};
let inbox = Inbox { rx: Arc::new(rx) };
(mailbox, inbox)
}
pub struct WeakMailbox<A: Actor> {
inner: Weak<Inner<A>>,
ref_count: Weak<AtomicUsize>,
}
impl<A: Actor> WeakMailbox<A> {
pub fn upgrade(&self) -> Option<Mailbox<A>> {
let inner = self.inner.upgrade()?;
let ref_count = self.ref_count.upgrade()?;
Some(Mailbox { inner, ref_count })
}
}
#[cfg(test)]
mod tests {
use std::mem;
use std::time::Duration;
use super::*;
use crate::tests::{Ping, PingReceiverActor};
use crate::Universe;
#[tokio::test]
async fn test_weak_mailbox_downgrade_upgrade() {
let universe = Universe::with_accelerated_time();
let (mailbox, _inbox) = universe.create_test_mailbox::<PingReceiverActor>();
let weak_mailbox = mailbox.downgrade();
assert!(weak_mailbox.upgrade().is_some());
}
#[tokio::test]
async fn test_weak_mailbox_failing_upgrade() {
let universe = Universe::with_accelerated_time();
let (mailbox, _inbox) = universe.create_test_mailbox::<PingReceiverActor>();
let weak_mailbox = mailbox.downgrade();
drop(mailbox);
assert!(weak_mailbox.upgrade().is_none());
}
struct BackPressureActor;
impl Actor for BackPressureActor {
type ObservableState = ();
fn observable_state(&self) -> Self::ObservableState {}
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Bounded(0)
}
fn yield_after_each_message(&self) -> bool {
false
}
}
use async_trait::async_trait;
#[async_trait]
impl Handler<Duration> for BackPressureActor {
type Reply = ();
async fn handle(
&mut self,
sleep_duration: Duration,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if !sleep_duration.is_zero() {
tokio::time::sleep(sleep_duration).await;
}
Ok(())
}
}
#[tokio::test]
async fn test_mailbox_send_with_backpressure_counter_low_backpressure() {
let universe = Universe::with_accelerated_time();
let back_pressure_actor = BackPressureActor;
let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
mailbox
.ask_with_backpressure_counter(Duration::default(), None)
.await
.unwrap();
let backpressure_micros_counter =
IntCounter::new("test_counter", "help for test_counter").unwrap();
let wait_duration = Duration::from_millis(1);
let processed = mailbox
.send_message_with_backpressure_counter(
wait_duration,
Some(&backpressure_micros_counter),
)
.await
.unwrap();
assert!(backpressure_micros_counter.get() < 500);
processed.await.unwrap();
assert!(backpressure_micros_counter.get() < 500);
universe.assert_quit().await;
}
#[tokio::test]
async fn test_mailbox_send_with_backpressure_counter_backpressure() {
let universe = Universe::with_accelerated_time();
let back_pressure_actor = BackPressureActor;
let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
mailbox
.ask_with_backpressure_counter(Duration::default(), None)
.await
.unwrap();
let backpressure_micros_counter =
IntCounter::new("test_counter", "help for test_counter").unwrap();
let wait_duration = Duration::from_millis(1);
mailbox
.send_message_with_backpressure_counter(
wait_duration,
Some(&backpressure_micros_counter),
)
.await
.unwrap();
mailbox
.send_message_with_backpressure_counter(
Duration::default(),
Some(&backpressure_micros_counter),
)
.await
.unwrap();
assert!(backpressure_micros_counter.get() > 1_000u64);
universe.assert_quit().await;
}
#[tokio::test]
async fn test_mailbox_waiting_for_processing_does_not_counter_as_backpressure() {
let universe = Universe::with_accelerated_time();
let back_pressure_actor = BackPressureActor;
let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
mailbox
.ask_with_backpressure_counter(Duration::default(), None)
.await
.unwrap();
let backpressure_micros_counter =
IntCounter::new("test_counter", "help for test_counter").unwrap();
let start = Instant::now();
mailbox
.ask_with_backpressure_counter(Duration::from_millis(1), None)
.await
.unwrap();
let elapsed = start.elapsed();
assert!(elapsed.as_micros() > 1000);
assert_eq!(backpressure_micros_counter.get(), 0);
universe.assert_quit().await;
}
#[tokio::test]
async fn test_try_send() {
let universe = Universe::with_accelerated_time();
let (mailbox, _inbox) = universe
.create_mailbox::<PingReceiverActor>("hello".to_string(), QueueCapacity::Bounded(1));
assert!(mailbox.try_send_message(Ping).is_ok());
assert!(matches!(
mailbox.try_send_message(Ping).unwrap_err(),
TrySendError::Full(Ping)
));
}
#[tokio::test]
async fn test_try_send_disconnect() {
let universe = Universe::with_accelerated_time();
let (mailbox, inbox) = universe
.create_mailbox::<PingReceiverActor>("hello".to_string(), QueueCapacity::Bounded(1));
assert!(mailbox.try_send_message(Ping).is_ok());
mem::drop(inbox);
assert!(matches!(
mailbox.try_send_message(Ping).unwrap_err(),
TrySendError::Disconnected
));
}
}