tor_rtcompat/impls/
smol.rs1pub(crate) mod net {
7 use super::SmolRuntime;
8 use crate::network::TcpListenOptions;
9 #[cfg(unix)]
10 use crate::network::UnixListenOptions;
11 use crate::{impls, traits};
12 use async_trait::async_trait;
13 use futures::stream::{self, Stream};
14 use paste::paste;
15 use smol::Async;
16 #[cfg(unix)]
17 use smol::net::unix::{UnixListener, UnixStream};
18 use smol::net::{TcpListener, TcpStream, UdpSocket as SmolUdpSocket};
19 use std::io::Result as IoResult;
20 use std::net::SocketAddr;
21 use std::pin::Pin;
22 use std::task::{Context, Poll};
23 use tor_general_addr::unix;
24 use tracing::instrument;
25
26 macro_rules! impl_stream {
29 { $kind:ident, $addr:ty } => { paste! {
30
31 pub struct [<Incoming $kind Streams>] {
33 inner: Pin<Box<dyn Stream<Item = IoResult<([<$kind Stream>], $addr)>> + Send + Sync>>,
35 }
36
37 impl [<Incoming $kind Streams>] {
38 pub fn from_listener(lis: [<$kind Listener>]) -> Self {
40 let stream = stream::unfold(lis, |lis| async move {
41 let result = lis.accept().await;
42 Some((result, lis))
43 });
44 Self {
45 inner: Box::pin(stream),
46 }
47 }
48 }
49
50 impl Stream for [<Incoming $kind Streams>] {
51 type Item = IoResult<([<$kind Stream>], $addr)>;
52
53 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
54 self.inner.as_mut().poll_next(cx)
55 }
56 }
57
58 impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
59 type Stream = [<$kind Stream>];
60 type Incoming = [<Incoming $kind Streams>];
61
62 fn incoming(self) -> Self::Incoming {
63 [<Incoming $kind Streams>]::from_listener(self)
64 }
65
66 fn local_addr(&self) -> IoResult<$addr> {
67 [<$kind Listener>]::local_addr(self)
68 }
69 }
70 }}
71 }
72
73 impl_stream! { Tcp, SocketAddr }
74 #[cfg(unix)]
75 impl_stream! { Unix, unix::SocketAddr }
76
77 #[async_trait]
78 impl traits::NetStreamProvider<SocketAddr> for SmolRuntime {
79 type Stream = TcpStream;
80 type Listener = TcpListener;
81 type ListenOptions = TcpListenOptions;
82
83 #[instrument(skip_all, level = "trace")]
84 async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
85 TcpStream::connect(addr).await
86 }
87
88 async fn listen(
89 &self,
90 addr: &SocketAddr,
91 options: &Self::ListenOptions,
92 ) -> IoResult<Self::Listener> {
93 Ok(Async::new_nonblocking(impls::tcp_listen(addr, options)?)?.into())
98 }
99 }
100
101 #[cfg(unix)]
102 #[async_trait]
103 impl traits::NetStreamProvider<unix::SocketAddr> for SmolRuntime {
104 type Stream = UnixStream;
105 type Listener = UnixListener;
106 type ListenOptions = UnixListenOptions;
107
108 #[instrument(skip_all, level = "trace")]
109 async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
110 let path = addr
111 .as_pathname()
112 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
113 UnixStream::connect(path).await
114 }
115
116 async fn listen(
117 &self,
118 addr: &unix::SocketAddr,
119 options: &Self::ListenOptions,
120 ) -> IoResult<Self::Listener> {
121 let UnixListenOptions {} = options;
123
124 let path = addr
125 .as_pathname()
126 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
127 UnixListener::bind(path)
128 }
129 }
130
131 #[cfg(not(unix))]
132 crate::impls::impl_unix_non_provider! { SmolRuntime }
133
134 #[async_trait]
135 impl traits::UdpProvider for SmolRuntime {
136 type UdpSocket = UdpSocket;
137
138 async fn bind(&self, addr: &SocketAddr) -> IoResult<Self::UdpSocket> {
139 SmolUdpSocket::bind(addr)
140 .await
141 .map(|socket| UdpSocket { socket })
142 }
143 }
144
145 pub struct UdpSocket {
148 socket: SmolUdpSocket,
150 }
151
152 #[async_trait]
153 impl traits::UdpSocket for UdpSocket {
154 async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
155 self.socket.recv_from(buf).await
156 }
157
158 async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
159 self.socket.send_to(buf, target).await
160 }
161
162 fn local_addr(&self) -> IoResult<SocketAddr> {
163 self.socket.local_addr()
164 }
165 }
166
167 impl traits::StreamOps for TcpStream {
168 fn set_tcp_notsent_lowat(&self, lowat: u32) -> IoResult<()> {
169 impls::streamops::set_tcp_notsent_lowat(self, lowat)
170 }
171
172 #[cfg(target_os = "linux")]
173 fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
174 Box::new(impls::streamops::TcpSockFd::from_fd(self))
175 }
176 }
177
178 #[cfg(unix)]
179 impl traits::StreamOps for UnixStream {
180 fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
181 Err(traits::UnsupportedStreamOp::new(
182 "set_tcp_notsent_lowat",
183 "unsupported on Unix streams",
184 )
185 .into())
186 }
187 }
188}
189
190use crate::traits::*;
193use futures::task::{FutureObj, Spawn, SpawnError};
194use futures::{Future, FutureExt};
195use std::pin::Pin;
196use std::time::Duration;
197
198#[derive(Clone)]
200pub struct SmolRuntime {
201 executor: std::sync::Arc<smol::Executor<'static>>,
203}
204
205pub fn create_runtime() -> SmolRuntime {
209 SmolRuntime {
210 executor: std::sync::Arc::new(smol::Executor::new()),
211 }
212}
213
214impl SleepProvider for SmolRuntime {
215 type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
216 fn sleep(&self, duration: Duration) -> Self::SleepFuture {
217 Box::pin(async_io::Timer::after(duration).map(|_| ()))
218 }
219}
220
221impl ToplevelBlockOn for SmolRuntime {
222 fn block_on<F: Future>(&self, f: F) -> F::Output {
223 smol::block_on(self.executor.run(f))
224 }
225}
226
227impl Blocking for SmolRuntime {
228 type ThreadHandle<T: Send + 'static> = blocking::Task<T>;
229
230 fn spawn_blocking<F, T>(&self, f: F) -> blocking::Task<T>
231 where
232 F: FnOnce() -> T + Send + 'static,
233 T: Send + 'static,
234 {
235 smol::unblock(f)
236 }
237
238 fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
239 smol::block_on(self.executor.run(f))
240 }
241}
242
243impl Spawn for SmolRuntime {
244 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
245 self.executor.spawn(future).detach();
246 Ok(())
247 }
248}