use crate::{ import::*, BoxEnvelope, ChanSender, ChanReceiver, CloneSinkExt, Addr, ThesErr, Mailbox, MailboxEnd };
pub const BOUNDED: usize = 16;
pub struct ActorBuilder<A: Actor>
{
tx : Option< ChanSender <A> > ,
rx : Option< ChanReceiver<A> > ,
bounded: Option< usize > ,
name : Arc<str> ,
}
impl<A: Actor> ActorBuilder<A>
{
pub fn new( name: impl AsRef<str> ) -> Self
{
Self
{
tx : None ,
rx : None ,
bounded: Some( BOUNDED ) ,
name : name.as_ref().into() ,
}
}
pub fn bounded( mut self, bounded: Option<usize> ) -> Self
{
debug_assert!( self.tx.is_none() );
debug_assert!( self.rx.is_none() );
self.bounded = bounded;
self
}
pub fn channel<E, TX, RX>( mut self, tx:TX, rx: RX ) -> Self
where TX: Sink<BoxEnvelope<A>, Error=E> + Clone + Unpin + Send + 'static,
E : Error + Sync + Send + 'static,
RX: Stream<Item=BoxEnvelope<A>> + Send + Unpin + 'static,
{
debug_assert!( self.bounded == Some( BOUNDED ) );
self.tx = Some( tx.dyned() );
self.rx = Some( Box::new(rx) );
self
}
pub fn build( mut self ) -> (Addr<A>, Mailbox<A>)
{
if self.rx.is_none() || self.tx.is_none()
{
if let Some( bounded ) = self.bounded
{
let (tx, rx) = futures::channel::mpsc::channel( bounded );
self.tx = Some( tx.dyned() );
self.rx = Some( Box::new(rx) );
}
else
{
let (tx, rx) = futures::channel::mpsc::unbounded();
self.tx = Some( tx.dyned() );
self.rx = Some( Box::new(rx) );
}
}
let rx = self.rx.unwrap();
let mb = Mailbox::new( self.name, rx );
let addr = mb.addr( self.tx.unwrap() );
(addr, mb)
}
pub fn spawn( self, actor: A, exec: &dyn Spawn ) -> Result< Addr<A>, ThesErr >
where A: Send
{
let (addr, mb) = self.build();
let fut = mb.start( actor );
exec.spawn( async { fut.await; } )
.map_err( |src| ThesErr::Spawn{ info: addr.info(), src } )?
;
Ok(addr)
}
#[allow(clippy::type_complexity)] pub fn spawn_handle( self, actor: A, exec: & dyn SpawnHandle< MailboxEnd<A> > )
-> Result< (Addr<A>, JoinHandle< MailboxEnd<A> >), ThesErr >
where A: Send
{
let (addr, mb) = self.build();
let fut = mb.start( actor );
let handle = exec.spawn_handle( fut )
.map_err( |src| ThesErr::Spawn{ info: addr.info(), src } )?
;
Ok(( addr, handle ))
}
pub fn spawn_local( self, actor: A, exec: & dyn LocalSpawn ) -> Result< Addr<A>, ThesErr >
{
let (addr, mb) = self.build();
let fut = mb.start_local( actor );
exec.spawn_local( async { fut.await; } )
.map_err( |src| ThesErr::Spawn{ info: addr.info(), src } )?
;
Ok(addr)
}
#[allow(clippy::type_complexity)] pub fn spawn_handle_local( self, actor: A, exec: & dyn LocalSpawnHandle< MailboxEnd<A> > )
-> Result< (Addr<A>, JoinHandle< MailboxEnd<A> >), ThesErr >
{
let (addr, mb) = self.build();
let fut = mb.start_local( actor );
let handle = exec.spawn_handle_local( fut )
.map_err( |src| ThesErr::Spawn{ info: addr.info(), src } )?
;
Ok(( addr, handle ))
}
}
impl<A: Actor> fmt::Debug for ActorBuilder<A>
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
f.debug_struct( "ActorBuilder<A>" )
.field( "bounded", &self.bounded )
.field( "name" , &self.name )
.finish()
}
}