thespis_impl 0.2.1

Reference implementation for the thespis actor model
// This benchmark allows profiling where the performance is used. It contains an outer actor which has
// to do an async operation in it's handler and an inner actor which just does a sync addition of u64.
// It turns out that bounded channels are actually faster. Both tokio::sync and futures::mpsc::unbounded
// take 2x time compared to tokio::sync bounded with buffer size 16.
	async_chanx       :: { *          } ,
	thespis           :: { *          } ,
	thespis_impl      :: { *          } ,
	std               :: { thread     } ,
	tokio             :: { sync::mpsc } ,
	tokio_stream      :: { wrappers::UnboundedReceiverStream } ,

// queue size for the bounded channel, 16 is same as actix default size.
const MESSAGES: usize = 100_000;

#[ derive( Actor ) ] struct Sum
	pub total: u64,
	pub inner: Addr<SumIn>,

#[ derive( Actor ) ] struct SumIn
	pub count: u64,

struct Add ( u64 );
struct Show       ;

impl Message for Add  { type Return = () ; }
impl Message for Show { type Return = u64; }

impl Handler< Add > for Sum
	fn handle( &mut self, msg: Add ) -> Return<()> { Box::pin( async move
		let inner = Show ).await.expect( "call inner" ); += msg.0 + inner;


impl Handler< Show > for Sum
	fn handle( &mut self, _msg: Show ) -> Return<u64> { Box::pin( async move


impl Handler< Show > for SumIn
	fn handle( &mut self, _msg: Show ) -> Return<u64> { Box::pin( async move

		self.count += 1;


fn main()
	let (tx, rx)    = mpsc::unbounded_channel()                                                                ;
	let tx          = Box::new( TokioUnboundedSender::new( tx ).sink_map_err( |e| Box::new(e) as DynError ) ) ;
	let sum_in_mb   = Mailbox::new( None, Box::new( UnboundedReceiverStream::new(rx) ) )                                                     ;
	let sum_in_addr = sum_in_mb.addr( tx )                                                                     ;

	let (tx, rx)     = mpsc::unbounded_channel()                                                                ;
	let     tx       = Box::new( TokioUnboundedSender::new( tx ).sink_map_err( |e| Box::new(e) as DynError ) ) ;
	let     sum_mb   = Mailbox::new( None, Box::new( UnboundedReceiverStream::new(rx) ) )                                                     ;
	let mut sum_addr = sum_mb.addr( tx )                                                                        ;
	let     sum      = Sum{ total: 5, inner: sum_in_addr }                                                      ;

	let sumin_thread = thread::spawn( move ||
		let sum_in = SumIn{ count: 0 };

		async_std::task::block_on( sum_in_mb.start( sum_in ) );

	let sum_thread = thread::spawn( move ||
		async_std::task::block_on( sum_mb.start( sum ) );

	async_std::task::block_on( async move
		for _ in 0..MESSAGES
			// sum_addr.send( Add( 10 ) ).await.expect( "Send failed" ); Add( 10 ) ).await.expect( "Send failed" );

		let res = Show{} ).await.expect( "Call failed" );

		dbg!( res );

		assert_eq!( MESSAGES as u64 *10 + 5 + termial( MESSAGES as u64 ), res );

	sumin_thread.join().expect( "join sum_in thread" );
	sum_thread  .join().expect( "join sum    thread" );

fn termial( n: u64 ) -> u64
	n * ( n + 1 ) / 2