1use std::io::Result;
2use std::net::SocketAddr;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use tokio::net::UdpSocket;
8use tokio::time::{sleep, Sleep, Instant};
9use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
10
11use crate::{get_timeout, new_udp_socket};
12
13pub struct UdpStreamRemote {
19 socket: UdpSocket,
20 timeout: Pin<Box<Sleep>>,
21}
22
23impl UdpStreamRemote {
24 #[inline]
26 pub async fn new(local_addr: SocketAddr, remote_addr: SocketAddr) -> std::io::Result<Self> {
27 let socket = new_udp_socket(local_addr)?;
28 socket.connect(remote_addr).await?;
29 Ok(Self {
30 socket,
31 timeout: Box::pin(sleep(get_timeout())),
32 })
33 }
34
35 #[inline]
37 pub fn peer_addr(&self) -> SocketAddr { self.socket.peer_addr().unwrap() }
38
39 #[inline]
41 pub fn local_addr(&self) -> SocketAddr { self.socket.local_addr().unwrap() }
42
43 #[inline]
45 pub const fn inner_socket(&self) -> &UdpSocket { &self.socket }
46}
47
48impl AsyncRead for UdpStreamRemote {
49 fn poll_read(
50 self: Pin<&mut Self>,
51 cx: &mut Context<'_>,
52 buf: &mut ReadBuf<'_>,
53 ) -> Poll<Result<()>> {
54 let this = self.get_mut();
55
56 if let Poll::Ready(result) = this.socket.poll_recv(cx, buf) {
57 this.timeout.as_mut().reset(Instant::now() + get_timeout());
59
60 return match result {
61 Ok(_) => Poll::Ready(Ok(())),
62 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
63 Err(e) => Poll::Ready(Err(e)),
64 };
65 }
66
67 if this.timeout.as_mut().poll(cx).is_ready() {
69 buf.clear();
70 return Poll::Ready(Ok(()));
71 }
72
73 Poll::Pending
74 }
75}
76
77impl AsyncWrite for UdpStreamRemote {
78 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
79 let this = self.get_mut();
80 this.socket.poll_send(cx, buf)
81 }
82
83 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
84 Poll::Ready(Ok(()))
85 }
86
87 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
88 Poll::Ready(Ok(()))
89 }
90}