use std::{
future::Future,
ops::DerefMut,
pin::Pin,
task::{Context, Poll},
};
use futures_util::{AsyncRead, AsyncWrite};
use super::*;
mod peek;
pub use peek::Peek;
mod peek_exact;
pub use peek_exact::PeekExact;
mod peek_to_string;
pub use peek_to_string::PeekToString;
mod peek_to_end;
pub use peek_to_end::PeekToEnd;
mod peek_vectored;
pub use peek_vectored::PeekVectored;
mod fill_peek_buf;
pub use fill_peek_buf::FillPeekBuf;
pub trait AsyncPeek: AsyncRead {
fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>>;
fn poll_peek_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
for b in bufs {
if !b.is_empty() {
return self.poll_peek(cx, b);
}
}
self.poll_peek(cx, &mut [])
}
}
macro_rules! deref_async_peek {
() => {
#[cfg_attr(not(tarpaulin), inline(always))]
fn poll_peek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
Pin::new(&mut **self).poll_peek(cx, buf)
}
#[cfg_attr(not(tarpaulin), inline(always))]
fn poll_peek_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
Pin::new(&mut **self).poll_peek_vectored(cx, bufs)
}
};
}
impl<T: ?Sized + AsyncPeek + Unpin> AsyncPeek for Box<T> {
deref_async_peek!();
}
impl<T: ?Sized + AsyncPeek + Unpin> AsyncPeek for &mut T {
deref_async_peek!();
}
impl<P> AsyncPeek for Pin<P>
where
P: DerefMut + Unpin,
P::Target: AsyncPeek,
{
#[cfg_attr(not(tarpaulin), inline(always))]
fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
self.get_mut().as_mut().poll_peek(cx, buf)
}
#[cfg_attr(not(tarpaulin), inline(always))]
fn poll_peek_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
self.get_mut().as_mut().poll_peek_vectored(cx, bufs)
}
}
pin_project_lite::pin_project! {
#[derive(Debug)]
pub struct AsyncPeekable<R, B = DefaultBuffer> {
#[pin]
reader: R,
buffer: B,
buf_cap: Option<usize>,
}
}
impl<R> From<R> for AsyncPeekable<R> {
#[cfg_attr(not(tarpaulin), inline(always))]
fn from(reader: R) -> Self {
Self::new(reader)
}
}
impl<R> From<(usize, R)> for AsyncPeekable<R> {
#[cfg_attr(not(tarpaulin), inline(always))]
fn from((cap, reader): (usize, R)) -> Self {
Self::with_capacity(reader, cap)
}
}
impl<R, B> AsyncRead for AsyncPeekable<R, B>
where
R: AsyncRead,
B: Buffer,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: &mut [u8],
) -> Poll<Result<usize>> {
let want_read = buf.len();
let buffer_len = self.buffer.len();
let this = self.project();
if buffer_len > 0 {
return match want_read.cmp(&buffer_len) {
cmp::Ordering::Less => {
buf.copy_from_slice(&this.buffer.as_slice()[..want_read]);
this.buffer.consume(..want_read);
return Poll::Ready(Ok(want_read));
}
cmp::Ordering::Equal => {
buf.copy_from_slice(this.buffer.as_slice());
this.buffer.clear();
return Poll::Ready(Ok(want_read));
}
cmp::Ordering::Greater => {
buf[..buffer_len].copy_from_slice(this.buffer.as_slice());
buf = &mut buf[buffer_len..];
match this.reader.poll_read(cx, buf) {
Poll::Ready(Ok(bytes)) => {
this.buffer.clear();
Poll::Ready(Ok(bytes + buffer_len))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => {
this.buffer.clear();
Poll::Ready(Ok(buffer_len))
}
}
}
};
}
this.reader.poll_read(cx, buf)
}
}
impl<W, B> AsyncWrite for AsyncPeekable<W, B>
where
W: AsyncWrite,
{
#[cfg_attr(not(tarpaulin), inline(always))]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.project().reader.poll_close(cx)
}
#[cfg_attr(not(tarpaulin), inline(always))]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.project().reader.poll_flush(cx)
}
#[cfg_attr(not(tarpaulin), inline(always))]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
self.project().reader.poll_write(cx, buf)
}
}
impl<R, B> AsyncPeek for AsyncPeekable<R, B>
where
R: AsyncRead,
B: Buffer,
{
fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
let want_peek = buf.len();
let buffer_len = self.buffer.len();
if buffer_len > 0 {
return match want_peek.cmp(&buffer_len) {
cmp::Ordering::Less => {
buf.copy_from_slice(&self.buffer.as_slice()[..want_peek]);
Poll::Ready(Ok(want_peek))
}
cmp::Ordering::Equal => {
buf.copy_from_slice(self.buffer.as_slice());
Poll::Ready(Ok(want_peek))
}
cmp::Ordering::Greater => {
let this = self.project();
this.buffer.resize(want_peek)?;
match this
.reader
.poll_read(cx, &mut this.buffer.as_mut_slice()[buffer_len..])
{
Poll::Ready(Ok(n)) => {
this.buffer.truncate(n + buffer_len);
buf[..buffer_len + n].copy_from_slice(this.buffer.as_slice());
Poll::Ready(Ok(buffer_len + n))
}
Poll::Ready(Err(e)) => {
this.buffer.truncate(buffer_len);
Poll::Ready(Err(e))
}
Poll::Pending => {
this.buffer.truncate(buffer_len);
buf[..buffer_len].copy_from_slice(this.buffer.as_slice());
Poll::Ready(Ok(buffer_len))
}
}
}
};
}
let this = self.project();
match this.reader.poll_read(cx, buf) {
Poll::Ready(Ok(bytes)) => {
this.buffer.extend_from_slice(&buf[..bytes])?;
Poll::Ready(Ok(bytes))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}
impl<R> AsyncPeekable<R> {
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn new(reader: R) -> Self {
Self::construct(reader, DefaultBuffer::new(), None)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn with_capacity(reader: R, capacity: usize) -> Self {
Self::construct(
reader,
DefaultBuffer::with_capacity(capacity),
Some(capacity),
)
}
}
impl<R, B> AsyncPeekable<R, B> {
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn with_buffer(reader: R) -> Self
where
B: Buffer,
{
Self::construct(reader, B::new(), None)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn with_capacity_and_buffer(reader: R, capacity: usize) -> Self
where
B: Buffer,
{
Self::construct(reader, B::with_capacity(capacity), Some(capacity))
}
#[cfg_attr(not(tarpaulin), inline(always))]
fn construct(reader: R, buffer: B, buf_cap: Option<usize>) -> Self {
Self {
reader,
buffer,
buf_cap,
}
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn consume(&mut self) -> B
where
B: Buffer,
{
let buf = match self.buf_cap {
Some(capacity) => B::with_capacity(capacity),
None => B::new(),
};
mem::replace(&mut self.buffer, buf)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn consume_in_place(&mut self)
where
B: Buffer,
{
self.buffer.clear();
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn get_mut(&mut self) -> (&[u8], &mut R)
where
B: Buffer,
{
(self.buffer.as_slice(), &mut self.reader)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn get_ref(&self) -> (&[u8], &R)
where
B: Buffer,
{
(self.buffer.as_slice(), &self.reader)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn into_components(self) -> (B, R) {
(self.buffer, self.reader)
}
}
impl<R, B> AsyncPeekable<R, B>
where
R: AsyncRead + Unpin,
B: Buffer,
{
pub fn peek<'a>(&'a mut self, buf: &'a mut [u8]) -> Peek<'a, R, B>
where
Self: Unpin,
{
assert_future(Peek::new(self, buf))
}
pub fn peek_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> PeekVectored<'a, R, B>
where
Self: Unpin,
{
assert_future(PeekVectored::new(self, bufs))
}
pub fn peek_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> PeekExact<'a, R, B>
where
Self: Unpin,
{
assert_future::<Result<()>, _>(PeekExact::new(self, buf))
}
pub fn peek_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> PeekToEnd<'a, R, B>
where
Self: Unpin,
{
assert_future::<Result<usize>, _>(PeekToEnd::new(self, buf))
}
pub fn peek_to_string<'a>(&'a mut self, buf: &'a mut String) -> PeekToString<'a, R, B>
where
Self: Unpin,
{
assert_future::<Result<usize>, _>(PeekToString::new(self, buf))
}
pub fn fill_peek_buf(&mut self) -> FillPeekBuf<'_, R, B> {
assert_future::<Result<usize>, _>(FillPeekBuf::new(self))
}
}
pub trait AsyncPeekExt: AsyncRead {
fn peekable(self) -> AsyncPeekable<Self>
where
Self: Sized,
{
AsyncPeekable::from(self)
}
fn peekable_with_capacity(self, capacity: usize) -> AsyncPeekable<Self>
where
Self: Sized,
{
AsyncPeekable::from((capacity, self))
}
fn peekable_with_buffer<B>(self) -> AsyncPeekable<Self, B>
where
Self: Sized,
B: Buffer,
{
AsyncPeekable::with_buffer(self)
}
fn peekable_with_capacity_and_buffer<B>(self, capacity: usize) -> AsyncPeekable<Self, B>
where
Self: Sized,
B: Buffer,
{
AsyncPeekable::with_capacity_and_buffer(self, capacity)
}
}
impl<R: AsyncRead> AsyncPeekExt for R {}
pub(crate) fn assert_future<T, F>(future: F) -> F
where
F: Future<Output = T>,
{
future
}
#[cfg(test)]
mod tests {
use super::*;
use futures::io::{AsyncReadExt, Cursor};
#[test]
fn test_peek_exact_peek_exact_read_exact() {
futures::executor::block_on(async move {
let mut peekable = Cursor::new([1, 2, 3, 4, 5, 6, 7, 8, 9]).peekable();
let mut buf1 = [0; 2];
peekable.peek_exact(&mut buf1).await.unwrap();
assert_eq!(buf1, [1, 2]);
let mut buf2 = [0; 4];
peekable.peek_exact(&mut buf2).await.unwrap();
assert_eq!(buf2, [1, 2, 3, 4]);
let mut buf3 = [0; 4];
peekable.read_exact(&mut buf3).await.unwrap();
assert_eq!(buf3, [1, 2, 3, 4]);
});
}
#[test]
fn test_peek_exact_peek_exact_read_exact_1() {
futures::executor::block_on(async move {
let mut peekable = Cursor::new([1, 2, 3, 4, 5, 6, 7, 8, 9]).peekable_with_buffer::<Vec<u8>>();
let mut buf1 = [0; 2];
peekable.peek_exact(&mut buf1).await.unwrap();
assert_eq!(buf1, [1, 2]);
let mut buf2 = [0; 4];
peekable.peek_exact(&mut buf2).await.unwrap();
assert_eq!(buf2, [1, 2, 3, 4]);
let mut buf3 = [0; 4];
peekable.read_exact(&mut buf3).await.unwrap();
assert_eq!(buf3, [1, 2, 3, 4]);
});
}
#[test]
fn test_peek_exact_peek_exact_read_exact_2() {
futures::executor::block_on(async move {
let mut peekable =
Cursor::new([1, 2, 3, 4, 5, 6, 7, 8, 9]).peekable_with_capacity_and_buffer::<Vec<u8>>(24);
let mut buf1 = [0; 2];
peekable.peek_exact(&mut buf1).await.unwrap();
assert_eq!(buf1, [1, 2]);
let mut buf2 = [0; 4];
peekable.peek_exact(&mut buf2).await.unwrap();
assert_eq!(buf2, [1, 2, 3, 4]);
let mut buf3 = [0; 4];
peekable.read_exact(&mut buf3).await.unwrap();
assert_eq!(buf3, [1, 2, 3, 4]);
});
}
}