ant_quic/high_level/runtime/
tokio.rs1use std::{
9 future::Future,
10 io,
11 net::SocketAddr,
12 pin::Pin,
13 sync::Arc,
14 task::{Context, Poll},
15};
16
17use tokio::{
18 io::ReadBuf,
19 time::{Sleep, sleep_until},
20};
21
22use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPollHelper, UdpPoller};
23use crate::Instant;
24
25#[derive(Debug)]
27pub struct TokioRuntime;
28
29impl Runtime for TokioRuntime {
30 fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>> {
31 Box::pin(TokioTimer(Box::pin(sleep_until(i.into()))))
32 }
33
34 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
35 tokio::spawn(future);
36 }
37
38 fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Arc<dyn AsyncUdpSocket>> {
39 t.set_nonblocking(true)?;
40 Ok(Arc::new(UdpSocket {
41 inner: tokio::net::UdpSocket::from_std(t)?,
42 may_fragment: true, }))
44 }
45
46 fn now(&self) -> Instant {
47 Instant::from(tokio::time::Instant::now())
48 }
49}
50
51#[derive(Debug)]
53struct TokioTimer(Pin<Box<Sleep>>);
54
55impl AsyncTimer for TokioTimer {
56 fn reset(mut self: Pin<&mut Self>, i: Instant) {
57 self.0.as_mut().reset(i.into())
58 }
59
60 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
61 self.0.as_mut().poll(cx).map(|_| ())
62 }
63}
64
65#[derive(Debug)]
67struct UdpSocket {
68 inner: tokio::net::UdpSocket,
69 may_fragment: bool,
70}
71
72impl AsyncUdpSocket for UdpSocket {
73 fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn UdpPoller>> {
74 Box::pin(UdpPollHelper::new(move || {
75 let socket = self.clone();
76 async move {
77 loop {
78 socket.inner.writable().await?;
79 return Ok(());
80 }
81 }
82 }))
83 }
84
85 fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
86 self.inner
87 .try_send_to(transmit.contents, transmit.destination)?;
88 Ok(())
89 }
90
91 fn poll_recv(
92 &self,
93 cx: &mut Context,
94 bufs: &mut [std::io::IoSliceMut<'_>],
95 meta: &mut [quinn_udp::RecvMeta],
96 ) -> Poll<io::Result<usize>> {
97 if bufs.is_empty() || meta.is_empty() {
101 return Poll::Ready(Ok(0));
102 }
103
104 let mut buf = ReadBuf::new(&mut bufs[0]);
105 let addr = match self.inner.poll_recv_from(cx, &mut buf) {
106 Poll::Ready(Ok(addr)) => addr,
107 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
108 Poll::Pending => return Poll::Pending,
109 };
110
111 let len = buf.filled().len();
112 meta[0] = quinn_udp::RecvMeta {
113 len,
114 stride: len,
115 addr,
116 ecn: None,
117 dst_ip: None,
118 };
119
120 Poll::Ready(Ok(1))
121 }
122
123 fn local_addr(&self) -> io::Result<SocketAddr> {
124 self.inner.local_addr()
125 }
126
127 fn may_fragment(&self) -> bool {
128 self.may_fragment
129 }
130}
131
132#[allow(dead_code)]
134pub(super) trait HandleRuntime {
135 fn as_runtime(&self) -> TokioRuntime;
137}
138
139impl HandleRuntime for tokio::runtime::Handle {
140 fn as_runtime(&self) -> TokioRuntime {
141 TokioRuntime
142 }
143}