use
{
thespis :: { * } ,
thespis_impl :: { * } ,
tracing :: { * } ,
futures::task :: { Spawn, SpawnExt } ,
std :: { error::Error } ,
async_executors :: { AsyncStd, JoinHandle, SpawnHandleExt } ,
};
#[ derive( Actor ) ] struct Counter;
struct Add(usize);
impl Message for Add { type Return = usize; }
impl Handler< Add > for Counter
{
#[async_fn] fn handle( &mut self, msg: Add ) -> usize
{
if msg.0 % 2 == 0 { panic!(); }
msg.0
}
}
#[ derive( Actor ) ]
struct Supervisor
{
exec: Box<dyn Spawn + Send>
}
struct Supervise<A: Actor>
{
mailbox : Option< JoinHandle<MailboxEnd<A>> > ,
create: Box< dyn FnMut() ->A + Send > ,
}
impl<A: Actor + Send> std::panic::UnwindSafe for Supervise<A> {}
impl<A: Actor + Send> Message for Supervise<A>
{
type Return = Option< Addr<A> >;
}
impl<A: Actor + Send> Handler< Supervise<A> > for Supervisor
{
#[async_fn] fn handle( &mut self, mut actor: Supervise<A> ) -> Option< Addr<A> >
{
let mut addr = None;
let mut mb_handle = if actor.mailbox.is_none()
{
let (addr_new, mb_handle) = Addr::builder( "supervised" )
.spawn_handle( (actor.create)(), &AsyncStd )
.unwrap()
;
addr = Some(addr_new);
mb_handle
}
else { actor.mailbox.take().unwrap() };
let supervisor = async move
{
while let MailboxEnd::Mailbox(mb) = mb_handle.await
{
mb_handle = AsyncStd.spawn_handle( mb.start( (actor.create)() ) ).unwrap();
}
};
self.exec.spawn( supervisor ).unwrap();
addr
}
}
#[async_std::main]
async fn main() -> Result< (), Box<dyn Error> >
{
tracing_subscriber::fmt::Subscriber::builder()
.with_max_level( Level::DEBUG )
.init()
;
let mut supervisor = Addr::builder( "supervisor" )
.spawn( Supervisor{ exec: Box::new( AsyncStd ) }, &AsyncStd )?
;
let create = Box::new( ||
{
debug!( "Creating a new Counter" );
Counter
});
let supervise = Supervise
{
create,
mailbox: None,
};
let mut addr = supervisor.call( supervise ).await?.unwrap();
assert!(matches!( addr.call( Add(10) ).await, Err( ThesErr::ActorStoppedBeforeResponse{..} ) ));
assert!(matches!( addr.send( Add(10) ).await, Ok(()) ));
assert_eq!( addr.call( Add(11) ).await, Ok(11) );
Ok(())
}