1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#![cfg_attr(
    feature = "impl_trait_in_assoc_type",
    feature(impl_trait_in_assoc_type)
)]

use std::pin::Pin;

use futures_core::Future;
use read::{AsyncAsyncRead, PollRead};
use reusable_box_future::ReusableBoxFuture;
use tokio::io::{AsyncRead, AsyncWrite};
use write::{AsyncAsyncWrite, PollWrite};

pub mod read;
pub mod write;

#[derive(Debug)]
pub struct PollIo<R, W> {
    read: PollRead<R>,
    write: PollWrite<W>,
}

impl<R, W> PollIo<R, W> {
    pub fn new(read: PollRead<R>, write: PollWrite<W>) -> Self {
        Self { read, write }
    }

    pub fn into_split(self) -> (PollRead<R>, PollWrite<W>) {
        (self.read, self.write)
    }

    pub fn split(&self) -> (&PollRead<R>, &PollWrite<W>) {
        (&self.read, &self.write)
    }

    pub fn split_mut(&mut self) -> (&mut PollRead<R>, &mut PollWrite<W>) {
        (&mut self.read, &mut self.write)
    }
}

impl<R, W> AsyncRead for PollIo<R, W>
where
    R: AsyncAsyncRead + Unpin + Send + 'static,
    W: Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        Pin::new(&mut self.get_mut().read).poll_read(cx, buf)
    }
}

impl<R, W> AsyncWrite for PollIo<R, W>
where
    R: Unpin,
    W: AsyncAsyncWrite + Unpin + Send + 'static,
{
    fn poll_write(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> std::task::Poll<Result<usize, std::io::Error>> {
        Pin::new(&mut self.get_mut().write).poll_write(cx, buf)
    }

    fn poll_flush(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), std::io::Error>> {
        Pin::new(&mut self.get_mut().write).poll_flush(cx)
    }

    fn poll_shutdown(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), std::io::Error>> {
        Pin::new(&mut self.get_mut().write).poll_shutdown(cx)
    }
}

fn box_fut<F, O>(fut: F, fut_box: Option<ReusableBoxFuture<O>>) -> ReusableBoxFuture<O>
where
    F: Future<Output = O> + Send + 'static,
{
    match fut_box {
        Some(mut fut_box) => {
            fut_box.set(fut);
            fut_box
        }
        None => ReusableBoxFuture::new(fut),
    }
}