udpflow/
streamr.rs

1use std::io::Result;
2use std::net::SocketAddr;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use tokio::net::UdpSocket;
8use tokio::time::{sleep, Sleep, Instant};
9use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
10
11use crate::{get_timeout, new_udp_socket};
12
13/// Udp stream which is actively established.
14///
15/// A `Read` call times out when there is no packet received
16/// during a period of time. This is treated as `EOF`, and
17/// a `Ok(0)` will be returned.
18pub struct UdpStreamRemote {
19    socket: UdpSocket,
20    timeout: Pin<Box<Sleep>>,
21}
22
23impl UdpStreamRemote {
24    /// Create from a **bound** udp socket.
25    #[inline]
26    pub async fn new(local_addr: SocketAddr, remote_addr: SocketAddr) -> std::io::Result<Self> {
27        let socket = new_udp_socket(local_addr)?;
28        socket.connect(remote_addr).await?;
29        Ok(Self {
30            socket,
31            timeout: Box::pin(sleep(get_timeout())),
32        })
33    }
34
35    /// Get peer sockaddr.
36    #[inline]
37    pub fn peer_addr(&self) -> SocketAddr { self.socket.peer_addr().unwrap() }
38
39    /// Get local sockaddr.
40    #[inline]
41    pub fn local_addr(&self) -> SocketAddr { self.socket.local_addr().unwrap() }
42
43    /// Get inner udp socket.
44    #[inline]
45    pub const fn inner_socket(&self) -> &UdpSocket { &self.socket }
46}
47
48impl AsyncRead for UdpStreamRemote {
49    fn poll_read(
50        self: Pin<&mut Self>,
51        cx: &mut Context<'_>,
52        buf: &mut ReadBuf<'_>,
53    ) -> Poll<Result<()>> {
54        let this = self.get_mut();
55
56        if let Poll::Ready(result) = this.socket.poll_recv(cx, buf) {
57            // reset timer
58            this.timeout.as_mut().reset(Instant::now() + get_timeout());
59
60            return match result {
61                Ok(_) => Poll::Ready(Ok(())),
62                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
63                Err(e) => Poll::Ready(Err(e)),
64            };
65        }
66
67        // EOF
68        if this.timeout.as_mut().poll(cx).is_ready() {
69            buf.clear();
70            return Poll::Ready(Ok(()));
71        }
72
73        Poll::Pending
74    }
75}
76
77impl AsyncWrite for UdpStreamRemote {
78    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
79        let this = self.get_mut();
80        this.socket.poll_send(cx, buf)
81    }
82
83    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
84        Poll::Ready(Ok(()))
85    }
86
87    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
88        Poll::Ready(Ok(()))
89    }
90}