iroh_quinn/runtime/
async_io.rs

1use std::{
2    future::Future,
3    io,
4    pin::Pin,
5    sync::Arc,
6    task::{Context, Poll},
7    time::Instant,
8};
9
10use async_io::{Async, Timer};
11
12use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSenderHelper};
13
14#[cfg(feature = "smol")]
15// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
16pub use self::smol::SmolRuntime;
17
18#[cfg(feature = "smol")]
19mod smol {
20    use super::*;
21
22    /// A Quinn runtime for smol
23    #[derive(Debug)]
24    pub struct SmolRuntime;
25
26    impl Runtime for SmolRuntime {
27        fn new_timer(&self, t: Instant) -> Pin<Box<dyn AsyncTimer>> {
28            Box::pin(Timer::at(t))
29        }
30
31        fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
32            ::smol::spawn(future).detach();
33        }
34
35        fn wrap_udp_socket(
36            &self,
37            sock: std::net::UdpSocket,
38        ) -> io::Result<Box<dyn AsyncUdpSocket>> {
39            Ok(Box::new(UdpSocket::new(sock)?))
40        }
41    }
42}
43
44#[cfg(feature = "async-std")]
45// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
46pub use self::async_std::AsyncStdRuntime;
47
48#[cfg(feature = "async-std")]
49mod async_std {
50    use super::*;
51
52    /// A Quinn runtime for async-std
53    #[derive(Debug)]
54    pub struct AsyncStdRuntime;
55
56    impl Runtime for AsyncStdRuntime {
57        fn new_timer(&self, t: Instant) -> Pin<Box<dyn AsyncTimer>> {
58            Box::pin(Timer::at(t))
59        }
60
61        fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
62            ::async_std::task::spawn(future);
63        }
64
65        fn wrap_udp_socket(
66            &self,
67            sock: std::net::UdpSocket,
68        ) -> io::Result<Box<dyn AsyncUdpSocket>> {
69            Ok(Box::new(UdpSocket::new(sock)?))
70        }
71    }
72}
73
74impl AsyncTimer for Timer {
75    fn reset(mut self: Pin<&mut Self>, t: Instant) {
76        self.set_at(t)
77    }
78
79    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
80        Future::poll(self, cx).map(|_| ())
81    }
82}
83
84#[derive(Debug, Clone)]
85struct UdpSocket {
86    io: Arc<Async<std::net::UdpSocket>>,
87    inner: Arc<udp::UdpSocketState>,
88}
89
90impl UdpSocket {
91    fn new(sock: std::net::UdpSocket) -> io::Result<Self> {
92        Ok(Self {
93            inner: Arc::new(udp::UdpSocketState::new((&sock).into())?),
94            io: Arc::new(Async::new_nonblocking(sock)?),
95        })
96    }
97}
98
99impl super::UdpSenderHelperSocket for UdpSocket {
100    fn max_transmit_segments(&self) -> usize {
101        self.inner.max_gso_segments()
102    }
103
104    fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()> {
105        self.inner.send((&self.io).into(), transmit)
106    }
107}
108
109impl AsyncUdpSocket for UdpSocket {
110    fn create_sender(&self) -> Pin<Box<dyn super::UdpSender>> {
111        Box::pin(UdpSenderHelper::new(self.clone(), |socket: &UdpSocket| {
112            let socket = socket.clone();
113            async move { socket.io.writable().await }
114        }))
115    }
116
117    fn poll_recv(
118        &mut self,
119        cx: &mut Context,
120        bufs: &mut [io::IoSliceMut<'_>],
121        meta: &mut [udp::RecvMeta],
122    ) -> Poll<io::Result<usize>> {
123        loop {
124            ready!(self.io.poll_readable(cx))?;
125            if let Ok(res) = self.inner.recv((&self.io).into(), bufs, meta) {
126                return Poll::Ready(Ok(res));
127            }
128        }
129    }
130
131    fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
132        self.io.as_ref().as_ref().local_addr()
133    }
134
135    fn may_fragment(&self) -> bool {
136        self.inner.may_fragment()
137    }
138
139    fn max_receive_segments(&self) -> usize {
140        self.inner.gro_segments()
141    }
142}