use std::any::Any;
use std::convert::Infallible;
use std::fmt;
use std::hash::Hash;
use std::sync::Arc;
use tokio::sync::oneshot;
use crate::channel_with_priority::{Priority, Receiver, Sender};
use crate::envelope::{wrap_in_envelope, Envelope};
use crate::{Actor, AskError, Handler, QueueCapacity, RecvError, SendError};
pub struct Mailbox<A: Actor> {
pub(crate) inner: Arc<Inner<A>>,
}
impl<A: Actor> Clone for Mailbox<A> {
fn clone(&self) -> Self {
Mailbox {
inner: self.inner.clone(),
}
}
}
impl<A: Actor> Mailbox<A> {
pub(crate) fn is_last_mailbox(&self) -> bool {
Arc::strong_count(&self.inner) == 1
}
pub fn id(&self) -> &str {
&self.inner.instance_id
}
}
pub(crate) enum CommandOrMessage<A: Actor> {
Message(Box<dyn Envelope<A>>),
Command(Command),
}
impl<A: Actor> From<Command> for CommandOrMessage<A> {
fn from(cmd: Command) -> Self {
CommandOrMessage::Command(cmd)
}
}
pub(crate) struct Inner<A: Actor> {
pub(crate) tx: Sender<CommandOrMessage<A>>,
instance_id: String,
}
pub enum Command {
Pause,
Resume,
ExitWithSuccess,
Observe(oneshot::Sender<Box<dyn Any + Send>>),
Quit,
Kill,
}
impl fmt::Debug for Command {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Command::Pause => write!(f, "Pause"),
Command::Resume => write!(f, "Resume"),
Command::Observe(_) => write!(f, "Observe"),
Command::ExitWithSuccess => write!(f, "Success"),
Command::Quit => write!(f, "Quit"),
Command::Kill => write!(f, "Kill"),
}
}
}
impl<A: Actor> fmt::Debug for Mailbox<A> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Mailbox({})", self.actor_instance_id())
}
}
impl<A: Actor> Hash for Mailbox<A> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.instance_id.hash(state)
}
}
impl<A: Actor> PartialEq for Mailbox<A> {
fn eq(&self, other: &Self) -> bool {
self.inner.instance_id.eq(&other.inner.instance_id)
}
}
impl<A: Actor> Eq for Mailbox<A> {}
impl<A: Actor> Mailbox<A> {
pub fn actor_instance_id(&self) -> &str {
&self.inner.instance_id
}
pub(crate) async fn send_with_priority(
&self,
cmd_or_msg: CommandOrMessage<A>,
priority: Priority,
) -> Result<(), SendError> {
self.inner.tx.send(cmd_or_msg, priority).await
}
pub async fn send_message<M>(
&self,
message: M,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
where
A: Handler<M>,
M: 'static + Send + Sync + fmt::Debug,
{
let (envelope, response_rx) = wrap_in_envelope(message);
self.send_with_priority(CommandOrMessage::Message(envelope), Priority::Low)
.await?;
Ok(response_rx)
}
pub async fn send_command(&self, command: Command) -> Result<(), SendError> {
self.send_with_priority(command.into(), Priority::High)
.await
}
pub async fn ask<M, T>(&self, message: M) -> Result<T, AskError<Infallible>>
where
A: Handler<M, Reply = T>,
M: 'static + Send + Sync + fmt::Debug,
{
self.send_message(message)
.await
.map_err(|_send_error| AskError::MessageNotDelivered)?
.await
.map_err(|_| AskError::ProcessMessageError)
}
pub async fn ask_for_res<M, T, E: fmt::Debug>(&self, message: M) -> Result<T, AskError<E>>
where
A: Handler<M, Reply = Result<T, E>>,
M: 'static + Send + Sync + fmt::Debug,
{
self.send_message(message)
.await
.map_err(|_send_error| AskError::MessageNotDelivered)?
.await
.map_err(|_| AskError::ProcessMessageError)?
.map_err(AskError::from)
}
}
pub struct Inbox<A: Actor> {
rx: Receiver<CommandOrMessage<A>>,
}
impl<A: Actor> Inbox<A> {
pub(crate) async fn recv_timeout(&mut self) -> Result<CommandOrMessage<A>, RecvError> {
self.rx.recv_timeout(crate::message_timeout()).await
}
pub(crate) async fn recv_timeout_cmd_and_scheduled_msg_only(
&mut self,
) -> Result<CommandOrMessage<A>, RecvError> {
self.rx
.recv_high_priority_timeout(crate::message_timeout())
.await
}
pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> {
self.rx
.drain_low_priority()
.into_iter()
.map(|command_or_message| match command_or_message {
CommandOrMessage::Message(mut msg) => msg.message(),
CommandOrMessage::Command(cmd) => Box::new(cmd),
})
.collect()
}
}
pub fn create_mailbox<A: Actor>(
actor_name: String,
queue_capacity: QueueCapacity,
) -> (Mailbox<A>, Inbox<A>) {
let (tx, rx) = crate::channel_with_priority::channel(queue_capacity);
let mailbox = Mailbox {
inner: Arc::new(Inner {
tx,
instance_id: quickwit_common::new_coolid(&actor_name),
}),
};
let inbox = Inbox { rx };
(mailbox, inbox)
}
pub fn create_test_mailbox<A: Actor>() -> (Mailbox<A>, Inbox<A>) {
create_mailbox("test-mailbox".to_string(), QueueCapacity::Unbounded)
}