1use std::fmt::Debug;
2use std::io;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::task::{self, Poll};
6
7use futures::ready;
8use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
9use tracing::{instrument, trace};
10
11pub struct AddrStream<IO> {
13 pub remote_addr: SocketAddr,
15
16 pub stream: IO,
18}
19
20impl<IO> AddrStream<IO> {
21 #[inline]
23 pub fn new(remote_addr: SocketAddr, stream: IO) -> AddrStream<IO> {
24 AddrStream {
25 remote_addr,
26 stream,
27 }
28 }
29}
30
31impl<IO> AsyncRead for AddrStream<IO>
32where
33 IO: Unpin + AsyncRead,
34{
35 #[inline]
36 #[instrument(skip(cx, buf))]
37 fn poll_read(
38 mut self: Pin<&mut Self>,
39 cx: &mut task::Context<'_>,
40 buf: &mut ReadBuf<'_>,
41 ) -> Poll<io::Result<()>> {
42 let poll = Pin::new(&mut self.stream).poll_read(cx, buf);
43 trace!("poll read: {:?}", poll);
44 ready!(poll)?;
45 trace!("read {} bytes", buf.filled().len());
46 Poll::Ready(Ok(()))
47 }
48}
49
50impl<IO> AsyncWrite for AddrStream<IO>
51where
52 IO: Unpin + AsyncWrite,
53{
54 #[inline]
55 #[instrument(skip(cx, buf))]
56 fn poll_write(
57 mut self: Pin<&mut Self>,
58 cx: &mut task::Context<'_>,
59 buf: &[u8],
60 ) -> Poll<io::Result<usize>> {
61 let write_size = ready!(Pin::new(&mut self.stream).poll_write(cx, buf))?;
62 trace!("wrote {} bytes", write_size);
63 Poll::Ready(Ok(write_size))
64 }
65
66 #[inline]
67 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
68 Pin::new(&mut self.stream).poll_flush(cx)
69 }
70
71 #[inline]
72 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
73 Pin::new(&mut self.stream).poll_shutdown(cx)
74 }
75}
76
77impl<IO> Debug for AddrStream<IO> {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("AddrStream")
80 .field("remote_addr", &self.remote_addr)
81 .finish()
82 }
83}