roa_core/app/
stream.rs

1use std::fmt::Debug;
2use std::io;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::task::{self, Poll};
6
7use futures::ready;
8use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
9use tracing::{instrument, trace};
10
11/// A transport returned yieled by `AddrIncoming`.
12pub struct AddrStream<IO> {
13    /// The remote address of this stream.
14    pub remote_addr: SocketAddr,
15
16    /// The inner stream.
17    pub stream: IO,
18}
19
20impl<IO> AddrStream<IO> {
21    /// Construct an AddrStream from an addr and a AsyncReadWriter.
22    #[inline]
23    pub fn new(remote_addr: SocketAddr, stream: IO) -> AddrStream<IO> {
24        AddrStream {
25            remote_addr,
26            stream,
27        }
28    }
29}
30
31impl<IO> AsyncRead for AddrStream<IO>
32where
33    IO: Unpin + AsyncRead,
34{
35    #[inline]
36    #[instrument(skip(cx, buf))]
37    fn poll_read(
38        mut self: Pin<&mut Self>,
39        cx: &mut task::Context<'_>,
40        buf: &mut ReadBuf<'_>,
41    ) -> Poll<io::Result<()>> {
42        let poll = Pin::new(&mut self.stream).poll_read(cx, buf);
43        trace!("poll read: {:?}", poll);
44        ready!(poll)?;
45        trace!("read {} bytes", buf.filled().len());
46        Poll::Ready(Ok(()))
47    }
48}
49
50impl<IO> AsyncWrite for AddrStream<IO>
51where
52    IO: Unpin + AsyncWrite,
53{
54    #[inline]
55    #[instrument(skip(cx, buf))]
56    fn poll_write(
57        mut self: Pin<&mut Self>,
58        cx: &mut task::Context<'_>,
59        buf: &[u8],
60    ) -> Poll<io::Result<usize>> {
61        let write_size = ready!(Pin::new(&mut self.stream).poll_write(cx, buf))?;
62        trace!("wrote {} bytes", write_size);
63        Poll::Ready(Ok(write_size))
64    }
65
66    #[inline]
67    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
68        Pin::new(&mut self.stream).poll_flush(cx)
69    }
70
71    #[inline]
72    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
73        Pin::new(&mut self.stream).poll_shutdown(cx)
74    }
75}
76
77impl<IO> Debug for AddrStream<IO> {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("AddrStream")
80            .field("remote_addr", &self.remote_addr)
81            .finish()
82    }
83}