use std::hash::{Hash, Hasher};
use std::{error, fmt};
use derive_more::Display;
pub(crate) mod channel;
mod envelope;
mod message;
mod queue;
use crate::actor::Actor;
use crate::handler::{Handler, Message};
pub use self::envelope::{Envelope, EnvelopeProxy, ToEnvelope};
pub use self::message::{RecipientRequest, Request};
pub(crate) use self::channel::{AddressReceiver, AddressSenderProducer};
use self::channel::{AddressSender, Sender, WeakAddressSender};
pub enum SendError<T> {
Full(T),
Closed(T),
}
#[derive(Display, Clone, Copy)]
pub enum MailboxError {
#[display(fmt = "Mailbox has closed")]
Closed,
#[display(fmt = "Message delivery timed out")]
Timeout,
}
impl error::Error for MailboxError {}
impl<T> SendError<T> {
pub fn into_inner(self) -> T {
match self {
SendError::Full(msg) | SendError::Closed(msg) => msg,
}
}
}
impl<T> error::Error for SendError<T> {}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SendError::Full(_) => write!(fmt, "SendError::Full(..)"),
SendError::Closed(_) => write!(fmt, "SendError::Closed(..)"),
}
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SendError::Full(_) => write!(fmt, "send failed because receiver is full"),
SendError::Closed(_) => write!(fmt, "send failed because receiver is gone"),
}
}
}
impl fmt::Debug for MailboxError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "MailboxError({})", self)
}
}
#[derive(Debug)]
pub struct Addr<A: Actor> {
tx: AddressSender<A>,
}
impl<A: Actor> Addr<A> {
pub fn new(tx: AddressSender<A>) -> Addr<A> {
Addr { tx }
}
#[inline]
pub fn connected(&self) -> bool {
self.tx.connected()
}
#[inline]
pub fn do_send<M>(&self, msg: M)
where
M: Message + Send,
M::Result: Send,
A: Handler<M>,
A::Context: ToEnvelope<A, M>,
{
let _ = self.tx.do_send(msg);
}
pub fn try_send<M>(&self, msg: M) -> Result<(), SendError<M>>
where
M: Message + Send + 'static,
M::Result: Send,
A: Handler<M>,
A::Context: ToEnvelope<A, M>,
{
self.tx.try_send(msg, true)
}
#[inline]
pub fn send<M>(&self, msg: M) -> Request<A, M>
where
M: Message + Send,
M::Result: Send,
A: Handler<M>,
A::Context: ToEnvelope<A, M>,
{
match self.tx.send(msg) {
Ok(rx) => Request::new(Some(rx), None),
Err(SendError::Full(msg)) => {
Request::new(None, Some((self.tx.clone(), msg)))
}
Err(SendError::Closed(_)) => Request::new(None, None),
}
}
pub fn recipient<M: 'static>(self) -> Recipient<M>
where
A: Handler<M>,
A::Context: ToEnvelope<A, M>,
M: Message + Send,
M::Result: Send,
{
self.into()
}
pub fn downgrade(&self) -> WeakAddr<A> {
WeakAddr {
wtx: self.tx.downgrade(),
}
}
}
impl<A: Actor> Clone for Addr<A> {
fn clone(&self) -> Addr<A> {
Addr {
tx: self.tx.clone(),
}
}
}
impl<A: Actor> PartialEq for Addr<A> {
fn eq(&self, other: &Self) -> bool {
self.tx == other.tx
}
}
impl<A: Actor> Eq for Addr<A> {}
impl<A: Actor> Hash for Addr<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.tx.hash(state)
}
}
#[derive(Debug)]
pub struct WeakAddr<A: Actor> {
wtx: WeakAddressSender<A>,
}
impl<A: Actor> WeakAddr<A> {
pub fn upgrade(&self) -> Option<Addr<A>> {
match self.wtx.upgrade() {
Some(tx) => {
if tx.connected() {
Some(Addr::new(tx))
} else {
None
}
}
None => None,
}
}
}
pub struct Recipient<M: Message>
where
M: Message + Send,
M::Result: Send,
{
tx: Box<dyn Sender<M>>,
}
impl<M> Recipient<M>
where
M: Message + Send,
M::Result: Send,
{
pub(crate) fn new(tx: Box<dyn Sender<M>>) -> Recipient<M> {
Recipient { tx }
}
pub fn do_send(&self, msg: M) -> Result<(), SendError<M>> {
self.tx.do_send(msg)
}
pub fn try_send(&self, msg: M) -> Result<(), SendError<M>> {
self.tx.try_send(msg)
}
pub fn send(&self, msg: M) -> RecipientRequest<M> {
match self.tx.send(msg) {
Ok(rx) => RecipientRequest::new(Some(rx), None),
Err(SendError::Full(msg)) => {
RecipientRequest::new(None, Some((self.tx.boxed(), msg)))
}
Err(SendError::Closed(_)) => RecipientRequest::new(None, None),
}
}
pub fn connected(&self) -> bool {
self.tx.connected()
}
}
impl<A: Actor, M: Message + Send + 'static> Into<Recipient<M>> for Addr<A>
where
A: Handler<M>,
M::Result: Send,
A::Context: ToEnvelope<A, M>,
{
fn into(self) -> Recipient<M> {
Recipient::new(Box::new(self.tx))
}
}
impl<M> Clone for Recipient<M>
where
M: Message + Send,
M::Result: Send,
{
fn clone(&self) -> Recipient<M> {
Recipient {
tx: self.tx.boxed(),
}
}
}
impl<M> PartialEq for Recipient<M>
where
M: Message + Send,
M::Result: Send,
{
fn eq(&self, other: &Self) -> bool {
self.tx.hash() == other.tx.hash()
}
}
impl<M> Eq for Recipient<M>
where
M: Message + Send,
M::Result: Send,
{
}
impl<M> Hash for Recipient<M>
where
M: Message + Send,
M::Result: Send,
{
fn hash<H: Hasher>(&self, state: &mut H) {
self.tx.hash().hash(state)
}
}
impl<M> fmt::Debug for Recipient<M>
where
M: Message + Send,
M::Result: Send,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Recipient {{ /* omitted */ }}")
}
}
#[cfg(test)]
mod tests {
use crate::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct ActorWithSmallMailBox(Arc<AtomicUsize>);
impl Actor for ActorWithSmallMailBox {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.set_mailbox_capacity(1);
}
}
pub struct SetCounter(usize);
impl Message for SetCounter {
type Result = ();
}
impl Handler<SetCounter> for ActorWithSmallMailBox {
type Result = <SetCounter as Message>::Result;
fn handle(&mut self, ping: SetCounter, _: &mut Context<Self>) -> Self::Result {
self.0.store(ping.0, Ordering::Relaxed)
}
}
#[test]
fn test_send_over_limit() {
let count = Arc::new(AtomicUsize::new(0));
let count2 = Arc::clone(&count);
System::run(move || {
let addr = ActorWithSmallMailBox::create(|ctx| {
ctx.set_mailbox_capacity(1);
ActorWithSmallMailBox(count2)
});
let fut = async move {
let send = addr.clone().send(SetCounter(1));
assert!(send.rx_is_some());
let addr2 = addr.clone();
let send2 = addr2.send(SetCounter(2));
assert!(send2.rx_is_some());
let send3 = addr2.send(SetCounter(3));
assert!(!send3.rx_is_some());
let _ = send.await;
let _ = send2.await;
let _ = send3.await;
System::current().stop();
};
Arbiter::spawn(fut);
})
.unwrap();
assert_eq!(count.load(Ordering::Relaxed), 3);
}
}