iroh_quinn/
runtime.rs

1use std::{
2    fmt::Debug,
3    future::Future,
4    io::{self, IoSliceMut},
5    net::SocketAddr,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use udp::{RecvMeta, Transmit};
12
13use crate::Instant;
14
15/// Abstracts I/O and timer operations for runtime independence
16pub trait Runtime: Send + Sync + Debug + 'static {
17    /// Construct a timer that will expire at `i`
18    fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>>;
19    /// Drive `future` to completion in the background
20    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
21    /// Convert `t` into the socket type used by this runtime
22    #[cfg(not(wasm_browser))]
23    fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>>;
24    /// Look up the current time
25    ///
26    /// Allows simulating the flow of time for testing.
27    fn now(&self) -> Instant {
28        Instant::now()
29    }
30}
31
32/// Abstract implementation of an async timer for runtime independence
33pub trait AsyncTimer: Send + Debug + 'static {
34    /// Update the timer to expire at `i`
35    fn reset(self: Pin<&mut Self>, i: Instant);
36    /// Check whether the timer has expired, and register to be woken if not
37    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>;
38}
39
40/// Abstract implementation of a UDP socket for runtime independence
41pub trait AsyncUdpSocket: Send + Sync + Debug + 'static {
42    /// Create a [`UdpSender`] that can register a single task for write-readiness notifications
43    /// and send a transmit, if ready.
44    ///
45    /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time,
46    /// i.e. allow at most one caller to wait for an event. This method allows any number of
47    /// interested tasks to construct their own [`UdpSender`] object. They can all then wait for the
48    /// same event and be notified concurrently, because each [`UdpSender`] can store a separate
49    /// [`Waker`].
50    ///
51    /// [`Waker`]: std::task::Waker
52    fn create_sender(&self) -> Pin<Box<dyn UdpSender>>;
53
54    /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future
55    fn poll_recv(
56        &mut self,
57        cx: &mut Context,
58        bufs: &mut [IoSliceMut<'_>],
59        meta: &mut [RecvMeta],
60    ) -> Poll<io::Result<usize>>;
61
62    /// Look up the local IP address and port used by this socket
63    fn local_addr(&self) -> io::Result<SocketAddr>;
64
65    /// Maximum number of datagrams that might be described by a single [`RecvMeta`]
66    fn max_receive_segments(&self) -> usize {
67        1
68    }
69
70    /// Whether datagrams might get fragmented into multiple parts
71    ///
72    /// Sockets should prevent this for best performance. See e.g. the `IPV6_DONTFRAG` socket
73    /// option.
74    fn may_fragment(&self) -> bool {
75        true
76    }
77}
78
79/// An object for asynchronously writing to an associated [`AsyncUdpSocket`].
80///
81/// Any number of [`UdpSender`]s may exist for a single [`AsyncUdpSocket`]. Each [`UdpSender`] is
82/// responsible for notifying at most one task for send readiness.
83pub trait UdpSender: Send + Sync + Debug + 'static {
84    /// Send a UDP datagram, or register to be woken if sending may succeed in the future.
85    ///
86    /// Usually implementations of this will poll the socket for writability before trying to
87    /// write to them, and retry both if writing fails.
88    ///
89    /// Quinn will create multiple [`UdpSender`]s, one for each task it's using it from. Thus it's
90    /// important to poll the underlying socket in a way that doesn't overwrite wakers.
91    ///
92    /// A single [`UdpSender`] will be re-used, even if `poll_send` returns `Poll::Ready` once,
93    /// unlike [`Future::poll`], so calling it again after readiness should not panic.
94    fn poll_send(
95        self: Pin<&mut Self>,
96        transmit: &Transmit,
97        cx: &mut Context,
98    ) -> Poll<io::Result<()>>;
99
100    /// Maximum number of datagrams that a [`Transmit`] may encode.
101    fn max_transmit_segments(&self) -> usize {
102        1
103    }
104
105    /// Try to send a UDP datagram, if the socket happens to be write-ready.
106    ///
107    /// This may fail with [`io::ErrorKind::WouldBlock`], if the socket is currently full.
108    ///
109    /// The quinn endpoint uses this function when sending
110    ///
111    /// - A version negotiation response due to an unknown version
112    /// - A `CLOSE` due to a malformed or unwanted connection attempt
113    /// - A stateless reset due to an unrecognized connection
114    /// - A `Retry` packet due to a connection attempt when `use_retry` is set
115    ///
116    /// If sending in these cases fails, a well-behaved peer will re-try. Thus it's fine
117    /// if we drop datagrams sometimes with this function.
118    fn try_send(self: Pin<&mut Self>, transmit: &Transmit) -> io::Result<()>;
119}
120
121pin_project_lite::pin_project! {
122    /// A helper for constructing [`UdpSender`]s from an underlying `Socket` type.
123    ///
124    /// This struct implements [`UdpSender`] if `MakeWritableFn` produces a `WritableFut`.
125    ///
126    /// Also serves as a trick, since `WritableFut` doesn't need to be a named future,
127    /// it can be an anonymous async block, as long as `MakeWritableFn` produces that
128    /// anonymous async block type.
129    ///
130    /// The `UdpSenderHelper` generic type parameters don't need to named, as it will be
131    /// used in its dyn-compatible form as a `Pin<Box<dyn UdpSender>>`.
132    pub struct UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut> {
133        socket: Socket,
134        make_writable_fut_fn: MakeWritableFutFn,
135        #[pin]
136        writable_fut: Option<WritableFut>,
137    }
138}
139
140impl<Socket, MakeWritableFutFn, WritableFut> Debug
141    for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
142{
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.write_str("UdpSender")
145    }
146}
147
148impl<Socket, MakeWritableFutFn, WriteableFut>
149    UdpSenderHelper<Socket, MakeWritableFutFn, WriteableFut>
150{
151    /// Create helper that implements [`UdpSender`] from a socket.
152    ///
153    /// Additionally you need to provide what is essentially an async function
154    /// that resolves once the socket is write-ready.
155    ///
156    /// See also the bounds on this struct's [`UdpSender`] implementation.
157    pub fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self {
158        Self {
159            socket: inner,
160            make_writable_fut_fn: make_fut,
161            writable_fut: None,
162        }
163    }
164}
165
166impl<Socket, MakeWritableFutFn, WritableFut> super::UdpSender
167    for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
168where
169    Socket: UdpSenderHelperSocket,
170    MakeWritableFutFn: Fn(&Socket) -> WritableFut + Send + Sync + 'static,
171    WritableFut: Future<Output = io::Result<()>> + Send + Sync + 'static,
172{
173    fn poll_send(
174        self: Pin<&mut Self>,
175        transmit: &udp::Transmit,
176        cx: &mut Context,
177    ) -> Poll<io::Result<()>> {
178        let mut this = self.project();
179        loop {
180            if this.writable_fut.is_none() {
181                this.writable_fut
182                    .set(Some((this.make_writable_fut_fn)(&this.socket)));
183            }
184            // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely
185            // obtain an `&mut WritableFut` after storing it in `self.writable_fut` when `self` is already behind `Pin`,
186            // and if we didn't store it then we wouldn't be able to keep it alive between
187            // `poll_send` calls.
188            let result = ready!(this.writable_fut.as_mut().as_pin_mut().unwrap().poll(cx));
189
190            // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
191            // a new `Future` to be created on the next call.
192            this.writable_fut.set(None);
193
194            // If .writable() fails, propagate the error
195            result?;
196
197            let result = this.socket.try_send(transmit);
198
199            match result {
200                // We thought the socket was writable, but it wasn't, then retry so that either another
201                // `writable().await` call determines that the socket is indeed not writable and
202                // registers us for a wakeup, or the send succeeds if this really was just a
203                // transient failure.
204                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
205                // In all other cases, either propagate the error or we're Ok
206                _ => return Poll::Ready(result),
207            }
208        }
209    }
210
211    fn max_transmit_segments(&self) -> usize {
212        self.socket.max_transmit_segments()
213    }
214
215    fn try_send(self: Pin<&mut Self>, transmit: &udp::Transmit) -> io::Result<()> {
216        self.socket.try_send(transmit)
217    }
218}
219
220/// Parts of the [`UdpSender`] trait that aren't asynchronous or require storing wakers.
221///
222/// This trait is used by [`UdpSenderHelper`] to help construct [`UdpSender`]s.
223pub trait UdpSenderHelperSocket: Send + Sync + 'static {
224    /// Try to send a transmit, if the socket happens to be write-ready.
225    ///
226    /// Supposed to work identically to [`UdpSender::try_send`], see also its documentation.
227    fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>;
228
229    /// See [`UdpSender::max_transmit_segments`].
230    fn max_transmit_segments(&self) -> usize;
231}
232
233/// Automatically select an appropriate runtime from those enabled at compile time
234///
235/// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context,
236/// then `TokioRuntime` is returned. Otherwise, if `runtime-async-std` is enabled, `AsyncStdRuntime`
237/// is returned. Otherwise, if `runtime-smol` is enabled, `SmolRuntime` is returned.
238/// Otherwise, `None` is returned.
239#[allow(clippy::needless_return)] // Be sure we return the right thing
240pub fn default_runtime() -> Option<Arc<dyn Runtime>> {
241    #[cfg(feature = "runtime-tokio")]
242    {
243        if ::tokio::runtime::Handle::try_current().is_ok() {
244            return Some(Arc::new(TokioRuntime));
245        }
246    }
247
248    #[cfg(feature = "runtime-async-std")]
249    {
250        return Some(Arc::new(AsyncStdRuntime));
251    }
252
253    #[cfg(all(feature = "runtime-smol", not(feature = "runtime-async-std")))]
254    {
255        return Some(Arc::new(SmolRuntime));
256    }
257
258    #[cfg(not(any(feature = "runtime-async-std", feature = "runtime-smol")))]
259    None
260}
261
262#[cfg(feature = "runtime-tokio")]
263mod tokio;
264// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
265#[cfg(feature = "runtime-tokio")]
266pub use self::tokio::TokioRuntime;
267
268#[cfg(feature = "async-io")]
269mod async_io;
270// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
271#[cfg(feature = "async-io")]
272pub use self::async_io::*;