ant_quic/high_level/runtime/
tokio.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    future::Future,
10    io,
11    net::SocketAddr,
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16
17use tokio::{
18    io::ReadBuf,
19    time::{Sleep, sleep_until},
20};
21
22use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPollHelper, UdpPoller};
23use crate::Instant;
24
25/// Tokio runtime implementation
26#[derive(Debug)]
27pub struct TokioRuntime;
28
29impl Runtime for TokioRuntime {
30    fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>> {
31        Box::pin(TokioTimer(Box::pin(sleep_until(i.into()))))
32    }
33
34    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
35        tokio::spawn(future);
36    }
37
38    fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Arc<dyn AsyncUdpSocket>> {
39        t.set_nonblocking(true)?;
40        Ok(Arc::new(UdpSocket {
41            inner: tokio::net::UdpSocket::from_std(t)?,
42            may_fragment: true, // Default to true for now
43        }))
44    }
45
46    fn now(&self) -> Instant {
47        Instant::from(tokio::time::Instant::now())
48    }
49}
50
51/// Tokio timer implementation
52#[derive(Debug)]
53struct TokioTimer(Pin<Box<Sleep>>);
54
55impl AsyncTimer for TokioTimer {
56    fn reset(mut self: Pin<&mut Self>, i: Instant) {
57        self.0.as_mut().reset(i.into())
58    }
59
60    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
61        self.0.as_mut().poll(cx).map(|_| ())
62    }
63}
64
65/// Tokio UDP socket implementation
66#[derive(Debug)]
67struct UdpSocket {
68    inner: tokio::net::UdpSocket,
69    may_fragment: bool,
70}
71
72impl AsyncUdpSocket for UdpSocket {
73    fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn UdpPoller>> {
74        Box::pin(UdpPollHelper::new(move || {
75            let socket = self.clone();
76            async move {
77                loop {
78                    socket.inner.writable().await?;
79                    return Ok(());
80                }
81            }
82        }))
83    }
84
85    fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
86        self.inner
87            .try_send_to(transmit.contents, transmit.destination)?;
88        Ok(())
89    }
90
91    fn poll_recv(
92        &self,
93        cx: &mut Context,
94        bufs: &mut [std::io::IoSliceMut<'_>],
95        meta: &mut [quinn_udp::RecvMeta],
96    ) -> Poll<io::Result<usize>> {
97        // For now, use a simple single-packet receive
98        // In production, should use quinn_udp::recv for GSO/GRO support
99
100        if bufs.is_empty() || meta.is_empty() {
101            return Poll::Ready(Ok(0));
102        }
103
104        let mut buf = ReadBuf::new(&mut bufs[0]);
105        let addr = match self.inner.poll_recv_from(cx, &mut buf) {
106            Poll::Ready(Ok(addr)) => addr,
107            Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
108            Poll::Pending => return Poll::Pending,
109        };
110
111        let len = buf.filled().len();
112        meta[0] = quinn_udp::RecvMeta {
113            len,
114            stride: len,
115            addr,
116            ecn: None,
117            dst_ip: None,
118        };
119
120        Poll::Ready(Ok(1))
121    }
122
123    fn local_addr(&self) -> io::Result<SocketAddr> {
124        self.inner.local_addr()
125    }
126
127    fn may_fragment(&self) -> bool {
128        self.may_fragment
129    }
130}
131
132/// Extension trait to convert tokio::Handle to Runtime
133#[allow(dead_code)]
134pub(super) trait HandleRuntime {
135    /// Create a Runtime implementation from this handle
136    fn as_runtime(&self) -> TokioRuntime;
137}
138
139impl HandleRuntime for tokio::runtime::Handle {
140    fn as_runtime(&self) -> TokioRuntime {
141        TokioRuntime
142    }
143}