iroh_quinn/runtime.rs
1use std::{
2 fmt::Debug,
3 future::Future,
4 io::{self, IoSliceMut},
5 net::SocketAddr,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use udp::{RecvMeta, Transmit};
12
13use crate::Instant;
14
15/// Abstracts I/O and timer operations for runtime independence
16pub trait Runtime: Send + Sync + Debug + 'static {
17 /// Construct a timer that will expire at `i`
18 fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>>;
19 /// Drive `future` to completion in the background
20 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
21 /// Convert `t` into the socket type used by this runtime
22 #[cfg(not(wasm_browser))]
23 fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>>;
24 /// Look up the current time
25 ///
26 /// Allows simulating the flow of time for testing.
27 fn now(&self) -> Instant {
28 Instant::now()
29 }
30}
31
32/// Abstract implementation of an async timer for runtime independence
33pub trait AsyncTimer: Send + Debug + 'static {
34 /// Update the timer to expire at `i`
35 fn reset(self: Pin<&mut Self>, i: Instant);
36 /// Check whether the timer has expired, and register to be woken if not
37 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>;
38}
39
40/// Abstract implementation of a UDP socket for runtime independence
41pub trait AsyncUdpSocket: Send + Sync + Debug + 'static {
42 /// Create a [`UdpSender`] that can register a single task for write-readiness notifications
43 /// and send a transmit, if ready.
44 ///
45 /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time,
46 /// i.e. allow at most one caller to wait for an event. This method allows any number of
47 /// interested tasks to construct their own [`UdpSender`] object. They can all then wait for the
48 /// same event and be notified concurrently, because each [`UdpSender`] can store a separate
49 /// [`Waker`].
50 ///
51 /// [`Waker`]: std::task::Waker
52 fn create_sender(&self) -> Pin<Box<dyn UdpSender>>;
53
54 /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future
55 fn poll_recv(
56 &mut self,
57 cx: &mut Context,
58 bufs: &mut [IoSliceMut<'_>],
59 meta: &mut [RecvMeta],
60 ) -> Poll<io::Result<usize>>;
61
62 /// Look up the local IP address and port used by this socket
63 fn local_addr(&self) -> io::Result<SocketAddr>;
64
65 /// Maximum number of datagrams that might be described by a single [`RecvMeta`]
66 fn max_receive_segments(&self) -> usize {
67 1
68 }
69
70 /// Whether datagrams might get fragmented into multiple parts
71 ///
72 /// Sockets should prevent this for best performance. See e.g. the `IPV6_DONTFRAG` socket
73 /// option.
74 fn may_fragment(&self) -> bool {
75 true
76 }
77}
78
79/// An object for asynchronously writing to an associated [`AsyncUdpSocket`].
80///
81/// Any number of [`UdpSender`]s may exist for a single [`AsyncUdpSocket`]. Each [`UdpSender`] is
82/// responsible for notifying at most one task for send readiness.
83pub trait UdpSender: Send + Sync + Debug + 'static {
84 /// Send a UDP datagram, or register to be woken if sending may succeed in the future.
85 ///
86 /// Usually implementations of this will poll the socket for writability before trying to
87 /// write to them, and retry both if writing fails.
88 ///
89 /// Quinn will create multiple [`UdpSender`]s, one for each task it's using it from. Thus it's
90 /// important to poll the underlying socket in a way that doesn't overwrite wakers.
91 ///
92 /// A single [`UdpSender`] will be re-used, even if `poll_send` returns `Poll::Ready` once,
93 /// unlike [`Future::poll`], so calling it again after readiness should not panic.
94 fn poll_send(
95 self: Pin<&mut Self>,
96 transmit: &Transmit,
97 cx: &mut Context,
98 ) -> Poll<io::Result<()>>;
99
100 /// Maximum number of datagrams that a [`Transmit`] may encode.
101 fn max_transmit_segments(&self) -> usize {
102 1
103 }
104
105 /// Try to send a UDP datagram, if the socket happens to be write-ready.
106 ///
107 /// This may fail with [`io::ErrorKind::WouldBlock`], if the socket is currently full.
108 ///
109 /// The quinn endpoint uses this function when sending
110 ///
111 /// - A version negotiation response due to an unknown version
112 /// - A `CLOSE` due to a malformed or unwanted connection attempt
113 /// - A stateless reset due to an unrecognized connection
114 /// - A `Retry` packet due to a connection attempt when `use_retry` is set
115 ///
116 /// If sending in these cases fails, a well-behaved peer will re-try. Thus it's fine
117 /// if we drop datagrams sometimes with this function.
118 fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()>;
119}
120
121pin_project_lite::pin_project! {
122 /// A helper for constructing [`UdpSender`]s from an underlying `Socket` type.
123 ///
124 /// This struct implements [`UdpSender`] if `MakeWritableFn` produces a `WritableFut`.
125 ///
126 /// Also serves as a trick, since `WritableFut` doesn't need to be a named future,
127 /// it can be an anonymous async block, as long as `MakeWritableFn` produces that
128 /// anonymous async block type.
129 ///
130 /// The `UdpSenderHelper` generic type parameters don't need to named, as it will be
131 /// used in its dyn-compatible form as a `Pin<Box<dyn UdpSender>>`.
132 pub struct UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut> {
133 socket: Socket,
134 make_writable_fut_fn: MakeWritableFutFn,
135 #[pin]
136 writable_fut: Option<WritableFut>,
137 }
138}
139
140impl<Socket, MakeWritableFutFn, WritableFut> Debug
141 for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
142{
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.write_str("UdpSender")
145 }
146}
147
148impl<Socket, MakeWritableFutFn, WriteableFut>
149 UdpSenderHelper<Socket, MakeWritableFutFn, WriteableFut>
150{
151 /// Create helper that implements [`UdpSender`] from a socket.
152 ///
153 /// Additionally you need to provide what is essentially an async function
154 /// that resolves once the socket is write-ready.
155 ///
156 /// See also the bounds on this struct's [`UdpSender`] implementation.
157 pub fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self {
158 Self {
159 socket: inner,
160 make_writable_fut_fn: make_fut,
161 writable_fut: None,
162 }
163 }
164}
165
166impl<Socket, MakeWritableFutFn, WritableFut> super::UdpSender
167 for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
168where
169 Socket: UdpSenderHelperSocket,
170 MakeWritableFutFn: Fn(&Socket) -> WritableFut + Send + Sync + 'static,
171 WritableFut: Future<Output = io::Result<()>> + Send + Sync + 'static,
172{
173 fn poll_send(
174 self: Pin<&mut Self>,
175 transmit: &udp::Transmit,
176 cx: &mut Context,
177 ) -> Poll<io::Result<()>> {
178 let mut this = self.project();
179 loop {
180 if this.writable_fut.is_none() {
181 this.writable_fut
182 .set(Some((this.make_writable_fut_fn)(&this.socket)));
183 }
184 // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely
185 // obtain an `&mut WritableFut` after storing it in `self.writable_fut` when `self` is already behind `Pin`,
186 // and if we didn't store it then we wouldn't be able to keep it alive between
187 // `poll_send` calls.
188 let result = ready!(this.writable_fut.as_mut().as_pin_mut().unwrap().poll(cx));
189
190 // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
191 // a new `Future` to be created on the next call.
192 this.writable_fut.set(None);
193
194 // If .writable() fails, propagate the error
195 result?;
196
197 let result = this.socket.try_send(transmit);
198
199 match result {
200 // We thought the socket was writable, but it wasn't, then retry so that either another
201 // `writable().await` call determines that the socket is indeed not writable and
202 // registers us for a wakeup, or the send succeeds if this really was just a
203 // transient failure.
204 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
205 // In all other cases, either propagate the error or we're Ok
206 _ => return Poll::Ready(result),
207 }
208 }
209 }
210
211 fn max_transmit_segments(&self) -> usize {
212 self.socket.max_transmit_segments()
213 }
214
215 fn try_send(self: Pin<&mut Self>, transmit: &udp::Transmit) -> io::Result<()> {
216 self.socket.try_send(transmit)
217 }
218}
219
220/// Parts of the [`UdpSender`] trait that aren't asynchronous or require storing wakers.
221///
222/// This trait is used by [`UdpSenderHelper`] to help construct [`UdpSender`]s.
223pub trait UdpSenderHelperSocket: Send + Sync + 'static {
224 /// Try to send a transmit, if the socket happens to be write-ready.
225 ///
226 /// Supposed to work identically to [`UdpSender::try_send`], see also its documentation.
227 fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>;
228
229 /// See [`UdpSender::max_transmit_segments`].
230 fn max_transmit_segments(&self) -> usize;
231}
232
233/// Automatically select an appropriate runtime from those enabled at compile time
234///
235/// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context,
236/// then `TokioRuntime` is returned. Otherwise, if `runtime-async-std` is enabled, `AsyncStdRuntime`
237/// is returned. Otherwise, if `runtime-smol` is enabled, `SmolRuntime` is returned.
238/// Otherwise, `None` is returned.
239#[allow(clippy::needless_return)] // Be sure we return the right thing
240pub fn default_runtime() -> Option<Arc<dyn Runtime>> {
241 #[cfg(feature = "runtime-tokio")]
242 {
243 if ::tokio::runtime::Handle::try_current().is_ok() {
244 return Some(Arc::new(TokioRuntime));
245 }
246 }
247
248 #[cfg(feature = "runtime-async-std")]
249 {
250 return Some(Arc::new(AsyncStdRuntime));
251 }
252
253 #[cfg(all(feature = "runtime-smol", not(feature = "runtime-async-std")))]
254 {
255 return Some(Arc::new(SmolRuntime));
256 }
257
258 #[cfg(not(any(feature = "runtime-async-std", feature = "runtime-smol")))]
259 None
260}
261
262#[cfg(feature = "runtime-tokio")]
263mod tokio;
264// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
265#[cfg(feature = "runtime-tokio")]
266pub use self::tokio::TokioRuntime;
267
268#[cfg(feature = "async-io")]
269mod async_io;
270// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
271#[cfg(feature = "async-io")]
272pub use self::async_io::*;