use crate::{ import::*, ChanSender, BoxEnvelope, StrongCount, ActorInfo, envelope::*, error::* };
pub(crate) struct AddrInner< A: Actor >
{
mb : ChanSender<A> ,
pub(crate) info : Arc<ActorInfo> ,
pub(crate) strong: Arc<Mutex< StrongCount >> ,
}
impl< A: Actor > Clone for AddrInner<A>
{
fn clone( &self ) -> Self
{
Self
{
mb : self.mb.clone_sink() ,
info : self.info.clone() ,
strong: self.strong.clone() ,
}
}
}
impl< A: Actor > PartialEq for AddrInner<A>
{
fn eq( &self, other: &Self ) -> bool
{
self.info.id == other.info.id
}
}
impl< A: Actor > Eq for AddrInner<A>{}
impl<A: Actor> fmt::Debug for AddrInner<A>
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
let name = match &self.info.name().is_empty()
{
true => String::new(),
false => format!( ", {}", &self.info.name )
};
write!
(
f ,
"AddrInner<{}> ~ {}{}" ,
std::any::type_name::<A>() ,
&self.info.id ,
name ,
)
}
}
impl<A> AddrInner<A> where A: Actor
{
pub(crate) fn new( tx: ChanSender<A>, info: Arc<ActorInfo>, strong: Arc<Mutex<StrongCount>> ) -> Self
{
Self{ info, mb: tx, strong }
}
pub(crate) fn span( &self ) -> Span
{
self.info.span()
}
pub(crate) fn type_name( &self ) -> &str
{
self.info.type_name()
}
}
impl<A, M> Address<M> for AddrInner<A>
where A: Actor + Handler<M> ,
M: Message ,
{
fn call( &mut self, msg: M ) -> Return<'_, ThesRes< <M as Message>::Return >>
{
async move
{
let (ret_tx, ret_rx) = oneshot::<M::Return>() ;
let envl: BoxEnvelope<A> = Box::new( CallEnvelope::new( msg, ret_tx ) ) ;
let result = self.mb.send( envl ).await ;
result.map_err( |e| ThesErr::MailboxClosed{ info: self.info.clone(), src: e.into() } )?;
ret_rx.await
.map_err( |_|
{
ThesErr::ActorStoppedBeforeResponse
{
info: self.info.clone()
}
})
}.boxed()
}
fn clone_box( &self ) -> BoxAddress<M, ThesErr>
{
Box::new( self.clone() )
}
}
impl<A> Identify for AddrInner<A>
where A: Actor,
{
fn id( &self ) -> usize
{
self.info.id()
}
fn name( &self ) -> Arc<str>
{
self.info.name()
}
}
impl<A, M> Sink<M> for AddrInner<A>
where A: Actor + Handler<M> ,
M: Message ,
{
type Error = ThesErr;
fn poll_ready( mut self: Pin<&mut Self>, cx: &mut TaskContext<'_> ) -> Poll<Result<(), Self::Error>>
{
match Pin::new( &mut self.mb ).poll_ready( cx )
{
Poll::Ready( p ) => match p
{
Ok (_) =>
{
self.info.span().in_scope(|| trace!( "Mailbox ready for message." ));
Poll::Ready( Ok(()) )
}
Err(e) =>
{
let err = ThesErr::MailboxClosed{ info: self.info.clone(), src: e.into() };
Poll::Ready( Err(err) )
}
}
Poll::Pending =>
{
self.info.span().in_scope(|| trace!( "Mailbox giving backpressure." ));
Poll::Pending
}
}
}
fn start_send( mut self: Pin<&mut Self>, msg: M ) -> Result<(), Self::Error>
{
let envl: BoxEnvelope<A>= Box::new( SendEnvelope::new( msg ) );
Pin::new( &mut self.mb )
.start_send( envl )
.map_err( |e| ThesErr::MailboxClosed{ info: self.info.clone(), src: e.into() } )
}
fn poll_flush( mut self: Pin<&mut Self>, cx: &mut TaskContext<'_> ) -> Poll<Result<(), Self::Error>>
{
match Pin::new( &mut self.mb ).poll_flush( cx )
{
Poll::Ready( p ) => match p
{
Ok (_) => Poll::Ready( Ok(()) ),
Err(e) =>
{
Poll::Ready( Err( ThesErr::MailboxClosed{ info: self.info.clone(), src: e.into() } ) )
}
}
Poll::Pending => Poll::Pending
}
}
fn poll_close( self: Pin<&mut Self>, _cx: &mut TaskContext<'_> ) -> Poll<Result<(), Self::Error>>
{
Ok(()).into()
}
}