futures_net/tcp/stream.rs
1//! A TCP stream between a local and a remote socket.
2
3use std::fmt;
4use std::io;
5use std::mem;
6use std::net::{Shutdown, SocketAddr};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use async_ready::{AsyncReadReady, AsyncWriteReady};
12use futures_core::Future;
13use futures_io::{AsyncRead, AsyncWrite};
14
15use crate::driver::sys;
16use crate::driver::PollEvented;
17
18/// A TCP stream between a local and a remote socket.
19///
20/// A `TcpStream` can either be created by connecting to an endpoint, via the
21/// [`connect`] method, or by [accepting] a connection from a [listener].
22/// It can be read or written to using the `AsyncRead`, `AsyncWrite`, and related
23/// extension traits in `futures::io`.
24///
25/// The connection will be closed when the value is dropped. The reading and writing
26/// portions of the connection can also be shut down individually with the [`shutdown`]
27/// method.
28///
29/// [`connect`]: struct.TcpStream.html#method.connect
30/// [accepting]: struct.TcpListener.html#method.accept
31/// [listener]: struct.TcpListener.html
32pub struct TcpStream {
33 io: PollEvented<sys::net::TcpStream>,
34}
35
36/// The future returned by `TcpStream::connect`, which will resolve to a `TcpStream`
37/// when the stream is connected.
38#[must_use = "futures do nothing unless polled"]
39#[derive(Debug)]
40pub struct ConnectFuture {
41 inner: ConnectFutureState,
42}
43
44#[must_use = "futures do nothing unless polled"]
45#[derive(Debug)]
46enum ConnectFutureState {
47 Waiting(TcpStream),
48 Error(io::Error),
49 Empty,
50}
51
52impl Unpin for TcpStream {}
53
54impl TcpStream {
55 /// Create a new TCP stream connected to the specified address.
56 ///
57 /// This function will create a new TCP socket and attempt to connect it to
58 /// the `addr` provided. The returned future will be resolved once the
59 /// stream has successfully connected, or it will return an error if one
60 /// occurs.
61 ///
62 /// # Examples
63 ///
64 /// ```no_run
65 /// # use std::io;
66 /// use futures_net::tcp::TcpStream;
67 ///
68 /// # async fn connect_localhost() -> io::Result<TcpStream> {
69 /// let addr = "127.0.0.1".parse().unwrap();
70 /// TcpStream::connect(&addr).await
71 /// # }
72 /// ```
73 pub fn connect(addr: &SocketAddr) -> ConnectFuture {
74 use self::ConnectFutureState::*;
75
76 let inner = match sys::net::TcpStream::connect(addr) {
77 Ok(tcp) => Waiting(TcpStream::new(tcp)),
78 Err(e) => Error(e),
79 };
80
81 ConnectFuture { inner }
82 }
83
84 pub(crate) fn new(connected: sys::net::TcpStream) -> TcpStream {
85 let io = PollEvented::new(connected);
86 TcpStream { io }
87 }
88
89 /// Returns the local address that this stream is bound to.
90 ///
91 /// # Examples
92 ///
93 /// ```rust
94 /// use futures_net::tcp::TcpStream;
95 /// use std::net::{IpAddr, Ipv4Addr};
96 ///
97 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
98 /// let addr = "127.0.0.1:8080".parse()?;
99 /// let stream = TcpStream::connect(&addr).await?;
100 ///
101 /// let expected = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
102 /// assert_eq!(stream.local_addr()?.ip(), expected);
103 /// # Ok(())}
104 /// ```
105 pub fn local_addr(&self) -> io::Result<SocketAddr> {
106 self.io.get_ref().local_addr()
107 }
108
109 /// Returns the remote address that this stream is connected to.
110 ///
111 /// # Examples
112 ///
113 /// ```rust
114 /// use futures_net::tcp::TcpStream;
115 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
116 ///
117 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
118 /// let addr = "127.0.0.1:8080".parse()?;
119 /// let stream = TcpStream::connect(&addr).await?;
120 ///
121 /// let expected = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080);
122 /// assert_eq!(stream.peer_addr()?, SocketAddr::V4(expected));
123 /// # Ok(())}
124 /// ```
125 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
126 self.io.get_ref().peer_addr()
127 }
128
129 /// Shuts down the read, write, or both halves of this connection.
130 ///
131 /// This function will cause all pending and future I/O on the specified
132 /// portions to return immediately with an appropriate value (see the
133 /// documentation of `Shutdown`).
134 ///
135 /// # Examples
136 ///
137 /// ```rust
138 /// use futures_net::tcp::TcpStream;
139 /// use std::net::Shutdown;
140 ///
141 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
142 /// let addr = "127.0.0.1:8080".parse()?;
143 /// let stream = TcpStream::connect(&addr).await?;
144 ///
145 /// stream.shutdown(Shutdown::Both)?;
146 /// # Ok(())}
147 /// ```
148 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
149 self.io.get_ref().shutdown(how)
150 }
151
152 /// Gets the value of the `TCP_NODELAY` option on this socket.
153 ///
154 /// For more information about this option, see [`set_nodelay`].
155 ///
156 /// [`set_nodelay`]: #method.set_nodelay
157 ///
158 /// # Examples
159 ///
160 /// ```rust
161 /// use futures_net::tcp::TcpStream;
162 ///
163 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
164 /// let addr = "127.0.0.1:8080".parse()?;
165 /// let stream = TcpStream::connect(&addr).await?;
166 ///
167 /// stream.set_nodelay(true)?;
168 /// assert_eq!(stream.nodelay()?, true);
169 /// # Ok(())}
170 /// ```
171 pub fn nodelay(&self) -> io::Result<bool> {
172 self.io.get_ref().nodelay()
173 }
174
175 /// Sets the value of the `TCP_NODELAY` option on this socket.
176 ///
177 /// If set, this option disables the Nagle algorithm. This means that
178 /// segments are always sent as soon as possible, even if there is only a
179 /// small amount of data. When not set, data is buffered until there is a
180 /// sufficient amount to send out, thereby avoiding the frequent sending of
181 /// small packets.
182 ///
183 /// # Examples
184 ///
185 /// ```rust
186 /// use futures_net::tcp::TcpStream;
187 ///
188 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
189 /// let addr = "127.0.0.1:8080".parse()?;
190 /// let stream = TcpStream::connect(&addr).await?;
191 ///
192 /// stream.set_nodelay(true)?;
193 /// # Ok(())}
194 /// ```
195 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
196 self.io.get_ref().set_nodelay(nodelay)
197 }
198
199 /// Gets the value of the `SO_RCVBUF` option on this socket.
200 ///
201 /// For more information about this option, see [`set_recv_buffer_size`].
202 ///
203 /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size
204 ///
205 /// # Examples
206 ///
207 /// ```rust
208 /// use futures_net::tcp::TcpStream;
209 ///
210 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
211 /// let addr = "127.0.0.1:8080".parse()?;
212 /// let stream = TcpStream::connect(&addr).await?;
213 ///
214 /// stream.set_recv_buffer_size(100);
215 /// assert_eq!(stream.recv_buffer_size()?, 100);
216 /// # Ok(())}
217 /// ```
218 pub fn recv_buffer_size(&self) -> io::Result<usize> {
219 self.io.get_ref().recv_buffer_size()
220 }
221
222 /// Sets the value of the `SO_RCVBUF` option on this socket.
223 ///
224 /// Changes the size of the operating system's receive buffer associated
225 /// with the socket.
226 ///
227 /// # Examples
228 ///
229 /// ```rust
230 /// use futures_net::tcp::TcpStream;
231 ///
232 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
233 /// let addr = "127.0.0.1:8080".parse()?;
234 /// let stream = TcpStream::connect(&addr).await?;
235 ///
236 /// stream.set_recv_buffer_size(100);
237 /// # Ok(())}
238 /// ```
239 pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
240 self.io.get_ref().set_recv_buffer_size(size)
241 }
242
243 /// Gets the value of the `SO_SNDBUF` option on this socket.
244 ///
245 /// For more information about this option, see [`set_send_buffer`].
246 ///
247 /// [`set_send_buffer`]: #tymethod.set_send_buffer
248 ///
249 /// # Examples
250 ///
251 /// ```rust
252 /// use futures_net::tcp::TcpStream;
253 ///
254 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
255 /// let addr = "127.0.0.1:8080".parse()?;
256 /// let stream = TcpStream::connect(&addr).await?;
257 ///
258 /// stream.set_send_buffer_size(100);
259 /// assert_eq!(stream.send_buffer_size()?, 100);
260 /// # Ok(())}
261 /// ```
262 pub fn send_buffer_size(&self) -> io::Result<usize> {
263 self.io.get_ref().send_buffer_size()
264 }
265
266 /// Sets the value of the `SO_SNDBUF` option on this socket.
267 ///
268 /// Changes the size of the operating system's send buffer associated with
269 /// the socket.
270 ///
271 /// # Examples
272 ///
273 /// ```rust
274 /// use futures_net::tcp::TcpStream;
275 ///
276 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
277 /// let addr = "127.0.0.1:8080".parse()?;
278 /// let stream = TcpStream::connect(&addr).await?;
279 ///
280 /// stream.set_send_buffer_size(100);
281 /// # Ok(())}
282 /// ```
283 pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
284 self.io.get_ref().set_send_buffer_size(size)
285 }
286
287 /// Returns whether keepalive messages are enabled on this socket, and if so
288 /// the duration of time between them.
289 ///
290 /// For more information about this option, see [`set_keepalive`].
291 ///
292 /// [`set_keepalive`]: #tymethod.set_keepalive
293 ///
294 /// # Examples
295 ///
296 /// ```rust
297 /// use futures_net::tcp::TcpStream;
298 /// use std::time::Duration;
299 ///
300 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
301 /// let addr = "127.0.0.1:8080".parse()?;
302 /// let stream = TcpStream::connect(&addr).await?;
303 ///
304 /// stream.set_keepalive(Some(Duration::from_secs(60)))?;
305 /// assert_eq!(stream.keepalive()?, Some(Duration::from_secs(60)));
306 /// # Ok(())}
307 /// ```
308 pub fn keepalive(&self) -> io::Result<Option<Duration>> {
309 self.io.get_ref().keepalive()
310 }
311
312 /// Sets whether keepalive messages are enabled to be sent on this socket.
313 ///
314 /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
315 /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
316 /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
317 ///
318 /// If `None` is specified then keepalive messages are disabled, otherwise
319 /// the duration specified will be the time to remain idle before sending a
320 /// TCP keepalive probe.
321 ///
322 /// Some platforms specify this value in seconds, so sub-second
323 /// specifications may be omitted.
324 ///
325 /// # Examples
326 ///
327 /// ```rust
328 /// use futures_net::tcp::TcpStream;
329 /// use std::time::Duration;
330 ///
331 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
332 /// let addr = "127.0.0.1:8080".parse()?;
333 /// let stream = TcpStream::connect(&addr).await?;
334 ///
335 /// stream.set_keepalive(Some(Duration::from_secs(60)))?;
336 /// # Ok(())}
337 /// ```
338 pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
339 self.io.get_ref().set_keepalive(keepalive)
340 }
341
342 /// Gets the value of the `IP_TTL` option for this socket.
343 ///
344 /// For more information about this option, see [`set_ttl`].
345 ///
346 /// [`set_ttl`]: #tymethod.set_ttl
347 ///
348 /// # Examples
349 ///
350 /// ```rust
351 /// use futures_net::tcp::TcpStream;
352 ///
353 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
354 /// let addr = "127.0.0.1:8080".parse()?;
355 /// let stream = TcpStream::connect(&addr).await?;
356 ///
357 /// stream.set_ttl(100)?;
358 /// assert_eq!(stream.ttl()?, 100);
359 /// # Ok(())}
360 /// ```
361 pub fn ttl(&self) -> io::Result<u32> {
362 self.io.get_ref().ttl()
363 }
364
365 /// Sets the value for the `IP_TTL` option on this socket.
366 ///
367 /// This value sets the time-to-live field that is used in every packet sent
368 /// from this socket.
369 ///
370 /// # Examples
371 ///
372 /// ```rust
373 /// use futures_net::tcp::TcpStream;
374 ///
375 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
376 /// let addr = "127.0.0.1:8080".parse()?;
377 /// let stream = TcpStream::connect(&addr).await?;
378 ///
379 /// stream.set_ttl(100)?;
380 /// # Ok(())}
381 /// ```
382 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
383 self.io.get_ref().set_ttl(ttl)
384 }
385
386 /// Reads the linger duration for this socket by getting the `SO_LINGER`
387 /// option.
388 ///
389 /// For more information about this option, see [`set_linger`].
390 ///
391 /// [`set_linger`]: #tymethod.set_linger
392 ///
393 /// # Examples
394 ///
395 /// ```rust
396 /// use futures_net::tcp::TcpStream;
397 /// use std::time::Duration;
398 ///
399 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
400 /// let addr = "127.0.0.1:8080".parse()?;
401 /// let stream = TcpStream::connect(&addr).await?;
402 ///
403 /// stream.set_linger(Some(Duration::from_millis(100)))?;
404 /// assert_eq!(stream.linger()?, Some(Duration::from_millis(100)));
405 /// # Ok(())}
406 /// ```
407 pub fn linger(&self) -> io::Result<Option<Duration>> {
408 self.io.get_ref().linger()
409 }
410
411 /// Sets the linger duration of this socket by setting the `SO_LINGER`
412 /// option.
413 ///
414 /// This option controls the action taken when a stream has unsent messages
415 /// and the stream is closed. If `SO_LINGER` is set, the system
416 /// shall block the process until it can transmit the data or until the
417 /// time expires.
418 ///
419 /// If `SO_LINGER` is not specified, and the stream is closed, the system
420 /// handles the call in a way that allows the process to continue as quickly
421 /// as possible.
422 ///
423 /// # Examples
424 ///
425 /// ```rust
426 /// use futures_net::tcp::TcpStream;
427 /// use std::time::Duration;
428 ///
429 /// # async fn run () -> Result<(), Box<dyn std::error::Error + 'static>> {
430 /// let addr = "127.0.0.1:8080".parse()?;
431 /// let stream = TcpStream::connect(&addr).await?;
432 ///
433 /// stream.set_linger(Some(Duration::from_millis(100)))?;
434 /// # Ok(())}
435 /// ```
436 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
437 self.io.get_ref().set_linger(dur)
438 }
439}
440
441impl AsyncRead for TcpStream {
442 fn poll_read(
443 mut self: Pin<&mut Self>,
444 cx: &mut Context<'_>,
445 buf: &mut [u8],
446 ) -> Poll<io::Result<usize>> {
447 Pin::new(&mut self.io).poll_read(cx, buf)
448 }
449}
450
451impl AsyncWrite for TcpStream {
452 fn poll_write(
453 mut self: Pin<&mut Self>,
454 cx: &mut Context<'_>,
455 buf: &[u8],
456 ) -> Poll<io::Result<usize>> {
457 Pin::new(&mut self.io).poll_write(cx, buf)
458 }
459
460 fn poll_flush(
461 mut self: Pin<&mut Self>,
462 cx: &mut Context<'_>,
463 ) -> Poll<io::Result<()>> {
464 Pin::new(&mut self.io).poll_flush(cx)
465 }
466
467 fn poll_close(
468 mut self: Pin<&mut Self>,
469 cx: &mut Context<'_>,
470 ) -> Poll<io::Result<()>> {
471 Pin::new(&mut self.io).poll_close(cx)
472 }
473}
474
475impl AsyncReadReady for TcpStream {
476 type Ok = sys::event::Ready;
477 type Err = io::Error;
478
479 /// Poll the TCP stream's readiness for reading.
480 ///
481 /// If the stream is not ready for a read then the method will return `Poll::Pending`
482 /// and schedule the current task for wakeup upon read-readiness.
483 ///
484 /// Once the stream is ready for reading, it will remain so until all available
485 /// bytes have been extracted (via `futures::io::AsyncRead` and related traits).
486 fn poll_read_ready(
487 mut self: Pin<&mut Self>,
488 cx: &mut Context<'_>,
489 ) -> Poll<Result<Self::Ok, Self::Err>> {
490 Pin::new(&mut self.io).poll_read_ready(cx)
491 }
492}
493
494impl AsyncWriteReady for TcpStream {
495 type Ok = sys::event::Ready;
496 type Err = io::Error;
497
498 /// Check the TCP stream's write readiness state.
499 ///
500 /// This always checks for writable readiness and also checks for HUP
501 /// readiness on platforms that support it.
502 ///
503 /// If the resource is not ready for a write then `Poll::Pending` is
504 /// returned and the current task is notified once a new event is received.
505 ///
506 /// The I/O resource will remain in a write-ready state until calls to
507 /// `poll_write` return `NotReady`.
508 ///
509 /// # Panics
510 ///
511 /// This function panics if called from outside of a task context.
512 fn poll_write_ready(
513 self: Pin<&mut Self>,
514 cx: &mut Context<'_>,
515 ) -> Poll<Result<Self::Ok, Self::Err>> {
516 self.io.poll_write_ready(cx)
517 }
518}
519
520impl fmt::Debug for TcpStream {
521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522 self.io.get_ref().fmt(f)
523 }
524}
525
526impl Future for ConnectFuture {
527 type Output = io::Result<TcpStream>;
528
529 fn poll(
530 mut self: Pin<&mut Self>,
531 cx: &mut Context<'_>,
532 ) -> Poll<io::Result<TcpStream>> {
533 match mem::replace(&mut self.inner, ConnectFutureState::Empty) {
534 ConnectFutureState::Waiting(stream) => {
535 // Once we've connected, wait for the stream to be writable as
536 // that's when the actual connection has been initiated. Once we're
537 // writable we check for `take_socket_error` to see if the connect
538 // actually hit an error or not.
539 //
540 // If all that succeeded then we ship everything on up.
541 if let Poll::Pending = stream.io.poll_write_ready(cx)? {
542 self.inner = ConnectFutureState::Waiting(stream);
543 return Poll::Pending;
544 }
545
546 if let Some(e) = stream.io.get_ref().take_error()? {
547 return Poll::Ready(Err(e));
548 }
549
550 Poll::Ready(Ok(stream))
551 }
552 ConnectFutureState::Error(e) => Poll::Ready(Err(e)),
553 ConnectFutureState::Empty => panic!("can't poll TCP stream twice"),
554 }
555 }
556}
557
558impl std::convert::TryFrom<std::net::TcpStream> for TcpStream {
559 type Error = io::Error;
560
561 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
562 let tcp = sys::net::TcpStream::from_stream(stream)?;
563 Ok(TcpStream::new(tcp))
564 }
565}
566
567impl std::convert::TryFrom<&std::net::SocketAddr> for TcpStream {
568 type Error = io::Error;
569
570 fn try_from(addr: &std::net::SocketAddr) -> Result<Self, Self::Error> {
571 let tcp = sys::net::TcpStream::connect(&addr)?;
572 Ok(TcpStream::new(tcp))
573 }
574}
575
576use std::os::unix::prelude::*;
577
578impl AsRawFd for TcpStream {
579 fn as_raw_fd(&self) -> RawFd {
580 self.io.get_ref().as_raw_fd()
581 }
582}