ant_quic/high_level/runtime/
tokio.rs

1use std::{
2    future::Future,
3    io,
4    net::SocketAddr,
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll},
8};
9
10use tokio::{
11    io::ReadBuf,
12    time::{Sleep, sleep_until},
13};
14
15use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPollHelper, UdpPoller};
16use crate::Instant;
17
18/// Tokio runtime implementation
19#[derive(Debug)]
20pub struct TokioRuntime;
21
22impl Runtime for TokioRuntime {
23    fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>> {
24        Box::pin(TokioTimer(Box::pin(sleep_until(i.into()))))
25    }
26
27    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
28        tokio::spawn(future);
29    }
30
31    fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Arc<dyn AsyncUdpSocket>> {
32        t.set_nonblocking(true)?;
33        Ok(Arc::new(UdpSocket {
34            inner: tokio::net::UdpSocket::from_std(t)?,
35            may_fragment: true, // Default to true for now
36        }))
37    }
38
39    fn now(&self) -> Instant {
40        Instant::from(tokio::time::Instant::now())
41    }
42}
43
44/// Tokio timer implementation
45#[derive(Debug)]
46struct TokioTimer(Pin<Box<Sleep>>);
47
48impl AsyncTimer for TokioTimer {
49    fn reset(mut self: Pin<&mut Self>, i: Instant) {
50        self.0.as_mut().reset(i.into())
51    }
52
53    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
54        self.0.as_mut().poll(cx).map(|_| ())
55    }
56}
57
58/// Tokio UDP socket implementation
59#[derive(Debug)]
60struct UdpSocket {
61    inner: tokio::net::UdpSocket,
62    may_fragment: bool,
63}
64
65impl AsyncUdpSocket for UdpSocket {
66    fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn UdpPoller>> {
67        Box::pin(UdpPollHelper::new(move || {
68            let socket = self.clone();
69            async move {
70                loop {
71                    socket.inner.writable().await?;
72                    return Ok(());
73                }
74            }
75        }))
76    }
77
78    fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
79        self.inner
80            .try_send_to(transmit.contents, transmit.destination)?;
81        Ok(())
82    }
83
84    fn poll_recv(
85        &self,
86        cx: &mut Context,
87        bufs: &mut [std::io::IoSliceMut<'_>],
88        meta: &mut [quinn_udp::RecvMeta],
89    ) -> Poll<io::Result<usize>> {
90        // For now, use a simple single-packet receive
91        // In production, should use quinn_udp::recv for GSO/GRO support
92
93        if bufs.is_empty() || meta.is_empty() {
94            return Poll::Ready(Ok(0));
95        }
96
97        let mut buf = ReadBuf::new(&mut bufs[0]);
98        let addr = match self.inner.poll_recv_from(cx, &mut buf) {
99            Poll::Ready(Ok(addr)) => addr,
100            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
101            Poll::Pending => return Poll::Pending,
102        };
103
104        let len = buf.filled().len();
105        meta[0] = quinn_udp::RecvMeta {
106            len,
107            stride: len,
108            addr,
109            ecn: None,
110            dst_ip: None,
111        };
112
113        Poll::Ready(Ok(1))
114    }
115
116    fn local_addr(&self) -> io::Result<SocketAddr> {
117        self.inner.local_addr()
118    }
119
120    fn may_fragment(&self) -> bool {
121        self.may_fragment
122    }
123}
124
125/// Extension trait to convert tokio::Handle to Runtime
126pub(super) trait HandleRuntime {
127    /// Create a Runtime implementation from this handle
128    fn as_runtime(&self) -> TokioRuntime;
129}
130
131impl HandleRuntime for tokio::runtime::Handle {
132    fn as_runtime(&self) -> TokioRuntime {
133        TokioRuntime
134    }
135}