udpflow/
streaml.rs

1use std::io::Result;
2use std::sync::Arc;
3use std::net::SocketAddr;
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::net::UdpSocket;
9use tokio::time::{sleep, Sleep, Instant};
10use tokio::sync::mpsc::Receiver;
11use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
12
13use crate::sockmap::{SockMap, Packet};
14use crate::get_timeout;
15
16/// Udp stream accepted from local listener.
17///
18/// A `Read` call times out when there is no packet received
19/// during a period of time. This is treated as `EOF`, and
20/// a `Ok(0)` will be returned.
21pub struct UdpStreamLocal {
22    rx: Receiver<Packet>,
23    socket: Arc<UdpSocket>,
24    timeout: Pin<Box<Sleep>>,
25    sockmap: SockMap,
26    addr: SocketAddr,
27}
28
29impl UdpStreamLocal {
30    pub(crate) fn new(
31        rx: Receiver<Packet>,
32        socket: Arc<UdpSocket>,
33        sockmap: SockMap,
34        addr: SocketAddr,
35    ) -> Self {
36        Self {
37            rx,
38            socket,
39            addr,
40            sockmap,
41            timeout: Box::pin(sleep(get_timeout())),
42        }
43    }
44
45    /// Get peer sockaddr.
46    #[inline]
47    pub const fn peer_addr(&self) -> SocketAddr { self.addr }
48
49    /// Get local sockaddr.
50    #[inline]
51    pub fn local_addr(&self) -> SocketAddr { self.socket.local_addr().unwrap() }
52
53    /// Get inner udp socket.
54    #[inline]
55    pub const fn inner_socket(&self) -> &Arc<UdpSocket> { &self.socket }
56}
57
58impl Drop for UdpStreamLocal {
59    fn drop(&mut self) {
60        self.sockmap.remove(&self.addr);
61        // left elements are popped
62    }
63}
64
65impl AsyncRead for UdpStreamLocal {
66    fn poll_read(
67        self: Pin<&mut Self>,
68        cx: &mut Context<'_>,
69        buf: &mut ReadBuf<'_>,
70    ) -> Poll<Result<()>> {
71        let this = self.get_mut();
72
73        if let Poll::Ready(Some(pkt)) = this.rx.poll_recv(cx) {
74            buf.put_slice(&pkt);
75
76            // reset timer
77            this.timeout.as_mut().reset(Instant::now() + get_timeout());
78
79            return Poll::Ready(Ok(()));
80        }
81
82        // EOF
83        if this.timeout.as_mut().poll(cx).is_ready() {
84            return Poll::Ready(Ok(()));
85        }
86
87        Poll::Pending
88    }
89}
90
91impl AsyncWrite for UdpStreamLocal {
92    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
93        let this = self.get_mut();
94        this.socket.poll_send_to(cx, buf, this.addr)
95    }
96
97    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
98        Poll::Ready(Ok(()))
99    }
100
101    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
102        self.get_mut().rx.close();
103        Poll::Ready(Ok(()))
104    }
105}