ant_quic/quinn_high_level/runtime/
tokio.rs1use std::{
2 future::Future,
3 io,
4 net::SocketAddr,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8};
9
10use tokio::{
11 io::ReadBuf,
12 time::{sleep_until, Sleep},
13};
14
15use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller, UdpPollHelper};
16use crate::Instant;
17
18#[derive(Debug)]
20pub struct TokioRuntime;
21
22impl Runtime for TokioRuntime {
23 fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>> {
24 Box::pin(TokioTimer(Box::pin(sleep_until(i.into()))))
25 }
26
27 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
28 tokio::spawn(future);
29 }
30
31 fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Arc<dyn AsyncUdpSocket>> {
32 t.set_nonblocking(true)?;
33 Ok(Arc::new(UdpSocket {
34 inner: tokio::net::UdpSocket::from_std(t)?,
35 may_fragment: true, }))
37 }
38
39 fn now(&self) -> Instant {
40 Instant::from(tokio::time::Instant::now())
41 }
42}
43
44#[derive(Debug)]
46struct TokioTimer(Pin<Box<Sleep>>);
47
48impl AsyncTimer for TokioTimer {
49 fn reset(mut self: Pin<&mut Self>, i: Instant) {
50 self.0.as_mut().reset(i.into())
51 }
52
53 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
54 self.0.as_mut().poll(cx).map(|_| ())
55 }
56}
57
58#[derive(Debug)]
60struct UdpSocket {
61 inner: tokio::net::UdpSocket,
62 may_fragment: bool,
63}
64
65impl AsyncUdpSocket for UdpSocket {
66 fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn UdpPoller>> {
67 Box::pin(UdpPollHelper::new(move || {
68 let socket = self.clone();
69 async move {
70 loop {
71 socket.inner.writable().await?;
72 return Ok(());
73 }
74 }
75 }))
76 }
77
78 fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
79 self.inner.try_send_to(&transmit.contents, transmit.destination)?;
80 Ok(())
81 }
82
83 fn poll_recv(
84 &self,
85 cx: &mut Context,
86 bufs: &mut [std::io::IoSliceMut<'_>],
87 meta: &mut [quinn_udp::RecvMeta],
88 ) -> Poll<io::Result<usize>> {
89 if bufs.is_empty() || meta.is_empty() {
93 return Poll::Ready(Ok(0));
94 }
95
96 let mut buf = ReadBuf::new(&mut bufs[0]);
97 let addr = match self.inner.poll_recv_from(cx, &mut buf) {
98 Poll::Ready(Ok(addr)) => addr,
99 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
100 Poll::Pending => return Poll::Pending,
101 };
102
103 let len = buf.filled().len();
104 meta[0] = quinn_udp::RecvMeta {
105 len,
106 stride: len,
107 addr,
108 ecn: None,
109 dst_ip: None,
110 };
111
112 Poll::Ready(Ok(1))
113 }
114
115 fn local_addr(&self) -> io::Result<SocketAddr> {
116 self.inner.local_addr()
117 }
118
119 fn may_fragment(&self) -> bool {
120 self.may_fragment
121 }
122}
123
124pub(super) trait HandleRuntime {
126 fn as_runtime(&self) -> TokioRuntime;
128}
129
130impl HandleRuntime for tokio::runtime::Handle {
131 fn as_runtime(&self) -> TokioRuntime {
132 TokioRuntime
133 }
134}