iroh_quinn/runtime/
tokio.rs1use std::{
2 fmt::Debug,
3 future::Future,
4 io,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8 time::Instant,
9};
10
11use tokio::{
12 io::Interest,
13 time::{sleep_until, Sleep},
14};
15
16use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSenderHelper, UdpSenderHelperSocket};
17
18#[derive(Debug)]
20pub struct TokioRuntime;
21
22impl Runtime for TokioRuntime {
23 fn new_timer(&self, t: Instant) -> Pin<Box<dyn AsyncTimer>> {
24 Box::pin(sleep_until(t.into()))
25 }
26
27 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
28 tokio::spawn(future);
29 }
30
31 fn wrap_udp_socket(&self, sock: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>> {
32 Ok(Box::new(UdpSocket {
33 inner: Arc::new(udp::UdpSocketState::new((&sock).into())?),
34 io: Arc::new(tokio::net::UdpSocket::from_std(sock)?),
35 }))
36 }
37
38 fn now(&self) -> Instant {
39 tokio::time::Instant::now().into_std()
40 }
41}
42
43impl AsyncTimer for Sleep {
44 fn reset(self: Pin<&mut Self>, t: Instant) {
45 Self::reset(self, t.into())
46 }
47 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
48 Future::poll(self, cx)
49 }
50}
51
52#[derive(Debug, Clone)]
53struct UdpSocket {
54 io: Arc<tokio::net::UdpSocket>,
55 inner: Arc<udp::UdpSocketState>,
56}
57
58impl UdpSenderHelperSocket for UdpSocket {
59 fn max_transmit_segments(&self) -> usize {
60 self.inner.max_gso_segments()
61 }
62
63 fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()> {
64 self.io.try_io(Interest::WRITABLE, || {
65 self.inner.send((&self.io).into(), transmit)
66 })
67 }
68}
69
70impl AsyncUdpSocket for UdpSocket {
71 fn create_sender(&self) -> Pin<Box<dyn super::UdpSender>> {
72 Box::pin(UdpSenderHelper::new(self.clone(), |socket: &UdpSocket| {
73 let socket = socket.clone();
74 async move { socket.io.writable().await }
75 }))
76 }
77
78 fn poll_recv(
79 &mut self,
80 cx: &mut Context,
81 bufs: &mut [std::io::IoSliceMut<'_>],
82 meta: &mut [udp::RecvMeta],
83 ) -> Poll<io::Result<usize>> {
84 loop {
85 ready!(self.io.poll_recv_ready(cx))?;
86 if let Ok(res) = self.io.try_io(Interest::READABLE, || {
87 self.inner.recv((&self.io).into(), bufs, meta)
88 }) {
89 return Poll::Ready(Ok(res));
90 }
91 }
92 }
93
94 fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
95 self.io.local_addr()
96 }
97
98 fn may_fragment(&self) -> bool {
99 self.inner.may_fragment()
100 }
101
102 fn max_receive_segments(&self) -> usize {
103 self.inner.gro_segments()
104 }
105}