use crate::{ import::*, RingBuffer };
impl AsyncRead for RingBuffer<u8>
{
fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, dst: &mut [u8] ) -> Poll< Result<usize, io::Error> >
{
if dst.is_empty()
{
return Poll::Ready( Ok(0) );
}
let read = self.consumer.pop_slice( dst );
if read != 0
{
if let Some(waker) = self.write_waker.take()
{
waker.wake();
}
Poll::Ready( Ok(read) )
}
else if self.closed
{
Ok(0).into()
}
else
{
self.read_waker.replace( cx.waker().clone() );
Poll::Pending
}
}
}
#[cfg(test)]
mod tests
{
use crate::{ import::{ *, assert_eq }, RingBuffer };
#[test]
fn async_read() { block_on( async
{
let mut ring = RingBuffer::<u8>::new(2);
ring.producer.push( b'a' ).expect( "write" );
ring.producer.push( b'b' ).expect( "write" );
let mut read_buf = [0u8;1];
AsyncReadExt::read( &mut ring, &mut read_buf ).await.unwrap();
assert!( !ring.is_empty() );
assert!( !ring.is_full() );
assert_eq!( ring.len() , 1 );
assert_eq!( ring.remaining(), 1 );
assert!( ring.read_waker .is_none() );
assert!( ring.write_waker.is_none() );
assert_eq!( b'a', read_buf[0] );
AsyncReadExt::read( &mut ring, &mut read_buf ).await.unwrap();
assert!( ring.is_empty() );
assert!( !ring.is_full() );
assert_eq!( ring.len() , 0 );
assert_eq!( ring.remaining(), 2 );
assert!( ring.read_waker .is_none() );
assert!( ring.write_waker.is_none() );
assert_eq!( b'b', read_buf[0] );
let (waker, count) = new_count_waker();
let mut cx = Context::from_waker( &waker );
assert!( AsyncRead::poll_read( Pin::new( &mut ring ), &mut cx, &mut read_buf ).is_pending() );
assert!( ring.is_empty() );
assert!( !ring.is_full() );
assert_eq!( ring.len() , 0 );
assert_eq!( ring.remaining(), 2 );
assert!( ring.read_waker .is_some() );
assert!( ring.write_waker.is_none() );
let arr = [ b'c' ];
AsyncWriteExt::write( &mut ring, &arr ).await.expect( "write" );
assert!( !ring.is_empty() );
assert!( !ring.is_full() );
assert_eq!( ring.len() , 1 );
assert_eq!( ring.remaining(), 1 );
assert!( ring.read_waker.is_none() );
assert_eq!( count, 1 );
assert_eq!( 1, AsyncReadExt::read( &mut ring, &mut read_buf ).await.unwrap() );
assert_eq!( b'c', read_buf[0] );
assert!( ring.is_empty() );
assert!( !ring.is_full() );
assert_eq!( ring.len() , 0 );
assert_eq!( ring.remaining(), 2 );
})}
#[test]
fn closed_read() { block_on( async
{
let mut ring = RingBuffer::<u8>::new(2) ;
let mut read_buf = [0u8;1] ;
let arr = [ b'a' ] ;
AsyncWriteExt::write( &mut ring, &arr ).await.expect( "write" );
ring.close().await.unwrap();
AsyncReadExt::read( &mut ring, &mut read_buf ).await.unwrap();
assert_eq!( b'a', read_buf[0] );
assert_eq!( AsyncReadExt::read( &mut ring, &mut read_buf ).await.unwrap(), 0 );
assert_eq!( AsyncReadExt::read( &mut ring, &mut read_buf ).await.unwrap(), 0 );
})}
}