[][src]Crate async_stdio

Adapter for using async read/write streams in std::io contexts

Sometimes, you'll come across an interface that only takes std::io Read + Write types, but also needs to be adapted for an async/await application. The AsStdIo adapter allows an AsyncRead + AsyncWrite stream to be used as its counterpart from std::io. Assuming that whatever is consuming the wrapped stream will bubble up io::ErrorKind::WouldBlock errors and allows operations to be resumed, this provides a way to both use an async stream with the std::io-only interface, and to write an async wrapper around it.

Example

use async_stdio::*;

struct ChunkReader<R> {
    // ...
}

impl<R: Read> ChunkReader<R> {
    fn new(reader: R, chunk_size: usize) -> Self {
        // ...
    }

    /// Reads a chunk from the stream
    ///
    /// If the stream ends before a full chunk is read, may return a smaller
    /// chunk. Returns an empty chunk if there is no more to be read.
    fn read_chunk(&mut self) -> io::Result<Vec<u8>> {
        // ...
    }
}

/// Wrapper around the std-only `ChunkReader` to turn it
/// into an async `Stream`
struct AsyncChunked<S> {
    inner: ChunkReader<AsStdIo<S>>,
    waker_ctrl: WakerCtrl,
}

impl<S: AsyncRead + Unpin> AsyncChunked<S> {
    fn new(stream: S, chunk_size: usize) -> AsyncChunked<S> {
        let (stream, waker_ctrl) = AsStdIo::new(stream, None);
        let inner = ChunkReader::new(stream, chunk_size);
        AsyncChunked { inner, waker_ctrl }
    }
}

impl<S: AsyncRead + Unpin> Stream for AsyncChunked<S> {
    type Item = io::Result<Vec<u8>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        // Make sure the waker is set before the calls to `std::io::Read::read`
        this.waker_ctrl.register(cx.waker());
        // `into_poll` (from `ResultExt`) converts `WouldBlock` into `Pending`
        let chunk_res = ready!(this.inner.read_chunk().into_poll());

        Poll::Ready(
            chunk_res
                .map(|chunk| if chunk.is_empty() { None } else { Some(chunk) })
                .transpose(),
        )
    }
}

// Pretend this doesn't already implement `io::Read`
let stream = io::Cursor::new(vec![0, 1, 2, 3, 4, 5]);
let mut async_chunked = AsyncChunked::new(stream, 2);

let chunks: Vec<Vec<u8>> = block_on(async_chunked.map(|chunk| chunk.unwrap()).collect());

let expected: Vec<Vec<u8>> = vec![vec![0, 1], vec![2, 3], vec![4, 5]];

assert_eq!(chunks, expected,);

Re-exports

pub use ext::PollExt as _;
pub use ext::ResultExt as _;

Modules

ext

Extension traits for Poll and io::Result

Structs

AsStdIo

Adapter to use an async byte stream as a std::io::{Read, Write} type

WakerCtrl

Waker control for the AsStdIo wrapper