tor_rtcompat/impls/
async_std.rs

1//! Re-exports of the async_std runtime for use with arti.
2//!
3//! This crate helps define a slim API around our async runtime so that we
4//! can easily swap it out.
5//!
6//! We'll probably want to support tokio as well in the future.
7
8/// Types used for networking (async_std implementation)
9mod net {
10    use crate::{impls, traits};
11
12    use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
13    #[cfg(unix)]
14    use async_std_crate::os::unix::net::{UnixListener, UnixStream};
15    use async_trait::async_trait;
16    use futures::future::Future;
17    use futures::stream::Stream;
18    use paste::paste;
19    use std::io::Result as IoResult;
20    use std::net::SocketAddr;
21    use std::pin::Pin;
22    use std::task::{Context, Poll};
23    #[cfg(unix)]
24    use tor_general_addr::unix;
25    use tracing::instrument;
26
27    /// Implement NetStreamProvider-related functionality for a single address type.
28    macro_rules! impl_stream {
29        { $kind:ident, $addr:ty } => {paste!{
30            /// A `Stream` of incoming streams.
31            ///
32            /// Differs from the output of `*Listener::incoming` in that this
33            /// struct is a real type, and that it returns a stream and an address
34            /// for each input.
35            pub struct [<Incoming $kind Streams>] {
36                /// A state object, stored in an Option so we can take ownership of it
37                /// while poll is being called.
38                // TODO(nickm): I hate using this trick.  At some point in the
39                // future, once Rust has nice support for async traits, maybe
40                // we can refactor it.
41                state: Option<[<Incoming $kind StreamsState>]>,
42            }
43            /// The result type returned by `take_and_poll_*`.
44            ///
45            /// It has to include the Listener, since take_and_poll() has
46            /// ownership of the listener.
47            type [<$kind FResult>] = (IoResult<([<$kind Stream>], $addr)>, [<$kind Listener>]);
48            /// Helper to implement `Incoming*Streams`
49            ///
50            /// This function calls `Listener::accept` while owning the
51            /// listener.  Thus, it returns a future that itself owns the listener,
52            /// and we don't have lifetime troubles.
53            async fn [<take_and_poll_ $kind:lower>](lis: [<$kind Listener>]) -> [<$kind FResult>] {
54                let result = lis.accept().await;
55                (result, lis)
56            }
57            /// The possible states for an `Incoming*Streams`.
58            enum [<Incoming $kind StreamsState>] {
59                /// We're ready to call `accept` on the listener again.
60                Ready([<$kind Listener>]),
61                /// We've called `accept` on the listener, and we're waiting
62                /// for a future to complete.
63                Accepting(Pin<Box<dyn Future<Output = [<$kind FResult>]> + Send + Sync>>),
64            }
65            impl [<Incoming $kind Streams>] {
66                /// Create a new IncomingStreams from a Listener.
67                pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
68                    Self {
69                        state: Some([<Incoming $kind StreamsState>]::Ready(lis)),
70                    }
71                }
72            }
73            impl Stream for [< Incoming $kind Streams >] {
74                type Item = IoResult<([<$kind Stream>], $addr)>;
75
76                fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
77                    use [<Incoming $kind StreamsState>] as St;
78                    let state = self.state.take().expect("No valid state!");
79                    let mut future = match state {
80                        St::Ready(lis) => Box::pin([<take_and_poll_ $kind:lower>](lis)),
81                        St::Accepting(fut) => fut,
82                    };
83                    match future.as_mut().poll(cx) {
84                        Poll::Ready((val, lis)) => {
85                            self.state = Some(St::Ready(lis));
86                            Poll::Ready(Some(val))
87                        }
88                        Poll::Pending => {
89                            self.state = Some(St::Accepting(future));
90                            Poll::Pending
91                        }
92                    }
93                }
94            }
95            impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
96                type Stream = [<$kind Stream>];
97                type Incoming = [<Incoming $kind Streams>];
98                fn incoming(self) -> [<Incoming $kind Streams>] {
99                    [<Incoming $kind Streams>]::from_listener(self)
100                }
101                fn local_addr(&self) -> IoResult<$addr> {
102                    [<$kind Listener>]::local_addr(self)
103                }
104            }
105        }}
106    }
107
108    impl_stream! { Tcp, std::net::SocketAddr }
109    #[cfg(unix)]
110    impl_stream! { Unix, unix::SocketAddr}
111
112    #[async_trait]
113    impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
114        type Stream = TcpStream;
115        type Listener = TcpListener;
116        #[instrument(skip_all, level = "trace")]
117        async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
118            TcpStream::connect(addr).await
119        }
120        async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> {
121            // Use an implementation that's the same across all runtimes.
122            Ok(impls::tcp_listen(addr)?.into())
123        }
124    }
125
126    #[cfg(unix)]
127    #[async_trait]
128    impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
129        type Stream = UnixStream;
130        type Listener = UnixListener;
131        #[instrument(skip_all, level = "trace")]
132        async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
133            let path = addr
134                .as_pathname()
135                .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
136            UnixStream::connect(path).await
137        }
138        async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> {
139            let path = addr
140                .as_pathname()
141                .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
142            UnixListener::bind(path).await
143        }
144    }
145
146    #[cfg(not(unix))]
147    crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
148
149    #[async_trait]
150    impl traits::UdpProvider for async_executors::AsyncStd {
151        type UdpSocket = UdpSocket;
152
153        async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
154            StdUdpSocket::bind(*addr)
155                .await
156                .map(|socket| UdpSocket { socket })
157        }
158    }
159
160    /// Wrap a AsyncStd UdpSocket
161    pub struct UdpSocket {
162        /// The underlying UdpSocket
163        socket: StdUdpSocket,
164    }
165
166    #[async_trait]
167    impl traits::UdpSocket for UdpSocket {
168        async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
169            self.socket.recv_from(buf).await
170        }
171
172        async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
173            self.socket.send_to(buf, target).await
174        }
175
176        fn local_addr(&self) -> IoResult<SocketAddr> {
177            self.socket.local_addr()
178        }
179    }
180
181    impl traits::StreamOps for TcpStream {
182        fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
183            impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
184        }
185
186        #[cfg(target_os = "linux")]
187        fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
188            Box::new(impls::streamops::TcpSockFd::from_fd(self))
189        }
190    }
191
192    #[cfg(unix)]
193    impl traits::StreamOps for UnixStream {
194        fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
195            Err(traits::UnsupportedStreamOp::new(
196                "set_tcp_notsent_lowat",
197                "unsupported on Unix streams",
198            )
199            .into())
200        }
201    }
202}
203
204// ==============================
205
206use futures::{Future, FutureExt};
207use std::pin::Pin;
208use std::time::Duration;
209
210use crate::traits::*;
211
212/// Create and return a new `async_std` runtime.
213pub fn create_runtime() -> async_executors::AsyncStd {
214    async_executors::AsyncStd::new()
215}
216
217impl SleepProvider for async_executors::AsyncStd {
218    type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
219    fn sleep(&self, duration: Duration) -> Self::SleepFuture {
220        Box::pin(async_io::Timer::after(duration).map(|_| ()))
221    }
222}
223
224impl ToplevelBlockOn for async_executors::AsyncStd {
225    fn block_on<F: Future>(&self, f: F) -> F::Output {
226        async_executors::AsyncStd::block_on(f)
227    }
228}
229
230impl Blocking for async_executors::AsyncStd {
231    type ThreadHandle<T: Send + 'static> = async_executors::BlockingHandle<T>;
232
233    fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
234    where
235        F: FnOnce() -> T + Send + 'static,
236        T: Send + 'static,
237    {
238        async_executors::SpawnBlocking::spawn_blocking(&self, f)
239    }
240
241    fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
242        async_executors::AsyncStd::block_on(f)
243    }
244}