async_std/net/tcp/
stream.rs

1use std::io::{IoSlice, IoSliceMut, Read as _, Write as _};
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use crate::future;
7use crate::io::{self, Read, Write};
8use crate::net::driver::Watcher;
9use crate::net::ToSocketAddrs;
10use crate::task::{Context, Poll};
11
12/// A TCP stream between a local and a remote socket.
13///
14/// A `TcpStream` can either be created by connecting to an endpoint, via the [`connect`] method,
15/// or by [accepting] a connection from a [listener].  It can be read or written to using the
16/// [`AsyncRead`], [`AsyncWrite`], and related extension traits in [`futures::io`].
17///
18/// The connection will be closed when the value is dropped. The reading and writing portions of
19/// the connection can also be shut down individually with the [`shutdown`] method.
20///
21/// This type is an async version of [`std::net::TcpStream`].
22///
23/// [`connect`]: struct.TcpStream.html#method.connect
24/// [accepting]: struct.TcpListener.html#method.accept
25/// [listener]: struct.TcpListener.html
26/// [`AsyncRead`]: https://docs.rs/futures/0.3/futures/io/trait.AsyncRead.html
27/// [`AsyncWrite`]: https://docs.rs/futures/0.3/futures/io/trait.AsyncWrite.html
28/// [`futures::io`]: https://docs.rs/futures/0.3/futures/io/index.html
29/// [`shutdown`]: struct.TcpStream.html#method.shutdown
30/// [`std::net::TcpStream`]: https://doc.rust-lang.org/std/net/struct.TcpStream.html
31///
32/// ## Examples
33///
34/// ```no_run
35/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
36/// #
37/// use async_std::net::TcpStream;
38/// use async_std::prelude::*;
39///
40/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
41/// stream.write_all(b"hello world").await?;
42///
43/// let mut buf = vec![0u8; 1024];
44/// let n = stream.read(&mut buf).await?;
45/// #
46/// # Ok(()) }) }
47/// ```
48#[derive(Debug, Clone)]
49pub struct TcpStream {
50    pub(super) watcher: Arc<Watcher<mio::net::TcpStream>>,
51}
52
53impl TcpStream {
54    /// Creates a new TCP stream connected to the specified address.
55    ///
56    /// This method will create a new TCP socket and attempt to connect it to the `addr`
57    /// provided. The [returned future] will be resolved once the stream has successfully
58    /// connected, or it will return an error if one occurs.
59    ///
60    /// [returned future]: struct.Connect.html
61    ///
62    /// # Examples
63    ///
64    /// ```no_run
65    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
66    /// #
67    /// use async_std::net::TcpStream;
68    ///
69    /// let stream = TcpStream::connect("127.0.0.1:0").await?;
70    /// #
71    /// # Ok(()) }) }
72    /// ```
73    pub async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> {
74        let mut last_err = None;
75        let addrs = addrs.to_socket_addrs().await?;
76
77        for addr in addrs {
78            // mio's TcpStream::connect is non-blocking and may just be in progress
79            // when it returns with `Ok`. We therefore wait for write readiness to
80            // be sure the connection has either been established or there was an
81            // error which we check for afterwards.
82            let watcher = match mio::net::TcpStream::connect(&addr) {
83                Ok(s) => Watcher::new(s),
84                Err(e) => {
85                    last_err = Some(e);
86                    continue;
87                }
88            };
89
90            future::poll_fn(|cx| watcher.poll_write_ready(cx)).await;
91
92            match watcher.get_ref().take_error() {
93                Ok(None) => {
94                    return Ok(TcpStream {
95                        watcher: Arc::new(watcher),
96                    });
97                }
98                Ok(Some(e)) => last_err = Some(e),
99                Err(e) => last_err = Some(e),
100            }
101        }
102
103        Err(last_err.unwrap_or_else(|| {
104            io::Error::new(
105                io::ErrorKind::InvalidInput,
106                "could not resolve to any addresses",
107            )
108        }))
109    }
110
111    /// Returns the local address that this stream is connected to.
112    ///
113    /// ## Examples
114    ///
115    /// ```no_run
116    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
117    /// #
118    /// use async_std::net::TcpStream;
119    ///
120    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
121    /// let addr = stream.local_addr()?;
122    /// #
123    /// # Ok(()) }) }
124    /// ```
125    pub fn local_addr(&self) -> io::Result<SocketAddr> {
126        self.watcher.get_ref().local_addr()
127    }
128
129    /// Returns the remote address that this stream is connected to.
130    ///
131    /// ## Examples
132    ///
133    /// ```no_run
134    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
135    /// #
136    /// use async_std::net::TcpStream;
137    ///
138    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
139    /// let peer = stream.peer_addr()?;
140    /// #
141    /// # Ok(()) }) }
142    /// ```
143    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
144        self.watcher.get_ref().peer_addr()
145    }
146
147    /// Gets the value of the `IP_TTL` option for this socket.
148    ///
149    /// For more information about this option, see [`set_ttl`].
150    ///
151    /// [`set_ttl`]: #method.set_ttl
152    ///
153    /// # Examples
154    ///
155    /// ```no_run
156    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
157    /// #
158    /// use async_std::net::TcpStream;
159    ///
160    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
161    ///
162    /// stream.set_ttl(100)?;
163    /// assert_eq!(stream.ttl()?, 100);
164    /// #
165    /// # Ok(()) }) }
166    /// ```
167    pub fn ttl(&self) -> io::Result<u32> {
168        self.watcher.get_ref().ttl()
169    }
170
171    /// Sets the value for the `IP_TTL` option on this socket.
172    ///
173    /// This value sets the time-to-live field that is used in every packet sent
174    /// from this socket.
175    ///
176    /// # Examples
177    ///
178    /// ```no_run
179    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
180    /// #
181    /// use async_std::net::TcpStream;
182    ///
183    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
184    ///
185    /// stream.set_ttl(100)?;
186    /// assert_eq!(stream.ttl()?, 100);
187    /// #
188    /// # Ok(()) }) }
189    /// ```
190    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
191        self.watcher.get_ref().set_ttl(ttl)
192    }
193
194    /// Receives data on the socket from the remote address to which it is connected, without
195    /// removing that data from the queue.
196    ///
197    /// On success, returns the number of bytes peeked.
198    ///
199    /// Successive calls return the same data. This is accomplished by passing `MSG_PEEK` as a flag
200    /// to the underlying `recv` system call.
201    ///
202    /// # Examples
203    ///
204    /// ```no_run
205    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
206    /// #
207    /// use async_std::net::TcpStream;
208    ///
209    /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
210    ///
211    /// let mut buf = vec![0; 1024];
212    /// let n = stream.peek(&mut buf).await?;
213    /// #
214    /// # Ok(()) }) }
215    /// ```
216    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
217        future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.peek(buf))).await
218    }
219
220    /// Gets the value of the `TCP_NODELAY` option on this socket.
221    ///
222    /// For more information about this option, see [`set_nodelay`].
223    ///
224    /// [`set_nodelay`]: #method.set_nodelay
225    ///
226    /// # Examples
227    ///
228    /// ```no_run
229    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
230    /// #
231    /// use async_std::net::TcpStream;
232    ///
233    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
234    ///
235    /// stream.set_nodelay(true)?;
236    /// assert_eq!(stream.nodelay()?, true);
237    /// #
238    /// # Ok(()) }) }
239    /// ```
240    pub fn nodelay(&self) -> io::Result<bool> {
241        self.watcher.get_ref().nodelay()
242    }
243
244    /// Sets the value of the `TCP_NODELAY` option on this socket.
245    ///
246    /// If set, this option disables the Nagle algorithm. This means that
247    /// segments are always sent as soon as possible, even if there is only a
248    /// small amount of data. When not set, data is buffered until there is a
249    /// sufficient amount to send out, thereby avoiding the frequent sending of
250    /// small packets.
251    ///
252    /// # Examples
253    ///
254    /// ```no_run
255    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
256    /// #
257    /// use async_std::net::TcpStream;
258    ///
259    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
260    ///
261    /// stream.set_nodelay(true)?;
262    /// assert_eq!(stream.nodelay()?, true);
263    /// #
264    /// # Ok(()) }) }
265    /// ```
266    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
267        self.watcher.get_ref().set_nodelay(nodelay)
268    }
269
270    /// Shuts down the read, write, or both halves of this connection.
271    ///
272    /// This method will cause all pending and future I/O on the specified portions to return
273    /// immediately with an appropriate value (see the documentation of [`Shutdown`]).
274    ///
275    /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html
276    ///
277    /// # Examples
278    ///
279    /// ```no_run
280    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
281    /// #
282    /// use std::net::Shutdown;
283    ///
284    /// use async_std::net::TcpStream;
285    ///
286    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
287    /// stream.shutdown(Shutdown::Both)?;
288    /// #
289    /// # Ok(()) }) }
290    /// ```
291    pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
292        self.watcher.get_ref().shutdown(how)
293    }
294}
295
296impl Read for TcpStream {
297    fn poll_read(
298        self: Pin<&mut Self>,
299        cx: &mut Context<'_>,
300        buf: &mut [u8],
301    ) -> Poll<io::Result<usize>> {
302        Pin::new(&mut &*self).poll_read(cx, buf)
303    }
304
305    fn poll_read_vectored(
306        self: Pin<&mut Self>,
307        cx: &mut Context<'_>,
308        bufs: &mut [IoSliceMut<'_>],
309    ) -> Poll<io::Result<usize>> {
310        Pin::new(&mut &*self).poll_read_vectored(cx, bufs)
311    }
312}
313
314impl Read for &TcpStream {
315    fn poll_read(
316        self: Pin<&mut Self>,
317        cx: &mut Context<'_>,
318        buf: &mut [u8],
319    ) -> Poll<io::Result<usize>> {
320        self.watcher.poll_read_with(cx, |mut inner| inner.read(buf))
321    }
322}
323
324impl Write for TcpStream {
325    fn poll_write(
326        self: Pin<&mut Self>,
327        cx: &mut Context<'_>,
328        buf: &[u8],
329    ) -> Poll<io::Result<usize>> {
330        Pin::new(&mut &*self).poll_write(cx, buf)
331    }
332
333    fn poll_write_vectored(
334        self: Pin<&mut Self>,
335        cx: &mut Context<'_>,
336        bufs: &[IoSlice<'_>],
337    ) -> Poll<io::Result<usize>> {
338        Pin::new(&mut &*self).poll_write_vectored(cx, bufs)
339    }
340
341    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
342        Pin::new(&mut &*self).poll_flush(cx)
343    }
344
345    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
346        Pin::new(&mut &*self).poll_close(cx)
347    }
348}
349
350impl Write for &TcpStream {
351    fn poll_write(
352        self: Pin<&mut Self>,
353        cx: &mut Context<'_>,
354        buf: &[u8],
355    ) -> Poll<io::Result<usize>> {
356        self.watcher
357            .poll_write_with(cx, |mut inner| inner.write(buf))
358    }
359
360    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
361        self.watcher.poll_write_with(cx, |mut inner| inner.flush())
362    }
363
364    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
365        self.shutdown(std::net::Shutdown::Write)?;
366        Poll::Ready(Ok(()))
367    }
368}
369
370impl From<std::net::TcpStream> for TcpStream {
371    /// Converts a `std::net::TcpStream` into its asynchronous equivalent.
372    fn from(stream: std::net::TcpStream) -> TcpStream {
373        let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap();
374        TcpStream {
375            watcher: Arc::new(Watcher::new(mio_stream)),
376        }
377    }
378}
379
380cfg_unix! {
381    use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
382
383    impl AsRawFd for TcpStream {
384        fn as_raw_fd(&self) -> RawFd {
385            self.watcher.get_ref().as_raw_fd()
386        }
387    }
388
389    impl FromRawFd for TcpStream {
390        unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
391            std::net::TcpStream::from_raw_fd(fd).into()
392        }
393    }
394
395    impl IntoRawFd for TcpStream {
396        fn into_raw_fd(self) -> RawFd {
397            // TODO(stjepang): This does not mean `RawFd` is now the sole owner of the file
398            // descriptor because it's possible that there are other clones of this `TcpStream`
399            // using it at the same time. We should probably document that behavior.
400            self.as_raw_fd()
401        }
402    }
403}
404
405cfg_windows! {
406    // use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
407    //
408    // impl AsRawSocket for TcpStream {
409    //     fn as_raw_socket(&self) -> RawSocket {
410    //         self.raw_socket
411    //     }
412    // }
413    //
414    // impl FromRawSocket for TcpStream {
415    //     unsafe fn from_raw_socket(handle: RawSocket) -> TcpStream {
416    //         net::TcpStream::from_raw_socket(handle).try_into().unwrap()
417    //     }
418    // }
419    //
420    // impl IntoRawSocket for TcpListener {
421    //     fn into_raw_socket(self) -> RawSocket {
422    //         self.raw_socket
423    //     }
424    // }
425}