1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
//! Re-exports of the tokio runtime for use with arti.
//!
//! This crate helps define a slim API around our async runtime so that we
//! can easily swap it out.

use std::convert::TryInto;

/// Types used for networking (tokio implementation)
mod net {
    use crate::traits;
    use async_trait::async_trait;

    pub(crate) use tokio_crate::net::{
        TcpListener as TokioTcpListener, TcpStream as TokioTcpStream,
    };

    use futures::io::{AsyncRead, AsyncWrite};
    use tokio_util::compat::{Compat, TokioAsyncReadCompatExt as _};

    use std::io::Result as IoResult;
    use std::net::SocketAddr;
    use std::pin::Pin;
    use std::task::{Context, Poll};

    /// Wrapper for Tokio's TcpStream that implements the standard
    /// AsyncRead and AsyncWrite.
    pub struct TcpStream {
        /// Underlying tokio_util::compat::Compat wrapper.
        s: Compat<TokioTcpStream>,
    }
    impl From<TokioTcpStream> for TcpStream {
        fn from(s: TokioTcpStream) -> TcpStream {
            let s = s.compat();
            TcpStream { s }
        }
    }
    impl AsyncRead for TcpStream {
        fn poll_read(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &mut [u8],
        ) -> Poll<IoResult<usize>> {
            Pin::new(&mut self.s).poll_read(cx, buf)
        }
    }
    impl AsyncWrite for TcpStream {
        fn poll_write(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &[u8],
        ) -> Poll<IoResult<usize>> {
            Pin::new(&mut self.s).poll_write(cx, buf)
        }
        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
            Pin::new(&mut self.s).poll_flush(cx)
        }
        fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
            Pin::new(&mut self.s).poll_close(cx)
        }
    }

    /// Wrap a Tokio TcpListener to behave as a futures::io::TcpListener.
    pub struct TcpListener {
        /// The underlying listener.
        pub(super) lis: TokioTcpListener,
    }

    /// Asynchronous stream that yields incoming connections from a
    /// TcpListener.
    ///
    /// This is analogous to async_std::net::Incoming.
    pub struct IncomingTcpStreams {
        /// Reference to the underlying listener.
        pub(super) lis: TokioTcpListener,
    }

    impl futures::stream::Stream for IncomingTcpStreams {
        type Item = IoResult<(TcpStream, SocketAddr)>;

        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            match self.lis.poll_accept(cx) {
                Poll::Ready(Ok((s, a))) => Poll::Ready(Some(Ok((s.into(), a)))),
                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
                Poll::Pending => Poll::Pending,
            }
        }
    }
    #[async_trait]
    impl traits::TcpListener for TcpListener {
        type TcpStream = TcpStream;
        type Incoming = IncomingTcpStreams;
        async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)> {
            let (stream, addr) = self.lis.accept().await?;
            Ok((stream.into(), addr))
        }
        fn incoming(self) -> Self::Incoming {
            IncomingTcpStreams { lis: self.lis }
        }
        fn local_addr(&self) -> IoResult<SocketAddr> {
            self.lis.local_addr()
        }
    }
}

/// Implement a set of TLS wrappers for use with tokio.
///
/// Right now only tokio_native_tls is supported.
mod tls {
    use async_trait::async_trait;
    use tokio_util::compat::{Compat, TokioAsyncReadCompatExt as _};

    use futures::io::{AsyncRead, AsyncWrite};

    use std::convert::TryFrom;
    use std::io::{Error as IoError, Result as IoResult};
    use std::net::SocketAddr;
    use std::pin::Pin;
    use std::task::{Context, Poll};

    /// Connection factory for building tls connections with tokio and
    /// native_tls.
    pub struct TlsConnector {
        /// The inner connector objject
        connector: tokio_native_tls::TlsConnector,
    }

    impl TryFrom<native_tls::TlsConnectorBuilder> for TlsConnector {
        type Error = native_tls::Error;
        fn try_from(builder: native_tls::TlsConnectorBuilder) -> native_tls::Result<TlsConnector> {
            let connector = builder.build()?.into();
            Ok(TlsConnector { connector })
        }
    }

    /// A TLS-over-TCP stream, using Tokio.
    pub struct TlsStream {
        /// The inner stream object.
        s: Compat<tokio_native_tls::TlsStream<tokio_crate::net::TcpStream>>,
    }

    #[async_trait]
    impl crate::traits::TlsConnector for TlsConnector {
        type Conn = TlsStream;

        async fn connect_unvalidated(
            &self,
            addr: &SocketAddr,
            hostname: &str,
        ) -> IoResult<Self::Conn> {
            let stream = tokio_crate::net::TcpStream::connect(addr).await?;

            let conn = self
                .connector
                .connect(hostname, stream)
                .await
                .map_err(|e| IoError::new(std::io::ErrorKind::Other, e))?;
            let conn = conn.compat();
            Ok(TlsStream { s: conn })
        }
    }

    impl AsyncRead for TlsStream {
        fn poll_read(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &mut [u8],
        ) -> Poll<IoResult<usize>> {
            Pin::new(&mut self.s).poll_read(cx, buf)
        }
    }

    impl AsyncWrite for TlsStream {
        fn poll_write(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &[u8],
        ) -> Poll<IoResult<usize>> {
            Pin::new(&mut self.s).poll_write(cx, buf)
        }
        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
            Pin::new(&mut self.s).poll_flush(cx)
        }
        fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
            Pin::new(&mut self.s).poll_close(cx)
        }
    }

    impl crate::traits::CertifiedConn for TlsStream {
        fn peer_certificate(&self) -> IoResult<Option<Vec<u8>>> {
            let cert = self.s.get_ref().get_ref().peer_certificate();
            match cert {
                Ok(Some(c)) => {
                    let der = c
                        .to_der()
                        .map_err(|e| IoError::new(std::io::ErrorKind::Other, e))?;
                    Ok(Some(der))
                }
                Ok(None) => Ok(None),
                Err(e) => Err(IoError::new(std::io::ErrorKind::Other, e)),
            }
        }
    }
}

// ==============================

use crate::traits::*;
use async_trait::async_trait;
use futures::Future;
use std::io::Result as IoResult;
use std::time::Duration;

macro_rules! implement_traits_for {
    ($runtime:ty) => {
        impl SleepProvider for $runtime {
            type SleepFuture = tokio_crate::time::Sleep;
            fn sleep(&self, duration: Duration) -> Self::SleepFuture {
                tokio_crate::time::sleep(duration)
            }
        }

        impl TlsProvider for $runtime {
            type TlsStream = tls::TlsStream;
            type Connector = tls::TlsConnector;

            fn tls_connector(&self) -> tls::TlsConnector {
                let mut builder = native_tls::TlsConnector::builder();
                // These function names are scary, but they just mean that we
                // aren't checking whether the signer of this cert
                // participates in the web PKI, and we aren't checking the
                // hostname in the cert.
                builder
                    .danger_accept_invalid_certs(true)
                    .danger_accept_invalid_hostnames(true);

                builder.try_into().expect("Couldn't build a TLS connector!")
            }
        }

        #[async_trait]
        impl crate::traits::TcpProvider for $runtime {
            type TcpStream = net::TcpStream;
            type TcpListener = net::TcpListener;

            async fn connect(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpStream> {
                let s = net::TokioTcpStream::connect(addr).await?;
                Ok(s.into())
            }
            async fn listen(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpListener> {
                let lis = net::TokioTcpListener::bind(*addr).await?;
                Ok(net::TcpListener { lis })
            }
        }
    };
}

/// Create and return a new Tokio multithreaded runtime.
pub fn create_runtime() -> IoResult<async_executors::TokioTp> {
    let mut builder = async_executors::TokioTpBuilder::new();
    builder.tokio_builder().enable_all();
    builder.build()
}

/// Wrapper around a Handle to a tokio runtime.
///
/// # Limitations
///
/// Note that Arti requires that the runtime should have working
/// implementations for Tokio's time, net, and io facilities, but we have
/// no good way to check that when creating this object.
#[derive(Clone, Debug)]
pub struct TokioRuntimeHandle {
    /// The underlying Handle.
    handle: tokio_crate::runtime::Handle,
}

impl TokioRuntimeHandle {
    /// Wrap a tokio runtime handle into a format that Arti can use.
    ///
    /// # Limitations
    ///
    /// Note that Arti requires that the runtime should have working
    /// implementations for Tokio's time, net, and io facilities, but we have
    /// no good way to check that when creating this object.
    pub fn new(handle: tokio_crate::runtime::Handle) -> Self {
        handle.into()
    }
}

impl From<tokio_crate::runtime::Handle> for TokioRuntimeHandle {
    fn from(handle: tokio_crate::runtime::Handle) -> Self {
        Self { handle }
    }
}

impl SpawnBlocking for async_executors::TokioTp {
    fn block_on<F: Future>(&self, f: F) -> F::Output {
        async_executors::TokioTp::block_on(self, f)
    }
}

impl SpawnBlocking for TokioRuntimeHandle {
    fn block_on<F: Future>(&self, f: F) -> F::Output {
        self.handle.block_on(f)
    }
}

impl futures::task::Spawn for TokioRuntimeHandle {
    fn spawn_obj(
        &self,
        future: futures::task::FutureObj<'static, ()>,
    ) -> Result<(), futures::task::SpawnError> {
        let join_handle = self.handle.spawn(future);
        drop(join_handle); // this makes the task detached.
        Ok(())
    }
}

implement_traits_for! {async_executors::TokioTp}
implement_traits_for! {TokioRuntimeHandle}