1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
use std::future::Future;
use std::io::{self, Read, Write};
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_io_traits_sync_wrapper::Wrapper as AsyncRWSyncWrapper;
use futures_util::io::{AsyncRead, AsyncWrite};
use rustls::{Session, Stream};

#[cfg(feature = "acceptor")]
mod acceptor;
#[cfg(feature = "acceptor")]
pub use acceptor::{Accept, TlsAcceptor};

#[cfg(feature = "connector")]
mod connector;
#[cfg(feature = "connector")]
pub use connector::{Connect, TlsConnector};

pub mod prelude {
    pub use rustls::{
        internal::pemfile, ClientConfig, ClientSession, NoClientAuth, ServerConfig, ServerSession,
        Session as RustlsSession,
    };

    #[cfg(feature = "connector")]
    pub use webpki::DNSNameRef;
    #[cfg(feature = "connector")]
    pub use webpki_roots::TLS_SERVER_ROOTS;
}

#[derive(PartialEq, Eq)]
enum State {
    Pending,
    HandshakeCompleted,
}

pub struct TlsStream<S, IO> {
    session: S,
    io: Box<IO>,
    state: State,
}

impl<S, IO> TlsStream<S, IO> {
    fn new(session: S, io: IO) -> Self {
        Self {
            session,
            io: Box::new(io),
            state: State::Pending,
        }
    }

    pub fn get_mut(&mut self) -> (&mut S, &mut IO) {
        (&mut self.session, self.io.as_mut())
    }

    pub fn get_ref(&self) -> (&S, &IO) {
        (&self.session, self.io.as_ref())
    }

    pub fn into_inner(self) -> (S, IO) {
        (self.session, *self.io)
    }
}

pub enum MidHandshake<S, IO> {
    Handshaking(TlsStream<S, IO>),
    End,
}

pub fn handshake<S, IO>(session: S, io: IO) -> MidHandshake<S, IO> {
    MidHandshake::Handshaking(TlsStream::new(session, io))
}

impl<S, IO> Future for MidHandshake<S, IO>
where
    S: Session + Unpin,
    IO: AsyncRead + AsyncWrite + Unpin,
{
    type Output = io::Result<TlsStream<S, IO>>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let this = self.get_mut();

        if let MidHandshake::Handshaking(tls_stream) = this {
            let mut io = AsyncRWSyncWrapper::new(&mut tls_stream.io, cx);

            match tls_stream.session.complete_io(&mut io) {
                Ok(_) => {}
                Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Poll::Pending,
                Err(err) => return Poll::Ready(Err(err)),
            }
        }

        match mem::replace(this, MidHandshake::End) {
            MidHandshake::Handshaking(mut tls_stream) => {
                *this = MidHandshake::End;

                tls_stream.state = State::HandshakeCompleted;
                Poll::Ready(Ok(tls_stream))
            }
            MidHandshake::End => panic!(),
        }
    }
}

impl<S, IO> AsyncRead for TlsStream<S, IO>
where
    S: Session + Unpin,
    IO: AsyncRead + AsyncWrite + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let this = self.get_mut();

        debug_assert!(this.state == State::HandshakeCompleted);

        let mut io = AsyncRWSyncWrapper::new(&mut this.io, cx);

        let mut stream = Stream::new(&mut this.session, &mut io);

        match stream.read(buf) {
            Ok(n) => Poll::Ready(Ok(n)),
            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Poll::Pending,
            Err(err) => Poll::Ready(Err(err)),
        }
    }
}

impl<S, IO> AsyncWrite for TlsStream<S, IO>
where
    S: Session + Unpin,
    IO: AsyncRead + AsyncWrite + Unpin,
{
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
        let this = self.get_mut();

        debug_assert!(this.state == State::HandshakeCompleted);

        let mut io = AsyncRWSyncWrapper::new(&mut this.io, cx);

        let mut stream = Stream::new(&mut this.session, &mut io);

        match stream.write(buf) {
            Ok(n) => Poll::Ready(Ok(n)),
            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Poll::Pending,
            Err(err) => Poll::Ready(Err(err)),
        }
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
        let this = self.get_mut();

        let mut io = AsyncRWSyncWrapper::new(&mut this.io, cx);

        let mut stream = Stream::new(&mut this.session, &mut io);

        stream.flush()?;

        Pin::new(&mut this.io).poll_flush(cx)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
        let this = self.get_mut();

        Pin::new(&mut this.io).poll_close(cx)
    }
}