use crate::{ import::*, Addr, ChanReceiver, RxStrong, ChanSender, ActorInfo };
#[ derive(Debug) ]
pub enum MailboxEnd<A: Actor>
{
Actor( A ) ,
Mailbox( Mailbox<A> ) ,
}
pub struct Mailbox<A> where A: Actor
{
rx : RxStrong<A> ,
info: Arc<ActorInfo> ,
}
impl<A> Mailbox<A> where A: Actor
{
pub fn new( name: impl AsRef<str>, rx: ChanReceiver<A> ) -> Self
{
static MB_COUNTER: AtomicUsize = AtomicUsize::new( 1 );
let id = MB_COUNTER.fetch_add( 1, Ordering::Relaxed );
let rx = RxStrong::new(rx);
let info = Arc::new( ActorInfo::new::<A>( id, name.as_ref().into() ) );
Self { rx, info }
}
pub fn info( &self ) -> &ActorInfo
{
&self.info
}
pub fn addr( &self, tx: ChanSender<A> ) -> Addr<A>
{
Addr::new( tx, self.info.clone(), RxStrong::count( &self.rx ) )
}
pub async fn start( mut self, mut actor: A ) -> MailboxEnd<A>
where A: Send
{
let span = self.info.span();
async
{
actor.started().await;
trace!( "Mailbox started" );
while let Some( envl ) = self.rx.next().await
{
trace!( "Will process a message." );
if let Err( e ) = FutureExt::catch_unwind( AssertUnwindSafe( envl.handle( &mut actor ) ) ).await
{
error!( "Actor panicked, with error: {:?}", e );
return MailboxEnd::Mailbox(self);
}
trace!( "Finished handling message. Waiting for next message" );
}
actor.stopped().await;
trace!( "Mailbox stopped" );
MailboxEnd::Actor( actor )
}
.instrument( span )
.await
}
pub async fn start_local( mut self, mut actor: A ) -> MailboxEnd<A>
{
let span = self.info.span();
async
{
actor.started().await;
trace!( "Mailbox started" );
while let Some( envl ) = self.rx.next().await
{
trace!( "Actor will process a message." );
if let Err( e ) = FutureExt::catch_unwind( AssertUnwindSafe( envl.handle_local( &mut actor ) ) ).await
{
error!( "Actor panicked with error: {:?}", e );
return MailboxEnd::Mailbox( self );
}
trace!( "Actor finished handling it's message. Waiting for next message" );
}
actor.stopped().await;
trace!( "Mailbox stopped" );
MailboxEnd::Actor( actor )
}
.instrument( span )
.await
}
}
impl<A: Actor> Identify for Mailbox<A>
{
fn id( &self ) -> usize
{
self.info.id
}
fn name( &self ) -> Arc<str>
{
self.info.name.clone()
}
}
impl<A: Actor> fmt::Debug for Mailbox<A>
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
match self.info.name.is_empty()
{
true => write!( f, "Mailbox<{}> ~ id: {}" , std::any::type_name::<A>(), &self.info.id ) ,
false => write!( f, "Mailbox<{}> ~ id: {}, name: {}", std::any::type_name::<A>(), &self.info.id, self.info.name ) ,
}
}
}
impl<A: Actor> fmt::Display for Mailbox<A>
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
match self.info.name.is_empty()
{
true => write!( f, "{} ({})" , self.info.type_name(), self.info.id ) ,
false => write!( f, "{} ({}, {})", self.info.type_name(), self.info.id, self.info.name ) ,
}
}
}