tor_rtcompat/impls/
async_std.rs1mod net {
10 use crate::network::TcpListenOptions;
11 #[cfg(unix)]
12 use crate::network::UnixListenOptions;
13 use crate::{impls, traits};
14
15 use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
16 #[cfg(unix)]
17 use async_std_crate::os::unix::net::{UnixListener, UnixStream};
18 use async_trait::async_trait;
19 use futures::stream::{self, Stream};
20 use paste::paste;
21 use std::io::Result as IoResult;
22 use std::net::SocketAddr;
23 use std::pin::Pin;
24 use std::task::{Context, Poll};
25 #[cfg(unix)]
26 use tor_general_addr::unix;
27 use tracing::instrument;
28
29 macro_rules! impl_stream {
31 { $kind:ident, $addr:ty } => {paste!{
32 pub struct [<Incoming $kind Streams>] {
38 inner: Pin<Box<dyn Stream<Item = IoResult<([<$kind Stream>], $addr)>> + Send + Sync>>,
40 }
41 impl [<Incoming $kind Streams>] {
42 pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
44 let stream = stream::unfold(lis, |lis| async move {
45 let result = lis.accept().await;
46 Some((result, lis))
47 });
48 Self {
49 inner: Box::pin(stream),
50 }
51 }
52 }
53 impl Stream for [< Incoming $kind Streams >] {
54 type Item = IoResult<([<$kind Stream>], $addr)>;
55
56 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
57 self.inner.as_mut().poll_next(cx)
58 }
59 }
60 impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
61 type Stream = [<$kind Stream>];
62 type Incoming = [<Incoming $kind Streams>];
63 fn incoming(self) -> [<Incoming $kind Streams>] {
64 [<Incoming $kind Streams>]::from_listener(self)
65 }
66 fn local_addr(&self) -> IoResult<$addr> {
67 [<$kind Listener>]::local_addr(self)
68 }
69 }
70 }}
71 }
72
73 impl_stream! { Tcp, std::net::SocketAddr }
74 #[cfg(unix)]
75 impl_stream! { Unix, unix::SocketAddr}
76
77 #[async_trait]
78 impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
79 type Stream = TcpStream;
80 type Listener = TcpListener;
81 type ListenOptions = TcpListenOptions;
82 #[instrument(skip_all, level = "trace")]
83 async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
84 TcpStream::connect(addr).await
85 }
86 async fn listen(
87 &self,
88 addr: &SocketAddr,
89 options: &Self::ListenOptions,
90 ) -> IoResult<Self::Listener> {
91 Ok(impls::tcp_listen(addr, options)?.into())
93 }
94 }
95
96 #[cfg(unix)]
97 #[async_trait]
98 impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
99 type Stream = UnixStream;
100 type Listener = UnixListener;
101 type ListenOptions = UnixListenOptions;
102 #[instrument(skip_all, level = "trace")]
103 async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
104 let path = addr
105 .as_pathname()
106 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
107 UnixStream::connect(path).await
108 }
109 async fn listen(
110 &self,
111 addr: &unix::SocketAddr,
112 options: &Self::ListenOptions,
113 ) -> IoResult<Self::Listener> {
114 let UnixListenOptions {} = options;
116
117 let path = addr
118 .as_pathname()
119 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
120 UnixListener::bind(path).await
121 }
122 }
123
124 #[cfg(not(unix))]
125 crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
126
127 #[async_trait]
128 impl traits::UdpProvider for async_executors::AsyncStd {
129 type UdpSocket = UdpSocket;
130
131 async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
132 StdUdpSocket::bind(*addr)
133 .await
134 .map(|socket| UdpSocket { socket })
135 }
136 }
137
138 pub struct UdpSocket {
140 socket: StdUdpSocket,
142 }
143
144 #[async_trait]
145 impl traits::UdpSocket for UdpSocket {
146 async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
147 self.socket.recv_from(buf).await
148 }
149
150 async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
151 self.socket.send_to(buf, target).await
152 }
153
154 fn local_addr(&self) -> IoResult<SocketAddr> {
155 self.socket.local_addr()
156 }
157 }
158
159 impl traits::StreamOps for TcpStream {
160 fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
161 impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
162 }
163
164 #[cfg(target_os = "linux")]
165 fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
166 Box::new(impls::streamops::TcpSockFd::from_fd(self))
167 }
168 }
169
170 #[cfg(unix)]
171 impl traits::StreamOps for UnixStream {
172 fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
173 Err(traits::UnsupportedStreamOp::new(
174 "set_tcp_notsent_lowat",
175 "unsupported on Unix streams",
176 )
177 .into())
178 }
179 }
180}
181
182use futures::{Future, FutureExt};
185use std::pin::Pin;
186use std::time::Duration;
187
188use crate::traits::*;
189
190pub fn create_runtime() -> async_executors::AsyncStd {
192 async_executors::AsyncStd::new()
193}
194
195impl SleepProvider for async_executors::AsyncStd {
196 type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
197 fn sleep(&self, duration: Duration) -> Self::SleepFuture {
198 Box::pin(async_io::Timer::after(duration).map(|_| ()))
199 }
200}
201
202impl ToplevelBlockOn for async_executors::AsyncStd {
203 fn block_on<F: Future>(&self, f: F) -> F::Output {
204 async_executors::AsyncStd::block_on(f)
205 }
206}
207
208impl Blocking for async_executors::AsyncStd {
209 type ThreadHandle<T: Send + 'static> = async_executors::BlockingHandle<T>;
210
211 fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
212 where
213 F: FnOnce() -> T + Send + 'static,
214 T: Send + 'static,
215 {
216 async_executors::SpawnBlocking::spawn_blocking(&self, f)
217 }
218
219 fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
220 async_executors::AsyncStd::block_on(f)
221 }
222}