ant_quic/high_level/
runtime.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    fmt::Debug,
10    future::Future,
11    io::{self, IoSliceMut},
12    net::SocketAddr,
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16};
17
18use quinn_udp::{RecvMeta, Transmit};
19use tracing::error;
20
21use crate::Instant;
22
23/// Abstracts I/O and timer operations for runtime independence
24pub trait Runtime: Send + Sync + Debug + 'static {
25    /// Construct a timer that will expire at `i`
26    fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>>;
27    /// Drive `future` to completion in the background
28    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
29    /// Convert `t` into the socket type used by this runtime
30    #[cfg(not(wasm_browser))]
31    fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Arc<dyn AsyncUdpSocket>>;
32    /// Look up the current time
33    ///
34    /// Allows simulating the flow of time for testing.
35    fn now(&self) -> Instant {
36        Instant::now()
37    }
38}
39
40/// Abstract implementation of an async timer for runtime independence
41pub trait AsyncTimer: Send + Debug + 'static {
42    /// Update the timer to expire at `i`
43    fn reset(self: Pin<&mut Self>, i: Instant);
44    /// Check whether the timer has expired, and register to be woken if not
45    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>;
46}
47
48/// Abstract implementation of a UDP socket for runtime independence
49pub trait AsyncUdpSocket: Send + Sync + Debug + 'static {
50    /// Create a [`UdpPoller`] that can register a single task for write-readiness notifications
51    ///
52    /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time,
53    /// i.e. allow at most one caller to wait for an event. This method allows any number of
54    /// interested tasks to construct their own [`UdpPoller`] object. They can all then wait for the
55    /// same event and be notified concurrently, because each [`UdpPoller`] can store a separate
56    /// [`Waker`].
57    ///
58    /// [`Waker`]: std::task::Waker
59    fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn UdpPoller>>;
60
61    /// Send UDP datagrams from `transmits`, or return `WouldBlock` and clear the underlying
62    /// socket's readiness, or return an I/O error
63    ///
64    /// If this returns [`io::ErrorKind::WouldBlock`], [`UdpPoller::poll_writable`] must be called
65    /// to register the calling task to be woken when a send should be attempted again.
66    fn try_send(&self, transmit: &Transmit) -> io::Result<()>;
67
68    /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future
69    fn poll_recv(
70        &self,
71        cx: &mut Context,
72        bufs: &mut [IoSliceMut<'_>],
73        meta: &mut [RecvMeta],
74    ) -> Poll<io::Result<usize>>;
75
76    /// Look up the local IP address and port used by this socket
77    fn local_addr(&self) -> io::Result<SocketAddr>;
78
79    /// Maximum number of datagrams that a [`Transmit`] may encode
80    fn max_transmit_segments(&self) -> usize {
81        1
82    }
83
84    /// Maximum number of datagrams that might be described by a single [`RecvMeta`]
85    fn max_receive_segments(&self) -> usize {
86        1
87    }
88
89    /// Whether datagrams might get fragmented into multiple parts
90    ///
91    /// Sockets should prevent this for best performance. See e.g. the `IPV6_DONTFRAG` socket
92    /// option.
93    fn may_fragment(&self) -> bool {
94        true
95    }
96}
97
98/// An object polled to detect when an associated [`AsyncUdpSocket`] is writable
99///
100/// Any number of `UdpPoller`s may exist for a single [`AsyncUdpSocket`]. Each `UdpPoller` is
101/// responsible for notifying at most one task when that socket becomes writable.
102pub trait UdpPoller: Send + Sync + Debug + 'static {
103    /// Check whether the associated socket is likely to be writable
104    ///
105    /// Must be called after [`AsyncUdpSocket::try_send`] returns [`io::ErrorKind::WouldBlock`] to
106    /// register the task associated with `cx` to be woken when a send should be attempted
107    /// again. Unlike in [`Future::poll`], a [`UdpPoller`] may be reused indefinitely no matter how
108    /// many times `poll_writable` returns [`Poll::Ready`].
109    fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
110}
111
112pin_project_lite::pin_project! {
113    /// Helper adapting a function `MakeFut` that constructs a single-use future `Fut` into a
114    /// [`UdpPoller`] that may be reused indefinitely
115    struct UdpPollHelper<MakeFut, Fut> {
116        make_fut: MakeFut,
117        #[pin]
118        fut: Option<Fut>,
119    }
120}
121
122impl<MakeFut, Fut> UdpPollHelper<MakeFut, Fut> {
123    /// Construct a [`UdpPoller`] that calls `make_fut` to get the future to poll, storing it until
124    /// it yields [`Poll::Ready`], then creating a new one on the next
125    /// [`poll_writable`](UdpPoller::poll_writable)
126    #[cfg(any(
127        feature = "runtime-async-std",
128        feature = "runtime-smol",
129        feature = "runtime-tokio",
130    ))]
131    fn new(make_fut: MakeFut) -> Self {
132        Self {
133            make_fut,
134            fut: None,
135        }
136    }
137}
138
139impl<MakeFut, Fut> UdpPoller for UdpPollHelper<MakeFut, Fut>
140where
141    MakeFut: Fn() -> Fut + Send + Sync + 'static,
142    Fut: Future<Output = io::Result<()>> + Send + Sync + 'static,
143{
144    fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
145        let mut this = self.project();
146        if this.fut.is_none() {
147            this.fut.set(Some((this.make_fut)()));
148        }
149        // We're forced to use expect here because `Fut` may be `!Unpin`, which means we can't safely
150        // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`,
151        // and if we didn't store it then we wouldn't be able to keep it alive between
152        // `poll_writable` calls.
153        let result = match this.fut.as_mut().as_pin_mut() {
154            Some(fut) => fut.poll(cx),
155            None => {
156                error!("Future not set when UdpPollHelper is polled");
157                Poll::Ready(Err(std::io::Error::other("Future not set")))
158            }
159        };
160        if result.is_ready() {
161            // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
162            // a new `Future` to be created on the next call.
163            this.fut.set(None);
164        }
165        result
166    }
167}
168
169impl<MakeFut, Fut> Debug for UdpPollHelper<MakeFut, Fut> {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("UdpPollHelper").finish_non_exhaustive()
172    }
173}
174
175/// Automatically select an appropriate runtime from those enabled at compile time
176///
177/// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context,
178/// then `TokioRuntime` is returned. Otherwise, if `runtime-async-std` is enabled, `AsyncStdRuntime`
179/// is returned. Otherwise, if `smol` is enabled, `SmolRuntime` is returned.
180/// Otherwise, `None` is returned.
181#[allow(clippy::needless_return)] // Be sure we return the right thing
182pub fn default_runtime() -> Option<Arc<dyn Runtime>> {
183    #[cfg(feature = "runtime-tokio")]
184    {
185        if ::tokio::runtime::Handle::try_current().is_ok() {
186            return Some(Arc::new(TokioRuntime));
187        }
188    }
189
190    #[cfg(feature = "runtime-async-std")]
191    {
192        return Some(Arc::new(AsyncStdRuntime));
193    }
194
195    #[cfg(all(feature = "runtime-smol", not(feature = "runtime-async-std")))]
196    {
197        return Some(Arc::new(SmolRuntime));
198    }
199
200    #[cfg(not(any(feature = "runtime-async-std", feature = "runtime-smol")))]
201    None
202}
203
204#[cfg(feature = "runtime-tokio")]
205mod tokio;
206// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
207#[cfg(feature = "runtime-tokio")]
208pub use self::tokio::TokioRuntime;
209
210#[cfg(any(feature = "runtime-smol", feature = "runtime-async-std"))]
211mod async_io;
212// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
213#[cfg(any(feature = "runtime-smol", feature = "runtime-async-std"))]
214pub use self::async_io::*;