async_async_io/
lib.rs

1#![cfg_attr(
2    feature = "impl_trait_in_assoc_type",
3    feature(impl_trait_in_assoc_type)
4)]
5
6use core::{
7    future::Future,
8    pin::Pin,
9    task::{Context, Poll},
10};
11use std::io;
12
13use read::{AsyncAsyncRead, PollRead};
14use reusable_box_future::ReusableBoxFuture;
15use tokio::io::{AsyncRead, AsyncWrite};
16use write::{AsyncAsyncWrite, PollWrite};
17
18pub mod read;
19pub mod write;
20
21#[derive(Debug)]
22pub struct PollIo<R, W> {
23    read: PollRead<R>,
24    write: PollWrite<W>,
25}
26
27impl<R, W> PollIo<R, W> {
28    pub fn new(read: PollRead<R>, write: PollWrite<W>) -> Self {
29        Self { read, write }
30    }
31
32    pub fn into_split(self) -> (PollRead<R>, PollWrite<W>) {
33        (self.read, self.write)
34    }
35
36    pub fn split(&self) -> (&PollRead<R>, &PollWrite<W>) {
37        (&self.read, &self.write)
38    }
39
40    pub fn split_mut(&mut self) -> (&mut PollRead<R>, &mut PollWrite<W>) {
41        (&mut self.read, &mut self.write)
42    }
43}
44
45impl<R, W> AsyncRead for PollIo<R, W>
46where
47    R: AsyncAsyncRead + Unpin + Send + 'static,
48    W: Unpin,
49{
50    fn poll_read(
51        self: Pin<&mut Self>,
52        cx: &mut Context<'_>,
53        buf: &mut tokio::io::ReadBuf<'_>,
54    ) -> Poll<io::Result<()>> {
55        Pin::new(&mut self.get_mut().read).poll_read(cx, buf)
56    }
57}
58
59impl<R, W> AsyncWrite for PollIo<R, W>
60where
61    R: Unpin,
62    W: AsyncAsyncWrite + Unpin + Send + 'static,
63{
64    fn poll_write(
65        self: Pin<&mut Self>,
66        cx: &mut Context<'_>,
67        buf: &[u8],
68    ) -> Poll<Result<usize, io::Error>> {
69        Pin::new(&mut self.get_mut().write).poll_write(cx, buf)
70    }
71
72    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
73        Pin::new(&mut self.get_mut().write).poll_flush(cx)
74    }
75
76    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
77        Pin::new(&mut self.get_mut().write).poll_shutdown(cx)
78    }
79}
80
81fn box_fut<F, O>(fut: F, fut_box: Option<ReusableBoxFuture<O>>) -> ReusableBoxFuture<O>
82where
83    F: Future<Output = O> + Send + 'static,
84{
85    match fut_box {
86        Some(mut fut_box) => {
87            fut_box.set(fut);
88            fut_box
89        }
90        None => ReusableBoxFuture::new(fut),
91    }
92}