async_chanx 0.1.0-alpha.5

Implement `Sink` for some channel implementations.
Documentation
use
{
	crate         :: { import::*, ChanErr                             } ,
	async_channel :: { Sender, TrySendError                           } ,
	std           :: { task::{ Waker, Poll, Context }, future::Future } ,
	futures       :: { FutureExt, ready                               } ,
};


/// A wrapper around piper::Sender that implements Sink.
//
pub struct AsyncSender<I>
{
	sender: Option< Sender<I>                                         > ,
	buffer: Option< I                                                 > ,
	waker : Option< Waker                                             > ,
	send  : Option< Pin<Box< dyn Future<Output = Sender<I>> + Send >> > ,
}


impl<I: fmt::Debug> fmt::Debug for AsyncSender<I>
{
	fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result
	{
		fmt.debug_struct( "AsyncSender" )

			.field( "sender", &self.sender )
			.field( "buffer", &self.buffer )
			.field( "waker" , &self.waker )
			.field( "send"  , &self.send.as_ref().map( |_| "future processing send" ) )

		.finish()
	}
}



impl<I> AsyncSender<I>
{
	/// Constructor
	//
	pub fn new( sender: Sender<I> ) -> Self
	{
		Self
		{
			sender: Some(sender) ,
			buffer: None         ,
			waker : None         ,
			send  : None         ,
		}
	}
}



impl<I: 'static +  Send + Unpin> Sink<I> for AsyncSender<I>
{
	type Error = ChanErr<I>;


	fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< Result<(), Self::Error> >
	{
		if self.buffer.is_some()
		{
			self.waker = Some( cx.waker().clone() );
			return Poll::Pending;
		}

		Poll::Ready( Ok(()) )
	}


	fn start_send( mut self: Pin<&mut Self>, msg: I ) -> Result<(), Self::Error>
	{
		if self.buffer.is_some()
		{
			panic!( "call `poll_ready` before start_send" )
		}

		self.buffer = Some(msg);

		Ok(())
	}


	fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
	{
		if let Some(future) = self.send.as_mut()
		{
			self.sender = Some( ready!(future.as_mut().poll(cx)) );
			self.send   = None;
		}

		match self.buffer.take()
		{
			None => Poll::Ready( Ok(()) ),

			Some(msg) =>
			{
				// take the sender
				//
				let mut sender = self.as_mut().sender.take().unwrap();


				match sender.try_send( msg )
				{
					Ok(()) =>
					{
						self.sender = Some(sender);
						return Poll::Ready( Ok(()) );
					}

					Err( TrySendError::Full ) =>
					{
						self.buffer = Some( msg );

					}

					Err( TrySendError::Closed ) =>
					{
						self.sender = Some(sender);

						return ChanErr::ClosedI(msg);
					}
				}





				let send = async move
				{
					sender.send( msg ).await;
					sender

				}.boxed();

				// store it in the future
				//
				self.send = Some( send );

				// leave it that way until it's ready
				//
				sender = ready!( self.as_mut().send.as_mut().unwrap().as_mut().poll(cx) );

				// Put the sender back and drop the future.
				//
				self.sender = Some(sender);
				self.send   = None;

				Poll::Ready( Ok(()) )
			}
		}
	}


	fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
	{
		self.poll_flush( cx )
	}
}



impl<I> Clone for AsyncSender<I>
{
	fn clone(&self) -> Self
	{
		Self
		{
			sender: self.sender.clone() ,
			buffer: None                ,
			waker : None                ,
			send  : None                ,
		}
	}
}