iroh_quinn/runtime/
tokio.rs

1use std::{
2    fmt::Debug,
3    future::Future,
4    io,
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll},
8    time::Instant,
9};
10
11use tokio::{
12    io::Interest,
13    time::{sleep_until, Sleep},
14};
15
16use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSenderHelper, UdpSenderHelperSocket};
17
18/// A Quinn runtime for Tokio
19#[derive(Debug)]
20pub struct TokioRuntime;
21
22impl Runtime for TokioRuntime {
23    fn new_timer(&self, t: Instant) -> Pin<Box<dyn AsyncTimer>> {
24        Box::pin(sleep_until(t.into()))
25    }
26
27    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
28        tokio::spawn(future);
29    }
30
31    fn wrap_udp_socket(&self, sock: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>> {
32        Ok(Box::new(UdpSocket {
33            inner: Arc::new(udp::UdpSocketState::new((&sock).into())?),
34            io: Arc::new(tokio::net::UdpSocket::from_std(sock)?),
35        }))
36    }
37
38    fn now(&self) -> Instant {
39        tokio::time::Instant::now().into_std()
40    }
41}
42
43impl AsyncTimer for Sleep {
44    fn reset(self: Pin<&mut Self>, t: Instant) {
45        Self::reset(self, t.into())
46    }
47    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
48        Future::poll(self, cx)
49    }
50}
51
52#[derive(Debug, Clone)]
53struct UdpSocket {
54    io: Arc<tokio::net::UdpSocket>,
55    inner: Arc<udp::UdpSocketState>,
56}
57
58impl UdpSenderHelperSocket for UdpSocket {
59    fn max_transmit_segments(&self) -> usize {
60        self.inner.max_gso_segments()
61    }
62
63    fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()> {
64        self.io.try_io(Interest::WRITABLE, || {
65            self.inner.send((&self.io).into(), transmit)
66        })
67    }
68}
69
70impl AsyncUdpSocket for UdpSocket {
71    fn create_sender(&self) -> Pin<Box<dyn super::UdpSender>> {
72        Box::pin(UdpSenderHelper::new(self.clone(), |socket: &UdpSocket| {
73            let socket = socket.clone();
74            async move { socket.io.writable().await }
75        }))
76    }
77
78    fn poll_recv(
79        &mut self,
80        cx: &mut Context,
81        bufs: &mut [std::io::IoSliceMut<'_>],
82        meta: &mut [udp::RecvMeta],
83    ) -> Poll<io::Result<usize>> {
84        loop {
85            ready!(self.io.poll_recv_ready(cx))?;
86            if let Ok(res) = self.io.try_io(Interest::READABLE, || {
87                self.inner.recv((&self.io).into(), bufs, meta)
88            }) {
89                return Poll::Ready(Ok(res));
90            }
91        }
92    }
93
94    fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
95        self.io.local_addr()
96    }
97
98    fn may_fragment(&self) -> bool {
99        self.inner.may_fragment()
100    }
101
102    fn max_receive_segments(&self) -> usize {
103        self.inner.gro_segments()
104    }
105}