use futures::Future;
use std::fmt;
use std::marker::PhantomData;
mod envelope;
mod message;
mod queue;
mod sync;
pub(crate) mod sync_channel;
mod unsync;
mod unsync_channel;
use actor::{Actor, AsyncContext};
use handler::{Handler, Message};
pub use self::envelope::{EnvelopeProxy, MessageEnvelope, SyncEnvelope,
SyncMessageEnvelope, ToEnvelope, UnsyncEnvelope};
pub use self::message::Request;
pub use self::sync::{Syn, SyncRecipientRequest};
pub(crate) use self::sync_channel::SyncAddressReceiver;
pub use self::unsync::{Unsync, UnsyncRecipientRequest};
pub(crate) use self::unsync_channel::UnsyncAddrReceiver;
pub enum SendError<T> {
Full(T),
Closed(T),
}
#[derive(Fail)]
pub enum MailboxError {
#[fail(display = "Mailbox has closed")]
Closed,
#[fail(display = "Message delivery timed out")]
Timeout,
}
impl<T> SendError<T> {
pub fn into_inner(self) -> T {
match self {
SendError::Full(msg) | SendError::Closed(msg) => msg,
}
}
}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
SendError::Full(_) => write!(fmt, "SendError::Full(..)"),
SendError::Closed(_) => write!(fmt, "SendError::Closed(..)"),
}
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
SendError::Full(_) => write!(fmt, "send failed because receiver is full"),
SendError::Closed(_) => write!(fmt, "send failed because receiver is gone"),
}
}
}
impl fmt::Debug for MailboxError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "MailboxError({})", self)
}
}
pub trait ActorAddress<A, T>
where
A: Actor,
{
fn get(ctx: &mut A::Context) -> T;
}
impl<A> ActorAddress<A, Addr<Unsync, A>> for A
where
A: Actor,
A::Context: AsyncContext<A>,
{
fn get(ctx: &mut A::Context) -> Addr<Unsync, A> {
ctx.unsync_address()
}
}
impl<A> ActorAddress<A, Addr<Syn, A>> for A
where
A: Actor,
A::Context: AsyncContext<A>,
{
fn get(ctx: &mut A::Context) -> Addr<Syn, A> {
ctx.sync_address()
}
}
impl<A> ActorAddress<A, (Addr<Unsync, A>, Addr<Syn, A>)> for A
where
A: Actor,
A::Context: AsyncContext<A>,
{
fn get(ctx: &mut A::Context) -> (Addr<Unsync, A>, Addr<Syn, A>) {
(ctx.unsync_address(), ctx.sync_address())
}
}
impl<A> ActorAddress<A, ()> for A
where
A: Actor,
{
fn get(_: &mut A::Context) -> () {
()
}
}
pub trait Destination<A>: Sized {
type Transport: Clone;
fn connected(tx: &Self::Transport) -> bool;
}
#[allow(unused_variables)]
pub trait MessageDestination<A, M>: Destination<A>
where
A: Handler<M>,
A::Context: ToEnvelope<Self, A, M>,
M: Message + 'static,
Self::Transport: MessageDestinationTransport<Self, A, M>,
{
type Envelope;
type ResultSender;
type ResultReceiver: Future<Item = M::Result>;
fn do_send(tx: &Self::Transport, msg: M);
fn try_send(tx: &Self::Transport, msg: M) -> Result<(), SendError<M>>;
fn send(tx: &Self::Transport, msg: M) -> Request<Self, A, M>;
fn recipient(tx: <Self as Destination<A>>::Transport) -> Recipient<Self, M>
where
Self: MessageRecipient<M>;
}
pub trait MessageDestinationTransport<T: MessageDestination<A, M>, A, M>
where
M: Message + 'static,
A: Handler<M>,
A::Context: ToEnvelope<T, A, M>,
T::Transport: MessageDestinationTransport<T, A, M>,
{
fn send(&self, msg: M) -> Result<T::ResultReceiver, SendError<M>>;
}
#[allow(unused_variables)]
pub trait MessageRecipient<M>: Sized
where
M: Message + 'static,
{
type Envelope: From<M>;
type Transport;
type SendError;
type MailboxError;
type Request: Future<Item = M::Result, Error = Self::MailboxError>;
fn do_send(tx: &Self::Transport, msg: M) -> Result<(), Self::SendError>;
fn try_send(tx: &Self::Transport, msg: M) -> Result<(), Self::SendError>;
fn send(tx: &Self::Transport, msg: M) -> Self::Request;
fn clone(tx: &Self::Transport) -> Self::Transport;
}
pub struct Addr<T: Destination<A>, A> {
tx: T::Transport,
act: PhantomData<A>,
}
unsafe impl<A: Actor> Send for Addr<Syn, A> {}
unsafe impl<A: Actor> Sync for Addr<Syn, A> {}
impl<T: Destination<A>, A> Addr<T, A> {
pub fn new(tx: T::Transport) -> Addr<T, A> {
Addr {
tx,
act: PhantomData,
}
}
pub fn connected(&self) -> bool {
T::connected(&self.tx)
}
pub fn do_send<M>(&self, msg: M)
where
T: MessageDestination<A, M>,
T::Transport: MessageDestinationTransport<T, A, M>,
M: Message + 'static,
A: Handler<M>,
A::Context: ToEnvelope<T, A, M>,
{
T::do_send(&self.tx, msg)
}
pub fn send<M>(&self, msg: M) -> Request<T, A, M>
where
T: MessageDestination<A, M>,
T::Transport: MessageDestinationTransport<T, A, M>,
M: Message + 'static,
A: Handler<M>,
A::Context: ToEnvelope<T, A, M>,
{
T::send(&self.tx, msg)
}
pub fn try_send<M>(&self, msg: M) -> Result<(), SendError<M>>
where
T: MessageDestination<A, M>,
T::Transport: MessageDestinationTransport<T, A, M>,
M: Message + 'static,
A: Handler<M>,
A::Context: ToEnvelope<T, A, M>,
{
T::try_send(&self.tx, msg)
}
pub fn recipient<M>(self) -> Recipient<T, M>
where
T: MessageDestination<A, M> + MessageRecipient<M>,
A: Handler<M>,
A::Context: ToEnvelope<T, A, M>,
<T as Destination<A>>::Transport: MessageDestinationTransport<T, A, M>,
M: Message + 'static,
{
T::recipient(self.tx)
}
}
impl<T: Destination<A>, A> Clone for Addr<T, A> {
fn clone(&self) -> Addr<T, A> {
Addr {
tx: self.tx.clone(),
act: PhantomData,
}
}
}
pub struct Recipient<T: MessageRecipient<M>, M: Message + 'static> {
tx: T::Transport,
msg: PhantomData<M>,
}
unsafe impl<M> Send for Recipient<Syn, M>
where
M: Message + Send + 'static,
M::Result: Send,
{
}
unsafe impl<M> Sync for Recipient<Syn, M>
where
M: Message + Send + 'static,
M::Result: Send,
{
}
impl<T, M> Recipient<T, M>
where
T: MessageRecipient<M>,
M: Message + 'static,
{
pub fn new(tx: T::Transport) -> Recipient<T, M> {
Recipient {
tx,
msg: PhantomData,
}
}
pub fn do_send(&self, msg: M) -> Result<(), T::SendError> {
T::do_send(&self.tx, msg)
}
pub fn try_send(&self, msg: M) -> Result<(), T::SendError> {
T::try_send(&self.tx, msg)
}
pub fn send(&self, msg: M) -> T::Request {
T::send(&self.tx, msg)
}
}
impl<T, M> Clone for Recipient<T, M>
where
T: MessageRecipient<M>,
M: Message + 'static,
{
fn clone(&self) -> Recipient<T, M> {
Recipient {
tx: T::clone(&self.tx),
msg: PhantomData,
}
}
}