async_std/net/tcp/stream.rs
1use std::io::{IoSlice, IoSliceMut, Read as _, Write as _};
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use crate::future;
7use crate::io::{self, Read, Write};
8use crate::net::driver::Watcher;
9use crate::net::ToSocketAddrs;
10use crate::task::{Context, Poll};
11
12/// A TCP stream between a local and a remote socket.
13///
14/// A `TcpStream` can either be created by connecting to an endpoint, via the [`connect`] method,
15/// or by [accepting] a connection from a [listener]. It can be read or written to using the
16/// [`AsyncRead`], [`AsyncWrite`], and related extension traits in [`futures::io`].
17///
18/// The connection will be closed when the value is dropped. The reading and writing portions of
19/// the connection can also be shut down individually with the [`shutdown`] method.
20///
21/// This type is an async version of [`std::net::TcpStream`].
22///
23/// [`connect`]: struct.TcpStream.html#method.connect
24/// [accepting]: struct.TcpListener.html#method.accept
25/// [listener]: struct.TcpListener.html
26/// [`AsyncRead`]: https://docs.rs/futures/0.3/futures/io/trait.AsyncRead.html
27/// [`AsyncWrite`]: https://docs.rs/futures/0.3/futures/io/trait.AsyncWrite.html
28/// [`futures::io`]: https://docs.rs/futures/0.3/futures/io/index.html
29/// [`shutdown`]: struct.TcpStream.html#method.shutdown
30/// [`std::net::TcpStream`]: https://doc.rust-lang.org/std/net/struct.TcpStream.html
31///
32/// ## Examples
33///
34/// ```no_run
35/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
36/// #
37/// use async_std::net::TcpStream;
38/// use async_std::prelude::*;
39///
40/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
41/// stream.write_all(b"hello world").await?;
42///
43/// let mut buf = vec![0u8; 1024];
44/// let n = stream.read(&mut buf).await?;
45/// #
46/// # Ok(()) }) }
47/// ```
48#[derive(Debug, Clone)]
49pub struct TcpStream {
50 pub(super) watcher: Arc<Watcher<mio::net::TcpStream>>,
51}
52
53impl TcpStream {
54 /// Creates a new TCP stream connected to the specified address.
55 ///
56 /// This method will create a new TCP socket and attempt to connect it to the `addr`
57 /// provided. The [returned future] will be resolved once the stream has successfully
58 /// connected, or it will return an error if one occurs.
59 ///
60 /// [returned future]: struct.Connect.html
61 ///
62 /// # Examples
63 ///
64 /// ```no_run
65 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
66 /// #
67 /// use async_std::net::TcpStream;
68 ///
69 /// let stream = TcpStream::connect("127.0.0.1:0").await?;
70 /// #
71 /// # Ok(()) }) }
72 /// ```
73 pub async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> {
74 let mut last_err = None;
75 let addrs = addrs.to_socket_addrs().await?;
76
77 for addr in addrs {
78 // mio's TcpStream::connect is non-blocking and may just be in progress
79 // when it returns with `Ok`. We therefore wait for write readiness to
80 // be sure the connection has either been established or there was an
81 // error which we check for afterwards.
82 let watcher = match mio::net::TcpStream::connect(&addr) {
83 Ok(s) => Watcher::new(s),
84 Err(e) => {
85 last_err = Some(e);
86 continue;
87 }
88 };
89
90 future::poll_fn(|cx| watcher.poll_write_ready(cx)).await;
91
92 match watcher.get_ref().take_error() {
93 Ok(None) => {
94 return Ok(TcpStream {
95 watcher: Arc::new(watcher),
96 });
97 }
98 Ok(Some(e)) => last_err = Some(e),
99 Err(e) => last_err = Some(e),
100 }
101 }
102
103 Err(last_err.unwrap_or_else(|| {
104 io::Error::new(
105 io::ErrorKind::InvalidInput,
106 "could not resolve to any addresses",
107 )
108 }))
109 }
110
111 /// Returns the local address that this stream is connected to.
112 ///
113 /// ## Examples
114 ///
115 /// ```no_run
116 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
117 /// #
118 /// use async_std::net::TcpStream;
119 ///
120 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
121 /// let addr = stream.local_addr()?;
122 /// #
123 /// # Ok(()) }) }
124 /// ```
125 pub fn local_addr(&self) -> io::Result<SocketAddr> {
126 self.watcher.get_ref().local_addr()
127 }
128
129 /// Returns the remote address that this stream is connected to.
130 ///
131 /// ## Examples
132 ///
133 /// ```no_run
134 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
135 /// #
136 /// use async_std::net::TcpStream;
137 ///
138 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
139 /// let peer = stream.peer_addr()?;
140 /// #
141 /// # Ok(()) }) }
142 /// ```
143 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
144 self.watcher.get_ref().peer_addr()
145 }
146
147 /// Gets the value of the `IP_TTL` option for this socket.
148 ///
149 /// For more information about this option, see [`set_ttl`].
150 ///
151 /// [`set_ttl`]: #method.set_ttl
152 ///
153 /// # Examples
154 ///
155 /// ```no_run
156 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
157 /// #
158 /// use async_std::net::TcpStream;
159 ///
160 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
161 ///
162 /// stream.set_ttl(100)?;
163 /// assert_eq!(stream.ttl()?, 100);
164 /// #
165 /// # Ok(()) }) }
166 /// ```
167 pub fn ttl(&self) -> io::Result<u32> {
168 self.watcher.get_ref().ttl()
169 }
170
171 /// Sets the value for the `IP_TTL` option on this socket.
172 ///
173 /// This value sets the time-to-live field that is used in every packet sent
174 /// from this socket.
175 ///
176 /// # Examples
177 ///
178 /// ```no_run
179 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
180 /// #
181 /// use async_std::net::TcpStream;
182 ///
183 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
184 ///
185 /// stream.set_ttl(100)?;
186 /// assert_eq!(stream.ttl()?, 100);
187 /// #
188 /// # Ok(()) }) }
189 /// ```
190 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
191 self.watcher.get_ref().set_ttl(ttl)
192 }
193
194 /// Receives data on the socket from the remote address to which it is connected, without
195 /// removing that data from the queue.
196 ///
197 /// On success, returns the number of bytes peeked.
198 ///
199 /// Successive calls return the same data. This is accomplished by passing `MSG_PEEK` as a flag
200 /// to the underlying `recv` system call.
201 ///
202 /// # Examples
203 ///
204 /// ```no_run
205 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
206 /// #
207 /// use async_std::net::TcpStream;
208 ///
209 /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
210 ///
211 /// let mut buf = vec![0; 1024];
212 /// let n = stream.peek(&mut buf).await?;
213 /// #
214 /// # Ok(()) }) }
215 /// ```
216 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
217 future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.peek(buf))).await
218 }
219
220 /// Gets the value of the `TCP_NODELAY` option on this socket.
221 ///
222 /// For more information about this option, see [`set_nodelay`].
223 ///
224 /// [`set_nodelay`]: #method.set_nodelay
225 ///
226 /// # Examples
227 ///
228 /// ```no_run
229 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
230 /// #
231 /// use async_std::net::TcpStream;
232 ///
233 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
234 ///
235 /// stream.set_nodelay(true)?;
236 /// assert_eq!(stream.nodelay()?, true);
237 /// #
238 /// # Ok(()) }) }
239 /// ```
240 pub fn nodelay(&self) -> io::Result<bool> {
241 self.watcher.get_ref().nodelay()
242 }
243
244 /// Sets the value of the `TCP_NODELAY` option on this socket.
245 ///
246 /// If set, this option disables the Nagle algorithm. This means that
247 /// segments are always sent as soon as possible, even if there is only a
248 /// small amount of data. When not set, data is buffered until there is a
249 /// sufficient amount to send out, thereby avoiding the frequent sending of
250 /// small packets.
251 ///
252 /// # Examples
253 ///
254 /// ```no_run
255 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
256 /// #
257 /// use async_std::net::TcpStream;
258 ///
259 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
260 ///
261 /// stream.set_nodelay(true)?;
262 /// assert_eq!(stream.nodelay()?, true);
263 /// #
264 /// # Ok(()) }) }
265 /// ```
266 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
267 self.watcher.get_ref().set_nodelay(nodelay)
268 }
269
270 /// Shuts down the read, write, or both halves of this connection.
271 ///
272 /// This method will cause all pending and future I/O on the specified portions to return
273 /// immediately with an appropriate value (see the documentation of [`Shutdown`]).
274 ///
275 /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html
276 ///
277 /// # Examples
278 ///
279 /// ```no_run
280 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
281 /// #
282 /// use std::net::Shutdown;
283 ///
284 /// use async_std::net::TcpStream;
285 ///
286 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
287 /// stream.shutdown(Shutdown::Both)?;
288 /// #
289 /// # Ok(()) }) }
290 /// ```
291 pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
292 self.watcher.get_ref().shutdown(how)
293 }
294}
295
296impl Read for TcpStream {
297 fn poll_read(
298 self: Pin<&mut Self>,
299 cx: &mut Context<'_>,
300 buf: &mut [u8],
301 ) -> Poll<io::Result<usize>> {
302 Pin::new(&mut &*self).poll_read(cx, buf)
303 }
304
305 fn poll_read_vectored(
306 self: Pin<&mut Self>,
307 cx: &mut Context<'_>,
308 bufs: &mut [IoSliceMut<'_>],
309 ) -> Poll<io::Result<usize>> {
310 Pin::new(&mut &*self).poll_read_vectored(cx, bufs)
311 }
312}
313
314impl Read for &TcpStream {
315 fn poll_read(
316 self: Pin<&mut Self>,
317 cx: &mut Context<'_>,
318 buf: &mut [u8],
319 ) -> Poll<io::Result<usize>> {
320 self.watcher.poll_read_with(cx, |mut inner| inner.read(buf))
321 }
322}
323
324impl Write for TcpStream {
325 fn poll_write(
326 self: Pin<&mut Self>,
327 cx: &mut Context<'_>,
328 buf: &[u8],
329 ) -> Poll<io::Result<usize>> {
330 Pin::new(&mut &*self).poll_write(cx, buf)
331 }
332
333 fn poll_write_vectored(
334 self: Pin<&mut Self>,
335 cx: &mut Context<'_>,
336 bufs: &[IoSlice<'_>],
337 ) -> Poll<io::Result<usize>> {
338 Pin::new(&mut &*self).poll_write_vectored(cx, bufs)
339 }
340
341 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
342 Pin::new(&mut &*self).poll_flush(cx)
343 }
344
345 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
346 Pin::new(&mut &*self).poll_close(cx)
347 }
348}
349
350impl Write for &TcpStream {
351 fn poll_write(
352 self: Pin<&mut Self>,
353 cx: &mut Context<'_>,
354 buf: &[u8],
355 ) -> Poll<io::Result<usize>> {
356 self.watcher
357 .poll_write_with(cx, |mut inner| inner.write(buf))
358 }
359
360 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
361 self.watcher.poll_write_with(cx, |mut inner| inner.flush())
362 }
363
364 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
365 self.shutdown(std::net::Shutdown::Write)?;
366 Poll::Ready(Ok(()))
367 }
368}
369
370impl From<std::net::TcpStream> for TcpStream {
371 /// Converts a `std::net::TcpStream` into its asynchronous equivalent.
372 fn from(stream: std::net::TcpStream) -> TcpStream {
373 let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap();
374 TcpStream {
375 watcher: Arc::new(Watcher::new(mio_stream)),
376 }
377 }
378}
379
380cfg_unix! {
381 use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
382
383 impl AsRawFd for TcpStream {
384 fn as_raw_fd(&self) -> RawFd {
385 self.watcher.get_ref().as_raw_fd()
386 }
387 }
388
389 impl FromRawFd for TcpStream {
390 unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
391 std::net::TcpStream::from_raw_fd(fd).into()
392 }
393 }
394
395 impl IntoRawFd for TcpStream {
396 fn into_raw_fd(self) -> RawFd {
397 // TODO(stjepang): This does not mean `RawFd` is now the sole owner of the file
398 // descriptor because it's possible that there are other clones of this `TcpStream`
399 // using it at the same time. We should probably document that behavior.
400 self.as_raw_fd()
401 }
402 }
403}
404
405cfg_windows! {
406 // use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
407 //
408 // impl AsRawSocket for TcpStream {
409 // fn as_raw_socket(&self) -> RawSocket {
410 // self.raw_socket
411 // }
412 // }
413 //
414 // impl FromRawSocket for TcpStream {
415 // unsafe fn from_raw_socket(handle: RawSocket) -> TcpStream {
416 // net::TcpStream::from_raw_socket(handle).try_into().unwrap()
417 // }
418 // }
419 //
420 // impl IntoRawSocket for TcpListener {
421 // fn into_raw_socket(self) -> RawSocket {
422 // self.raw_socket
423 // }
424 // }
425}