tor_rtcompat/impls/
async_std.rs1mod net {
10 use crate::network::{TcpConnectOptions, TcpListenOptions};
11 #[cfg(unix)]
12 use crate::network::{UnixConnectOptions, UnixListenOptions};
13 use crate::{impls, traits};
14
15 use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
16 #[cfg(unix)]
17 use async_std_crate::os::unix::net::{UnixListener, UnixStream};
18 use async_trait::async_trait;
19 use futures::stream::{self, Stream};
20 use paste::paste;
21 use std::io::Result as IoResult;
22 use std::net::SocketAddr;
23 use std::pin::Pin;
24 use std::task::{Context, Poll};
25 #[cfg(unix)]
26 use tor_general_addr::unix;
27 use tracing::instrument;
28
29 macro_rules! impl_stream {
31 { $kind:ident, $addr:ty } => {paste!{
32 pub struct [<Incoming $kind Streams>] {
38 inner: Pin<Box<dyn Stream<Item = IoResult<([<$kind Stream>], $addr)>> + Send + Sync>>,
40 }
41 impl [<Incoming $kind Streams>] {
42 pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
44 let stream = stream::unfold(lis, |lis| async move {
45 let result = lis.accept().await;
46 Some((result, lis))
47 });
48 Self {
49 inner: Box::pin(stream),
50 }
51 }
52 }
53 impl Stream for [< Incoming $kind Streams >] {
54 type Item = IoResult<([<$kind Stream>], $addr)>;
55
56 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
57 self.inner.as_mut().poll_next(cx)
58 }
59 }
60 impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
61 type Stream = [<$kind Stream>];
62 type Incoming = [<Incoming $kind Streams>];
63 fn incoming(self) -> [<Incoming $kind Streams>] {
64 [<Incoming $kind Streams>]::from_listener(self)
65 }
66 fn local_addr(&self) -> IoResult<$addr> {
67 [<$kind Listener>]::local_addr(self)
68 }
69 }
70 }}
71 }
72
73 impl_stream! { Tcp, std::net::SocketAddr }
74 #[cfg(unix)]
75 impl_stream! { Unix, unix::SocketAddr}
76
77 #[async_trait]
78 impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
79 type Stream = TcpStream;
80 type Listener = TcpListener;
81 type ConnectOptions = TcpConnectOptions;
82 type ListenOptions = TcpListenOptions;
83 #[instrument(skip_all, level = "trace")]
84 async fn connect(
85 &self,
86 addr: &SocketAddr,
87 options: &Self::ConnectOptions,
88 ) -> IoResult<Self::Stream> {
89 let stream = impls::tcp_async_io_connect(addr, options).await?;
91 Ok(stream.into())
92 }
93 async fn listen(
94 &self,
95 addr: &SocketAddr,
96 options: &Self::ListenOptions,
97 ) -> IoResult<Self::Listener> {
98 Ok(impls::tcp_listen(addr, options)?.into())
100 }
101 }
102
103 #[cfg(unix)]
104 #[async_trait]
105 impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
106 type Stream = UnixStream;
107 type Listener = UnixListener;
108 type ConnectOptions = UnixConnectOptions;
109 type ListenOptions = UnixListenOptions;
110 #[instrument(skip_all, level = "trace")]
111 async fn connect(
112 &self,
113 addr: &unix::SocketAddr,
114 options: &Self::ConnectOptions,
115 ) -> IoResult<Self::Stream> {
116 let UnixConnectOptions {} = options;
118
119 let path = addr
120 .as_pathname()
121 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
122 UnixStream::connect(path).await
123 }
124 async fn listen(
125 &self,
126 addr: &unix::SocketAddr,
127 options: &Self::ListenOptions,
128 ) -> IoResult<Self::Listener> {
129 let UnixListenOptions {} = options;
131
132 let path = addr
133 .as_pathname()
134 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
135 UnixListener::bind(path).await
136 }
137 }
138
139 #[cfg(not(unix))]
140 crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
141
142 #[async_trait]
143 impl traits::UdpProvider for async_executors::AsyncStd {
144 type UdpSocket = UdpSocket;
145
146 async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
147 StdUdpSocket::bind(*addr)
148 .await
149 .map(|socket| UdpSocket { socket })
150 }
151 }
152
153 pub struct UdpSocket {
155 socket: StdUdpSocket,
157 }
158
159 #[async_trait]
160 impl traits::UdpSocket for UdpSocket {
161 async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
162 self.socket.recv_from(buf).await
163 }
164
165 async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
166 self.socket.send_to(buf, target).await
167 }
168
169 fn local_addr(&self) -> IoResult<SocketAddr> {
170 self.socket.local_addr()
171 }
172 }
173
174 impl traits::StreamOps for TcpStream {
175 fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
176 impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
177 }
178
179 #[cfg(target_os = "linux")]
180 fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
181 Box::new(impls::streamops::TcpSockFd::from_fd(self))
182 }
183 }
184
185 #[cfg(unix)]
186 impl traits::StreamOps for UnixStream {
187 fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
188 Err(traits::UnsupportedStreamOp::new(
189 "set_tcp_notsent_lowat",
190 "unsupported on Unix streams",
191 )
192 .into())
193 }
194 }
195}
196
197use futures::{Future, FutureExt};
200use std::pin::Pin;
201use std::time::Duration;
202
203use crate::traits::*;
204
205pub fn create_runtime() -> async_executors::AsyncStd {
207 async_executors::AsyncStd::new()
208}
209
210impl SleepProvider for async_executors::AsyncStd {
211 type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
212 fn sleep(&self, duration: Duration) -> Self::SleepFuture {
213 Box::pin(async_io::Timer::after(duration).map(|_| ()))
214 }
215}
216
217impl ToplevelBlockOn for async_executors::AsyncStd {
218 fn block_on<F: Future>(&self, f: F) -> F::Output {
219 async_executors::AsyncStd::block_on(f)
220 }
221}
222
223impl Blocking for async_executors::AsyncStd {
224 type ThreadHandle<T: Send + 'static> = async_executors::BlockingHandle<T>;
225
226 fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
227 where
228 F: FnOnce() -> T + Send + 'static,
229 T: Send + 'static,
230 {
231 async_executors::SpawnBlocking::spawn_blocking(&self, f)
232 }
233
234 fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
235 async_executors::AsyncStd::block_on(f)
236 }
237}