#![cfg_attr( all( not(doctest), doc ), feature(doc_cfg, external_doc) )]
#![cfg_attr( all( not(doctest), doc ), doc(include = "../README.md") )]
#![doc = ""]
#![ doc ( html_root_url = "https://docs.rs/ws_stream_io" ) ]
#![ deny ( missing_docs ) ]
#![ forbid ( unsafe_code ) ]
#![ allow ( clippy::suspicious_else_formatting ) ]
#![ warn
(
anonymous_parameters ,
missing_copy_implementations ,
missing_debug_implementations ,
nonstandard_style ,
rust_2018_idioms ,
trivial_casts ,
trivial_numeric_casts ,
unreachable_pub ,
unused_extern_crates ,
unused_qualifications ,
variant_size_differences ,
)]
use
{
std :: { fmt, io::{ self, Read, Cursor, IoSlice, IoSliceMut, BufRead } } ,
std :: { pin::Pin, task::{ Poll, Context }, borrow::{ Borrow, BorrowMut } } ,
futures_core :: { TryStream, ready } ,
futures_sink :: { Sink } ,
futures_task :: { noop_waker } ,
futures_io :: { AsyncRead, AsyncWrite, AsyncBufRead } ,
};
#[ cfg( feature = "tokio_io" ) ]
use tokio::io::{ AsyncRead as TokAsyncRead, AsyncWrite as TokAsyncWrite };
#[ cfg( feature = "map_pharos" ) ]
use pharos::{ Observable, ObserveConfig, Events };
#[ derive(Debug) ]
enum ReadState<B>
{
Ready{ chunk: Cursor<B> } ,
Error{ error: io::Error } ,
Eof ,
}
pub struct IoStream<St, I>
where
St: Unpin,
{
inner : St ,
state : Option<ReadState<I>> ,
write_err: Option<io::Error> ,
}
impl<St, I> Unpin for IoStream<St, I>
where
St: Unpin,
{}
impl<St, I> IoStream<St, I>
where
St: Unpin,
{
pub fn new( inner: St ) -> Self
{
Self
{
inner ,
state : None ,
write_err : None ,
}
}
pub fn inner( &self ) -> &St
{
&self.inner
}
pub fn inner_mut( &mut self ) -> &mut St
{
&mut self.inner
}
fn poll_read_impl( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8] ) -> Poll< io::Result<usize> >
where
St: TryStream< Ok=I, Error=io::Error >,
I: AsRef<[u8]>,
{
let mut have_read = 0;
let mut state = self.state.take();
loop { match state
{
Some( ReadState::Eof ) => return Poll::Ready( Ok(0) ),
Some( ReadState::Error{ error } ) =>
{
self.state = None;
return Poll::Ready( Err(error) )
}
Some( ReadState::Ready{ ref mut chunk } ) =>
{
have_read += chunk.read( &mut buf[have_read..] ).expect( "no io errors on cursor" );
if chunk.position() == chunk.get_ref().as_ref().len() as u64
{
state = None;
}
if have_read == buf.len()
{
self.state = state;
return Poll::Ready( Ok(have_read) );
}
}
None =>
{
if have_read == 0
{
match ready!( Pin::new( &mut self.inner ).try_poll_next( cx ) )
{
Some(Ok( chunk )) =>
{
state = ReadState::Ready { chunk: Cursor::new(chunk) }.into();
}
None =>
{
self.state = ReadState::Eof.into();
return Ok(0).into();
}
Some( Err(err) ) =>
{
self.state = None;
return Poll::Ready(Err( err ))
}
}
}
else
{
let waker = noop_waker();
let mut context = Context::from_waker( &waker );
match Pin::new( &mut self.inner ).try_poll_next( &mut context )
{
Poll::Ready( Some(Ok( chunk )) ) =>
state = ReadState::Ready { chunk: Cursor::new(chunk) }.into(),
Poll::Ready( None ) =>
{
self.state = ReadState::Eof.into();
return Ok(have_read).into();
}
Poll::Ready(Some( Err(err) )) =>
{
self.state = ReadState::Error{ error: err }.into();
return Ok(have_read).into();
}
Poll::Pending =>
{
self.state = None;
return Ok(have_read).into();
}
}
}
}
}}
}
fn poll_read_vectored_impl( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>] ) -> Poll< io::Result<usize> >
where
St: TryStream< Ok=I, Error=io::Error >,
I: AsRef<[u8]>,
{
let mut have_read = 0;
for b in bufs
{
if !b.is_empty()
{
if have_read == 0
{
match ready!( self.as_mut().poll_read_impl( cx, b ) )
{
Err(e) => return Poll::Ready( Err(e) ) ,
Ok (n) if n < b.len() => return Poll::Ready( Ok (n) ) ,
Ok(n) =>
{
debug_assert!( n == b.len() );
have_read += n;
}
}
}
else
{
let waker = noop_waker();
let mut context = Context::from_waker( &waker );
match self.as_mut().poll_read_impl( &mut context, b )
{
Poll::Pending => return Poll::Ready( Ok(have_read ) ) ,
Poll::Ready( Ok(n) ) if n < b.len() => return Poll::Ready( Ok(have_read + n) ) ,
Poll::Ready( Ok(n) ) =>
{
debug_assert!( n == b.len() );
have_read += n;
}
Poll::Ready( Err(e) ) =>
{
self.state = ReadState::Error{ error: e }.into();
return Poll::Ready( Ok(have_read) );
}
}
}
}
}
if have_read == 0 { self.poll_read_impl( cx, &mut [] ) }
else { Poll::Ready( Ok(have_read) ) }
}
fn poll_write_impl<'a>( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &'a [u8] ) -> Poll< io::Result<usize> >
where
St: Sink< I, Error=io::Error >,
I: From< Vec<u8> >,
{
if let Some( e ) = self.write_err.take()
{
return Poll::Ready( Err(e) );
}
let res = ready!( Pin::new( &mut self.inner ).poll_ready(cx) );
if let Err( e ) = res
{
return Poll::Ready( Err(e) );
}
match Pin::new( &mut self.inner ).start_send( buf.to_vec().into() )
{
Ok (_) =>
{
let waker = noop_waker();
let mut context = Context::from_waker( &waker );
match Pin::new( &mut self.inner ).poll_flush( &mut context )
{
Poll::Pending |
Poll::Ready( Ok(_) ) => {}
Poll::Ready( Err(e)) => self.write_err = e.into(),
}
Poll::Ready(Ok( buf.len() ))
}
Err(e) => Poll::Ready( Err(e) ),
}
}
fn poll_write_vectored_impl<'a>( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &'a[ IoSlice<'a> ] ) -> Poll< io::Result<usize> >
where
St: Sink< I, Error=io::Error >,
I: From< Vec<u8> >,
{
if let Some( e ) = self.write_err.take()
{
return Poll::Ready( Err(e) );
}
let res = ready!( Pin::new( &mut self.inner ).poll_ready(cx) );
if let Err( e ) = res
{
return Poll::Ready( Err(e) )
}
let mut wrote = 0;
for buf in bufs { wrote += buf.len(); }
let mut item = Vec::with_capacity( wrote );
for buf in bufs
{
item.extend_from_slice( buf );
}
match Pin::new( &mut self.inner ).start_send( item.into() )
{
Ok (_) =>
{
let waker = noop_waker();
let mut context = Context::from_waker( &waker );
match Pin::new( &mut self.inner ).poll_flush( &mut context )
{
Poll::Pending |
Poll::Ready( Ok(_) ) => {}
Poll::Ready( Err(e)) => self.write_err = e.into(),
}
Poll::Ready(Ok( wrote ))
}
Err(e) => Poll::Ready( Err(e) ),
}
}
fn poll_flush_impl(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll< io::Result<()> >
where
St: Sink< I, Error=io::Error >
{
match ready!( Pin::new( &mut self.inner ).poll_flush(cx) )
{
Ok (_) => Poll::Ready(Ok ( () )) ,
Err(e) => Poll::Ready(Err( e )) ,
}
}
fn poll_close_impl( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< io::Result<()> >
where
St: Sink< I, Error=io::Error >
{
ready!( Pin::new( &mut self.inner ).poll_close( cx ) ).into()
}
}
impl<St: Unpin, I> fmt::Debug for IoStream<St, I>
{
fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
{
write!( f, "IoStream over Tungstenite" )
}
}
impl<St, I> AsyncWrite for IoStream<St, I>
where
St: Sink< I, Error=io::Error > + Unpin,
I: From< Vec<u8> >
{
fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8] ) -> Poll< io::Result<usize> >
{
self.poll_write_impl( cx, buf )
}
fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[ IoSlice<'_> ] ) -> Poll< io::Result<usize> >
{
self.poll_write_vectored_impl( cx, bufs )
}
fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< io::Result<()> >
{
self.poll_flush_impl( cx )
}
fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< io::Result<()> >
{
self.poll_close_impl( cx )
}
}
#[ cfg( feature = "tokio_io" ) ]
#[ cfg_attr( feature = "docs", doc(cfg( feature = "tokio_io" )) ) ]
impl<St, I> TokAsyncWrite for IoStream<St, I>
where
St: Sink< I, Error=io::Error > + Unpin,
I: From< Vec<u8> >
{
fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8] ) -> Poll< io::Result<usize> >
{
self.poll_write_impl( cx, buf )
}
fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< io::Result<()> >
{
self.poll_flush_impl( cx )
}
fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< io::Result<()> >
{
self.poll_close_impl( cx )
}
}
impl<St, I> AsyncRead for IoStream<St, I>
where
St: TryStream< Ok=I, Error=io::Error > + Unpin,
I: AsRef<[u8]>
{
fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8] ) -> Poll< io::Result<usize> >
{
self.poll_read_impl( cx, buf )
}
fn poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>] ) -> Poll< io::Result<usize> >
{
self.poll_read_vectored_impl( cx, bufs )
}
}
#[ cfg( feature = "tokio_io" ) ]
#[ cfg_attr( feature = "docs", doc(cfg( feature = "tokio_io" )) ) ]
impl<St, I> TokAsyncRead for IoStream<St, I>
where
St: TryStream< Ok=I, Error=io::Error > + Unpin,
I: AsRef<[u8]>
{
fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8] ) -> Poll< io::Result<usize> >
{
self.poll_read_impl( cx, buf )
}
}
#[ cfg( feature = "map_pharos" ) ]
impl<St, I, Ev> Observable<Ev> for IoStream<St, I>
where
St: Sink< I, Error=io::Error > + TryStream< Ok=I, Error=io::Error > + Observable<Ev> + Unpin,
Ev: Clone + Send + 'static,
{
type Error = <St as Observable<Ev>>::Error;
fn observe( &mut self, options: ObserveConfig<Ev> ) -> Result< Events<Ev>, Self::Error >
{
self.inner.observe( options ).map_err( Into::into )
}
}
impl<St, I> Borrow<St> for IoStream<St, I>
where
St: Sink< I, Error=io::Error > + TryStream< Ok=I, Error=io::Error > + Unpin,
{
fn borrow( &self ) -> &St
{
&self.inner
}
}
impl<St, I> BorrowMut<St> for IoStream<St, I>
where
St: Sink< I, Error=io::Error > + TryStream< Ok=I, Error=io::Error > + Unpin,
{
fn borrow_mut( &mut self ) -> &mut St
{
&mut self.inner
}
}
impl<St, I> AsyncBufRead for IoStream<St, I>
where
St: TryStream<Ok=I, Error = io::Error> + Unpin ,
I : AsRef<[u8]> + Unpin ,
{
fn poll_fill_buf( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll< io::Result<&[u8]> >
{
if self.state.is_none()
{
match ready!( Pin::new( &mut self.inner ).try_poll_next(cx) )
{
Some( Ok(chunk) ) =>
{
if !chunk.as_ref().is_empty()
{
self.state = ReadState::Ready
{
chunk: Cursor::new( chunk ),
}.into();
}
}
Some( Err(error) ) =>
{
self.state = ReadState::Error{ error }.into();
}
None =>
{
self.state = ReadState::Eof.into();
}
}
}
match self.state.take()
{
Some( ReadState::Error{ error } ) =>
{
self.state = None;
Poll::Ready( Err(error) )
}
Some( ReadState::Eof ) =>
{
Poll::Ready( Ok(&[]) )
}
Some(x) =>
{
self.state = Some(x);
if let Some( ReadState::Ready{ ref mut chunk } ) = self.get_mut().state
{
return Poll::Ready( chunk.fill_buf() );
}
unreachable!();
}
None => unreachable!(),
}
}
fn consume( mut self: Pin<&mut Self>, amount: usize )
{
if amount == 0 { return }
if let Some( ReadState::Ready{ chunk } ) = &mut self.state
{
chunk.consume( amount );
match chunk.get_ref().as_ref().len() as u64
{
x if x == chunk.position() => self.state = None,
x if x < chunk.position() => debug_assert!( false, "Attempted to consume more than available bytes" ),
_ => {}
}
}
else
{
debug_assert!( false, "Attempted to consume from IntoAsyncRead without chunk" );
}
}
}