ant_quic/high_level/runtime/
tokio.rs1use std::{
2 future::Future,
3 io,
4 net::SocketAddr,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8};
9
10use tokio::{
11 io::ReadBuf,
12 time::{Sleep, sleep_until},
13};
14
15use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPollHelper, UdpPoller};
16use crate::Instant;
17
18#[derive(Debug)]
20pub struct TokioRuntime;
21
22impl Runtime for TokioRuntime {
23 fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>> {
24 Box::pin(TokioTimer(Box::pin(sleep_until(i.into()))))
25 }
26
27 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
28 tokio::spawn(future);
29 }
30
31 fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Arc<dyn AsyncUdpSocket>> {
32 t.set_nonblocking(true)?;
33 Ok(Arc::new(UdpSocket {
34 inner: tokio::net::UdpSocket::from_std(t)?,
35 may_fragment: true, }))
37 }
38
39 fn now(&self) -> Instant {
40 Instant::from(tokio::time::Instant::now())
41 }
42}
43
44#[derive(Debug)]
46struct TokioTimer(Pin<Box<Sleep>>);
47
48impl AsyncTimer for TokioTimer {
49 fn reset(mut self: Pin<&mut Self>, i: Instant) {
50 self.0.as_mut().reset(i.into())
51 }
52
53 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
54 self.0.as_mut().poll(cx).map(|_| ())
55 }
56}
57
58#[derive(Debug)]
60struct UdpSocket {
61 inner: tokio::net::UdpSocket,
62 may_fragment: bool,
63}
64
65impl AsyncUdpSocket for UdpSocket {
66 fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn UdpPoller>> {
67 Box::pin(UdpPollHelper::new(move || {
68 let socket = self.clone();
69 async move {
70 loop {
71 socket.inner.writable().await?;
72 return Ok(());
73 }
74 }
75 }))
76 }
77
78 fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
79 self.inner
80 .try_send_to(transmit.contents, transmit.destination)?;
81 Ok(())
82 }
83
84 fn poll_recv(
85 &self,
86 cx: &mut Context,
87 bufs: &mut [std::io::IoSliceMut<'_>],
88 meta: &mut [quinn_udp::RecvMeta],
89 ) -> Poll<io::Result<usize>> {
90 if bufs.is_empty() || meta.is_empty() {
94 return Poll::Ready(Ok(0));
95 }
96
97 let mut buf = ReadBuf::new(&mut bufs[0]);
98 let addr = match self.inner.poll_recv_from(cx, &mut buf) {
99 Poll::Ready(Ok(addr)) => addr,
100 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
101 Poll::Pending => return Poll::Pending,
102 };
103
104 let len = buf.filled().len();
105 meta[0] = quinn_udp::RecvMeta {
106 len,
107 stride: len,
108 addr,
109 ecn: None,
110 dst_ip: None,
111 };
112
113 Poll::Ready(Ok(1))
114 }
115
116 fn local_addr(&self) -> io::Result<SocketAddr> {
117 self.inner.local_addr()
118 }
119
120 fn may_fragment(&self) -> bool {
121 self.may_fragment
122 }
123}
124
125pub(super) trait HandleRuntime {
127 fn as_runtime(&self) -> TokioRuntime;
129}
130
131impl HandleRuntime for tokio::runtime::Handle {
132 fn as_runtime(&self) -> TokioRuntime {
133 TokioRuntime
134 }
135}