tor_rtcompat/impls/
async_std.rs1mod net {
10 use crate::{impls, traits};
11
12 use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
13 #[cfg(unix)]
14 use async_std_crate::os::unix::net::{UnixListener, UnixStream};
15 use async_trait::async_trait;
16 use futures::future::Future;
17 use futures::stream::Stream;
18 use paste::paste;
19 use std::io::Result as IoResult;
20 use std::net::SocketAddr;
21 use std::pin::Pin;
22 use std::task::{Context, Poll};
23 #[cfg(unix)]
24 use tor_general_addr::unix;
25 use tracing::instrument;
26
27 macro_rules! impl_stream {
29 { $kind:ident, $addr:ty } => {paste!{
30 pub struct [<Incoming $kind Streams>] {
36 state: Option<[<Incoming $kind StreamsState>]>,
42 }
43 type [<$kind FResult>] = (IoResult<([<$kind Stream>], $addr)>, [<$kind Listener>]);
48 async fn [<take_and_poll_ $kind:lower>](lis: [<$kind Listener>]) -> [<$kind FResult>] {
54 let result = lis.accept().await;
55 (result, lis)
56 }
57 enum [<Incoming $kind StreamsState>] {
59 Ready([<$kind Listener>]),
61 Accepting(Pin<Box<dyn Future<Output = [<$kind FResult>]> + Send + Sync>>),
64 }
65 impl [<Incoming $kind Streams>] {
66 pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
68 Self {
69 state: Some([<Incoming $kind StreamsState>]::Ready(lis)),
70 }
71 }
72 }
73 impl Stream for [< Incoming $kind Streams >] {
74 type Item = IoResult<([<$kind Stream>], $addr)>;
75
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
77 use [<Incoming $kind StreamsState>] as St;
78 let state = self.state.take().expect("No valid state!");
79 let mut future = match state {
80 St::Ready(lis) => Box::pin([<take_and_poll_ $kind:lower>](lis)),
81 St::Accepting(fut) => fut,
82 };
83 match future.as_mut().poll(cx) {
84 Poll::Ready((val, lis)) => {
85 self.state = Some(St::Ready(lis));
86 Poll::Ready(Some(val))
87 }
88 Poll::Pending => {
89 self.state = Some(St::Accepting(future));
90 Poll::Pending
91 }
92 }
93 }
94 }
95 impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
96 type Stream = [<$kind Stream>];
97 type Incoming = [<Incoming $kind Streams>];
98 fn incoming(self) -> [<Incoming $kind Streams>] {
99 [<Incoming $kind Streams>]::from_listener(self)
100 }
101 fn local_addr(&self) -> IoResult<$addr> {
102 [<$kind Listener>]::local_addr(self)
103 }
104 }
105 }}
106 }
107
108 impl_stream! { Tcp, std::net::SocketAddr }
109 #[cfg(unix)]
110 impl_stream! { Unix, unix::SocketAddr}
111
112 #[async_trait]
113 impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
114 type Stream = TcpStream;
115 type Listener = TcpListener;
116 #[instrument(skip_all, level = "trace")]
117 async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
118 TcpStream::connect(addr).await
119 }
120 async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> {
121 Ok(impls::tcp_listen(addr)?.into())
123 }
124 }
125
126 #[cfg(unix)]
127 #[async_trait]
128 impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
129 type Stream = UnixStream;
130 type Listener = UnixListener;
131 #[instrument(skip_all, level = "trace")]
132 async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
133 let path = addr
134 .as_pathname()
135 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
136 UnixStream::connect(path).await
137 }
138 async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> {
139 let path = addr
140 .as_pathname()
141 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
142 UnixListener::bind(path).await
143 }
144 }
145
146 #[cfg(not(unix))]
147 crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
148
149 #[async_trait]
150 impl traits::UdpProvider for async_executors::AsyncStd {
151 type UdpSocket = UdpSocket;
152
153 async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
154 StdUdpSocket::bind(*addr)
155 .await
156 .map(|socket| UdpSocket { socket })
157 }
158 }
159
160 pub struct UdpSocket {
162 socket: StdUdpSocket,
164 }
165
166 #[async_trait]
167 impl traits::UdpSocket for UdpSocket {
168 async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
169 self.socket.recv_from(buf).await
170 }
171
172 async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
173 self.socket.send_to(buf, target).await
174 }
175
176 fn local_addr(&self) -> IoResult<SocketAddr> {
177 self.socket.local_addr()
178 }
179 }
180
181 impl traits::StreamOps for TcpStream {
182 fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
183 impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
184 }
185
186 #[cfg(target_os = "linux")]
187 fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
188 Box::new(impls::streamops::TcpSockFd::from_fd(self))
189 }
190 }
191
192 #[cfg(unix)]
193 impl traits::StreamOps for UnixStream {
194 fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
195 Err(traits::UnsupportedStreamOp::new(
196 "set_tcp_notsent_lowat",
197 "unsupported on Unix streams",
198 )
199 .into())
200 }
201 }
202}
203
204use futures::{Future, FutureExt};
207use std::pin::Pin;
208use std::time::Duration;
209
210use crate::traits::*;
211
212pub fn create_runtime() -> async_executors::AsyncStd {
214 async_executors::AsyncStd::new()
215}
216
217impl SleepProvider for async_executors::AsyncStd {
218 type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
219 fn sleep(&self, duration: Duration) -> Self::SleepFuture {
220 Box::pin(async_io::Timer::after(duration).map(|_| ()))
221 }
222}
223
224impl ToplevelBlockOn for async_executors::AsyncStd {
225 fn block_on<F: Future>(&self, f: F) -> F::Output {
226 async_executors::AsyncStd::block_on(f)
227 }
228}
229
230impl Blocking for async_executors::AsyncStd {
231 type ThreadHandle<T: Send + 'static> = async_executors::BlockingHandle<T>;
232
233 fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
234 where
235 F: FnOnce() -> T + Send + 'static,
236 T: Send + 'static,
237 {
238 async_executors::SpawnBlocking::spawn_blocking(&self, f)
239 }
240
241 fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
242 async_executors::AsyncStd::block_on(f)
243 }
244}