async_listen/
byte_stream.rs

1use std::fmt;
2use std::io;
3use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::pin::Pin;
6use std::task::{Poll, Context};
7
8use async_std::io::{Read, Write, IoSlice, IoSliceMut};
9use async_std::net::{TcpStream, Shutdown};
10#[cfg(unix)] use async_std::os::unix::net::UnixStream;
11
12use crate::backpressure::Token;
13
14
15#[derive(Debug, Clone)]
16enum Stream {
17    Tcp(TcpStream),
18    #[cfg(unix)]
19    Unix(UnixStream),
20}
21
22/// A peer address for either Tcp or Unix socket
23///
24/// This enum is returned by
25/// [`ByteStream::peer_addr`](struct.ByteStream.html#method.peer_addr).
26///
27///
28/// The enum contains `Unix` option even on platforms that don't support
29/// unix sockets (Windows) to make code easier to write (less `#[cfg(unix)]`
30/// attributes all over the code).
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub enum PeerAddr {
33    /// The peer address is TCP socket address.
34    Tcp(SocketAddr),
35    /// The peer address is Unix socket path. `None` if socket is unnamed.
36    Unix(Option<PathBuf>),
37}
38
39/// A wrapper around TcpStream and UnixStream
40///
41/// This structure is yielded by the stream created by
42/// [`ListenExt::backpressure_wrapper`](trait.ListenExt.html#method.backpressure_wrapper)
43///
44/// This wrapper serves two purposes:
45///
46/// 1. Holds backpressure token
47/// 2. Abstract away differences between TcpStream and UnixStream
48///
49/// The structure implements AsyncRead and AsyncWrite so can be used for
50/// protocol implementation directly.
51///
52/// # Notes on Cloning
53///
54/// Cloning a `ByteStream` is a shallow clone, both resulting `ByteStream`
55/// structures hold the same backpressure token (and the same underlying OS socket).
56/// The backpressure slot will be freed (which means new connection can be accepted)
57/// when the last clone of `ByteStream` is dropped.
58#[derive(Debug, Clone)]
59pub struct ByteStream {
60    stream: Stream,
61    token: Option<Token>,
62}
63
64trait Assert: Read + Write + Send + Unpin + 'static { }
65impl Assert for ByteStream {}
66
67impl fmt::Display for PeerAddr {
68    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
69        match self {
70            PeerAddr::Tcp(s) => s.fmt(f),
71            PeerAddr::Unix(None) => "<unnamed>".fmt(f),
72            PeerAddr::Unix(Some(s)) => s.display().fmt(f),
73        }
74    }
75}
76
77impl ByteStream {
78    /// Create a bytestream for a tcp socket
79    pub fn new_tcp(token: Token, stream: TcpStream) -> ByteStream {
80        ByteStream {
81            stream: Stream::Tcp(stream),
82            token: Some(token),
83        }
84    }
85
86    /// Create a bytestream for a tcp socket (without token)
87    ///
88    /// This can be used with interfaces that require a `ByteStream` but
89    /// aren't got from the listener that have backpressure applied. For
90    /// example, if you have two listeners in the single app or even for
91    /// client connections.
92    pub fn new_tcp_detached(stream: TcpStream) -> ByteStream {
93        ByteStream {
94            stream: Stream::Tcp(stream),
95            token: None,
96        }
97    }
98
99    /// Create a bytestream for a unix socket
100    #[cfg(unix)]
101    pub fn new_unix(token: Token, stream: UnixStream) -> ByteStream {
102        ByteStream {
103            stream: Stream::Unix(stream),
104            token: Some(token),
105        }
106    }
107
108    /// Create a bytestream for a unix socket (without token)
109    ///
110    /// This can be used with interfaces that require a `ByteStream` but
111    /// aren't got from the listener that have backpressure applied. For
112    /// example, if you have two listeners in the single app or even for
113    /// client connections.
114    #[cfg(unix)]
115    pub fn new_unix_detached(stream: UnixStream) -> ByteStream {
116        ByteStream {
117            stream: Stream::Unix(stream),
118            token: None,
119        }
120    }
121
122    /// Returns the remote address that this stream is connected to.
123    ///
124    /// Note: even on non-unix platforms (Windows)
125    /// [`PeerAddr`](enum.PeerAddr.html) still contains `Unix` option so you
126    /// don't have to use conditional compilation when matching.
127    ///
128    /// ## Examples
129    ///
130    /// ```ignore
131    /// let peer = stream.peer_addr()?;
132    /// match peer.peer_addr()? {
133    ///     PeerAddr::Tcp(addr) => println!("TCP addr {}", addr),
134    ///     PeerAddr::Unix(None) => println!("Unnamed unix socket"),
135    ///     PeerAddr::Unix(Some(path)) => println!("Unix {}", path.display()),
136    /// }
137    /// ```
138    pub fn peer_addr(&self) -> io::Result<PeerAddr> {
139        match &self.stream {
140            Stream::Tcp(s) => s.peer_addr().map(PeerAddr::Tcp),
141            #[cfg(unix)]
142            Stream::Unix(s) => {
143                s.peer_addr()
144                .map(|a| a.as_pathname().map(|p| p.to_owned()))
145                .map(PeerAddr::Unix)
146            }
147        }
148    }
149
150    /// Gets the value of the `TCP_NODELAY` option on this socket.
151    ///
152    /// For Unix sockets this function always returns true (Unix sockets
153    /// always behave like the option is off).
154    ///
155    /// For more information about this option, see [`set_nodelay`].
156    ///
157    /// [`set_nodelay`]: #method.set_nodelay
158    pub fn nodelay(&self) -> io::Result<bool> {
159        match &self.stream {
160            Stream::Tcp(s) => s.nodelay(),
161            #[cfg(unix)]
162            Stream::Unix(_) => Ok(true),
163        }
164    }
165
166    /// Sets the value of the `TCP_NODELAY` option on this socket.
167    ///
168    /// If set, this option disables the Nagle algorithm. This means that
169    /// segments are always sent as soon as possible, even if there is only a
170    /// small amount of data. When not set, data is buffered until there is a
171    /// sufficient amount to send out, thereby avoiding the frequent sending of
172    /// small packets.
173    ///
174    /// For Unix sockets this function does nothing (Unix sockets always behave
175    /// like the option is enabled, and there is no way to change that).
176    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
177        match &self.stream {
178            Stream::Tcp(s) => s.set_nodelay(nodelay),
179            #[cfg(unix)]
180            Stream::Unix(_) => Ok(()),
181        }
182    }
183
184    /// Shuts down the read, write, or both halves of this connection.
185    ///
186    /// This function will cause all pending and future I/O calls on the
187    /// specified portions to immediately return with an appropriate value
188    /// (see the documentation of Shutdown).
189    pub fn shutdown(&self, how: Shutdown) -> Result<(), io::Error> {
190        match &self.stream {
191            Stream::Tcp(s) => s.shutdown(how),
192            #[cfg(unix)]
193            Stream::Unix(s) => s.shutdown(how),
194        }
195    }
196}
197
198impl From<(Token, TcpStream)> for ByteStream {
199    fn from((token, stream): (Token, TcpStream)) -> ByteStream {
200        ByteStream::new_tcp(token, stream)
201    }
202}
203
204#[cfg(unix)]
205impl From<(Token, UnixStream)> for ByteStream {
206    fn from((token, stream): (Token, UnixStream)) -> ByteStream {
207        ByteStream::new_unix(token, stream)
208    }
209}
210
211impl Read for ByteStream {
212
213    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8])
214        -> Poll<Result<usize, io::Error>>
215    {
216        match self.stream {
217            Stream::Tcp(ref s) => {
218                Pin::new(&mut &*s).poll_read(cx, buf)
219            }
220            #[cfg(unix)]
221            Stream::Unix(ref s) => {
222                Pin::new(&mut &*s).poll_read(cx, buf)
223            }
224        }
225    }
226
227    fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context,
228        bufs: &mut [IoSliceMut])
229        -> Poll<Result<usize, io::Error>>
230    {
231        match self.stream {
232            Stream::Tcp(ref s) => {
233                Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
234            }
235            #[cfg(unix)]
236            Stream::Unix(ref s) => {
237                Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
238            }
239        }
240    }
241}
242
243impl Read for &ByteStream {
244    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8])
245        -> Poll<Result<usize, io::Error>>
246    {
247        match self.stream {
248            Stream::Tcp(ref s) => {
249                Pin::new(&mut &*s).poll_read(cx, buf)
250            }
251            #[cfg(unix)]
252            Stream::Unix(ref s) => {
253                Pin::new(&mut &*s).poll_read(cx, buf)
254            }
255        }
256    }
257    fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context,
258        bufs: &mut [IoSliceMut])
259        -> Poll<Result<usize, io::Error>>
260    {
261        match self.stream {
262            Stream::Tcp(ref s) => {
263                Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
264            }
265            #[cfg(unix)]
266            Stream::Unix(ref s) => {
267                Pin::new(&mut &*s).poll_read_vectored(cx, bufs)
268            }
269        }
270    }
271}
272
273impl Write for ByteStream {
274    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])
275        -> Poll<Result<usize, io::Error>>
276    {
277        match self.stream {
278            Stream::Tcp(ref s) => {
279                Pin::new(&mut &*s).poll_write(cx, buf)
280            }
281            #[cfg(unix)]
282            Stream::Unix(ref s) => {
283                Pin::new(&mut &*s).poll_write(cx, buf)
284            }
285        }
286    }
287    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context)
288        -> Poll<Result<(), io::Error>>
289    {
290        match self.stream {
291            Stream::Tcp(ref s) => {
292                Pin::new(&mut &*s).poll_flush(cx)
293            }
294            #[cfg(unix)]
295            Stream::Unix(ref s) => {
296                Pin::new(&mut &*s).poll_flush(cx)
297            }
298        }
299    }
300    fn poll_close(self: Pin<&mut Self>, cx: &mut Context)
301        -> Poll<Result<(), io::Error>>
302    {
303        match self.stream {
304            Stream::Tcp(ref s) => {
305                Pin::new(&mut &*s).poll_close(cx)
306            }
307            #[cfg(unix)]
308            Stream::Unix(ref s) => {
309                Pin::new(&mut &*s).poll_close(cx)
310            }
311        }
312    }
313    fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context,
314        bufs: &[IoSlice])
315        -> Poll<Result<usize, io::Error>>
316    {
317        match self.stream {
318            Stream::Tcp(ref s) => {
319                Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
320            }
321            #[cfg(unix)]
322            Stream::Unix(ref s) => {
323                Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
324            }
325        }
326    }
327}
328
329impl Write for &ByteStream {
330    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])
331        -> Poll<Result<usize, io::Error>>
332    {
333        match self.stream {
334            Stream::Tcp(ref s) => {
335                Pin::new(&mut &*s).poll_write(cx, buf)
336            }
337            #[cfg(unix)]
338            Stream::Unix(ref s) => {
339                Pin::new(&mut &*s).poll_write(cx, buf)
340            }
341        }
342    }
343    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context)
344        -> Poll<Result<(), io::Error>>
345    {
346        match self.stream {
347            Stream::Tcp(ref s) => {
348                Pin::new(&mut &*s).poll_flush(cx)
349            }
350            #[cfg(unix)]
351            Stream::Unix(ref s) => {
352                Pin::new(&mut &*s).poll_flush(cx)
353            }
354        }
355    }
356    fn poll_close(self: Pin<&mut Self>, cx: &mut Context)
357        -> Poll<Result<(), io::Error>>
358    {
359        match self.stream {
360            Stream::Tcp(ref s) => {
361                Pin::new(&mut &*s).poll_close(cx)
362            }
363            #[cfg(unix)]
364            Stream::Unix(ref s) => {
365                Pin::new(&mut &*s).poll_close(cx)
366            }
367        }
368    }
369    fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context,
370        bufs: &[IoSlice])
371        -> Poll<Result<usize, io::Error>>
372    {
373        match self.stream {
374            Stream::Tcp(ref s) => {
375                Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
376            }
377            #[cfg(unix)]
378            Stream::Unix(ref s) => {
379                Pin::new(&mut &*s).poll_write_vectored(cx, bufs)
380            }
381        }
382    }
383}